fix(extract): dedup queue sources whose basename is already in archive
Daemon re-extracted same source every ~4h cycle when research-session commits on agent branches re-introduced already-archived queue files. Existing daemon filters (DB-status, open-PR, 4h cooldown) all missed this pattern because the queue file gets a fresh sources row at status='unprocessed' on each re-add, the cooldown lapses exactly at the cycle interval, and the open-PR filter only catches in-flight extractions. Add an archive-basename filter immediately after the queue scan: if a file with this basename exists anywhere under inbox/archive/, skip. Archive copy is the source of truth — once extracted, the queue copy is stale by definition. Validation against pipeline.db (last 7d): 78 sources had multiple extract PRs (32% duplicate rate) 73/78 (94%) carry an archive copy and would have been caught. Current queue: 35/99 sources (35%) have archive duplicates today. Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887>
This commit is contained in:
parent
ed5f7ef6cc
commit
ed4af4d72e
1 changed files with 23 additions and 0 deletions
|
|
@ -923,6 +923,29 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
except Exception:
|
||||
logger.debug("Failed to read source %s", f, exc_info=True)
|
||||
|
||||
# Archive-basename filter: skip queue files whose basename already exists in
|
||||
# inbox/archive/. Research-session commits on agent branches occasionally
|
||||
# re-introduce already-archived queue files when the branch is re-merged,
|
||||
# producing same-source re-extractions every cooldown cycle. The archive
|
||||
# copy is the source of truth — if a file with this basename is in archive,
|
||||
# the source is processed regardless of queue state. Single archive scan
|
||||
# per cycle, cheap (~1k files).
|
||||
if unprocessed:
|
||||
archive_dir = main / "inbox" / "archive"
|
||||
archived_basenames: set[str] = set()
|
||||
if archive_dir.exists():
|
||||
for af in archive_dir.rglob("*.md"):
|
||||
archived_basenames.add(af.name)
|
||||
if archived_basenames:
|
||||
before = len(unprocessed)
|
||||
unprocessed = [
|
||||
(sp, c, f) for sp, c, f in unprocessed
|
||||
if Path(sp).name not in archived_basenames
|
||||
]
|
||||
skipped = before - len(unprocessed)
|
||||
if skipped:
|
||||
logger.info("Skipped %d queue source(s) — basename already in inbox/archive/", skipped)
|
||||
|
||||
# Don't early-return here — re-extraction sources may exist even when queue is empty
|
||||
# (the re-extraction check runs after open-PR filtering below)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue