teleo-infrastructure/scripts/backfill-sources.py
m3taversal a053a8ebf9
Some checks are pending
CI / lint-and-test (push) Waiting to run
fix(backfill): don't regress terminal source statuses to unprocessed
backfill-sources.py runs every 15 minutes and derives sources.status
purely from directory location. If a source file is in inbox/queue/,
it blindly overwrites the DB status to 'unprocessed' — even when the
DB already had 'extracted' or 'null_result'.

This is why the 43 zombies kept coming back after manual backfill:
cron re-reset them every 15 minutes, then each 4h cooldown expiry
re-triggered runaway extraction on the same source.

Fix: never regress from a terminal status (extracted, null_result,
error, ghost_no_file) to 'unprocessed'. File location is ambiguous
(legitimately new vs. zombie from failed archive); DB is authoritative.
Legitimate re-extraction still works — it goes through the needs_reextraction
path which is unaffected by this gate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 21:29:33 +01:00

147 lines
4.9 KiB
Python

#!/usr/bin/env python3
"""Backfill the sources table from filesystem.
Scans inbox/queue/, inbox/archive/{domain}/, inbox/null-result/
and registers every source file in the pipeline DB.
Reads frontmatter to determine status, domain, priority.
Skips files already in the DB (by path).
"""
import os
import re
import sqlite3
import sys
from pathlib import Path
REPO_DIR = Path("/opt/teleo-eval/workspaces/main")
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
def parse_frontmatter(path: Path) -> dict:
"""Extract key fields from YAML frontmatter."""
try:
text = path.read_text(errors="replace")
except Exception:
return {}
if not text.startswith("---"):
return {}
end = text.find("\n---", 3)
if end == -1:
return {}
fm = {}
for line in text[3:end].split("\n"):
line = line.strip()
if ":" in line:
key, _, val = line.partition(":")
key = key.strip()
val = val.strip().strip('"').strip("'")
if key in ("status", "domain", "priority", "claims_extracted"):
fm[key] = val
return fm
def map_dir_to_status(rel_path: str) -> str:
"""Map filesystem location to DB status."""
if rel_path.startswith("inbox/queue/"):
return "unprocessed"
elif rel_path.startswith("inbox/archive/"):
return "extracted"
elif rel_path.startswith("inbox/null-result/"):
return "null_result"
return "unprocessed"
def main():
conn = sqlite3.connect(DB_PATH, timeout=10)
conn.row_factory = sqlite3.Row
# Get existing paths
existing = set(r["path"] for r in conn.execute("SELECT path FROM sources").fetchall())
print(f"Existing in DB: {len(existing)}")
# Scan filesystem
dirs_to_scan = [
REPO_DIR / "inbox" / "queue",
REPO_DIR / "inbox" / "null-result",
]
# Add archive subdirectories
archive_dir = REPO_DIR / "inbox" / "archive"
if archive_dir.exists():
for d in archive_dir.iterdir():
if d.is_dir():
dirs_to_scan.append(d)
inserted = 0
updated = 0
for scan_dir in dirs_to_scan:
if not scan_dir.exists():
continue
for md_file in scan_dir.glob("*.md"):
rel_path = str(md_file.relative_to(REPO_DIR))
fm = parse_frontmatter(md_file)
# Determine status from directory location (overrides frontmatter)
status = map_dir_to_status(rel_path)
# Use frontmatter status if it's more specific
fm_status = fm.get("status", "")
if fm_status == "null-result":
status = "null_result"
elif fm_status == "processed":
status = "extracted"
domain = fm.get("domain", "unknown")
priority = fm.get("priority", "medium")
raw_claims = fm.get("claims_extracted", "0") or "0"
try:
claims_count = int(raw_claims)
except (ValueError, TypeError):
claims_count = 0
if rel_path in existing:
# Update status if different — but never regress from terminal states.
# If DB says 'extracted' or 'null_result' and file happens to be in queue/
# (e.g., failed archive push, zombie file), the DB is authoritative.
# Downgrading to 'unprocessed' triggers the runaway re-extraction loop.
current = conn.execute("SELECT status FROM sources WHERE path = ?", (rel_path,)).fetchone()
TERMINAL_STATUSES = {"extracted", "null_result", "error", "ghost_no_file"}
if current and current["status"] != status:
if current["status"] in TERMINAL_STATUSES and status == "unprocessed":
# Don't regress terminal → unprocessed. DB wins.
pass
else:
conn.execute(
"UPDATE sources SET status = ?, updated_at = datetime('now') WHERE path = ?",
(status, rel_path),
)
updated += 1
else:
conn.execute(
"""INSERT INTO sources (path, status, priority, claims_count, created_at, updated_at)
VALUES (?, ?, ?, ?, datetime('now'), datetime('now'))""",
(rel_path, status, priority, claims_count),
)
inserted += 1
conn.commit()
# Report
totals = conn.execute("SELECT status, COUNT(*) as n FROM sources GROUP BY status").fetchall()
print(f"Inserted: {inserted}, Updated: {updated}")
print("DB totals:")
for r in totals:
print(f" {r['status']}: {r['n']}")
total = conn.execute("SELECT COUNT(*) as n FROM sources").fetchone()["n"]
print(f"Total: {total}")
conn.close()
if __name__ == "__main__":
main()