diff --git a/ops/deploy.sh b/ops/deploy.sh index 31a2f6d1d..c571e9fca 100755 --- a/ops/deploy.sh +++ b/ops/deploy.sh @@ -93,7 +93,115 @@ echo "Deploy complete." if $RESTART; then echo "" - echo "=== Restarting services ===" - ssh "$VPS_HOST" "sudo systemctl restart teleo-pipeline teleo-diagnostics" - echo "Services restarted." + echo "=== Detecting services to restart ===" + + # Determine which services need restart based on what was deployed. + # rsync touched these paths → these services: + # pipeline-v2/lib/, pipeline-v2/*.py → teleo-pipeline + # diagnostics/ → teleo-diagnostics + # agent-state/, research-session.sh → no restart (not daemons) + RESTART_SVCS="" + + # Check VPS for recent file changes from this deploy + # Compare local files against VPS to see what actually changed + PIPELINE_CHANGED=false + DIAG_CHANGED=false + + # Pipeline: lib/ or top-level scripts + if ! rsync -avzn --exclude='__pycache__' --exclude='*.pyc' --exclude='*.bak*' \ + "$REPO_ROOT/ops/pipeline-v2/lib/" "$VPS_HOST:$VPS_PIPELINE/lib/" 2>/dev/null | grep -q '\.py$'; then + true # no python changes + else + PIPELINE_CHANGED=true + fi + for f in teleo-pipeline.py reweave.py; do + if [ -f "$REPO_ROOT/ops/pipeline-v2/$f" ]; then + if rsync -avzn "$REPO_ROOT/ops/pipeline-v2/$f" "$VPS_HOST:$VPS_PIPELINE/$f" 2>/dev/null | grep -q "$f"; then + PIPELINE_CHANGED=true + fi + fi + done + + # Diagnostics + if rsync -avzn --exclude='__pycache__' --exclude='*.pyc' --exclude='*.bak*' \ + "$REPO_ROOT/ops/diagnostics/" "$VPS_HOST:$VPS_DIAGNOSTICS/" 2>/dev/null | grep -q '\.py$'; then + DIAG_CHANGED=true + fi + + if $PIPELINE_CHANGED; then + RESTART_SVCS="$RESTART_SVCS teleo-pipeline" + echo " teleo-pipeline: files changed, will restart" + else + echo " teleo-pipeline: no changes, skipping" + fi + + if $DIAG_CHANGED; then + RESTART_SVCS="$RESTART_SVCS teleo-diagnostics" + echo " teleo-diagnostics: files changed, will restart" + else + echo " teleo-diagnostics: no changes, skipping" + fi + + if [ -z "$RESTART_SVCS" ]; then + echo "" + echo "No service files changed. Skipping restart." + else + echo "" + echo "=== Restarting:$RESTART_SVCS ===" + ssh "$VPS_HOST" "sudo systemctl restart $RESTART_SVCS" + echo "Services restarted. Waiting 5s for startup..." + sleep 5 + + echo "" + echo "=== Smoke test ===" + SMOKE_FAIL=0 + + # Check systemd unit status for restarted services + for svc in $RESTART_SVCS; do + if ssh "$VPS_HOST" "systemctl is-active --quiet $svc"; then + echo " $svc: active" + else + echo " $svc: FAILED" + ssh "$VPS_HOST" "journalctl -u $svc -n 10 --no-pager" || true + SMOKE_FAIL=1 + fi + done + + # Hit health endpoints for restarted services + if echo "$RESTART_SVCS" | grep -q "teleo-pipeline"; then + if ssh "$VPS_HOST" "curl -sf --connect-timeout 3 http://localhost:8080/health > /dev/null"; then + echo " pipeline health (8080): OK" + else + echo " pipeline health (8080): FAILED" + SMOKE_FAIL=1 + fi + fi + + if echo "$RESTART_SVCS" | grep -q "teleo-diagnostics"; then + if ssh "$VPS_HOST" "curl -sf --connect-timeout 3 http://localhost:8081/ops > /dev/null"; then + echo " diagnostics (8081): OK" + else + echo " diagnostics (8081): FAILED" + SMOKE_FAIL=1 + fi + fi + + # Tail logs for quick visual check + echo "" + echo "=== Recent logs (10s) ===" + JOURNAL_UNITS="" + for svc in $RESTART_SVCS; do + JOURNAL_UNITS="$JOURNAL_UNITS -u $svc" + done + ssh "$VPS_HOST" "journalctl $JOURNAL_UNITS --since '-10s' --no-pager -n 20" || true + + if [ "$SMOKE_FAIL" -gt 0 ]; then + echo "" + echo "WARNING: Smoke test detected failures. Check logs above." + exit 1 + fi + + echo "" + echo "Smoke test passed." + fi fi diff --git a/ops/diagnostics/review_queue.py b/ops/diagnostics/review_queue.py index c15a4beba..241171d5c 100644 --- a/ops/diagnostics/review_queue.py +++ b/ops/diagnostics/review_queue.py @@ -140,7 +140,7 @@ async def fetch_review_queue( if forgejo_token: headers["Authorization"] = f"token {forgejo_token}" - connector = aiohttp.TCPConnector(ssl=False) + connector = aiohttp.TCPConnector() # Default SSL verification — Forgejo token must not be exposed to MITM async with aiohttp.ClientSession(headers=headers, connector=connector) as session: # Fetch open PRs url = f"{FORGEJO_BASE}/repos/{REPO}/pulls?state=open&limit=50&sort=oldest" diff --git a/ops/pipeline-v2/backfill-descriptions.py b/ops/pipeline-v2/backfill-descriptions.py new file mode 100644 index 000000000..0e7c32a8a --- /dev/null +++ b/ops/pipeline-v2/backfill-descriptions.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +"""One-time backfill: populate prs.description with claim titles from merged files. + +For PRs that have description=NULL or empty, reads the claim files on main +(for merged PRs) or on the branch (for open PRs) and extracts H1 titles. + +Usage: python3 backfill-descriptions.py [--dry-run] + +Requires: run from the teleo-codex git worktree (main branch). +""" + +import re +import sqlite3 +import subprocess +import sys +from pathlib import Path + +DB_PATH = Path("/opt/teleo-eval/pipeline/pipeline.db") +MAIN_WORKTREE = Path("/opt/teleo-eval/teleo-codex") +CLAIM_DIRS = ("domains/", "core/", "foundations/") + +dry_run = "--dry-run" in sys.argv + + +def get_pr_claim_titles(pr_number: int, branch: str, status: str) -> list[str]: + """Extract H1 claim titles from a PR's changed files.""" + titles = [] + + # For merged PRs: diff the merge commit on main + # For open PRs: diff against main + try: + if status == "merged": + # Get the diff from the branch name — files are on main now + # Use git log to find the merge and diff its changes + result = subprocess.run( + ["git", "diff", "--name-only", f"origin/main...origin/{branch}"], + capture_output=True, text=True, timeout=10, + cwd=str(MAIN_WORKTREE), + ) + if result.returncode != 0: + # Branch may be deleted — try reading files from main directly + # We can't reconstruct the diff, but we can search by PR number in audit_log + return titles + else: + result = subprocess.run( + ["git", "diff", "--name-only", f"origin/main...origin/{branch}"], + capture_output=True, text=True, timeout=10, + cwd=str(MAIN_WORKTREE), + ) + if result.returncode != 0: + return titles + + changed_files = [ + f.strip() for f in result.stdout.strip().split("\n") + if f.strip() and any(f.strip().startswith(d) for d in CLAIM_DIRS) and f.strip().endswith(".md") + ] + + for fpath in changed_files: + # Read from main for merged, from branch for open + ref = "origin/main" if status == "merged" else f"origin/{branch}" + show = subprocess.run( + ["git", "show", f"{ref}:{fpath}"], + capture_output=True, text=True, timeout=5, + cwd=str(MAIN_WORKTREE), + ) + if show.returncode == 0: + for line in show.stdout.split("\n"): + if line.startswith("# ") and len(line) > 3: + titles.append(line[2:].strip()) + break + + except (subprocess.TimeoutExpired, Exception) as e: + print(f" PR #{pr_number}: error — {e}") + + return titles + + +def main(): + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + + # Find PRs with empty description + rows = conn.execute( + "SELECT number, branch, status FROM prs WHERE description IS NULL OR description = '' ORDER BY number DESC" + ).fetchall() + + print(f"Found {len(rows)} PRs with empty description") + + updated = 0 + skipped = 0 + + for row in rows: + pr_num = row["number"] + branch = row["branch"] + status = row["status"] + + if not branch: + skipped += 1 + continue + + titles = get_pr_claim_titles(pr_num, branch, status) + + if titles: + desc = " | ".join(titles) + if dry_run: + print(f" PR #{pr_num} ({status}): would set → {desc[:100]}...") + else: + conn.execute( + "UPDATE prs SET description = ? WHERE number = ?", + (desc, pr_num), + ) + updated += 1 + if updated % 50 == 0: + conn.commit() + print(f" ...{updated} updated so far") + else: + skipped += 1 + + if not dry_run: + conn.commit() + + conn.close() + print(f"\nDone. Updated: {updated}, Skipped: {skipped}, Total: {len(rows)}") + if dry_run: + print("(dry run — no changes written)") + + +if __name__ == "__main__": + main() diff --git a/ops/pipeline-v2/lib/cascade.py b/ops/pipeline-v2/lib/cascade.py index 1f8241f3f..350d9c89e 100644 --- a/ops/pipeline-v2/lib/cascade.py +++ b/ops/pipeline-v2/lib/cascade.py @@ -9,7 +9,7 @@ the same atomic-write pattern as lib-state.sh. """ import asyncio -import hashlib +import secrets import json import logging import os @@ -116,8 +116,8 @@ def _write_inbox_message(agent: str, subject: str, body: str) -> bool: return False ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") - file_hash = hashlib.md5(f"{agent}-{subject}-{body[:200]}".encode()).hexdigest()[:8] - filename = f"cascade-{ts}-{subject[:60]}-{file_hash}.md" + nonce = secrets.token_hex(3) + filename = f"cascade-{ts}-{nonce}-{subject[:60]}.md" final_path = inbox_dir / filename try: diff --git a/ops/pipeline-v2/lib/db.py b/ops/pipeline-v2/lib/db.py index 653b80376..06833f176 100644 --- a/ops/pipeline-v2/lib/db.py +++ b/ops/pipeline-v2/lib/db.py @@ -479,6 +479,9 @@ def migrate(conn: sqlite3.Connection): logger.info("Migration v11: added auto_merge column to prs table") + # v12-v16 ran manually on VPS before code was version-controlled. + # Their changes are consolidated into v17+ migrations below. + if current < 17: # Add prompt/pipeline version tracking per PR for col, default in [ diff --git a/ops/pipeline-v2/lib/extract.py b/ops/pipeline-v2/lib/extract.py index d71fbb076..ab663c2d2 100644 --- a/ops/pipeline-v2/lib/extract.py +++ b/ops/pipeline-v2/lib/extract.py @@ -376,6 +376,7 @@ async def _extract_one_source( filename = c.get("filename", "") if not filename: continue + filename = Path(filename).name # Strip directory components — LLM output may contain path traversal if not filename.endswith(".md"): filename += ".md" content = _build_claim_content(c, agent_lower) @@ -387,6 +388,7 @@ async def _extract_one_source( filename = e.get("filename", "") if not filename: continue + filename = Path(filename).name # Strip directory components — LLM output may contain path traversal if not filename.endswith(".md"): filename += ".md" action = e.get("action", "create") diff --git a/ops/pipeline-v2/lib/stale_pr.py b/ops/pipeline-v2/lib/stale_pr.py new file mode 100644 index 000000000..c2873d070 --- /dev/null +++ b/ops/pipeline-v2/lib/stale_pr.py @@ -0,0 +1,94 @@ +"""Stale extraction PR cleanup — closes extraction PRs that produce no claims. + +When an extraction PR sits open >30 min with claims_count=0, it indicates: +- Extraction failed (model couldn't extract anything useful) +- Batch job stalled (no claims written) +- Source material is empty/junk + +Auto-closing prevents zombie PRs from blocking the pipeline. +Logs each close for root cause analysis (model failures, bad sources, etc.). + +Epimetheus owns this module. +""" + +import json +import logging +from datetime import datetime, timezone + +from . import config, db +from .forgejo import api, repo_path + +logger = logging.getLogger("pipeline.stale_pr") + +STALE_THRESHOLD_MINUTES = 30 + + +async def check_stale_prs(conn) -> tuple[int, int]: + """Auto-close extraction PRs open >30 min with zero claims. + + Returns (stale_closed, stale_errors) — count of closed PRs and close failures. + """ + stale_closed = 0 + stale_errors = 0 + + # Find extraction PRs: open >30 min, source has 0 claims + stale_prs = conn.execute( + """SELECT p.number, p.branch, p.source_path, p.created_at + FROM prs p + LEFT JOIN sources s ON p.source_path = s.path + WHERE p.status = 'open' + AND p.commit_type = 'extract' + AND datetime(p.created_at) < datetime('now', '-' || ? || ' minutes') + AND COALESCE(s.claims_count, 0) = 0""", + (STALE_THRESHOLD_MINUTES,), + ).fetchall() + + for pr in stale_prs: + pr_num = pr["number"] + source_path = pr["source_path"] or "unknown" + + try: + # Close the PR via Forgejo + result = await api( + "PATCH", + repo_path(f"pulls/{pr_num}"), + body={"state": "closed"}, + ) + if result is None: + stale_errors += 1 + logger.warning( + "Failed to close stale extraction PR #%d (%s, %s)", + pr_num, source_path, pr["branch"], + ) + continue + + # Update local DB status + conn.execute( + "UPDATE prs SET status = 'closed' WHERE number = ?", + (pr_num,), + ) + db.audit( + conn, + "watchdog", + "stale_pr_closed", + json.dumps({ + "pr": pr_num, + "branch": pr["branch"], + "source": source_path, + "open_minutes": STALE_THRESHOLD_MINUTES, + }), + ) + stale_closed += 1 + logger.info( + "WATCHDOG: closed stale extraction PR #%d (no claims after %d min): %s", + pr_num, STALE_THRESHOLD_MINUTES, source_path, + ) + + except Exception as e: + stale_errors += 1 + logger.warning( + "Stale PR close exception for #%d: %s", + pr_num, e, + ) + + return stale_closed, stale_errors diff --git a/ops/pipeline-v2/lib/validate.py b/ops/pipeline-v2/lib/validate.py index d32ee9e60..f064fb44a 100644 --- a/ops/pipeline-v2/lib/validate.py +++ b/ops/pipeline-v2/lib/validate.py @@ -620,6 +620,27 @@ async def validate_pr(conn, pr_number: int) -> dict: # Extract claim files (domains/, core/, foundations/) claim_files = extract_claim_files_from_diff(diff) + # ── Backfill description (claim titles) if missing ── + # discover_external_prs creates rows without description. Extract H1 titles + # from the diff so the dashboard shows what the PR actually contains. + existing_desc = conn.execute( + "SELECT description FROM prs WHERE number = ?", (pr_number,) + ).fetchone() + if existing_desc and not (existing_desc["description"] or "").strip() and claim_files: + titles = [] + for _fp, content in claim_files.items(): + for line in content.split("\n"): + if line.startswith("# ") and len(line) > 3: + titles.append(line[2:].strip()) + break + if titles: + desc = " | ".join(titles) + conn.execute( + "UPDATE prs SET description = ? WHERE number = ? AND (description IS NULL OR description = '')", + (desc, pr_number), + ) + logger.info("PR #%d: backfilled description with %d claim titles", pr_number, len(titles)) + # ── Tier 0: per-claim validation ── # Only validates NEW files (not modified). Modified files have partial content # from diffs (only + lines) — frontmatter parsing fails on partial content, diff --git a/ops/pipeline-v2/lib/watchdog.py b/ops/pipeline-v2/lib/watchdog.py index e6b2ebdec..40c8f37e8 100644 --- a/ops/pipeline-v2/lib/watchdog.py +++ b/ops/pipeline-v2/lib/watchdog.py @@ -19,6 +19,7 @@ import logging from datetime import datetime, timezone from . import config, db +from .stale_pr import check_stale_prs logger = logging.getLogger("pipeline.watchdog") @@ -103,17 +104,94 @@ async def watchdog_check(conn) -> dict: "action": "GC should auto-close these — check fixer.py GC logic", }) - # 5. Tier0 blockage: many PRs with tier0_pass=0 (potential validation bug) + # 5. Tier0 blockage: auto-reset stuck PRs with retry cap + MAX_TIER0_RESETS = 3 + TIER0_RESET_COOLDOWN_S = 3600 tier0_blocked = conn.execute( - "SELECT COUNT(*) as n FROM prs WHERE status = 'open' AND tier0_pass = 0" - ).fetchone()["n"] - if tier0_blocked >= 5: - issues.append({ - "type": "tier0_blockage", - "severity": "warning", - "detail": f"{tier0_blocked} PRs blocked at tier0_pass=0", - "action": "Check validate.py — may be the modified-file or wiki-link bug recurring", - }) + "SELECT number, branch FROM prs WHERE status = 'open' AND tier0_pass = 0" + ).fetchall() + + if tier0_blocked: + reset_count = 0 + permanent_count = 0 + + for pr in tier0_blocked: + row = conn.execute( + """SELECT COUNT(*) as n, MAX(timestamp) as last_ts FROM audit_log + WHERE stage = 'watchdog' AND event = 'tier0_reset' + AND json_extract(detail, '$.pr') = ?""", + (pr["number"],), + ).fetchone() + prior_resets = row["n"] + + if prior_resets >= MAX_TIER0_RESETS: + permanent_count += 1 + continue + + last_reset = row["last_ts"] + + if last_reset: + try: + last_ts = datetime.fromisoformat(last_reset).replace(tzinfo=timezone.utc) + age = (datetime.now(timezone.utc) - last_ts).total_seconds() + if age < TIER0_RESET_COOLDOWN_S: + continue + except (ValueError, TypeError): + pass + + conn.execute( + "UPDATE prs SET tier0_pass = NULL WHERE number = ?", + (pr["number"],), + ) + db.audit( + conn, "watchdog", "tier0_reset", + json.dumps({ + "pr": pr["number"], + "branch": pr["branch"], + "attempt": prior_resets + 1, + "max": MAX_TIER0_RESETS, + }), + ) + reset_count += 1 + logger.info( + "WATCHDOG: auto-reset tier0 for PR #%d (attempt %d/%d)", + pr["number"], prior_resets + 1, MAX_TIER0_RESETS, + ) + + if reset_count: + issues.append({ + "type": "tier0_reset", + "severity": "info", + "detail": f"Auto-reset {reset_count} PRs stuck at tier0_pass=0 for re-validation", + "action": "Monitor — if same PRs fail again, check validate.py", + }) + if permanent_count: + issues.append({ + "type": "tier0_permanent_failure", + "severity": "warning", + "detail": f"{permanent_count} PRs exhausted {MAX_TIER0_RESETS} tier0 retries — manual intervention needed", + "action": "Inspect PR content or close stale PRs", + }) + + # 6. Stale extraction PRs: open >30 min with no claim files + try: + stale_closed, stale_errors = await check_stale_prs(conn) + if stale_closed > 0: + issues.append({ + "type": "stale_prs_closed", + "severity": "info", + "detail": f"Auto-closed {stale_closed} stale extraction PRs (no claims after 30 min)", + "action": "Check batch-extract logs for extraction failures", + }) + if stale_errors > 0: + issues.append({ + "type": "stale_pr_close_failed", + "severity": "warning", + "detail": f"Failed to close {stale_errors} stale PRs", + "action": "Check Forgejo API connectivity", + }) + except Exception as e: + logger.warning("Stale PR check failed: %s", e) # Log issues healthy = len(issues) == 0 @@ -124,7 +202,7 @@ async def watchdog_check(conn) -> dict: else: logger.info("WATCHDOG: %s — %s", issue["type"], issue["detail"]) - return {"healthy": healthy, "issues": issues, "checks_run": 5} + return {"healthy": healthy, "issues": issues, "checks_run": 6} async def watchdog_cycle(conn, max_workers=None) -> tuple[int, int]: