#!/usr/bin/env python3 """Backfill the sources table from filesystem. Scans inbox/queue/, inbox/archive/{domain}/, inbox/null-result/ and registers every source file in the pipeline DB. Reads frontmatter to determine status, domain, priority. Skips files already in the DB (by path). """ import os import re import sqlite3 import sys from pathlib import Path REPO_DIR = Path("/opt/teleo-eval/workspaces/main") DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db" def parse_frontmatter(path: Path) -> dict: """Extract key fields from YAML frontmatter.""" try: text = path.read_text(errors="replace") except Exception: return {} if not text.startswith("---"): return {} end = text.find("\n---", 3) if end == -1: return {} fm = {} for line in text[3:end].split("\n"): line = line.strip() if ":" in line: key, _, val = line.partition(":") key = key.strip() val = val.strip().strip('"').strip("'") if key in ("status", "domain", "priority", "claims_extracted"): fm[key] = val return fm def map_dir_to_status(rel_path: str) -> str: """Map filesystem location to DB status.""" if rel_path.startswith("inbox/queue/"): return "unprocessed" elif rel_path.startswith("inbox/archive/"): return "extracted" elif rel_path.startswith("inbox/null-result/"): return "null_result" return "unprocessed" def main(): conn = sqlite3.connect(DB_PATH, timeout=10) conn.row_factory = sqlite3.Row # Get existing paths existing = set(r["path"] for r in conn.execute("SELECT path FROM sources").fetchall()) print(f"Existing in DB: {len(existing)}") # Scan filesystem dirs_to_scan = [ REPO_DIR / "inbox" / "queue", REPO_DIR / "inbox" / "null-result", ] # Add archive subdirectories archive_dir = REPO_DIR / "inbox" / "archive" if archive_dir.exists(): for d in archive_dir.iterdir(): if d.is_dir(): dirs_to_scan.append(d) inserted = 0 updated = 0 for scan_dir in dirs_to_scan: if not scan_dir.exists(): continue for md_file in scan_dir.glob("*.md"): rel_path = str(md_file.relative_to(REPO_DIR)) fm = parse_frontmatter(md_file) # Determine status from directory location (overrides frontmatter) status = map_dir_to_status(rel_path) # Use frontmatter status if it's more specific fm_status = fm.get("status", "") if fm_status == "null-result": status = "null_result" elif fm_status == "processed": status = "extracted" domain = fm.get("domain", "unknown") priority = fm.get("priority", "medium") raw_claims = fm.get("claims_extracted", "0") or "0" try: claims_count = int(raw_claims) except (ValueError, TypeError): claims_count = 0 if rel_path in existing: # Update status if different current = conn.execute("SELECT status FROM sources WHERE path = ?", (rel_path,)).fetchone() if current and current["status"] != status: conn.execute( "UPDATE sources SET status = ?, updated_at = datetime('now') WHERE path = ?", (status, rel_path), ) updated += 1 else: conn.execute( """INSERT INTO sources (path, status, priority, claims_count, created_at, updated_at) VALUES (?, ?, ?, ?, datetime('now'), datetime('now'))""", (rel_path, status, priority, claims_count), ) inserted += 1 conn.commit() # Report totals = conn.execute("SELECT status, COUNT(*) as n FROM sources GROUP BY status").fetchall() print(f"Inserted: {inserted}, Updated: {updated}") print("DB totals:") for r in totals: print(f" {r['status']}: {r['n']}") total = conn.execute("SELECT COUNT(*) as n FROM sources").fetchone()["n"] print(f"Total: {total}") conn.close() if __name__ == "__main__": main()