#!/usr/bin/env python3 # ONE-SHOT BACKFILL — do not cron. Idempotent. """Reclassify PRs with domain='general' or NULL using file paths from diffs. The extraction prompt defaults to 'general' when it can't determine domain. This script re-derives domains from actual file paths in merged PR diffs, which are more reliable than extraction-time heuristics. Usage: python3 backfill-domains.py [--dry-run] Pentagon-Agent: Epimetheus <0144398E-4ED3-4FE2-95A3-3D72E1ABF887> """ import argparse import json import re import sqlite3 import subprocess from collections import Counter from pathlib import Path DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db" REPO_DIR = "/opt/teleo-eval/workspaces/main" # Canonical domains — must match lib/domains.py DOMAIN_AGENT_MAP VALID_DOMAINS = frozenset({ "internet-finance", "entertainment", "health", "ai-alignment", "space-development", "mechanisms", "living-capital", "living-agents", "teleohumanity", "grand-strategy", "critical-systems", "collective-intelligence", "teleological-economics", "cultural-dynamics", }) # Agent → primary domain (same as lib/domains.py) AGENT_PRIMARY_DOMAIN = { "rio": "internet-finance", "clay": "entertainment", "theseus": "ai-alignment", "vida": "health", "astra": "space-development", "leo": "grand-strategy", } def detect_domain_from_paths(file_paths: list[str]) -> str | None: """Detect domain from file paths in a diff. Checks domains/, entities/, core/, foundations/ directory structure. Returns the most frequently referenced valid domain, or None. """ domain_counts: Counter = Counter() for path in file_paths: for prefix in ("domains/", "entities/"): if path.startswith(prefix): parts = path.split("/") if len(parts) >= 2: d = parts[1] if d in VALID_DOMAINS: domain_counts[d] += 1 break else: for prefix in ("core/", "foundations/"): if path.startswith(prefix): parts = path.split("/") if len(parts) >= 2: d = parts[1] if d in VALID_DOMAINS: domain_counts[d] += 1 break if domain_counts: return domain_counts.most_common(1)[0][0] return None def get_diff_files(pr_number: int, branch: str) -> list[str]: """Get list of changed file paths for a PR from git.""" try: result = subprocess.run( ["git", "diff", "--name-only", f"origin/main...origin/{branch}"], capture_output=True, text=True, timeout=10, cwd=REPO_DIR, ) if result.returncode == 0: return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()] except (subprocess.TimeoutExpired, FileNotFoundError): pass # Fallback: try merge commit if branch is gone try: result = subprocess.run( ["git", "log", "--merges", f"--grep=#{pr_number}", "--format=%H", "-1"], capture_output=True, text=True, timeout=10, cwd=REPO_DIR, ) if result.returncode == 0 and result.stdout.strip(): merge_sha = result.stdout.strip() result2 = subprocess.run( ["git", "diff", "--name-only", f"{merge_sha}~1..{merge_sha}"], capture_output=True, text=True, timeout=10, cwd=REPO_DIR, ) if result2.returncode == 0: return [f.strip() for f in result2.stdout.strip().split("\n") if f.strip()] except (subprocess.TimeoutExpired, FileNotFoundError): pass return [] def detect_domain_from_agent(agent: str | None) -> str | None: """Infer domain from agent's primary domain.""" if agent: return AGENT_PRIMARY_DOMAIN.get(agent.lower()) return None def main(): parser = argparse.ArgumentParser(description="Backfill domain for 'general'/NULL PRs") parser.add_argument("--dry-run", action="store_true", help="Print changes without applying") args = parser.parse_args() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row # Find PRs with missing or 'general' domain rows = conn.execute( """SELECT number, branch, domain, agent FROM prs WHERE status = 'merged' AND (domain IS NULL OR domain = 'general') ORDER BY number""" ).fetchall() print(f"Found {len(rows)} merged PRs with domain=NULL or 'general'") reclassified = 0 unchanged = 0 distribution: Counter = Counter() log_entries = [] for row in rows: pr_num = row["number"] branch = row["branch"] old_domain = row["domain"] or "NULL" agent = row["agent"] new_domain = None # Strategy 1: File paths from diff if branch: files = get_diff_files(pr_num, branch) new_domain = detect_domain_from_paths(files) # Strategy 2: Agent's primary domain if new_domain is None: new_domain = detect_domain_from_agent(agent) if new_domain and new_domain != old_domain: log_entries.append(f"PR #{pr_num}: {old_domain} → {new_domain} (agent={agent}, branch={branch})") distribution[new_domain] += 1 if not args.dry_run: conn.execute( "UPDATE prs SET domain = ? WHERE number = ?", (new_domain, pr_num), ) reclassified += 1 else: unchanged += 1 if not args.dry_run and reclassified > 0: conn.commit() conn.close() # Report print(f"\nReclassified: {reclassified}") print(f"Unchanged (still general): {unchanged}") print(f"\nDistribution of reclassified PRs:") for domain, count in distribution.most_common(): print(f" {domain}: {count}") if log_entries: print(f"\nDetailed log ({len(log_entries)} changes):") for entry in log_entries: print(f" {entry}") if args.dry_run: print("\n[DRY RUN — no changes applied]") if __name__ == "__main__": main()