fix(extract): dedup queue sources whose basename is already in archive

Daemon re-extracted same source every ~4h cycle when research-session
commits on agent branches re-introduced already-archived queue files.
Existing daemon filters (DB-status, open-PR, 4h cooldown) all missed
this pattern because the queue file gets a fresh sources row at
status='unprocessed' on each re-add, the cooldown lapses exactly at
the cycle interval, and the open-PR filter only catches in-flight
extractions.

Add an archive-basename filter immediately after the queue scan: if
a file with this basename exists anywhere under inbox/archive/, skip.
Archive copy is the source of truth — once extracted, the queue copy
is stale by definition.

Validation against pipeline.db (last 7d):
  78 sources had multiple extract PRs (32% duplicate rate)
  73/78 (94%) carry an archive copy and would have been caught.
  Current queue: 35/99 sources (35%) have archive duplicates today.

Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887>
This commit is contained in:
m3taversal 2026-04-30 11:05:39 +01:00
parent ed5f7ef6cc
commit ed4af4d72e

View file

@ -923,6 +923,29 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
except Exception:
logger.debug("Failed to read source %s", f, exc_info=True)
# Archive-basename filter: skip queue files whose basename already exists in
# inbox/archive/. Research-session commits on agent branches occasionally
# re-introduce already-archived queue files when the branch is re-merged,
# producing same-source re-extractions every cooldown cycle. The archive
# copy is the source of truth — if a file with this basename is in archive,
# the source is processed regardless of queue state. Single archive scan
# per cycle, cheap (~1k files).
if unprocessed:
archive_dir = main / "inbox" / "archive"
archived_basenames: set[str] = set()
if archive_dir.exists():
for af in archive_dir.rglob("*.md"):
archived_basenames.add(af.name)
if archived_basenames:
before = len(unprocessed)
unprocessed = [
(sp, c, f) for sp, c, f in unprocessed
if Path(sp).name not in archived_basenames
]
skipped = before - len(unprocessed)
if skipped:
logger.info("Skipped %d queue source(s) — basename already in inbox/archive/", skipped)
# Don't early-return here — re-extraction sources may exist even when queue is empty
# (the re-extraction check runs after open-PR filtering below)