teleo-infrastructure/backfill-sources.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

139 lines
4.2 KiB
Python

#!/usr/bin/env python3
"""Backfill the sources table from filesystem.
Scans inbox/queue/, inbox/archive/{domain}/, inbox/null-result/
and registers every source file in the pipeline DB.
Reads frontmatter to determine status, domain, priority.
Skips files already in the DB (by path).
"""
import os
import re
import sqlite3
import sys
from pathlib import Path
REPO_DIR = Path("/opt/teleo-eval/workspaces/main")
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
def parse_frontmatter(path: Path) -> dict:
"""Extract key fields from YAML frontmatter."""
try:
text = path.read_text(errors="replace")
except Exception:
return {}
if not text.startswith("---"):
return {}
end = text.find("\n---", 3)
if end == -1:
return {}
fm = {}
for line in text[3:end].split("\n"):
line = line.strip()
if ":" in line:
key, _, val = line.partition(":")
key = key.strip()
val = val.strip().strip('"').strip("'")
if key in ("status", "domain", "priority", "claims_extracted"):
fm[key] = val
return fm
def map_dir_to_status(rel_path: str) -> str:
"""Map filesystem location to DB status."""
if rel_path.startswith("inbox/queue/"):
return "unprocessed"
elif rel_path.startswith("inbox/archive/"):
return "extracted"
elif rel_path.startswith("inbox/null-result/"):
return "null_result"
return "unprocessed"
def main():
conn = sqlite3.connect(DB_PATH, timeout=10)
conn.row_factory = sqlite3.Row
# Get existing paths
existing = set(r["path"] for r in conn.execute("SELECT path FROM sources").fetchall())
print(f"Existing in DB: {len(existing)}")
# Scan filesystem
dirs_to_scan = [
REPO_DIR / "inbox" / "queue",
REPO_DIR / "inbox" / "null-result",
]
# Add archive subdirectories
archive_dir = REPO_DIR / "inbox" / "archive"
if archive_dir.exists():
for d in archive_dir.iterdir():
if d.is_dir():
dirs_to_scan.append(d)
inserted = 0
updated = 0
for scan_dir in dirs_to_scan:
if not scan_dir.exists():
continue
for md_file in scan_dir.glob("*.md"):
rel_path = str(md_file.relative_to(REPO_DIR))
fm = parse_frontmatter(md_file)
# Determine status from directory location (overrides frontmatter)
status = map_dir_to_status(rel_path)
# Use frontmatter status if it's more specific
fm_status = fm.get("status", "")
if fm_status == "null-result":
status = "null_result"
elif fm_status == "processed":
status = "extracted"
domain = fm.get("domain", "unknown")
priority = fm.get("priority", "medium")
raw_claims = fm.get("claims_extracted", "0") or "0"
try:
claims_count = int(raw_claims)
except (ValueError, TypeError):
claims_count = 0
if rel_path in existing:
# Update status if different
current = conn.execute("SELECT status FROM sources WHERE path = ?", (rel_path,)).fetchone()
if current and current["status"] != status:
conn.execute(
"UPDATE sources SET status = ?, updated_at = datetime('now') WHERE path = ?",
(status, rel_path),
)
updated += 1
else:
conn.execute(
"""INSERT INTO sources (path, status, priority, claims_count, created_at, updated_at)
VALUES (?, ?, ?, ?, datetime('now'), datetime('now'))""",
(rel_path, status, priority, claims_count),
)
inserted += 1
conn.commit()
# Report
totals = conn.execute("SELECT status, COUNT(*) as n FROM sources GROUP BY status").fetchall()
print(f"Inserted: {inserted}, Updated: {updated}")
print("DB totals:")
for r in totals:
print(f" {r['status']}: {r['n']}")
total = conn.execute("SELECT COUNT(*) as n FROM sources").fetchone()["n"]
print(f"Total: {total}")
conn.close()
if __name__ == "__main__":
main()