teleo-infrastructure/backfill-domains.py
m3taversal 5f554bc2de
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
feat: atomic extract-and-connect + stale PR monitor + response audit
Atomic extract-and-connect (lib/connect.py):
- After extraction writes claim files, each new claim is embedded via
  OpenRouter, searched against Qdrant, and top-5 neighbors (cosine > 0.55)
  are added as `related` edges in the claim's frontmatter
- Edges written on NEW claim only — avoids merge conflicts
- Cross-domain connections enabled, non-fatal on Qdrant failure
- Wired into openrouter-extract-v2.py post-extraction step

Stale PR monitor (lib/stale_pr.py):
- Every watchdog cycle checks open extract/* PRs
- If open >30 min AND 0 claim files → auto-close with comment
- After 2 stale closures → marks source as extraction_failed
- Wired into watchdog.py as check #6

Response audit system:
- response_audit table (migration v8), persistent audit conn in bot.py
- 90-day retention cleanup, tool_calls JSON column
- Confidence tag stripping, systemd ReadWritePaths for pipeline.db

Supporting infrastructure:
- reweave.py: nightly edge reconnection for orphan claims
- reconcile-sources.py: source status reconciliation
- backfill-domains.py: domain classification backfill
- ops/reconcile-source-status.sh: operational reconciliation script
- Attribution improvements, post-extract enrichments, merge improvements

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 22:34:20 +00:00

193 lines
6.1 KiB
Python

#!/usr/bin/env python3
# ONE-SHOT BACKFILL — do not cron. Idempotent.
"""Reclassify PRs with domain='general' or NULL using file paths from diffs.
The extraction prompt defaults to 'general' when it can't determine domain.
This script re-derives domains from actual file paths in merged PR diffs,
which are more reliable than extraction-time heuristics.
Usage:
python3 backfill-domains.py [--dry-run]
Pentagon-Agent: Epimetheus <0144398E-4ED3-4FE2-95A3-3D72E1ABF887>
"""
import argparse
import json
import re
import sqlite3
import subprocess
from collections import Counter
from pathlib import Path
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
REPO_DIR = "/opt/teleo-eval/workspaces/main"
# Canonical domains — must match lib/domains.py DOMAIN_AGENT_MAP
VALID_DOMAINS = frozenset({
"internet-finance", "entertainment", "health", "ai-alignment",
"space-development", "mechanisms", "living-capital", "living-agents",
"teleohumanity", "grand-strategy", "critical-systems",
"collective-intelligence", "teleological-economics", "cultural-dynamics",
})
# Agent → primary domain (same as lib/domains.py)
AGENT_PRIMARY_DOMAIN = {
"rio": "internet-finance",
"clay": "entertainment",
"theseus": "ai-alignment",
"vida": "health",
"astra": "space-development",
"leo": "grand-strategy",
}
def detect_domain_from_paths(file_paths: list[str]) -> str | None:
"""Detect domain from file paths in a diff.
Checks domains/, entities/, core/, foundations/ directory structure.
Returns the most frequently referenced valid domain, or None.
"""
domain_counts: Counter = Counter()
for path in file_paths:
for prefix in ("domains/", "entities/"):
if path.startswith(prefix):
parts = path.split("/")
if len(parts) >= 2:
d = parts[1]
if d in VALID_DOMAINS:
domain_counts[d] += 1
break
else:
for prefix in ("core/", "foundations/"):
if path.startswith(prefix):
parts = path.split("/")
if len(parts) >= 2:
d = parts[1]
if d in VALID_DOMAINS:
domain_counts[d] += 1
break
if domain_counts:
return domain_counts.most_common(1)[0][0]
return None
def get_diff_files(pr_number: int, branch: str) -> list[str]:
"""Get list of changed file paths for a PR from git."""
try:
result = subprocess.run(
["git", "diff", "--name-only", f"origin/main...origin/{branch}"],
capture_output=True, text=True, timeout=10,
cwd=REPO_DIR,
)
if result.returncode == 0:
return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()]
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Fallback: try merge commit if branch is gone
try:
result = subprocess.run(
["git", "log", "--merges", f"--grep=#{pr_number}", "--format=%H", "-1"],
capture_output=True, text=True, timeout=10,
cwd=REPO_DIR,
)
if result.returncode == 0 and result.stdout.strip():
merge_sha = result.stdout.strip()
result2 = subprocess.run(
["git", "diff", "--name-only", f"{merge_sha}~1..{merge_sha}"],
capture_output=True, text=True, timeout=10,
cwd=REPO_DIR,
)
if result2.returncode == 0:
return [f.strip() for f in result2.stdout.strip().split("\n") if f.strip()]
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return []
def detect_domain_from_agent(agent: str | None) -> str | None:
"""Infer domain from agent's primary domain."""
if agent:
return AGENT_PRIMARY_DOMAIN.get(agent.lower())
return None
def main():
parser = argparse.ArgumentParser(description="Backfill domain for 'general'/NULL PRs")
parser.add_argument("--dry-run", action="store_true", help="Print changes without applying")
args = parser.parse_args()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
# Find PRs with missing or 'general' domain
rows = conn.execute(
"""SELECT number, branch, domain, agent FROM prs
WHERE status = 'merged'
AND (domain IS NULL OR domain = 'general')
ORDER BY number"""
).fetchall()
print(f"Found {len(rows)} merged PRs with domain=NULL or 'general'")
reclassified = 0
unchanged = 0
distribution: Counter = Counter()
log_entries = []
for row in rows:
pr_num = row["number"]
branch = row["branch"]
old_domain = row["domain"] or "NULL"
agent = row["agent"]
new_domain = None
# Strategy 1: File paths from diff
if branch:
files = get_diff_files(pr_num, branch)
new_domain = detect_domain_from_paths(files)
# Strategy 2: Agent's primary domain
if new_domain is None:
new_domain = detect_domain_from_agent(agent)
if new_domain and new_domain != old_domain:
log_entries.append(f"PR #{pr_num}: {old_domain}{new_domain} (agent={agent}, branch={branch})")
distribution[new_domain] += 1
if not args.dry_run:
conn.execute(
"UPDATE prs SET domain = ? WHERE number = ?",
(new_domain, pr_num),
)
reclassified += 1
else:
unchanged += 1
if not args.dry_run and reclassified > 0:
conn.commit()
conn.close()
# Report
print(f"\nReclassified: {reclassified}")
print(f"Unchanged (still general): {unchanged}")
print(f"\nDistribution of reclassified PRs:")
for domain, count in distribution.most_common():
print(f" {domain}: {count}")
if log_entries:
print(f"\nDetailed log ({len(log_entries)} changes):")
for entry in log_entries:
print(f" {entry}")
if args.dry_run:
print("\n[DRY RUN — no changes applied]")
if __name__ == "__main__":
main()