fix: stop runaway re-extraction loop in extract.py
Some checks are pending
CI / lint-and-test (push) Waiting to run
Some checks are pending
CI / lint-and-test (push) Waiting to run
Three changes reduce extraction cost and duplicate PR flood: 1. 4-hour cooldown gate — skip sources with ANY PR (merged/closed/open) created in the last 4h. Prevents same source re-extracting every 60s while archive step lags behind merge. 2. DB-authoritative status — sources.status is now updated in the pipeline DB at each extraction terminal point (null_result, success). Queue scan checks DB first so sources with failed archives (e.g., root-owned worktree files blocking git pull --rebase) don't get re-extracted forever. Also moves archival into the extraction branch so it goes through PR merge instead of a fragile separate main-worktree push. 3. source_channel wiring — extract.py PR INSERT now sets source_channel from classify_source_channel(branch). Previously daemon-created PRs had NULL source_channel, breaking Argus dashboard filters. Combined with Ship's in-branch archive refactor. Root incident: blockworks-metadao-strategic-reset.md extracted 31 times in 12 hours. Nine other sources hit 10-22 extractions each. Near-duplicate rejection rate jumped to 94%. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
8de28d6ee0
commit
469cb7f2da
1 changed files with 130 additions and 8 deletions
138
lib/extract.py
138
lib/extract.py
|
|
@ -33,6 +33,7 @@ from pathlib import Path
|
|||
|
||||
from . import config
|
||||
from .costs import record_usage
|
||||
from .db import classify_source_channel
|
||||
from .domains import agent_for_domain
|
||||
from .extraction_prompt import build_extraction_prompt
|
||||
from .forgejo import api as forgejo_api
|
||||
|
|
@ -492,6 +493,17 @@ async def _extract_one_source(
|
|||
|
||||
if not claim_files and not entity_files and not enrichments:
|
||||
logger.info("No valid claims/entities/enrichments after validation for %s — archiving as null-result", source_file)
|
||||
# Mark DB as null_result so queue scan won't re-extract even if file stays in queue
|
||||
# (the main-worktree push in _archive_source frequently fails — DB is authoritative).
|
||||
try:
|
||||
conn.execute(
|
||||
"""INSERT INTO sources (path, status, updated_at) VALUES (?, 'null_result', datetime('now'))
|
||||
ON CONFLICT(path) DO UPDATE SET status='null_result', updated_at=datetime('now')""",
|
||||
(source_path,),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
logger.debug("Failed to mark source as null_result in DB", exc_info=True)
|
||||
await _archive_source(source_path, domain, "null-result")
|
||||
return 0, 0
|
||||
|
||||
|
|
@ -578,6 +590,22 @@ async def _extract_one_source(
|
|||
except Exception:
|
||||
logger.warning("Extract-connect failed (non-fatal)", exc_info=True)
|
||||
|
||||
# Archive the source WITHIN the extract branch (not via separate push on main).
|
||||
# Prevents the runaway-extraction race: when archive-to-main push fails (non-FF,
|
||||
# non-pushable worktree state), file returns to queue and gets re-extracted every
|
||||
# cycle. Moving the archive into the extract branch makes it atomic with the PR
|
||||
# merge — when the PR merges, the source is archived automatically.
|
||||
try:
|
||||
archive_rel = _archive_source_in_worktree(
|
||||
worktree, source_path, domain, "processed", agent_lower, extract_model,
|
||||
)
|
||||
if archive_rel:
|
||||
files_written.append(archive_rel["new"])
|
||||
# The queue file was deleted; git add handles the removal
|
||||
await _git("add", "inbox/queue/", cwd=str(EXTRACT_WORKTREE))
|
||||
except Exception:
|
||||
logger.exception("In-branch archive failed for %s (continuing)", source_file)
|
||||
|
||||
# Stage and commit
|
||||
for f in files_written:
|
||||
await _git("add", f, cwd=str(EXTRACT_WORKTREE))
|
||||
|
|
@ -662,15 +690,17 @@ async def _extract_one_source(
|
|||
|
||||
# Upsert: if discover_external_prs already created the row, update it;
|
||||
# if not, create a partial row that discover will complete.
|
||||
source_channel = classify_source_channel(branch)
|
||||
try:
|
||||
conn.execute(
|
||||
"""INSERT INTO prs (number, branch, status, submitted_by, source_path, description)
|
||||
VALUES (?, ?, 'open', ?, ?, ?)
|
||||
"""INSERT INTO prs (number, branch, status, submitted_by, source_path, description, source_channel)
|
||||
VALUES (?, ?, 'open', ?, ?, ?, ?)
|
||||
ON CONFLICT(number) DO UPDATE SET
|
||||
submitted_by = excluded.submitted_by,
|
||||
source_path = excluded.source_path,
|
||||
description = COALESCE(excluded.description, prs.description)""",
|
||||
(pr_num, branch, contributor, source_path, claim_titles),
|
||||
description = COALESCE(excluded.description, prs.description),
|
||||
source_channel = COALESCE(prs.source_channel, excluded.source_channel)""",
|
||||
(pr_num, branch, contributor, source_path, claim_titles, source_channel),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
|
|
@ -691,12 +721,69 @@ async def _extract_one_source(
|
|||
# Clean up extract worktree
|
||||
await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE))
|
||||
|
||||
# 10. Archive source on main
|
||||
await _archive_source(source_path, domain, "processed", agent_lower)
|
||||
# Note: source archival happened in-branch before commit (see _archive_source_in_worktree).
|
||||
# Do NOT call _archive_source() here — the broken main-worktree-push path caused the
|
||||
# runaway extraction bug. Archive is now atomic with PR merge.
|
||||
|
||||
return 1, 0
|
||||
|
||||
|
||||
def _archive_source_in_worktree(
|
||||
worktree: Path,
|
||||
source_path: str,
|
||||
domain: str,
|
||||
status: str,
|
||||
agent: str | None,
|
||||
extraction_model: str,
|
||||
) -> dict | None:
|
||||
"""Move source file from inbox/queue/ to inbox/archive/<domain>/ WITHIN extract worktree.
|
||||
|
||||
Updates frontmatter (status, processed_by, processed_date, extraction_model) and
|
||||
returns {"old": old_rel_path, "new": new_rel_path} or None if not found.
|
||||
|
||||
The caller commits this change as part of the extract branch, so the archive lands
|
||||
atomically with the PR merge — no separate push on main required.
|
||||
"""
|
||||
queue_path = worktree / source_path
|
||||
if not queue_path.exists():
|
||||
logger.warning("Source %s not found in worktree queue — skipping in-branch archive", source_path)
|
||||
return None
|
||||
|
||||
if status == "null-result":
|
||||
dest_dir = worktree / "inbox" / "null-result"
|
||||
else:
|
||||
dest_dir = worktree / "inbox" / "archive" / (domain or "unknown")
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
dest_path = dest_dir / queue_path.name
|
||||
|
||||
content = queue_path.read_text(encoding="utf-8")
|
||||
today = date.today().isoformat()
|
||||
content = re.sub(r"^status: unprocessed", f"status: {status}", content, flags=re.MULTILINE)
|
||||
if agent and "processed_by:" not in content:
|
||||
content = re.sub(
|
||||
r"(^status: \w+)",
|
||||
rf"\1\nprocessed_by: {agent}\nprocessed_date: {today}",
|
||||
content,
|
||||
count=1,
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
if "extraction_model:" not in content:
|
||||
content = re.sub(
|
||||
r"(^status: \w+.*?)(\n---)",
|
||||
rf'\1\nextraction_model: "{extraction_model}"\2',
|
||||
content,
|
||||
count=1,
|
||||
flags=re.MULTILINE | re.DOTALL,
|
||||
)
|
||||
|
||||
dest_path.write_text(content, encoding="utf-8")
|
||||
queue_path.unlink()
|
||||
|
||||
old_rel = str(queue_path.relative_to(worktree))
|
||||
new_rel = str(dest_path.relative_to(worktree))
|
||||
return {"old": old_rel, "new": new_rel}
|
||||
|
||||
|
||||
async def _archive_source(
|
||||
source_path: str,
|
||||
domain: str,
|
||||
|
|
@ -788,13 +875,26 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
if not queue_dir.exists():
|
||||
return 0, 0
|
||||
|
||||
# DB-authoritative status filter: exclude sources where DB records non-unprocessed state.
|
||||
# File frontmatter alone isn't reliable — archive pushes can fail, leaving stale file state.
|
||||
# The sources table is the authoritative record of whether a source has been processed.
|
||||
db_non_unprocessed = {
|
||||
r["path"] for r in conn.execute(
|
||||
"SELECT path FROM sources WHERE status != 'unprocessed'"
|
||||
).fetchall()
|
||||
}
|
||||
|
||||
unprocessed = []
|
||||
for f in sorted(queue_dir.glob("*.md")):
|
||||
try:
|
||||
content = f.read_text(encoding="utf-8")
|
||||
fm = _parse_source_frontmatter(content)
|
||||
if fm.get("status") == "unprocessed":
|
||||
unprocessed.append((str(f.relative_to(main)), content, fm))
|
||||
if fm.get("status") != "unprocessed":
|
||||
continue
|
||||
rel_path = str(f.relative_to(main))
|
||||
if rel_path in db_non_unprocessed:
|
||||
continue
|
||||
unprocessed.append((rel_path, content, fm))
|
||||
except Exception:
|
||||
logger.debug("Failed to read source %s", f, exc_info=True)
|
||||
|
||||
|
|
@ -831,6 +931,28 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
if skipped:
|
||||
logger.info("Skipped %d source(s) with existing open PRs", skipped)
|
||||
|
||||
# Emergency cooldown: skip sources with ANY PR (merged/closed/open) in last 4h.
|
||||
# Prevents runaway re-extraction when archive step lags behind merge.
|
||||
# Root cause: source file remains in inbox/queue/ after PR merges until
|
||||
# _archive_source moves it — next daemon tick extracts the same source again.
|
||||
if unprocessed:
|
||||
recent_source_paths = {
|
||||
r["source_path"] for r in conn.execute(
|
||||
"""SELECT DISTINCT source_path FROM prs
|
||||
WHERE source_path IS NOT NULL
|
||||
AND created_at > datetime('now', '-4 hours')"""
|
||||
).fetchall() if r["source_path"]
|
||||
}
|
||||
if recent_source_paths:
|
||||
before = len(unprocessed)
|
||||
unprocessed = [
|
||||
(sp, c, f) for sp, c, f in unprocessed
|
||||
if sp not in recent_source_paths
|
||||
]
|
||||
cooled = before - len(unprocessed)
|
||||
if cooled:
|
||||
logger.info("Cooldown: skipped %d source(s) with PRs in last 4h", cooled)
|
||||
|
||||
# ── Check for re-extraction sources (must run even when queue is empty) ──
|
||||
reextract_rows = conn.execute(
|
||||
"""SELECT path, feedback FROM sources
|
||||
|
|
|
|||
Loading…
Reference in a new issue