fix: stop runaway re-extraction loop in extract.py
Some checks are pending
CI / lint-and-test (push) Waiting to run

Three changes reduce extraction cost and duplicate PR flood:

1. 4-hour cooldown gate — skip sources with ANY PR (merged/closed/open)
   created in the last 4h. Prevents same source re-extracting every 60s
   while archive step lags behind merge.

2. DB-authoritative status — sources.status is now updated in the pipeline DB
   at each extraction terminal point (null_result, success). Queue scan checks
   DB first so sources with failed archives (e.g., root-owned worktree files
   blocking git pull --rebase) don't get re-extracted forever. Also moves
   archival into the extraction branch so it goes through PR merge instead
   of a fragile separate main-worktree push.

3. source_channel wiring — extract.py PR INSERT now sets source_channel from
   classify_source_channel(branch). Previously daemon-created PRs had NULL
   source_channel, breaking Argus dashboard filters. Combined with Ship's
   in-branch archive refactor.

Root incident: blockworks-metadao-strategic-reset.md extracted 31 times in
12 hours. Nine other sources hit 10-22 extractions each. Near-duplicate
rejection rate jumped to 94%.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-04-22 11:19:30 +01:00
parent 8de28d6ee0
commit 469cb7f2da

View file

@ -33,6 +33,7 @@ from pathlib import Path
from . import config
from .costs import record_usage
from .db import classify_source_channel
from .domains import agent_for_domain
from .extraction_prompt import build_extraction_prompt
from .forgejo import api as forgejo_api
@ -492,6 +493,17 @@ async def _extract_one_source(
if not claim_files and not entity_files and not enrichments:
logger.info("No valid claims/entities/enrichments after validation for %s — archiving as null-result", source_file)
# Mark DB as null_result so queue scan won't re-extract even if file stays in queue
# (the main-worktree push in _archive_source frequently fails — DB is authoritative).
try:
conn.execute(
"""INSERT INTO sources (path, status, updated_at) VALUES (?, 'null_result', datetime('now'))
ON CONFLICT(path) DO UPDATE SET status='null_result', updated_at=datetime('now')""",
(source_path,),
)
conn.commit()
except Exception:
logger.debug("Failed to mark source as null_result in DB", exc_info=True)
await _archive_source(source_path, domain, "null-result")
return 0, 0
@ -578,6 +590,22 @@ async def _extract_one_source(
except Exception:
logger.warning("Extract-connect failed (non-fatal)", exc_info=True)
# Archive the source WITHIN the extract branch (not via separate push on main).
# Prevents the runaway-extraction race: when archive-to-main push fails (non-FF,
# non-pushable worktree state), file returns to queue and gets re-extracted every
# cycle. Moving the archive into the extract branch makes it atomic with the PR
# merge — when the PR merges, the source is archived automatically.
try:
archive_rel = _archive_source_in_worktree(
worktree, source_path, domain, "processed", agent_lower, extract_model,
)
if archive_rel:
files_written.append(archive_rel["new"])
# The queue file was deleted; git add handles the removal
await _git("add", "inbox/queue/", cwd=str(EXTRACT_WORKTREE))
except Exception:
logger.exception("In-branch archive failed for %s (continuing)", source_file)
# Stage and commit
for f in files_written:
await _git("add", f, cwd=str(EXTRACT_WORKTREE))
@ -662,15 +690,17 @@ async def _extract_one_source(
# Upsert: if discover_external_prs already created the row, update it;
# if not, create a partial row that discover will complete.
source_channel = classify_source_channel(branch)
try:
conn.execute(
"""INSERT INTO prs (number, branch, status, submitted_by, source_path, description)
VALUES (?, ?, 'open', ?, ?, ?)
"""INSERT INTO prs (number, branch, status, submitted_by, source_path, description, source_channel)
VALUES (?, ?, 'open', ?, ?, ?, ?)
ON CONFLICT(number) DO UPDATE SET
submitted_by = excluded.submitted_by,
source_path = excluded.source_path,
description = COALESCE(excluded.description, prs.description)""",
(pr_num, branch, contributor, source_path, claim_titles),
description = COALESCE(excluded.description, prs.description),
source_channel = COALESCE(prs.source_channel, excluded.source_channel)""",
(pr_num, branch, contributor, source_path, claim_titles, source_channel),
)
conn.commit()
except Exception:
@ -691,12 +721,69 @@ async def _extract_one_source(
# Clean up extract worktree
await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE))
# 10. Archive source on main
await _archive_source(source_path, domain, "processed", agent_lower)
# Note: source archival happened in-branch before commit (see _archive_source_in_worktree).
# Do NOT call _archive_source() here — the broken main-worktree-push path caused the
# runaway extraction bug. Archive is now atomic with PR merge.
return 1, 0
def _archive_source_in_worktree(
worktree: Path,
source_path: str,
domain: str,
status: str,
agent: str | None,
extraction_model: str,
) -> dict | None:
"""Move source file from inbox/queue/ to inbox/archive/<domain>/ WITHIN extract worktree.
Updates frontmatter (status, processed_by, processed_date, extraction_model) and
returns {"old": old_rel_path, "new": new_rel_path} or None if not found.
The caller commits this change as part of the extract branch, so the archive lands
atomically with the PR merge no separate push on main required.
"""
queue_path = worktree / source_path
if not queue_path.exists():
logger.warning("Source %s not found in worktree queue — skipping in-branch archive", source_path)
return None
if status == "null-result":
dest_dir = worktree / "inbox" / "null-result"
else:
dest_dir = worktree / "inbox" / "archive" / (domain or "unknown")
dest_dir.mkdir(parents=True, exist_ok=True)
dest_path = dest_dir / queue_path.name
content = queue_path.read_text(encoding="utf-8")
today = date.today().isoformat()
content = re.sub(r"^status: unprocessed", f"status: {status}", content, flags=re.MULTILINE)
if agent and "processed_by:" not in content:
content = re.sub(
r"(^status: \w+)",
rf"\1\nprocessed_by: {agent}\nprocessed_date: {today}",
content,
count=1,
flags=re.MULTILINE,
)
if "extraction_model:" not in content:
content = re.sub(
r"(^status: \w+.*?)(\n---)",
rf'\1\nextraction_model: "{extraction_model}"\2',
content,
count=1,
flags=re.MULTILINE | re.DOTALL,
)
dest_path.write_text(content, encoding="utf-8")
queue_path.unlink()
old_rel = str(queue_path.relative_to(worktree))
new_rel = str(dest_path.relative_to(worktree))
return {"old": old_rel, "new": new_rel}
async def _archive_source(
source_path: str,
domain: str,
@ -788,13 +875,26 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
if not queue_dir.exists():
return 0, 0
# DB-authoritative status filter: exclude sources where DB records non-unprocessed state.
# File frontmatter alone isn't reliable — archive pushes can fail, leaving stale file state.
# The sources table is the authoritative record of whether a source has been processed.
db_non_unprocessed = {
r["path"] for r in conn.execute(
"SELECT path FROM sources WHERE status != 'unprocessed'"
).fetchall()
}
unprocessed = []
for f in sorted(queue_dir.glob("*.md")):
try:
content = f.read_text(encoding="utf-8")
fm = _parse_source_frontmatter(content)
if fm.get("status") == "unprocessed":
unprocessed.append((str(f.relative_to(main)), content, fm))
if fm.get("status") != "unprocessed":
continue
rel_path = str(f.relative_to(main))
if rel_path in db_non_unprocessed:
continue
unprocessed.append((rel_path, content, fm))
except Exception:
logger.debug("Failed to read source %s", f, exc_info=True)
@ -831,6 +931,28 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
if skipped:
logger.info("Skipped %d source(s) with existing open PRs", skipped)
# Emergency cooldown: skip sources with ANY PR (merged/closed/open) in last 4h.
# Prevents runaway re-extraction when archive step lags behind merge.
# Root cause: source file remains in inbox/queue/ after PR merges until
# _archive_source moves it — next daemon tick extracts the same source again.
if unprocessed:
recent_source_paths = {
r["source_path"] for r in conn.execute(
"""SELECT DISTINCT source_path FROM prs
WHERE source_path IS NOT NULL
AND created_at > datetime('now', '-4 hours')"""
).fetchall() if r["source_path"]
}
if recent_source_paths:
before = len(unprocessed)
unprocessed = [
(sp, c, f) for sp, c, f in unprocessed
if sp not in recent_source_paths
]
cooled = before - len(unprocessed)
if cooled:
logger.info("Cooldown: skipped %d source(s) with PRs in last 4h", cooled)
# ── Check for re-extraction sources (must run even when queue is empty) ──
reextract_rows = conn.execute(
"""SELECT path, feedback FROM sources