diff --git a/lib/config.py b/lib/config.py index d5bbf7f..9f1b401 100644 --- a/lib/config.py +++ b/lib/config.py @@ -200,6 +200,9 @@ MERGE_INTERVAL = 30 FIX_INTERVAL = 60 HEALTH_CHECK_INTERVAL = 60 +# --- Extraction gates --- +EXTRACTION_COOLDOWN_HOURS = 4 # Skip sources with any PR activity in this window. Defense-in-depth for DB-status filter. + # --- Retrieval (Telegram bot) --- RETRIEVAL_RRF_K = 20 # RRF smoothing constant — tuned for 5-10 results per source RETRIEVAL_ENTITY_BOOST = 1.5 # RRF score multiplier for claims wiki-linked from matched entities diff --git a/lib/extract.py b/lib/extract.py index a1bdc78..a1e017f 100644 --- a/lib/extract.py +++ b/lib/extract.py @@ -572,6 +572,18 @@ async def _extract_one_source( if not files_written: logger.info("No files written for %s — cleaning up", source_file) + # Path B null-result: enrichments existed but all targets missing in worktree. + # No PR, no cooldown match — without DB update this re-extracts every 60s. + # (Ganymede review, commit 469cb7f follow-up.) + try: + conn.execute( + """INSERT INTO sources (path, status, updated_at) VALUES (?, 'null_result', datetime('now')) + ON CONFLICT(path) DO UPDATE SET status='null_result', updated_at=datetime('now')""", + (source_path,), + ) + conn.commit() + except Exception: + logger.debug("Failed to mark source as null_result (path B)", exc_info=True) await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE)) await _git("branch", "-D", branch, cwd=str(EXTRACT_WORKTREE)) await _archive_source(source_path, domain, "null-result") @@ -688,6 +700,19 @@ async def _extract_one_source( for c in claims_raw if c.get("title") or c.get("filename") ) + # Success path: mark source as 'extracting' so queue scan's DB-status filter + # skips it between PR creation and merge. Without this, cooldown is load-bearing + # (Ganymede review, commit 469cb7f follow-up). + try: + conn.execute( + """INSERT INTO sources (path, status, updated_at) VALUES (?, 'extracting', datetime('now')) + ON CONFLICT(path) DO UPDATE SET status='extracting', updated_at=datetime('now')""", + (source_path,), + ) + conn.commit() + except Exception: + logger.debug("Failed to mark source as extracting", exc_info=True) + # Upsert: if discover_external_prs already created the row, update it; # if not, create a partial row that discover will complete. source_channel = classify_source_channel(branch) @@ -931,16 +956,17 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]: if skipped: logger.info("Skipped %d source(s) with existing open PRs", skipped) - # Emergency cooldown: skip sources with ANY PR (merged/closed/open) in last 4h. - # Prevents runaway re-extraction when archive step lags behind merge. - # Root cause: source file remains in inbox/queue/ after PR merges until - # _archive_source moves it — next daemon tick extracts the same source again. + # Cooldown: skip sources with ANY PR in last EXTRACTION_COOLDOWN_HOURS. + # Defense-in-depth for DB-status filter — catches the window between PR + # creation and DB status update if anything races. if unprocessed: + cooldown_hours = config.EXTRACTION_COOLDOWN_HOURS recent_source_paths = { r["source_path"] for r in conn.execute( """SELECT DISTINCT source_path FROM prs WHERE source_path IS NOT NULL - AND created_at > datetime('now', '-4 hours')""" + AND created_at > datetime('now', ? || ' hours')""", + (f"-{cooldown_hours}",), ).fetchall() if r["source_path"] } if recent_source_paths: @@ -951,7 +977,7 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]: ] cooled = before - len(unprocessed) if cooled: - logger.info("Cooldown: skipped %d source(s) with PRs in last 4h", cooled) + logger.info("Cooldown: skipped %d source(s) with PRs in last %dh", cooled, cooldown_hours) # ── Check for re-extraction sources (must run even when queue is empty) ── reextract_rows = conn.execute(