Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge
Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors
Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes
Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
277 lines
11 KiB
Python
277 lines
11 KiB
Python
"""Auto-fixer stage — mechanical fixes for known issue types.
|
|
|
|
Currently fixes:
|
|
- broken_wiki_links: strips [[ ]] brackets from links that don't resolve
|
|
|
|
Runs as a pipeline stage on FIX_INTERVAL. Only fixes mechanical issues
|
|
that don't require content understanding. Does NOT fix frontmatter_schema,
|
|
near_duplicate, or any substantive issues.
|
|
|
|
Key design decisions (Ganymede):
|
|
- Only fix files in the PR diff (not the whole worktree/repo)
|
|
- Add intra-PR file stems to valid set (avoids stripping cross-references
|
|
between new claims in the same PR)
|
|
- Atomic claim via status='fixing' (same pattern as eval's 'reviewing')
|
|
- fix_attempts cap prevents infinite fix loops
|
|
- Reset eval_attempts + tier0_pass on successful fix for re-evaluation
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from . import config, db
|
|
from .validate import WIKI_LINK_RE, load_existing_claims
|
|
|
|
logger = logging.getLogger("pipeline.fixer")
|
|
|
|
|
|
# ─── Git helper (async subprocess, same pattern as merge.py) ─────────────
|
|
|
|
|
|
async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]:
|
|
"""Run a git command async. Returns (returncode, combined output)."""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git",
|
|
*args,
|
|
cwd=cwd or str(config.REPO_DIR),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
return -1, f"git {args[0]} timed out after {timeout}s"
|
|
output = (stdout or b"").decode().strip()
|
|
if stderr:
|
|
output += "\n" + stderr.decode().strip()
|
|
return proc.returncode, output
|
|
|
|
|
|
# ─── Wiki link fixer ─────────────────────────────────────────────────────
|
|
|
|
|
|
async def _fix_wiki_links_in_pr(conn, pr_number: int) -> dict:
|
|
"""Fix broken wiki links in a single PR by stripping brackets.
|
|
|
|
Only processes files in the PR diff (not the whole repo).
|
|
Adds intra-PR file stems to the valid set so cross-references
|
|
between new claims in the same PR are preserved.
|
|
"""
|
|
# Atomic claim — prevent concurrent fixers and evaluators
|
|
cursor = conn.execute(
|
|
"UPDATE prs SET status = 'fixing', last_attempt = datetime('now') WHERE number = ? AND status = 'open'",
|
|
(pr_number,),
|
|
)
|
|
if cursor.rowcount == 0:
|
|
return {"pr": pr_number, "skipped": True, "reason": "not_open"}
|
|
|
|
# Increment fix_attempts
|
|
conn.execute(
|
|
"UPDATE prs SET fix_attempts = COALESCE(fix_attempts, 0) + 1 WHERE number = ?",
|
|
(pr_number,),
|
|
)
|
|
|
|
# Get PR branch from DB first, fall back to Forgejo API
|
|
row = conn.execute("SELECT branch FROM prs WHERE number = ?", (pr_number,)).fetchone()
|
|
branch = row["branch"] if row and row["branch"] else None
|
|
|
|
if not branch:
|
|
from .forgejo import api as forgejo_api
|
|
from .forgejo import repo_path
|
|
|
|
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
|
|
if pr_info:
|
|
branch = pr_info.get("head", {}).get("ref")
|
|
|
|
if not branch:
|
|
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
|
|
return {"pr": pr_number, "skipped": True, "reason": "no_branch"}
|
|
|
|
# Fetch latest refs
|
|
await _git("fetch", "origin", branch, timeout=30)
|
|
|
|
# Create worktree
|
|
worktree_path = str(config.BASE_DIR / "workspaces" / f"fix-{pr_number}")
|
|
|
|
rc, out = await _git("worktree", "add", "--detach", worktree_path, f"origin/{branch}")
|
|
if rc != 0:
|
|
logger.error("PR #%d: worktree creation failed: %s", pr_number, out)
|
|
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
|
|
return {"pr": pr_number, "skipped": True, "reason": "worktree_failed"}
|
|
|
|
try:
|
|
# Checkout the actual branch (so we can push)
|
|
rc, out = await _git("checkout", "-B", branch, f"origin/{branch}", cwd=worktree_path)
|
|
if rc != 0:
|
|
logger.error("PR #%d: checkout failed: %s", pr_number, out)
|
|
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
|
|
return {"pr": pr_number, "skipped": True, "reason": "checkout_failed"}
|
|
|
|
# Get files changed in PR (only fix these, not the whole repo)
|
|
rc, out = await _git("diff", "--name-only", "origin/main...HEAD", cwd=worktree_path)
|
|
if rc != 0:
|
|
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
|
|
return {"pr": pr_number, "skipped": True, "reason": "diff_failed"}
|
|
|
|
pr_files = [f for f in out.split("\n") if f.strip() and f.endswith(".md")]
|
|
|
|
if not pr_files:
|
|
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
|
|
return {"pr": pr_number, "skipped": True, "reason": "no_md_files"}
|
|
|
|
# Load existing claims from main + add intra-PR stems
|
|
# (avoids stripping cross-references between new claims in same PR)
|
|
existing_claims = load_existing_claims()
|
|
for f in pr_files:
|
|
existing_claims.add(Path(f).stem)
|
|
|
|
# Fix broken links in each PR file
|
|
total_fixed = 0
|
|
|
|
for filepath in pr_files:
|
|
full_path = Path(worktree_path) / filepath
|
|
if not full_path.is_file():
|
|
continue
|
|
|
|
content = full_path.read_text(encoding="utf-8")
|
|
file_fixes = 0
|
|
|
|
def replace_broken_link(match):
|
|
nonlocal file_fixes
|
|
link_text = match.group(1)
|
|
if link_text.strip() not in existing_claims:
|
|
file_fixes += 1
|
|
return link_text # Strip brackets, keep text
|
|
return match.group(0) # Keep valid link
|
|
|
|
new_content = WIKI_LINK_RE.sub(replace_broken_link, content)
|
|
if new_content != content:
|
|
full_path.write_text(new_content, encoding="utf-8")
|
|
total_fixed += file_fixes
|
|
|
|
if total_fixed == 0:
|
|
# No broken links found — issue might be something else
|
|
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
|
|
return {"pr": pr_number, "skipped": True, "reason": "no_broken_links"}
|
|
|
|
# Commit and push
|
|
rc, out = await _git("add", *pr_files, cwd=worktree_path)
|
|
if rc != 0:
|
|
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
|
|
return {"pr": pr_number, "skipped": True, "reason": "git_add_failed"}
|
|
|
|
commit_msg = (
|
|
f"auto-fix: strip {total_fixed} broken wiki links\n\n"
|
|
f"Pipeline auto-fixer: removed [[ ]] brackets from links\n"
|
|
f"that don't resolve to existing claims in the knowledge base."
|
|
)
|
|
rc, out = await _git("commit", "-m", commit_msg, cwd=worktree_path)
|
|
if rc != 0:
|
|
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
|
|
return {"pr": pr_number, "skipped": True, "reason": "commit_failed"}
|
|
|
|
# Reset eval state BEFORE push — if daemon crashes between push and
|
|
# reset, the PR would be permanently stuck at max eval_attempts.
|
|
# Reset-first: worst case is one wasted eval cycle on old content.
|
|
conn.execute(
|
|
"""UPDATE prs SET
|
|
status = 'open',
|
|
eval_attempts = 0,
|
|
eval_issues = '[]',
|
|
tier0_pass = NULL,
|
|
domain_verdict = 'pending',
|
|
leo_verdict = 'pending',
|
|
last_error = NULL
|
|
WHERE number = ?""",
|
|
(pr_number,),
|
|
)
|
|
|
|
rc, out = await _git("push", "origin", branch, cwd=worktree_path, timeout=30)
|
|
if rc != 0:
|
|
logger.error("PR #%d: push failed: %s", pr_number, out)
|
|
# Eval state already reset — PR will re-evaluate old content,
|
|
# find same issues, and fixer will retry next cycle. No harm.
|
|
return {"pr": pr_number, "skipped": True, "reason": "push_failed"}
|
|
|
|
db.audit(
|
|
conn,
|
|
"fixer",
|
|
"wiki_links_fixed",
|
|
json.dumps({"pr": pr_number, "links_fixed": total_fixed}),
|
|
)
|
|
logger.info("PR #%d: fixed %d broken wiki links, reset for re-evaluation", pr_number, total_fixed)
|
|
|
|
return {"pr": pr_number, "fixed": True, "links_fixed": total_fixed}
|
|
|
|
finally:
|
|
# Always cleanup worktree
|
|
await _git("worktree", "remove", "--force", worktree_path)
|
|
|
|
|
|
# ─── Stage entry point ───────────────────────────────────────────────────
|
|
|
|
|
|
async def fix_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|
"""Run one fix cycle. Returns (fixed, errors).
|
|
|
|
Finds PRs with broken_wiki_links issues (from eval or tier0) that
|
|
haven't exceeded fix_attempts cap. Processes up to 5 per cycle
|
|
to avoid overlapping with eval.
|
|
"""
|
|
# Garbage collection: close PRs with exhausted fix budget that are stuck in open.
|
|
# These were evaluated, rejected, fixer couldn't help, nobody closes them.
|
|
# (Epimetheus session 2 — prevents zombie PR accumulation)
|
|
_gc = conn.execute(
|
|
"""UPDATE prs SET status = 'closed', last_error = 'fix budget exhausted — auto-closed'
|
|
WHERE status = 'open'
|
|
AND fix_attempts >= ?
|
|
AND (domain_verdict = 'request_changes' OR leo_verdict = 'request_changes')""",
|
|
(config.MAX_FIX_ATTEMPTS + 2,), # GC threshold = mechanical + substantive budget
|
|
)
|
|
if _gc.rowcount > 0:
|
|
logger.info("GC: closed %d exhausted PRs", _gc.rowcount)
|
|
|
|
batch_limit = min(max_workers or config.MAX_FIX_PER_CYCLE, config.MAX_FIX_PER_CYCLE)
|
|
|
|
# Only fix PRs that passed tier0 but have broken_wiki_links from eval.
|
|
# Do NOT fix PRs with tier0_pass=0 where the only issue is wiki links —
|
|
# wiki links are warnings, not gates. Fixing them creates an infinite
|
|
# fixer→validate→fixer loop. (Epimetheus session 2 — root cause of overnight stall)
|
|
rows = conn.execute(
|
|
"""SELECT number FROM prs
|
|
WHERE status = 'open'
|
|
AND tier0_pass = 1
|
|
AND eval_issues LIKE '%broken_wiki_links%'
|
|
AND COALESCE(fix_attempts, 0) < ?
|
|
AND (last_attempt IS NULL OR last_attempt < datetime('now', '-5 minutes'))
|
|
ORDER BY created_at ASC
|
|
LIMIT ?""",
|
|
(config.MAX_FIX_ATTEMPTS, batch_limit),
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
return 0, 0
|
|
|
|
fixed = 0
|
|
errors = 0
|
|
|
|
for row in rows:
|
|
try:
|
|
result = await _fix_wiki_links_in_pr(conn, row["number"])
|
|
if result.get("fixed"):
|
|
fixed += 1
|
|
elif result.get("skipped"):
|
|
logger.debug("PR #%d fix skipped: %s", row["number"], result.get("reason"))
|
|
except Exception:
|
|
logger.exception("Failed to fix PR #%d", row["number"])
|
|
errors += 1
|
|
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],))
|
|
|
|
if fixed or errors:
|
|
logger.info("Fix cycle: %d fixed, %d errors", fixed, errors)
|
|
|
|
return fixed, errors
|