From ed4af4d72eadb5724bf598ff4b3323c1b6e5b60e Mon Sep 17 00:00:00 2001 From: m3taversal Date: Thu, 30 Apr 2026 11:05:39 +0100 Subject: [PATCH] fix(extract): dedup queue sources whose basename is already in archive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Daemon re-extracted same source every ~4h cycle when research-session commits on agent branches re-introduced already-archived queue files. Existing daemon filters (DB-status, open-PR, 4h cooldown) all missed this pattern because the queue file gets a fresh sources row at status='unprocessed' on each re-add, the cooldown lapses exactly at the cycle interval, and the open-PR filter only catches in-flight extractions. Add an archive-basename filter immediately after the queue scan: if a file with this basename exists anywhere under inbox/archive/, skip. Archive copy is the source of truth — once extracted, the queue copy is stale by definition. Validation against pipeline.db (last 7d): 78 sources had multiple extract PRs (32% duplicate rate) 73/78 (94%) carry an archive copy and would have been caught. Current queue: 35/99 sources (35%) have archive duplicates today. Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887> --- lib/extract.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/lib/extract.py b/lib/extract.py index a1e017f..c73e29f 100644 --- a/lib/extract.py +++ b/lib/extract.py @@ -923,6 +923,29 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]: except Exception: logger.debug("Failed to read source %s", f, exc_info=True) + # Archive-basename filter: skip queue files whose basename already exists in + # inbox/archive/. Research-session commits on agent branches occasionally + # re-introduce already-archived queue files when the branch is re-merged, + # producing same-source re-extractions every cooldown cycle. The archive + # copy is the source of truth — if a file with this basename is in archive, + # the source is processed regardless of queue state. Single archive scan + # per cycle, cheap (~1k files). + if unprocessed: + archive_dir = main / "inbox" / "archive" + archived_basenames: set[str] = set() + if archive_dir.exists(): + for af in archive_dir.rglob("*.md"): + archived_basenames.add(af.name) + if archived_basenames: + before = len(unprocessed) + unprocessed = [ + (sp, c, f) for sp, c, f in unprocessed + if Path(sp).name not in archived_basenames + ] + skipped = before - len(unprocessed) + if skipped: + logger.info("Skipped %d queue source(s) — basename already in inbox/archive/", skipped) + # Don't early-return here — re-extraction sources may exist even when queue is empty # (the re-extraction check runs after open-PR filtering below)