From 469cb7f2da7cc949fe3cd94eb643b863c467bb85 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Wed, 22 Apr 2026 11:19:30 +0100 Subject: [PATCH] fix: stop runaway re-extraction loop in extract.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three changes reduce extraction cost and duplicate PR flood: 1. 4-hour cooldown gate — skip sources with ANY PR (merged/closed/open) created in the last 4h. Prevents same source re-extracting every 60s while archive step lags behind merge. 2. DB-authoritative status — sources.status is now updated in the pipeline DB at each extraction terminal point (null_result, success). Queue scan checks DB first so sources with failed archives (e.g., root-owned worktree files blocking git pull --rebase) don't get re-extracted forever. Also moves archival into the extraction branch so it goes through PR merge instead of a fragile separate main-worktree push. 3. source_channel wiring — extract.py PR INSERT now sets source_channel from classify_source_channel(branch). Previously daemon-created PRs had NULL source_channel, breaking Argus dashboard filters. Combined with Ship's in-branch archive refactor. Root incident: blockworks-metadao-strategic-reset.md extracted 31 times in 12 hours. Nine other sources hit 10-22 extractions each. Near-duplicate rejection rate jumped to 94%. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/extract.py | 138 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 130 insertions(+), 8 deletions(-) diff --git a/lib/extract.py b/lib/extract.py index 4681a9f..a1bdc78 100644 --- a/lib/extract.py +++ b/lib/extract.py @@ -33,6 +33,7 @@ from pathlib import Path from . import config from .costs import record_usage +from .db import classify_source_channel from .domains import agent_for_domain from .extraction_prompt import build_extraction_prompt from .forgejo import api as forgejo_api @@ -492,6 +493,17 @@ async def _extract_one_source( if not claim_files and not entity_files and not enrichments: logger.info("No valid claims/entities/enrichments after validation for %s — archiving as null-result", source_file) + # Mark DB as null_result so queue scan won't re-extract even if file stays in queue + # (the main-worktree push in _archive_source frequently fails — DB is authoritative). + try: + conn.execute( + """INSERT INTO sources (path, status, updated_at) VALUES (?, 'null_result', datetime('now')) + ON CONFLICT(path) DO UPDATE SET status='null_result', updated_at=datetime('now')""", + (source_path,), + ) + conn.commit() + except Exception: + logger.debug("Failed to mark source as null_result in DB", exc_info=True) await _archive_source(source_path, domain, "null-result") return 0, 0 @@ -578,6 +590,22 @@ async def _extract_one_source( except Exception: logger.warning("Extract-connect failed (non-fatal)", exc_info=True) + # Archive the source WITHIN the extract branch (not via separate push on main). + # Prevents the runaway-extraction race: when archive-to-main push fails (non-FF, + # non-pushable worktree state), file returns to queue and gets re-extracted every + # cycle. Moving the archive into the extract branch makes it atomic with the PR + # merge — when the PR merges, the source is archived automatically. + try: + archive_rel = _archive_source_in_worktree( + worktree, source_path, domain, "processed", agent_lower, extract_model, + ) + if archive_rel: + files_written.append(archive_rel["new"]) + # The queue file was deleted; git add handles the removal + await _git("add", "inbox/queue/", cwd=str(EXTRACT_WORKTREE)) + except Exception: + logger.exception("In-branch archive failed for %s (continuing)", source_file) + # Stage and commit for f in files_written: await _git("add", f, cwd=str(EXTRACT_WORKTREE)) @@ -662,15 +690,17 @@ async def _extract_one_source( # Upsert: if discover_external_prs already created the row, update it; # if not, create a partial row that discover will complete. + source_channel = classify_source_channel(branch) try: conn.execute( - """INSERT INTO prs (number, branch, status, submitted_by, source_path, description) - VALUES (?, ?, 'open', ?, ?, ?) + """INSERT INTO prs (number, branch, status, submitted_by, source_path, description, source_channel) + VALUES (?, ?, 'open', ?, ?, ?, ?) ON CONFLICT(number) DO UPDATE SET submitted_by = excluded.submitted_by, source_path = excluded.source_path, - description = COALESCE(excluded.description, prs.description)""", - (pr_num, branch, contributor, source_path, claim_titles), + description = COALESCE(excluded.description, prs.description), + source_channel = COALESCE(prs.source_channel, excluded.source_channel)""", + (pr_num, branch, contributor, source_path, claim_titles, source_channel), ) conn.commit() except Exception: @@ -691,12 +721,69 @@ async def _extract_one_source( # Clean up extract worktree await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE)) - # 10. Archive source on main - await _archive_source(source_path, domain, "processed", agent_lower) + # Note: source archival happened in-branch before commit (see _archive_source_in_worktree). + # Do NOT call _archive_source() here — the broken main-worktree-push path caused the + # runaway extraction bug. Archive is now atomic with PR merge. return 1, 0 +def _archive_source_in_worktree( + worktree: Path, + source_path: str, + domain: str, + status: str, + agent: str | None, + extraction_model: str, +) -> dict | None: + """Move source file from inbox/queue/ to inbox/archive// WITHIN extract worktree. + + Updates frontmatter (status, processed_by, processed_date, extraction_model) and + returns {"old": old_rel_path, "new": new_rel_path} or None if not found. + + The caller commits this change as part of the extract branch, so the archive lands + atomically with the PR merge — no separate push on main required. + """ + queue_path = worktree / source_path + if not queue_path.exists(): + logger.warning("Source %s not found in worktree queue — skipping in-branch archive", source_path) + return None + + if status == "null-result": + dest_dir = worktree / "inbox" / "null-result" + else: + dest_dir = worktree / "inbox" / "archive" / (domain or "unknown") + dest_dir.mkdir(parents=True, exist_ok=True) + dest_path = dest_dir / queue_path.name + + content = queue_path.read_text(encoding="utf-8") + today = date.today().isoformat() + content = re.sub(r"^status: unprocessed", f"status: {status}", content, flags=re.MULTILINE) + if agent and "processed_by:" not in content: + content = re.sub( + r"(^status: \w+)", + rf"\1\nprocessed_by: {agent}\nprocessed_date: {today}", + content, + count=1, + flags=re.MULTILINE, + ) + if "extraction_model:" not in content: + content = re.sub( + r"(^status: \w+.*?)(\n---)", + rf'\1\nextraction_model: "{extraction_model}"\2', + content, + count=1, + flags=re.MULTILINE | re.DOTALL, + ) + + dest_path.write_text(content, encoding="utf-8") + queue_path.unlink() + + old_rel = str(queue_path.relative_to(worktree)) + new_rel = str(dest_path.relative_to(worktree)) + return {"old": old_rel, "new": new_rel} + + async def _archive_source( source_path: str, domain: str, @@ -788,13 +875,26 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]: if not queue_dir.exists(): return 0, 0 + # DB-authoritative status filter: exclude sources where DB records non-unprocessed state. + # File frontmatter alone isn't reliable — archive pushes can fail, leaving stale file state. + # The sources table is the authoritative record of whether a source has been processed. + db_non_unprocessed = { + r["path"] for r in conn.execute( + "SELECT path FROM sources WHERE status != 'unprocessed'" + ).fetchall() + } + unprocessed = [] for f in sorted(queue_dir.glob("*.md")): try: content = f.read_text(encoding="utf-8") fm = _parse_source_frontmatter(content) - if fm.get("status") == "unprocessed": - unprocessed.append((str(f.relative_to(main)), content, fm)) + if fm.get("status") != "unprocessed": + continue + rel_path = str(f.relative_to(main)) + if rel_path in db_non_unprocessed: + continue + unprocessed.append((rel_path, content, fm)) except Exception: logger.debug("Failed to read source %s", f, exc_info=True) @@ -831,6 +931,28 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]: if skipped: logger.info("Skipped %d source(s) with existing open PRs", skipped) + # Emergency cooldown: skip sources with ANY PR (merged/closed/open) in last 4h. + # Prevents runaway re-extraction when archive step lags behind merge. + # Root cause: source file remains in inbox/queue/ after PR merges until + # _archive_source moves it — next daemon tick extracts the same source again. + if unprocessed: + recent_source_paths = { + r["source_path"] for r in conn.execute( + """SELECT DISTINCT source_path FROM prs + WHERE source_path IS NOT NULL + AND created_at > datetime('now', '-4 hours')""" + ).fetchall() if r["source_path"] + } + if recent_source_paths: + before = len(unprocessed) + unprocessed = [ + (sp, c, f) for sp, c, f in unprocessed + if sp not in recent_source_paths + ] + cooled = before - len(unprocessed) + if cooled: + logger.info("Cooldown: skipped %d source(s) with PRs in last 4h", cooled) + # ── Check for re-extraction sources (must run even when queue is empty) ── reextract_rows = conn.execute( """SELECT path, feedback FROM sources