teleo-infrastructure/reconcile-sources.py
m3taversal 5f554bc2de
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
feat: atomic extract-and-connect + stale PR monitor + response audit
Atomic extract-and-connect (lib/connect.py):
- After extraction writes claim files, each new claim is embedded via
  OpenRouter, searched against Qdrant, and top-5 neighbors (cosine > 0.55)
  are added as `related` edges in the claim's frontmatter
- Edges written on NEW claim only — avoids merge conflicts
- Cross-domain connections enabled, non-fatal on Qdrant failure
- Wired into openrouter-extract-v2.py post-extraction step

Stale PR monitor (lib/stale_pr.py):
- Every watchdog cycle checks open extract/* PRs
- If open >30 min AND 0 claim files → auto-close with comment
- After 2 stale closures → marks source as extraction_failed
- Wired into watchdog.py as check #6

Response audit system:
- response_audit table (migration v8), persistent audit conn in bot.py
- 90-day retention cleanup, tool_calls JSON column
- Confidence tag stripping, systemd ReadWritePaths for pipeline.db

Supporting infrastructure:
- reweave.py: nightly edge reconnection for orphan claims
- reconcile-sources.py: source status reconciliation
- backfill-domains.py: domain classification backfill
- ops/reconcile-source-status.sh: operational reconciliation script
- Attribution improvements, post-extract enrichments, merge improvements

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 22:34:20 +00:00

450 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Reconcile archive source status and add bidirectional links.
Matches unprocessed archive sources to existing decisions, entities, and claims.
Updates status to 'processed' or 'null-result' and adds frontmatter links.
Linking pattern (Ganymede Option A — frontmatter only):
- Archive sources get `derived_items:` listing decision/entity paths
- Decisions/entities get `source_archive:` pointing to archive source path
- All paths relative to repo root
Usage:
python3 reconcile-sources.py [--apply] # default: dry-run
python3 reconcile-sources.py --apply # apply changes
"""
import os
import re
import sys
from pathlib import Path
from urllib.parse import urlparse
from collections import defaultdict
REPO_ROOT = Path("/opt/teleo-eval/workspaces/main")
ARCHIVE_DIR = REPO_ROOT / "inbox" / "archive"
DECISIONS_DIR = REPO_ROOT / "decisions"
ENTITIES_DIR = REPO_ROOT / "entities"
DOMAINS_DIR = REPO_ROOT / "domains"
DRY_RUN = "--apply" not in sys.argv
# --- YAML frontmatter helpers ---
def read_frontmatter(filepath):
"""Read file, return (frontmatter_text, body_text, raw_content)."""
content = filepath.read_text(encoding="utf-8")
if not content.startswith("---"):
return None, content, content
end = content.find("\n---", 3)
if end == -1:
return None, content, content
fm = content[3:end].strip()
body = content[end + 4:] # skip \n---
return fm, body, content
def get_field(fm_text, field):
"""Get a single YAML field value from frontmatter text."""
if fm_text is None:
return None
m = re.search(rf'^{field}:\s*["\']?(.+?)["\']?\s*$', fm_text, re.MULTILINE)
return m.group(1) if m else None
def get_status(fm_text):
return get_field(fm_text, "status")
def get_url(fm_text):
return get_field(fm_text, "url")
def get_proposal_url(fm_text):
return get_field(fm_text, "proposal_url")
def get_title(fm_text):
return get_field(fm_text, "title")
def extract_hash_from_url(url):
"""Extract the proposal hash (last path segment) from a URL."""
if not url:
return None
parsed = urlparse(url.strip('"').strip("'"))
parts = [p for p in parsed.path.split("/") if p]
if parts:
last = parts[-1]
# Proposal hashes are base58-like, 32-50 chars
if len(last) >= 20 and re.match(r'^[A-Za-z0-9]+$', last):
return last
return None
def rel_path(filepath):
"""Get path relative to repo root."""
return str(filepath.relative_to(REPO_ROOT))
# --- Test/spam detection ---
TEST_PATTERNS = [
r'\btest\b', r'\btesting\b', r'\bmy-test\b', r'\bq\b$',
r'\ba-very-unique', r'\btext-mint', r'\bsample\b',
r'\basdf\b', r'\bfoo\b', r'\bbar\b', r'\bhello-world\b',
r'\bgrpc-indexer\b', r'\brocks{0,2}wd\b',
r'spending-limit', r'\btest-proposal\b',
r'\bdummy\b',
]
TEST_RE = re.compile('|'.join(TEST_PATTERNS), re.IGNORECASE)
# Title-based patterns
TEST_TITLE_PATTERNS = [
r'^test\b', r'^testing\b', r'^q$', r'^a$', r'^asdf',
r'^my test', r'^sample', r'^hello',
r'text mint ix', r'a very unique title',
r'testing spending limit', r'testing.*grpc',
r'my-test-proposal',
]
TEST_TITLE_RE = re.compile('|'.join(TEST_TITLE_PATTERNS), re.IGNORECASE)
def is_test_spam(filepath, fm_text):
"""Detect test/spam sources."""
name = filepath.stem
if TEST_RE.search(name):
return True
title = get_title(fm_text) or ""
if TEST_TITLE_RE.search(title):
return True
return False
# --- Build indexes ---
def build_decision_hash_index():
"""Map proposal hash → decision file path."""
index = {}
if not DECISIONS_DIR.exists():
return index
for f in DECISIONS_DIR.rglob("*.md"):
fm, _, _ = read_frontmatter(f)
url = get_proposal_url(fm)
h = extract_hash_from_url(url)
if h:
index[h] = f
return index
def build_entity_name_index():
"""Map normalized entity name → entity file path."""
index = {}
if not ENTITIES_DIR.exists():
return index
for f in ENTITIES_DIR.rglob("*.md"):
# Use filename as entity name
name = f.stem.lower().replace("-", " ").replace("_", " ")
index[name] = f
return index
def build_claim_source_index():
"""Map archive source slug → list of claim file paths (via wiki-links)."""
index = defaultdict(list)
if not DOMAINS_DIR.exists():
return index
for f in DOMAINS_DIR.rglob("*.md"):
try:
content = f.read_text(encoding="utf-8")
except Exception:
continue
# Find wiki-links to archive: [[inbox/archive/...]]
for m in re.finditer(r'\[\[inbox/archive/([^\]]+)\]\]', content):
slug = m.group(1)
index[slug].append(f)
return index
# --- Frontmatter modification ---
def add_frontmatter_field(filepath, field_name, field_value):
"""Add a YAML field to frontmatter. Returns modified content or None if already present."""
content = filepath.read_text(encoding="utf-8")
if not content.startswith("---"):
return None
end = content.find("\n---", 3)
if end == -1:
return None
fm = content[3:end]
# Check if field already exists
if re.search(rf'^{field_name}:', fm, re.MULTILINE):
return None # Already has this field
# Add before closing ---
if isinstance(field_value, list):
lines = f"\n{field_name}:"
for v in field_value:
lines += f'\n - "{v}"'
new_fm = fm.rstrip() + lines + "\n"
else:
new_fm = fm.rstrip() + f'\n{field_name}: "{field_value}"\n'
return "---" + new_fm + "---" + content[end + 4:]
def set_status(filepath, new_status):
"""Change status field in frontmatter."""
content = filepath.read_text(encoding="utf-8")
if not content.startswith("---"):
return None
# Replace status field
new_content = re.sub(
r'^(status:\s*).*$',
f'\\1{new_status}',
content,
count=1,
flags=re.MULTILINE
)
if new_content == content:
return None
return new_content
# --- Main reconciliation ---
def main():
print(f"{'DRY RUN' if DRY_RUN else 'APPLYING CHANGES'}")
print(f"Repo root: {REPO_ROOT}")
print()
# Build indexes
print("Building indexes...")
decision_hash_idx = build_decision_hash_index()
print(f" Decision hash index: {len(decision_hash_idx)} entries")
entity_name_idx = build_entity_name_index()
print(f" Entity name index: {len(entity_name_idx)} entries")
claim_source_idx = build_claim_source_index()
print(f" Claim source index: {len(claim_source_idx)} entries")
print()
# Find all unprocessed archive sources
unprocessed = []
for f in sorted(ARCHIVE_DIR.rglob("*.md")):
if ".extraction-debug" in str(f):
continue
fm, _, _ = read_frontmatter(f)
if get_status(fm) == "unprocessed":
unprocessed.append(f)
print(f"Found {len(unprocessed)} unprocessed sources")
print()
# Categorize and match
matched = [] # (source_path, [target_paths], match_type)
test_spam = []
futardio_unmatched = [] # futardio proposals with no KB output → null-result
genuine_backlog = [] # non-futardio sources still awaiting extraction → keep unprocessed
def is_futardio_source(filepath):
"""Check if file is a futardio/metadao governance proposal (not research)."""
name = filepath.name.lower()
return "futardio" in name
for src in unprocessed:
fm, _, _ = read_frontmatter(src)
# Check test/spam first
if is_test_spam(src, fm):
test_spam.append(src)
continue
targets = []
match_types = []
# Match 1: proposal hash → decision
url = get_url(fm)
src_hash = extract_hash_from_url(url)
if src_hash and src_hash in decision_hash_idx:
targets.append(decision_hash_idx[src_hash])
match_types.append("hash→decision")
# Match 2: wiki-links from claims
# Try multiple slug variants
src_rel = rel_path(src)
slug_no_ext = src_rel.replace("inbox/archive/", "").replace(".md", "")
# Also try just the filename without extension
slug_basename = src.stem
for slug in [slug_no_ext, slug_basename]:
if slug in claim_source_idx:
for claim_path in claim_source_idx[slug]:
if claim_path not in targets:
targets.append(claim_path)
match_types.append("wiki→claim")
# Match 3: entity name matching (for launches/fundraises)
title = get_title(fm) or ""
# Extract project name from title like "Futardio: ProjectName ..."
title_match = re.match(r'Futardio:\s*(.+?)(?:\s*[-—]|\s+Launch|\s+Fundraise|$)', title, re.IGNORECASE)
if title_match:
project_name = title_match.group(1).strip().lower().replace("-", " ")
if project_name in entity_name_idx:
entity_path = entity_name_idx[project_name]
if entity_path not in targets:
targets.append(entity_path)
match_types.append("name→entity")
if targets:
matched.append((src, targets, match_types))
elif is_futardio_source(src):
futardio_unmatched.append(src)
else:
genuine_backlog.append(src)
print(f"Results:")
print(f" Matched: {len(matched)}")
print(f" Test/spam: {len(test_spam)}")
print(f" Futardio unmatched (→ null-result): {len(futardio_unmatched)}")
print(f" Genuine backlog (kept unprocessed): {len(genuine_backlog)}")
print()
# Validate all link targets exist
broken_links = []
for src, targets, _ in matched:
for t in targets:
if isinstance(t, Path) and not t.exists():
broken_links.append((src, t))
if broken_links:
print(f"ERROR: {len(broken_links)} broken link targets!")
for src, target in broken_links:
print(f" {rel_path(src)}{rel_path(target)}")
if not DRY_RUN:
print("Aborting — fix broken links first.")
sys.exit(1)
# Show match samples
print("Sample matches:")
for src, targets, types in matched[:5]:
print(f" {src.name}")
for t, mt in zip(targets, types):
print(f"{rel_path(t)} ({mt})")
print()
# Show test/spam samples
if test_spam:
print(f"Test/spam samples ({len(test_spam)} total):")
for src in test_spam[:5]:
print(f" {src.name}")
print()
# Show futardio unmatched samples
if futardio_unmatched:
print(f"Futardio unmatched samples ({len(futardio_unmatched)} total):")
for src in futardio_unmatched[:10]:
print(f" {src.name}")
print()
# Show genuine backlog
if genuine_backlog:
print(f"Genuine backlog — kept unprocessed ({len(genuine_backlog)} total):")
from collections import Counter
backlog_domains = Counter()
for src in genuine_backlog:
parts = src.relative_to(ARCHIVE_DIR).parts
domain = parts[0] if len(parts) > 1 else "root"
backlog_domains[domain] += 1
for d, c in backlog_domains.most_common():
print(f" {d}: {c}")
print()
if DRY_RUN:
print("=== DRY RUN — no changes made. Use --apply to apply. ===")
return
# --- Apply changes ---
files_modified = 0
links_created = 0
# 1. Matched sources → processed + bidirectional links
for src, targets, _ in matched:
# Update source status
new_content = set_status(src, "processed")
if new_content:
# Also add derived_items
decision_entity_targets = [
rel_path(t) for t in targets
if isinstance(t, Path) and (
str(t).startswith(str(DECISIONS_DIR)) or
str(t).startswith(str(ENTITIES_DIR))
)
]
if decision_entity_targets:
# Add derived_items to the already-modified content
# Write status change first, then add field
src.write_text(new_content, encoding="utf-8")
linked = add_frontmatter_field(src, "derived_items", decision_entity_targets)
if linked:
src.write_text(linked, encoding="utf-8")
links_created += len(decision_entity_targets)
else:
src.write_text(new_content, encoding="utf-8")
files_modified += 1
# Add source_archive to decision/entity targets
src_rel = rel_path(src)
for t in targets:
if isinstance(t, Path) and (
str(t).startswith(str(DECISIONS_DIR)) or
str(t).startswith(str(ENTITIES_DIR))
):
linked = add_frontmatter_field(t, "source_archive", src_rel)
if linked:
t.write_text(linked, encoding="utf-8")
files_modified += 1
links_created += 1
# 2. Test/spam → null-result
for src in test_spam:
new_content = set_status(src, "null-result")
if new_content:
src.write_text(new_content, encoding="utf-8")
files_modified += 1
# 3. Futardio unmatched → null-result (no extraction output, won't be re-extracted)
for src in futardio_unmatched:
new_content = set_status(src, "null-result")
if new_content:
src.write_text(new_content, encoding="utf-8")
files_modified += 1
# 4. Genuine backlog → KEEP unprocessed (these are real extraction targets)
# No changes needed
print(f"\n=== APPLIED ===")
print(f"Files modified: {files_modified}")
print(f"Bidirectional links created: {links_created}")
print(f"Matched → processed: {len(matched)}")
print(f"Test/spam → null-result: {len(test_spam)}")
print(f"Futardio unmatched → null-result: {len(futardio_unmatched)}")
print(f"Genuine backlog → kept unprocessed: {len(genuine_backlog)}")
# Verify
remaining = 0
for f in ARCHIVE_DIR.rglob("*.md"):
if ".extraction-debug" in str(f):
continue
fm, _, _ = read_frontmatter(f)
if get_status(fm) == "unprocessed":
remaining += 1
print(f"\nRemaining unprocessed: {remaining}")
if __name__ == "__main__":
main()