diff --git a/backfill-domains.py b/backfill-domains.py new file mode 100644 index 0000000..9fca9fd --- /dev/null +++ b/backfill-domains.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +# ONE-SHOT BACKFILL — do not cron. Idempotent. +"""Reclassify PRs with domain='general' or NULL using file paths from diffs. + +The extraction prompt defaults to 'general' when it can't determine domain. +This script re-derives domains from actual file paths in merged PR diffs, +which are more reliable than extraction-time heuristics. + +Usage: + python3 backfill-domains.py [--dry-run] + +Pentagon-Agent: Epimetheus <0144398E-4ED3-4FE2-95A3-3D72E1ABF887> +""" + +import argparse +import json +import re +import sqlite3 +import subprocess +from collections import Counter +from pathlib import Path + +DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db" +REPO_DIR = "/opt/teleo-eval/workspaces/main" + +# Canonical domains — must match lib/domains.py DOMAIN_AGENT_MAP +VALID_DOMAINS = frozenset({ + "internet-finance", "entertainment", "health", "ai-alignment", + "space-development", "mechanisms", "living-capital", "living-agents", + "teleohumanity", "grand-strategy", "critical-systems", + "collective-intelligence", "teleological-economics", "cultural-dynamics", +}) + +# Agent → primary domain (same as lib/domains.py) +AGENT_PRIMARY_DOMAIN = { + "rio": "internet-finance", + "clay": "entertainment", + "theseus": "ai-alignment", + "vida": "health", + "astra": "space-development", + "leo": "grand-strategy", +} + + +def detect_domain_from_paths(file_paths: list[str]) -> str | None: + """Detect domain from file paths in a diff. + + Checks domains/, entities/, core/, foundations/ directory structure. + Returns the most frequently referenced valid domain, or None. + """ + domain_counts: Counter = Counter() + for path in file_paths: + for prefix in ("domains/", "entities/"): + if path.startswith(prefix): + parts = path.split("/") + if len(parts) >= 2: + d = parts[1] + if d in VALID_DOMAINS: + domain_counts[d] += 1 + break + else: + for prefix in ("core/", "foundations/"): + if path.startswith(prefix): + parts = path.split("/") + if len(parts) >= 2: + d = parts[1] + if d in VALID_DOMAINS: + domain_counts[d] += 1 + break + + if domain_counts: + return domain_counts.most_common(1)[0][0] + return None + + +def get_diff_files(pr_number: int, branch: str) -> list[str]: + """Get list of changed file paths for a PR from git.""" + try: + result = subprocess.run( + ["git", "diff", "--name-only", f"origin/main...origin/{branch}"], + capture_output=True, text=True, timeout=10, + cwd=REPO_DIR, + ) + if result.returncode == 0: + return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()] + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Fallback: try merge commit if branch is gone + try: + result = subprocess.run( + ["git", "log", "--merges", f"--grep=#{pr_number}", "--format=%H", "-1"], + capture_output=True, text=True, timeout=10, + cwd=REPO_DIR, + ) + if result.returncode == 0 and result.stdout.strip(): + merge_sha = result.stdout.strip() + result2 = subprocess.run( + ["git", "diff", "--name-only", f"{merge_sha}~1..{merge_sha}"], + capture_output=True, text=True, timeout=10, + cwd=REPO_DIR, + ) + if result2.returncode == 0: + return [f.strip() for f in result2.stdout.strip().split("\n") if f.strip()] + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + return [] + + +def detect_domain_from_agent(agent: str | None) -> str | None: + """Infer domain from agent's primary domain.""" + if agent: + return AGENT_PRIMARY_DOMAIN.get(agent.lower()) + return None + + +def main(): + parser = argparse.ArgumentParser(description="Backfill domain for 'general'/NULL PRs") + parser.add_argument("--dry-run", action="store_true", help="Print changes without applying") + args = parser.parse_args() + + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + + # Find PRs with missing or 'general' domain + rows = conn.execute( + """SELECT number, branch, domain, agent FROM prs + WHERE status = 'merged' + AND (domain IS NULL OR domain = 'general') + ORDER BY number""" + ).fetchall() + + print(f"Found {len(rows)} merged PRs with domain=NULL or 'general'") + + reclassified = 0 + unchanged = 0 + distribution: Counter = Counter() + log_entries = [] + + for row in rows: + pr_num = row["number"] + branch = row["branch"] + old_domain = row["domain"] or "NULL" + agent = row["agent"] + + new_domain = None + + # Strategy 1: File paths from diff + if branch: + files = get_diff_files(pr_num, branch) + new_domain = detect_domain_from_paths(files) + + # Strategy 2: Agent's primary domain + if new_domain is None: + new_domain = detect_domain_from_agent(agent) + + if new_domain and new_domain != old_domain: + log_entries.append(f"PR #{pr_num}: {old_domain} → {new_domain} (agent={agent}, branch={branch})") + distribution[new_domain] += 1 + + if not args.dry_run: + conn.execute( + "UPDATE prs SET domain = ? WHERE number = ?", + (new_domain, pr_num), + ) + reclassified += 1 + else: + unchanged += 1 + + if not args.dry_run and reclassified > 0: + conn.commit() + + conn.close() + + # Report + print(f"\nReclassified: {reclassified}") + print(f"Unchanged (still general): {unchanged}") + print(f"\nDistribution of reclassified PRs:") + for domain, count in distribution.most_common(): + print(f" {domain}: {count}") + + if log_entries: + print(f"\nDetailed log ({len(log_entries)} changes):") + for entry in log_entries: + print(f" {entry}") + + if args.dry_run: + print("\n[DRY RUN — no changes applied]") + + +if __name__ == "__main__": + main() diff --git a/embed-claims.py b/embed-claims.py index f0a45d3..b81bc2b 100644 --- a/embed-claims.py +++ b/embed-claims.py @@ -148,8 +148,8 @@ def embed_file(path: Path, api_key: str, dry_run: bool = False) -> bool: file_type, domain, confidence, title = classify_file(fm, path) rel_path = str(path.relative_to(REPO_DIR)) - # Build embed text: title + first ~2000 chars of body - embed_text_str = f"{title}\n\n{body[:2000]}" if body else title + # Build embed text: title + first ~6000 chars of body (model handles 8191 tokens) + embed_text_str = f"{title}\n\n{body[:6000]}" if body else title if dry_run: print(f" [{file_type}] {rel_path}: {title[:60]}") diff --git a/lib/attribution.py b/lib/attribution.py index 365c0e6..7ca5233 100644 --- a/lib/attribution.py +++ b/lib/attribution.py @@ -100,12 +100,15 @@ def parse_attribution_from_file(filepath: str) -> dict[str, list[dict]]: # ─── Validate attribution ────────────────────────────────────────────────── -def validate_attribution(fm: dict) -> list[str]: +def validate_attribution(fm: dict, agent: str | None = None) -> list[str]: """Validate attribution block in claim frontmatter. Returns list of issues. Block on missing extractor, warn on missing sourcer. (Leo: extractor is always known, sourcer is best-effort.) + If agent is provided and extractor is missing, auto-fix by setting the + agent as extractor (same pattern as created-date auto-fix). + Only validates if an attribution block is explicitly present. Legacy claims without attribution blocks are not blocked — they'll get attribution when enriched. New claims from v2 extraction always have attribution. @@ -123,7 +126,16 @@ def validate_attribution(fm: dict) -> list[str]: attribution = parse_attribution(fm) if not attribution["extractor"]: - issues.append("missing_attribution_extractor") + if agent: + # Auto-fix: set the processing agent as extractor + attr = fm.get("attribution") + if isinstance(attr, dict): + attr["extractor"] = [{"handle": agent}] + else: + fm["attribution"] = {"extractor": [{"handle": agent}]} + issues.append("fixed_missing_extractor") + else: + issues.append("missing_attribution_extractor") return issues diff --git a/lib/connect.py b/lib/connect.py new file mode 100644 index 0000000..a8444c8 --- /dev/null +++ b/lib/connect.py @@ -0,0 +1,202 @@ +"""Atomic extract-and-connect — wire new claims to the KB at extraction time. + +After extraction writes claim files to disk, this module: +1. Embeds each new claim (title + description + body snippet) +2. Searches Qdrant for semantically similar existing claims +3. Adds found neighbors as `related` edges on the NEW claim's frontmatter + +Key design decision: edges are written on the NEW claim, not on existing claims. +Writing on existing claims would cause merge conflicts (same reason entities are +queued, not written on branches). When the PR merges, embed-on-merge adds the +new claim to Qdrant, and reweave can later add reciprocal edges on neighbors. + +Cost: ~$0.0001 per claim (embedding only). No LLM classification — defaults to +"related". Reweave handles supports/challenges classification in a separate pass. + +Owner: Epimetheus +""" + +import logging +import os +import re +import sys +from pathlib import Path + +logger = logging.getLogger("pipeline.connect") + +# Similarity threshold for auto-connecting (lower than reweave's 0.70 because +# we're using "related" not "supports/challenges" — less precision needed) +CONNECT_THRESHOLD = 0.55 +CONNECT_MAX_NEIGHBORS = 5 + +# --- Import search functions --- +# This module is called from openrouter-extract-v2.py which may not have lib/ on path +# via the package, so handle both import paths. +try: + from .search import embed_query, search_qdrant + from .post_extract import parse_frontmatter, _rebuild_content +except ImportError: + sys.path.insert(0, os.path.dirname(__file__)) + from search import embed_query, search_qdrant + from post_extract import parse_frontmatter, _rebuild_content + + +def _build_search_text(content: str) -> str: + """Extract title + description + first 500 chars of body for embedding.""" + fm, body = parse_frontmatter(content) + parts = [] + if fm: + desc = fm.get("description", "") + if isinstance(desc, str) and desc: + parts.append(desc.strip('"').strip("'")) + # Get H1 title from body + h1_match = re.search(r"^# (.+)$", body, re.MULTILINE) if body else None + if h1_match: + parts.append(h1_match.group(1).strip()) + # Add body snippet (skip H1 line) + if body: + body_text = re.sub(r"^# .+\n*", "", body).strip() + # Stop at "Relevant Notes" or "Topics" sections + body_text = re.split(r"\n---\n", body_text)[0].strip() + if body_text: + parts.append(body_text[:500]) + return " ".join(parts) + + +def _add_related_edges(claim_path: str, neighbor_titles: list[str]) -> bool: + """Add related edges to a claim's frontmatter. Returns True if modified.""" + try: + with open(claim_path) as f: + content = f.read() + except Exception as e: + logger.warning("Cannot read %s: %s", claim_path, e) + return False + + fm, body = parse_frontmatter(content) + if fm is None: + return False + + # Get existing related edges to avoid duplicates + existing = fm.get("related", []) + if isinstance(existing, str): + existing = [existing] + elif not isinstance(existing, list): + existing = [] + + existing_lower = {str(e).strip().lower() for e in existing} + + # Add new edges + added = [] + for title in neighbor_titles: + if title.strip().lower() not in existing_lower: + added.append(title) + existing_lower.add(title.strip().lower()) + + if not added: + return False + + fm["related"] = existing + added + + # Rebuild and write + new_content = _rebuild_content(fm, body) + with open(claim_path, "w") as f: + f.write(new_content) + + return True + + +def connect_new_claims( + claim_paths: list[str], + domain: str | None = None, + threshold: float = CONNECT_THRESHOLD, + max_neighbors: int = CONNECT_MAX_NEIGHBORS, +) -> dict: + """Connect newly-written claims to the existing KB via vector search. + + Args: + claim_paths: List of file paths to newly-written claim files. + domain: Optional domain filter for Qdrant search. + threshold: Minimum cosine similarity for connection. + max_neighbors: Maximum edges to add per claim. + + Returns: + { + "total": int, + "connected": int, + "edges_added": int, + "skipped_embed_failed": int, + "skipped_no_neighbors": int, + "connections": [{"claim": str, "neighbors": [str]}], + } + """ + stats = { + "total": len(claim_paths), + "connected": 0, + "edges_added": 0, + "skipped_embed_failed": 0, + "skipped_no_neighbors": 0, + "connections": [], + } + + for claim_path in claim_paths: + try: + with open(claim_path) as f: + content = f.read() + except Exception: + continue + + # Build search text from claim content + search_text = _build_search_text(content) + if not search_text or len(search_text) < 20: + stats["skipped_no_neighbors"] += 1 + continue + + # Embed the claim + vector = embed_query(search_text) + if vector is None: + stats["skipped_embed_failed"] += 1 + continue + + # Search Qdrant for neighbors (exclude nothing — new claim isn't in Qdrant yet) + hits = search_qdrant( + vector, + limit=max_neighbors, + domain=None, # Cross-domain connections are valuable + score_threshold=threshold, + ) + + if not hits: + stats["skipped_no_neighbors"] += 1 + continue + + # Extract neighbor titles + neighbor_titles = [] + for hit in hits: + payload = hit.get("payload", {}) + title = payload.get("claim_title", "") + if title: + neighbor_titles.append(title) + + if not neighbor_titles: + stats["skipped_no_neighbors"] += 1 + continue + + # Add edges to the new claim's frontmatter + if _add_related_edges(claim_path, neighbor_titles): + stats["connected"] += 1 + stats["edges_added"] += len(neighbor_titles) + stats["connections"].append({ + "claim": os.path.basename(claim_path), + "neighbors": neighbor_titles, + }) + logger.info("Connected %s → %d neighbors", os.path.basename(claim_path), len(neighbor_titles)) + else: + stats["skipped_no_neighbors"] += 1 + + logger.info( + "Extract-and-connect: %d/%d claims connected (%d edges added, %d embed failed, %d no neighbors)", + stats["connected"], stats["total"], stats["edges_added"], + stats["skipped_embed_failed"], stats["skipped_no_neighbors"], + ) + + return stats diff --git a/lib/db.py b/lib/db.py index 246442a..dc8323d 100644 --- a/lib/db.py +++ b/lib/db.py @@ -9,7 +9,7 @@ from . import config logger = logging.getLogger("pipeline.db") -SCHEMA_VERSION = 6 +SCHEMA_VERSION = 9 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -48,6 +48,7 @@ CREATE TABLE IF NOT EXISTS prs ( -- conflict: rebase failed or merge timed out — needs human intervention domain TEXT, agent TEXT, + commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'challenge', 'enrich', 'synthesize', 'unknown')), tier TEXT, -- LIGHT, STANDARD, DEEP tier0_pass INTEGER, @@ -103,11 +104,52 @@ CREATE TABLE IF NOT EXISTS audit_log ( detail TEXT ); +CREATE TABLE IF NOT EXISTS response_audit ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (datetime('now')), + chat_id INTEGER, + user TEXT, + agent TEXT DEFAULT 'rio', + model TEXT, + query TEXT, + conversation_window TEXT, + -- JSON: prior N messages for context + -- NOTE: intentional duplication of transcript data for audit self-containment. + -- Transcripts live in /opt/teleo-eval/transcripts/ but audit rows need prompt + -- context inline for retrieval-quality diagnosis. Primary driver of row size — + -- target for cleanup when 90-day retention policy lands. + entities_matched TEXT, + -- JSON: [{name, path, score, used_in_response}] + claims_matched TEXT, + -- JSON: [{path, title, score, source, used_in_response}] + retrieval_layers_hit TEXT, + -- JSON: ["keyword","qdrant","graph"] + retrieval_gap TEXT, + -- What the KB was missing (if anything) + market_data TEXT, + -- JSON: injected token prices + research_context TEXT, + -- Haiku pre-pass results if any + kb_context_text TEXT, + -- Full context string sent to model + tool_calls TEXT, + -- JSON: ordered array [{tool, input, output, duration_ms, ts}] + raw_response TEXT, + display_response TEXT, + confidence_score REAL, + -- Model self-rated retrieval quality 0.0-1.0 + response_time_ms INTEGER, + created_at TEXT DEFAULT (datetime('now')) +); + CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status); CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status); CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain); CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date); CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage); +CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp); +CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent); +CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp); """ @@ -140,6 +182,37 @@ def transaction(conn: sqlite3.Connection): raise +# Branch prefix → (agent, commit_type) mapping. +# Single source of truth — used by merge.py at INSERT time and migration v7 backfill. +# Unknown prefixes → ('unknown', 'unknown') + warning log. +BRANCH_PREFIX_MAP = { + "extract": ("pipeline", "extract"), + "ingestion": ("pipeline", "extract"), + "epimetheus": ("epimetheus", "extract"), + "rio": ("rio", "research"), + "theseus": ("theseus", "research"), + "astra": ("astra", "research"), + "vida": ("vida", "research"), + "clay": ("clay", "research"), + "leo": ("leo", "entity"), + "reweave": ("pipeline", "reweave"), + "fix": ("pipeline", "fix"), +} + + +def classify_branch(branch: str) -> tuple[str, str]: + """Derive (agent, commit_type) from branch prefix. + + Returns ('unknown', 'unknown') and logs a warning for unrecognized prefixes. + """ + prefix = branch.split("/", 1)[0] if "/" in branch else branch + result = BRANCH_PREFIX_MAP.get(prefix) + if result is None: + logger.warning("Unknown branch prefix %r in branch %r — defaulting to ('unknown', 'unknown')", prefix, branch) + return ("unknown", "unknown") + return result + + def migrate(conn: sqlite3.Connection): """Run schema migrations.""" conn.executescript(SCHEMA_SQL) @@ -251,6 +324,121 @@ def migrate(conn: sqlite3.Connection): """) logger.info("Migration v6: added metrics_snapshots table for analytics dashboard") + if current < 7: + # Phase 7: agent attribution + commit_type for dashboard + # commit_type column + backfill agent/commit_type from branch prefix + try: + conn.execute("ALTER TABLE prs ADD COLUMN commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'unknown'))") + except sqlite3.OperationalError: + pass # column already exists from CREATE TABLE + # Backfill agent and commit_type from branch prefix + rows = conn.execute("SELECT number, branch FROM prs WHERE branch IS NOT NULL").fetchall() + for row in rows: + agent, commit_type = classify_branch(row["branch"]) + conn.execute( + "UPDATE prs SET agent = ?, commit_type = ? WHERE number = ? AND (agent IS NULL OR commit_type IS NULL)", + (agent, commit_type, row["number"]), + ) + backfilled = len(rows) + logger.info("Migration v7: added commit_type column, backfilled %d PRs with agent/commit_type", backfilled) + + if current < 8: + # Phase 8: response audit — full-chain visibility for agent response quality + # Captures: query → tool calls → retrieval → context → response → confidence + # Approved by Ganymede (architecture), Rio (agent needs), Rhea (ops) + conn.executescript(""" + CREATE TABLE IF NOT EXISTS response_audit ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (datetime('now')), + chat_id INTEGER, + user TEXT, + agent TEXT DEFAULT 'rio', + model TEXT, + query TEXT, + conversation_window TEXT, -- intentional transcript duplication for audit self-containment + entities_matched TEXT, + claims_matched TEXT, + retrieval_layers_hit TEXT, + retrieval_gap TEXT, + market_data TEXT, + research_context TEXT, + kb_context_text TEXT, + tool_calls TEXT, + raw_response TEXT, + display_response TEXT, + confidence_score REAL, + response_time_ms INTEGER, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp); + CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent); + CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp); + """) + logger.info("Migration v8: added response_audit table for agent response auditing") + + if current < 9: + # Phase 9: rebuild prs table to expand CHECK constraint on commit_type. + # SQLite cannot ALTER CHECK constraints in-place — must rebuild table. + # Old constraint (v7): extract,research,entity,decision,reweave,fix,unknown + # New constraint: adds challenge,enrich,synthesize + # Also re-derive commit_type from branch prefix for rows with invalid/NULL values. + + # Step 1: Get all column names from existing table + cols_info = conn.execute("PRAGMA table_info(prs)").fetchall() + col_names = [c["name"] for c in cols_info] + col_list = ", ".join(col_names) + + # Step 2: Create new table with expanded CHECK constraint + conn.executescript(f""" + CREATE TABLE prs_new ( + number INTEGER PRIMARY KEY, + source_path TEXT REFERENCES sources(path), + branch TEXT, + status TEXT NOT NULL DEFAULT 'open', + domain TEXT, + agent TEXT, + commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown')), + tier TEXT, + tier0_pass INTEGER, + leo_verdict TEXT DEFAULT 'pending', + domain_verdict TEXT DEFAULT 'pending', + domain_agent TEXT, + domain_model TEXT, + priority TEXT, + origin TEXT DEFAULT 'pipeline', + transient_retries INTEGER DEFAULT 0, + substantive_retries INTEGER DEFAULT 0, + last_error TEXT, + last_attempt TEXT, + cost_usd REAL DEFAULT 0, + created_at TEXT DEFAULT (datetime('now')), + merged_at TEXT + ); + INSERT INTO prs_new ({col_list}) SELECT {col_list} FROM prs; + DROP TABLE prs; + ALTER TABLE prs_new RENAME TO prs; + """) + logger.info("Migration v9: rebuilt prs table with expanded commit_type CHECK constraint") + + # Step 3: Re-derive commit_type from branch prefix for invalid/NULL values + rows = conn.execute( + """SELECT number, branch FROM prs + WHERE branch IS NOT NULL + AND (commit_type IS NULL + OR commit_type NOT IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown'))""" + ).fetchall() + fixed = 0 + for row in rows: + agent, commit_type = classify_branch(row["branch"]) + conn.execute( + "UPDATE prs SET agent = COALESCE(agent, ?), commit_type = ? WHERE number = ?", + (agent, commit_type, row["number"]), + ) + fixed += 1 + conn.commit() + logger.info("Migration v9: re-derived commit_type for %d PRs with invalid/NULL values", fixed) + if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", @@ -296,6 +484,27 @@ def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priorit raise +def insert_response_audit(conn: sqlite3.Connection, **kwargs): + """Insert a response audit record. All fields optional except query.""" + cols = [ + "timestamp", "chat_id", "user", "agent", "model", "query", + "conversation_window", "entities_matched", "claims_matched", + "retrieval_layers_hit", "retrieval_gap", "market_data", + "research_context", "kb_context_text", "tool_calls", + "raw_response", "display_response", "confidence_score", + "response_time_ms", + ] + present = {k: v for k, v in kwargs.items() if k in cols and v is not None} + if not present: + return + col_names = ", ".join(present.keys()) + placeholders = ", ".join("?" for _ in present) + conn.execute( + f"INSERT INTO response_audit ({col_names}) VALUES ({placeholders})", + tuple(present.values()), + ) + + def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"): """Set a source's authoritative priority. Used for human overrides and initial triage.""" conn.execute( diff --git a/lib/evaluate.py b/lib/evaluate.py index 3636afb..ddb850b 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -25,7 +25,7 @@ import re from datetime import datetime, timezone from . import config, db -from .domains import agent_for_domain, detect_domain_from_diff +from .domains import agent_for_domain, detect_domain_from_branch, detect_domain_from_diff from .forgejo import api as forgejo_api from .forgejo import get_agent_token, get_pr_diff, repo_path from .llm import run_batch_domain_review, run_domain_review, run_leo_review, triage_pr @@ -556,13 +556,15 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: review_diff = diff files = _extract_changed_files(diff) - # Detect domain + # Detect domain — try diff paths first, then branch prefix, then 'general' domain = detect_domain_from_diff(diff) - agent = agent_for_domain(domain) - - # Default NULL domain to 'general' (archive-only PRs have no domain files) + if domain is None: + pr_row = conn.execute("SELECT branch FROM prs WHERE number = ?", (pr_number,)).fetchone() + if pr_row and pr_row["branch"]: + domain = detect_domain_from_branch(pr_row["branch"]) if domain is None: domain = "general" + agent = agent_for_domain(domain) # Update PR domain if not set conn.execute( @@ -1272,7 +1274,7 @@ def _build_domain_batches( individual.append(row) continue - domain = existing["domain"] if existing and existing["domain"] else "general" + domain = existing["domain"] if existing and existing["domain"] and existing["domain"] != "general" else "general" domain_candidates.setdefault(domain, []).append(row) # Build sized batches per domain diff --git a/lib/merge.py b/lib/merge.py index 00ab1f3..97c610b 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -19,6 +19,7 @@ import shutil from collections import defaultdict from . import config, db +from .db import classify_branch from .domains import detect_domain_from_branch from .forgejo import api as forgejo_api @@ -96,12 +97,13 @@ async def discover_external_prs(conn) -> int: origin = "pipeline" if is_pipeline else "human" priority = "high" if origin == "human" else None domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"]) + agent, commit_type = classify_branch(pr["head"]["ref"]) conn.execute( """INSERT OR IGNORE INTO prs - (number, branch, status, origin, priority, domain) - VALUES (?, ?, 'open', ?, ?, ?)""", - (pr["number"], pr["head"]["ref"], origin, priority, domain), + (number, branch, status, origin, priority, domain, agent, commit_type) + VALUES (?, ?, 'open', ?, ?, ?, ?, ?)""", + (pr["number"], pr["head"]["ref"], origin, priority, domain, agent, commit_type), ) db.audit( conn, @@ -409,37 +411,78 @@ async def _delete_remote_branch(branch: str): # --- Contributor attribution --- -def _classify_commit_type(diff: str) -> str: - """Classify a PR as 'knowledge' or 'pipeline' by files changed. +def _is_knowledge_pr(diff: str) -> bool: + """Check if a PR touches knowledge files (claims, decisions, core, foundations). - Knowledge: claims, decisions, core, foundations (full CI weight) - Pipeline: inbox, entities, agents, archive (zero CI weight) + Knowledge PRs get full CI attribution weight. + Pipeline-only PRs (inbox, entities, agents, archive) get zero CI weight. - Mixed PRs (knowledge + pipeline files) classify as 'knowledge' — - if a PR adds a claim, it gets attribution even if it also moves - source files. Knowledge takes priority. (Ganymede review) + Mixed PRs count as knowledge — if a PR adds a claim, it gets attribution + even if it also moves source files. Knowledge takes priority. (Ganymede review) """ knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/") - pipeline_prefixes = ("inbox/", "entities/", "agents/") - has_knowledge = False for line in diff.split("\n"): if line.startswith("+++ b/") or line.startswith("--- a/"): path = line.split("/", 1)[1] if "/" in line else "" if any(path.startswith(p) for p in knowledge_prefixes): - has_knowledge = True - break + return True - return "knowledge" if has_knowledge else "pipeline" + return False + + +def _refine_commit_type(diff: str, branch_commit_type: str) -> str: + """Refine commit_type from diff content when branch prefix is ambiguous. + + Branch prefix gives initial classification (extract, research, entity, etc.). + For 'extract' branches, diff content can distinguish: + - challenge: adds challenged_by edges to existing claims + - enrich: modifies existing claim frontmatter without new files + - extract: creates new claim files (default for extract branches) + + Only refines 'extract' type — other branch types (research, entity, reweave, fix) + are already specific enough. + """ + if branch_commit_type != "extract": + return branch_commit_type + + new_files = 0 + modified_files = 0 + has_challenge_edge = False + + in_diff_header = False + current_is_new = False + for line in diff.split("\n"): + if line.startswith("diff --git"): + in_diff_header = True + current_is_new = False + elif line.startswith("new file"): + current_is_new = True + elif line.startswith("+++ b/"): + path = line[6:] + if any(path.startswith(p) for p in ("domains/", "core/", "foundations/")): + if current_is_new: + new_files += 1 + else: + modified_files += 1 + in_diff_header = False + elif line.startswith("+") and not line.startswith("+++"): + if "challenged_by:" in line or "challenges:" in line: + has_challenge_edge = True + + if has_challenge_edge and new_files == 0: + return "challenge" + if modified_files > 0 and new_files == 0: + return "enrich" + return "extract" async def _record_contributor_attribution(conn, pr_number: int, branch: str): """Record contributor attribution after a successful merge. Parses git trailers and claim frontmatter to identify contributors - and their roles. Upserts into contributors table. - Pipeline commits (inbox/, entities/, agents/) get commit_type='pipeline' - and don't increment role counts. + and their roles. Upserts into contributors table. Refines commit_type + from diff content. Pipeline-only PRs (no knowledge files) are skipped. """ import re as _re from datetime import date as _date, datetime as _dt @@ -451,14 +494,19 @@ async def _record_contributor_attribution(conn, pr_number: int, branch: str): if not diff: return - # Classify commit type — pipeline commits don't count toward CI - commit_type = _classify_commit_type(diff) - conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (commit_type, pr_number)) - - if commit_type == "pipeline": - logger.info("PR #%d: pipeline commit — skipping CI attribution", pr_number) + # Pipeline-only PRs (inbox, entities, agents) don't count toward CI + if not _is_knowledge_pr(diff): + logger.info("PR #%d: pipeline-only commit — skipping CI attribution", pr_number) return + # Refine commit_type from diff content (branch prefix may be too broad) + row = conn.execute("SELECT commit_type FROM prs WHERE number = ?", (pr_number,)).fetchone() + branch_type = row["commit_type"] if row and row["commit_type"] else "extract" + refined_type = _refine_commit_type(diff, branch_type) + if refined_type != branch_type: + conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number)) + logger.info("PR #%d: commit_type refined %s → %s", pr_number, branch_type, refined_type) + # Parse Pentagon-Agent trailer from branch commit messages agents_found: set[str] = set() rc, log_output = await _git( @@ -612,17 +660,20 @@ def _update_source_frontmatter_status(path: str, new_status: str): logger.warning("Failed to update source status in %s: %s", path, e) -async def _embed_merged_claims(branch_sha: str): +async def _embed_merged_claims(main_sha: str, branch_sha: str): """Embed new/changed claim files from a merged PR into Qdrant. - Finds .md files changed between main~1 and the merged SHA, then calls - embed-claims.py --file for each. Non-fatal — embedding failure does not - block the merge pipeline. + Diffs main_sha (pre-merge main HEAD) against branch_sha (merged branch tip) + to find ALL changed files across the entire branch, not just the last commit. + Also deletes Qdrant vectors for files removed by the branch. + + Non-fatal — embedding failure does not block the merge pipeline. """ try: + # --- Embed added/changed files --- rc, diff_out = await _git( "diff", "--name-only", "--diff-filter=ACMR", - f"{branch_sha}~1", branch_sha, + main_sha, branch_sha, cwd=str(config.MAIN_WORKTREE), timeout=10, ) @@ -638,9 +689,6 @@ async def _embed_merged_claims(branch_sha: str): and not f.split("/")[-1].startswith("_") ] - if not md_files: - return - embedded = 0 for fpath in md_files: full_path = config.MAIN_WORKTREE / fpath @@ -659,6 +707,35 @@ async def _embed_merged_claims(branch_sha: str): if embedded: logger.info("embed: %d/%d files embedded into Qdrant", embedded, len(md_files)) + + # --- Delete vectors for removed files (Ganymede: stale vector cleanup) --- + rc, del_out = await _git( + "diff", "--name-only", "--diff-filter=D", + main_sha, branch_sha, + cwd=str(config.MAIN_WORKTREE), + timeout=10, + ) + if rc == 0 and del_out.strip(): + deleted_files = [ + f for f in del_out.strip().split("\n") + if f.endswith(".md") + and any(f.startswith(d) for d in embed_dirs) + ] + if deleted_files: + import hashlib + point_ids = [hashlib.md5(f.encode()).hexdigest() for f in deleted_files] + try: + import urllib.request + req = urllib.request.Request( + "http://localhost:6333/collections/teleo-claims/points/delete", + data=json.dumps({"points": point_ids}).encode(), + headers={"Content-Type": "application/json"}, + method="POST", + ) + urllib.request.urlopen(req, timeout=10) + logger.info("embed: deleted %d stale vectors from Qdrant", len(point_ids)) + except Exception: + logger.warning("embed: failed to delete stale vectors (non-fatal)") except Exception: logger.exception("embed: post-merge embedding failed (non-fatal)") @@ -882,7 +959,7 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: _archive_source_for_pr(branch, domain) # Embed new/changed claims into Qdrant (non-fatal) - await _embed_merged_claims(branch_sha) + await _embed_merged_claims(main_sha, branch_sha) # Delete remote branch immediately (Ganymede Q4) await _delete_remote_branch(branch) diff --git a/lib/post_extract.py b/lib/post_extract.py index 735786f..7d033cb 100644 --- a/lib/post_extract.py +++ b/lib/post_extract.py @@ -212,7 +212,7 @@ def fix_h1_title_match(content: str, filename: str) -> tuple[str, list[str]]: # ─── Validators (check without modifying, return issues) ────────────────── -def validate_claim(filename: str, content: str, existing_claims: set[str]) -> list[str]: +def validate_claim(filename: str, content: str, existing_claims: set[str], agent: str | None = None) -> list[str]: """Validate a claim file. Returns list of issues (empty = pass).""" issues = [] fm, body = parse_frontmatter(content) @@ -271,7 +271,7 @@ def validate_claim(filename: str, content: str, existing_claims: set[str]) -> li # Attribution check: extractor must be identified. (Leo: block extractor, warn sourcer) if ftype == "claim": from .attribution import validate_attribution - issues.extend(validate_attribution(fm)) + issues.extend(validate_attribution(fm, agent=agent)) # OPSEC check: flag claims containing dollar amounts + internal entity references. # Rio's rule: never extract LivingIP/Teleo deal terms to public codex. (Ganymede review) @@ -358,7 +358,7 @@ def validate_and_fix_claims( all_fixes.extend([f"{filename}:{f}" for f in fixes]) # Phase 2: Validate (after fixes) - issues = validate_claim(filename, content, existing_claims) + issues = validate_claim(filename, content, existing_claims, agent=agent) # Separate hard failures from warnings hard_failures = [i for i in issues if not i.startswith("near_duplicate")] @@ -504,6 +504,24 @@ def _rebuild_content(fm: dict, body: str) -> str: def _yaml_line(key: str, val) -> str: """Format a single YAML key-value line.""" + if isinstance(val, dict): + # Nested YAML block (e.g. attribution with sub-keys) + lines = [f"{key}:"] + for sub_key, sub_val in val.items(): + if isinstance(sub_val, list) and sub_val: + lines.append(f" {sub_key}:") + for item in sub_val: + if isinstance(item, dict): + first = True + for ik, iv in item.items(): + prefix = " - " if first else " " + lines.append(f'{prefix}{ik}: "{iv}"') + first = False + else: + lines.append(f' - "{item}"') + else: + lines.append(f" {sub_key}: []") + return "\n".join(lines) if isinstance(val, list): return f"{key}: {json.dumps(val)}" if isinstance(val, bool): diff --git a/lib/stale_pr.py b/lib/stale_pr.py new file mode 100644 index 0000000..0a0d009 --- /dev/null +++ b/lib/stale_pr.py @@ -0,0 +1,220 @@ +"""Stale PR monitor — auto-close extraction PRs that produced no claims. + +Catches the failure mode where batch-extract creates a PR but extraction +produces only source-file updates (no actual claims). These PRs sit open +indefinitely, consuming merge queue bandwidth and confusing metrics. + +Rules: + - PR branch starts with "extract/" + - PR is open for >30 minutes + - PR diff contains 0 files in domains/*/ or decisions/*/ + → Auto-close with comment, log to audit_log as stale_extraction_closed + + - If same source branch has been stale-closed 2+ times + → Mark source as extraction_failed in pipeline.db sources table + +Called from the pipeline daemon (piggyback on validate_cycle interval) +or standalone via: python3 -m lib.stale_pr + +Owner: Epimetheus +""" + +import logging +import json +import os +import re +import sqlite3 +import urllib.request +from datetime import datetime, timedelta, timezone + +from . import config + +logger = logging.getLogger("pipeline.stale_pr") + +STALE_THRESHOLD_MINUTES = 30 +MAX_STALE_FAILURES = 2 # After this many stale closures, mark source as failed + + +def _forgejo_api(method: str, path: str, body: dict | None = None) -> dict | list | None: + """Call Forgejo API. Returns parsed JSON or None on failure.""" + token_file = config.FORGEJO_TOKEN_FILE + if not token_file.exists(): + logger.error("No Forgejo token at %s", token_file) + return None + token = token_file.read_text().strip() + + url = f"{config.FORGEJO_URL}/api/v1/{path}" + data = json.dumps(body).encode() if body else None + req = urllib.request.Request( + url, + data=data, + headers={ + "Authorization": f"token {token}", + "Content-Type": "application/json", + }, + method=method, + ) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + return json.loads(resp.read()) + except Exception as e: + logger.warning("Forgejo API %s %s failed: %s", method, path, e) + return None + + +def _pr_has_claim_files(pr_number: int) -> bool: + """Check if a PR's diff contains any files in domains/ or decisions/.""" + diff_data = _forgejo_api("GET", f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/files") + if not diff_data or not isinstance(diff_data, list): + return False + + for file_entry in diff_data: + filename = file_entry.get("filename", "") + if filename.startswith("domains/") or filename.startswith("decisions/"): + # Check it's a .md file, not a directory marker + if filename.endswith(".md"): + return True + return False + + +def _close_pr(pr_number: int, reason: str) -> bool: + """Close a PR with a comment explaining why.""" + # Add comment + _forgejo_api("POST", + f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + {"body": f"Auto-closed by stale PR monitor: {reason}\n\nPentagon-Agent: Epimetheus"}, + ) + # Close PR + result = _forgejo_api("PATCH", + f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}", + {"state": "closed"}, + ) + return result is not None + + +def _log_audit(conn: sqlite3.Connection, pr_number: int, branch: str): + """Log stale closure to audit_log.""" + try: + conn.execute( + "INSERT INTO audit_log (timestamp, stage, event, detail) VALUES (datetime('now'), ?, ?, ?)", + ("monitor", "stale_extraction_closed", json.dumps({"pr": pr_number, "branch": branch})), + ) + conn.commit() + except Exception as e: + logger.warning("Audit log write failed: %s", e) + + +def _count_stale_closures(conn: sqlite3.Connection, branch: str) -> int: + """Count how many times this branch has been stale-closed.""" + try: + row = conn.execute( + "SELECT COUNT(*) FROM audit_log WHERE event = 'stale_extraction_closed' AND detail LIKE ?", + (f'%"branch": "{branch}"%',), + ).fetchone() + return row[0] if row else 0 + except Exception: + return 0 + + +def _mark_source_failed(conn: sqlite3.Connection, branch: str): + """Mark the source as extraction_failed after repeated stale closures.""" + # Extract source name from branch: extract/source-name → source-name + source_name = branch.removeprefix("extract/") + try: + conn.execute( + "UPDATE sources SET status = 'extraction_failed', last_error = 'repeated_stale_extraction', updated_at = datetime('now') WHERE path LIKE ?", + (f"%{source_name}%",), + ) + conn.commit() + logger.info("Marked source %s as extraction_failed (repeated stale closures)", source_name) + except Exception as e: + logger.warning("Failed to mark source as failed: %s", e) + + +def check_stale_prs(conn: sqlite3.Connection) -> tuple[int, int]: + """Check for and close stale extraction PRs. + + Returns (closed_count, error_count). + """ + closed = 0 + errors = 0 + + # Fetch all open PRs (paginated) + page = 1 + all_prs = [] + while True: + prs = _forgejo_api("GET", + f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls?state=open&limit=50&page={page}") + if not prs: + break + all_prs.extend(prs) + if len(prs) < 50: + break + page += 1 + + now = datetime.now(timezone.utc) + + for pr in all_prs: + branch = pr.get("head", {}).get("ref", "") + if not branch.startswith("extract/"): + continue + + # Check age + created_str = pr.get("created_at", "") + if not created_str: + continue + try: + # Forgejo returns ISO format with Z suffix + created = datetime.fromisoformat(created_str.replace("Z", "+00:00")) + except ValueError: + continue + + age_minutes = (now - created).total_seconds() / 60 + if age_minutes < STALE_THRESHOLD_MINUTES: + continue + + pr_number = pr["number"] + + # Check if PR has claim files + if _pr_has_claim_files(pr_number): + continue # PR has claims — not stale + + # PR is stale — close it + logger.info("Stale PR #%d: branch=%s, age=%.0f min, no claim files — closing", + pr_number, branch, age_minutes) + + if _close_pr(pr_number, f"No claim files after {int(age_minutes)} minutes. Branch: {branch}"): + closed += 1 + _log_audit(conn, pr_number, branch) + + # Check for repeated failures + failure_count = _count_stale_closures(conn, branch) + if failure_count >= MAX_STALE_FAILURES: + _mark_source_failed(conn, branch) + logger.warning("Source %s marked as extraction_failed after %d stale closures", + branch, failure_count) + else: + errors += 1 + logger.warning("Failed to close stale PR #%d", pr_number) + + if closed: + logger.info("Stale PR monitor: closed %d PRs", closed) + + return closed, errors + + +# Allow standalone execution +if __name__ == "__main__": + import sys + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + + db_path = config.DB_PATH + if not db_path.exists(): + print(f"ERROR: Database not found at {db_path}", file=sys.stderr) + sys.exit(1) + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + closed, errs = check_stale_prs(conn) + print(f"Stale PR monitor: {closed} closed, {errs} errors") + conn.close() diff --git a/lib/watchdog.py b/lib/watchdog.py index e6b2ebd..af7d86f 100644 --- a/lib/watchdog.py +++ b/lib/watchdog.py @@ -19,6 +19,7 @@ import logging from datetime import datetime, timezone from . import config, db +from .stale_pr import check_stale_prs logger = logging.getLogger("pipeline.watchdog") @@ -115,6 +116,26 @@ async def watchdog_check(conn) -> dict: "action": "Check validate.py — may be the modified-file or wiki-link bug recurring", }) + # 6. Stale extraction PRs: open >30 min with no claim files + try: + stale_closed, stale_errors = check_stale_prs(conn) + if stale_closed > 0: + issues.append({ + "type": "stale_prs_closed", + "severity": "info", + "detail": f"Auto-closed {stale_closed} stale extraction PRs (no claims after {30} min)", + "action": "Check batch-extract logs for extraction failures", + }) + if stale_errors > 0: + issues.append({ + "type": "stale_pr_close_failed", + "severity": "warning", + "detail": f"Failed to close {stale_errors} stale PRs", + "action": "Check Forgejo API connectivity", + }) + except Exception as e: + logger.warning("Stale PR check failed: %s", e) + # Log issues healthy = len(issues) == 0 if not healthy: @@ -124,7 +145,7 @@ async def watchdog_check(conn) -> dict: else: logger.info("WATCHDOG: %s — %s", issue["type"], issue["detail"]) - return {"healthy": healthy, "issues": issues, "checks_run": 5} + return {"healthy": healthy, "issues": issues, "checks_run": 6} async def watchdog_cycle(conn, max_workers=None) -> tuple[int, int]: diff --git a/openrouter-extract-v2.py b/openrouter-extract-v2.py index 04eaeeb..b8a677c 100644 --- a/openrouter-extract-v2.py +++ b/openrouter-extract-v2.py @@ -40,6 +40,7 @@ from lib.post_extract import ( validate_and_fix_claims, validate_and_fix_entities, ) +from lib.connect import connect_new_claims # ─── Source registration (Argus: pipeline funnel tracking) ───────────────── @@ -455,6 +456,21 @@ def main(): written.append(filename) print(f" Wrote: {claim_path}") + # ── Atomic connect: wire new claims to existing KB via vector search ── + connect_stats = {"connected": 0, "edges_added": 0} + if written: + written_paths = [os.path.join(domain_dir, f) for f in written] + try: + connect_stats = connect_new_claims(written_paths, domain=domain) + if connect_stats["connected"] > 0: + print(f" Connected: {connect_stats['connected']}/{len(written)} claims → {connect_stats['edges_added']} edges") + for conn in connect_stats.get("connections", []): + print(f" {conn['claim']} → {', '.join(n[:40] for n in conn['neighbors'][:3])}") + if connect_stats.get("skipped_embed_failed"): + print(f" WARN: {connect_stats['skipped_embed_failed']} claims failed embedding (Qdrant unreachable?)") + except Exception as e: + print(f" WARN: Extract-and-connect failed (non-fatal): {e}", file=sys.stderr) + # ── Apply enrichments ── enriched = [] for enr in enrichments: @@ -616,6 +632,7 @@ def main(): print(f" Model: {args.model} ({p1_in} in / {p1_out} out)") print(f" Pass 2: Python validator ($0)") print(f" Claims: {len(written)} written, {claim_stats['rejected']} rejected, {claim_stats['fixed']} auto-fixed") + print(f" Connected: {connect_stats.get('connected', 0)} claims → {connect_stats.get('edges_added', 0)} edges (Qdrant)") print(f" Enrichments: {len(enriched)} applied") if entities_enqueued: print(f" Entities: {len(entities_enqueued)} enqueued (applied by batch on main)") diff --git a/ops/reconcile-source-status.sh b/ops/reconcile-source-status.sh new file mode 100755 index 0000000..61f2785 --- /dev/null +++ b/ops/reconcile-source-status.sh @@ -0,0 +1,115 @@ +#!/bin/bash +# Reconcile source archive status: mark sources as processed if claims already exist +# Usage: ./reconcile-source-status.sh [--apply] +# Default: dry-run (preview only) +# --apply: actually modify files + +CODEX_DIR="/Users/coryabdalla/Pentagon/teleo-codex" +ARCHIVE_DIR="$CODEX_DIR/inbox/archive" +DOMAINS_DIR="$CODEX_DIR/domains" + +MODE="dry-run" +[[ "${1:-}" == "--apply" ]] && MODE="apply" + +echo "=== Source Status Reconciliation ===" +echo "Mode: $MODE" +echo "" + +matched=0 +null_result=0 +skipped=0 +already_ok=0 + +while read -r src; do + # Only process unprocessed sources + status=$(grep "^status:" "$src" 2>/dev/null | head -1 | sed 's/^status: *//') + if [[ "$status" != "unprocessed" ]]; then + already_ok=$((already_ok + 1)) + continue + fi + + url=$(grep "^url:" "$src" 2>/dev/null | head -1 | sed 's/^url: *"*//;s/"*$//') + title=$(grep "^title:" "$src" 2>/dev/null | head -1 | sed 's/^title: *"*//;s/"*$//') + fname=$(basename "$src") + + # Check 1: Is this a test/spam source? + is_test=false + if echo "$title" | grep -qiE "^(Futardio: )?test[ -]"; then + is_test=true + fi + + # Check 2: URL-based match — search for the unique URL identifier in claims + url_matched=false + if [[ -n "$url" ]]; then + # Extract the unique hash/slug from the URL (the long alphanumeric key) + url_key=$(echo "$url" | grep -oE '[A-Za-z0-9]{20,}' | tail -1 || true) + if [[ -n "$url_key" ]]; then + if grep -rq "$url_key" "$DOMAINS_DIR" 2>/dev/null; then + url_matched=true + fi + fi + # Also try the full URL domain+path + if ! $url_matched; then + # Try matching the last path segment + path_seg=$(echo "$url" | grep -oE '[^/]+$' || true) + if [[ -n "$path_seg" ]] && [[ ${#path_seg} -gt 10 ]]; then + if grep -rq "$path_seg" "$DOMAINS_DIR" 2>/dev/null; then + url_matched=true + fi + fi + fi + fi + + # Check 3: Title match — search for a distinctive part of the title in claim source: fields + title_matched=false + if [[ -n "$title" ]]; then + # Strip "Futardio: " prefix and grab a distinctive portion + clean_title=$(echo "$title" | sed 's/^Futardio: //') + # Use first 30 chars as search key (enough to be distinctive) + title_key=$(echo "$clean_title" | cut -c1-30) + if [[ ${#title_key} -gt 8 ]]; then + if grep -rqi "$title_key" "$DOMAINS_DIR" 2>/dev/null; then + title_matched=true + fi + fi + fi + + if $is_test; then + echo " NULL-RESULT (test/spam): $fname" + null_result=$((null_result + 1)) + if [[ "$MODE" == "apply" ]]; then + sed -i '' "s/^status: unprocessed/status: null-result/" "$src" + if ! grep -q "^processed_by:" "$src"; then + sed -i '' "/^status: null-result/a\\ +processed_by: epimetheus-reconcile\\ +processed_date: $(date +%Y-%m-%d)\\ +notes: \"auto-reconciled: test/spam source\"" "$src" + fi + fi + elif $url_matched || $title_matched; then + match_type="" + $url_matched && match_type="url" || true + $title_matched && match_type="${match_type:+$match_type+}title" || true + echo " PROCESSED ($match_type): $fname" + matched=$((matched + 1)) + if [[ "$MODE" == "apply" ]]; then + sed -i '' "s/^status: unprocessed/status: processed/" "$src" + if ! grep -q "^processed_by:" "$src"; then + sed -i '' "/^status: processed/a\\ +processed_by: epimetheus-reconcile\\ +processed_date: $(date +%Y-%m-%d)\\ +notes: \"auto-reconciled: claims found matching this source\"" "$src" + fi + fi + else + skipped=$((skipped + 1)) + fi +done < <(find "$ARCHIVE_DIR" -name "*.md" -type f) + +echo "" +echo "=== Summary ===" +echo "Already correct status: $already_ok" +echo "Matched → processed: $matched" +echo "Test/spam → null-result: $null_result" +echo "Still unprocessed: $skipped" +echo "Total archive files: $(find "$ARCHIVE_DIR" -name '*.md' -type f 2>/dev/null | wc -l | tr -d ' ')" diff --git a/reconcile-sources.py b/reconcile-sources.py new file mode 100644 index 0000000..e004eb7 --- /dev/null +++ b/reconcile-sources.py @@ -0,0 +1,450 @@ +#!/usr/bin/env python3 +""" +Reconcile archive source status and add bidirectional links. + +Matches unprocessed archive sources to existing decisions, entities, and claims. +Updates status to 'processed' or 'null-result' and adds frontmatter links. + +Linking pattern (Ganymede Option A — frontmatter only): + - Archive sources get `derived_items:` listing decision/entity paths + - Decisions/entities get `source_archive:` pointing to archive source path + - All paths relative to repo root + +Usage: + python3 reconcile-sources.py [--apply] # default: dry-run + python3 reconcile-sources.py --apply # apply changes +""" + +import os +import re +import sys +from pathlib import Path +from urllib.parse import urlparse +from collections import defaultdict + +REPO_ROOT = Path("/opt/teleo-eval/workspaces/main") +ARCHIVE_DIR = REPO_ROOT / "inbox" / "archive" +DECISIONS_DIR = REPO_ROOT / "decisions" +ENTITIES_DIR = REPO_ROOT / "entities" +DOMAINS_DIR = REPO_ROOT / "domains" + +DRY_RUN = "--apply" not in sys.argv + +# --- YAML frontmatter helpers --- + +def read_frontmatter(filepath): + """Read file, return (frontmatter_text, body_text, raw_content).""" + content = filepath.read_text(encoding="utf-8") + if not content.startswith("---"): + return None, content, content + end = content.find("\n---", 3) + if end == -1: + return None, content, content + fm = content[3:end].strip() + body = content[end + 4:] # skip \n--- + return fm, body, content + + +def get_field(fm_text, field): + """Get a single YAML field value from frontmatter text.""" + if fm_text is None: + return None + m = re.search(rf'^{field}:\s*["\']?(.+?)["\']?\s*$', fm_text, re.MULTILINE) + return m.group(1) if m else None + + +def get_status(fm_text): + return get_field(fm_text, "status") + + +def get_url(fm_text): + return get_field(fm_text, "url") + + +def get_proposal_url(fm_text): + return get_field(fm_text, "proposal_url") + + +def get_title(fm_text): + return get_field(fm_text, "title") + + +def extract_hash_from_url(url): + """Extract the proposal hash (last path segment) from a URL.""" + if not url: + return None + parsed = urlparse(url.strip('"').strip("'")) + parts = [p for p in parsed.path.split("/") if p] + if parts: + last = parts[-1] + # Proposal hashes are base58-like, 32-50 chars + if len(last) >= 20 and re.match(r'^[A-Za-z0-9]+$', last): + return last + return None + + +def rel_path(filepath): + """Get path relative to repo root.""" + return str(filepath.relative_to(REPO_ROOT)) + + +# --- Test/spam detection --- + +TEST_PATTERNS = [ + r'\btest\b', r'\btesting\b', r'\bmy-test\b', r'\bq\b$', + r'\ba-very-unique', r'\btext-mint', r'\bsample\b', + r'\basdf\b', r'\bfoo\b', r'\bbar\b', r'\bhello-world\b', + r'\bgrpc-indexer\b', r'\brocks{0,2}wd\b', + r'spending-limit', r'\btest-proposal\b', + r'\bdummy\b', +] +TEST_RE = re.compile('|'.join(TEST_PATTERNS), re.IGNORECASE) + +# Title-based patterns +TEST_TITLE_PATTERNS = [ + r'^test\b', r'^testing\b', r'^q$', r'^a$', r'^asdf', + r'^my test', r'^sample', r'^hello', + r'text mint ix', r'a very unique title', + r'testing spending limit', r'testing.*grpc', + r'my-test-proposal', +] +TEST_TITLE_RE = re.compile('|'.join(TEST_TITLE_PATTERNS), re.IGNORECASE) + + +def is_test_spam(filepath, fm_text): + """Detect test/spam sources.""" + name = filepath.stem + if TEST_RE.search(name): + return True + title = get_title(fm_text) or "" + if TEST_TITLE_RE.search(title): + return True + return False + + +# --- Build indexes --- + +def build_decision_hash_index(): + """Map proposal hash → decision file path.""" + index = {} + if not DECISIONS_DIR.exists(): + return index + for f in DECISIONS_DIR.rglob("*.md"): + fm, _, _ = read_frontmatter(f) + url = get_proposal_url(fm) + h = extract_hash_from_url(url) + if h: + index[h] = f + return index + + +def build_entity_name_index(): + """Map normalized entity name → entity file path.""" + index = {} + if not ENTITIES_DIR.exists(): + return index + for f in ENTITIES_DIR.rglob("*.md"): + # Use filename as entity name + name = f.stem.lower().replace("-", " ").replace("_", " ") + index[name] = f + return index + + +def build_claim_source_index(): + """Map archive source slug → list of claim file paths (via wiki-links).""" + index = defaultdict(list) + if not DOMAINS_DIR.exists(): + return index + for f in DOMAINS_DIR.rglob("*.md"): + try: + content = f.read_text(encoding="utf-8") + except Exception: + continue + # Find wiki-links to archive: [[inbox/archive/...]] + for m in re.finditer(r'\[\[inbox/archive/([^\]]+)\]\]', content): + slug = m.group(1) + index[slug].append(f) + return index + + +# --- Frontmatter modification --- + +def add_frontmatter_field(filepath, field_name, field_value): + """Add a YAML field to frontmatter. Returns modified content or None if already present.""" + content = filepath.read_text(encoding="utf-8") + if not content.startswith("---"): + return None + + end = content.find("\n---", 3) + if end == -1: + return None + + fm = content[3:end] + + # Check if field already exists + if re.search(rf'^{field_name}:', fm, re.MULTILINE): + return None # Already has this field + + # Add before closing --- + if isinstance(field_value, list): + lines = f"\n{field_name}:" + for v in field_value: + lines += f'\n - "{v}"' + new_fm = fm.rstrip() + lines + "\n" + else: + new_fm = fm.rstrip() + f'\n{field_name}: "{field_value}"\n' + + return "---" + new_fm + "---" + content[end + 4:] + + +def set_status(filepath, new_status): + """Change status field in frontmatter.""" + content = filepath.read_text(encoding="utf-8") + if not content.startswith("---"): + return None + # Replace status field + new_content = re.sub( + r'^(status:\s*).*$', + f'\\1{new_status}', + content, + count=1, + flags=re.MULTILINE + ) + if new_content == content: + return None + return new_content + + +# --- Main reconciliation --- + +def main(): + print(f"{'DRY RUN' if DRY_RUN else 'APPLYING CHANGES'}") + print(f"Repo root: {REPO_ROOT}") + print() + + # Build indexes + print("Building indexes...") + decision_hash_idx = build_decision_hash_index() + print(f" Decision hash index: {len(decision_hash_idx)} entries") + + entity_name_idx = build_entity_name_index() + print(f" Entity name index: {len(entity_name_idx)} entries") + + claim_source_idx = build_claim_source_index() + print(f" Claim source index: {len(claim_source_idx)} entries") + print() + + # Find all unprocessed archive sources + unprocessed = [] + for f in sorted(ARCHIVE_DIR.rglob("*.md")): + if ".extraction-debug" in str(f): + continue + fm, _, _ = read_frontmatter(f) + if get_status(fm) == "unprocessed": + unprocessed.append(f) + + print(f"Found {len(unprocessed)} unprocessed sources") + print() + + # Categorize and match + matched = [] # (source_path, [target_paths], match_type) + test_spam = [] + futardio_unmatched = [] # futardio proposals with no KB output → null-result + genuine_backlog = [] # non-futardio sources still awaiting extraction → keep unprocessed + + def is_futardio_source(filepath): + """Check if file is a futardio/metadao governance proposal (not research).""" + name = filepath.name.lower() + return "futardio" in name + + for src in unprocessed: + fm, _, _ = read_frontmatter(src) + + # Check test/spam first + if is_test_spam(src, fm): + test_spam.append(src) + continue + + targets = [] + match_types = [] + + # Match 1: proposal hash → decision + url = get_url(fm) + src_hash = extract_hash_from_url(url) + if src_hash and src_hash in decision_hash_idx: + targets.append(decision_hash_idx[src_hash]) + match_types.append("hash→decision") + + # Match 2: wiki-links from claims + # Try multiple slug variants + src_rel = rel_path(src) + slug_no_ext = src_rel.replace("inbox/archive/", "").replace(".md", "") + # Also try just the filename without extension + slug_basename = src.stem + for slug in [slug_no_ext, slug_basename]: + if slug in claim_source_idx: + for claim_path in claim_source_idx[slug]: + if claim_path not in targets: + targets.append(claim_path) + match_types.append("wiki→claim") + + # Match 3: entity name matching (for launches/fundraises) + title = get_title(fm) or "" + # Extract project name from title like "Futardio: ProjectName ..." + title_match = re.match(r'Futardio:\s*(.+?)(?:\s*[-—]|\s+Launch|\s+Fundraise|$)', title, re.IGNORECASE) + if title_match: + project_name = title_match.group(1).strip().lower().replace("-", " ") + if project_name in entity_name_idx: + entity_path = entity_name_idx[project_name] + if entity_path not in targets: + targets.append(entity_path) + match_types.append("name→entity") + + if targets: + matched.append((src, targets, match_types)) + elif is_futardio_source(src): + futardio_unmatched.append(src) + else: + genuine_backlog.append(src) + + print(f"Results:") + print(f" Matched: {len(matched)}") + print(f" Test/spam: {len(test_spam)}") + print(f" Futardio unmatched (→ null-result): {len(futardio_unmatched)}") + print(f" Genuine backlog (kept unprocessed): {len(genuine_backlog)}") + print() + + # Validate all link targets exist + broken_links = [] + for src, targets, _ in matched: + for t in targets: + if isinstance(t, Path) and not t.exists(): + broken_links.append((src, t)) + + if broken_links: + print(f"ERROR: {len(broken_links)} broken link targets!") + for src, target in broken_links: + print(f" {rel_path(src)} → {rel_path(target)}") + if not DRY_RUN: + print("Aborting — fix broken links first.") + sys.exit(1) + + # Show match samples + print("Sample matches:") + for src, targets, types in matched[:5]: + print(f" {src.name}") + for t, mt in zip(targets, types): + print(f" → {rel_path(t)} ({mt})") + print() + + # Show test/spam samples + if test_spam: + print(f"Test/spam samples ({len(test_spam)} total):") + for src in test_spam[:5]: + print(f" {src.name}") + print() + + # Show futardio unmatched samples + if futardio_unmatched: + print(f"Futardio unmatched samples ({len(futardio_unmatched)} total):") + for src in futardio_unmatched[:10]: + print(f" {src.name}") + print() + + # Show genuine backlog + if genuine_backlog: + print(f"Genuine backlog — kept unprocessed ({len(genuine_backlog)} total):") + from collections import Counter + backlog_domains = Counter() + for src in genuine_backlog: + parts = src.relative_to(ARCHIVE_DIR).parts + domain = parts[0] if len(parts) > 1 else "root" + backlog_domains[domain] += 1 + for d, c in backlog_domains.most_common(): + print(f" {d}: {c}") + print() + + if DRY_RUN: + print("=== DRY RUN — no changes made. Use --apply to apply. ===") + return + + # --- Apply changes --- + files_modified = 0 + links_created = 0 + + # 1. Matched sources → processed + bidirectional links + for src, targets, _ in matched: + # Update source status + new_content = set_status(src, "processed") + if new_content: + # Also add derived_items + decision_entity_targets = [ + rel_path(t) for t in targets + if isinstance(t, Path) and ( + str(t).startswith(str(DECISIONS_DIR)) or + str(t).startswith(str(ENTITIES_DIR)) + ) + ] + if decision_entity_targets: + # Add derived_items to the already-modified content + # Write status change first, then add field + src.write_text(new_content, encoding="utf-8") + linked = add_frontmatter_field(src, "derived_items", decision_entity_targets) + if linked: + src.write_text(linked, encoding="utf-8") + links_created += len(decision_entity_targets) + else: + src.write_text(new_content, encoding="utf-8") + files_modified += 1 + + # Add source_archive to decision/entity targets + src_rel = rel_path(src) + for t in targets: + if isinstance(t, Path) and ( + str(t).startswith(str(DECISIONS_DIR)) or + str(t).startswith(str(ENTITIES_DIR)) + ): + linked = add_frontmatter_field(t, "source_archive", src_rel) + if linked: + t.write_text(linked, encoding="utf-8") + files_modified += 1 + links_created += 1 + + # 2. Test/spam → null-result + for src in test_spam: + new_content = set_status(src, "null-result") + if new_content: + src.write_text(new_content, encoding="utf-8") + files_modified += 1 + + # 3. Futardio unmatched → null-result (no extraction output, won't be re-extracted) + for src in futardio_unmatched: + new_content = set_status(src, "null-result") + if new_content: + src.write_text(new_content, encoding="utf-8") + files_modified += 1 + + # 4. Genuine backlog → KEEP unprocessed (these are real extraction targets) + # No changes needed + + print(f"\n=== APPLIED ===") + print(f"Files modified: {files_modified}") + print(f"Bidirectional links created: {links_created}") + print(f"Matched → processed: {len(matched)}") + print(f"Test/spam → null-result: {len(test_spam)}") + print(f"Futardio unmatched → null-result: {len(futardio_unmatched)}") + print(f"Genuine backlog → kept unprocessed: {len(genuine_backlog)}") + + # Verify + remaining = 0 + for f in ARCHIVE_DIR.rglob("*.md"): + if ".extraction-debug" in str(f): + continue + fm, _, _ = read_frontmatter(f) + if get_status(fm) == "unprocessed": + remaining += 1 + print(f"\nRemaining unprocessed: {remaining}") + + +if __name__ == "__main__": + main() diff --git a/reweave.py b/reweave.py new file mode 100644 index 0000000..5c00427 --- /dev/null +++ b/reweave.py @@ -0,0 +1,901 @@ +#!/usr/bin/env python3 +"""Orphan Reweave — connect isolated claims via vector similarity + Haiku classification. + +Finds claims with zero incoming links (orphans), uses Qdrant to find semantically +similar neighbors, classifies the relationship with Haiku, and writes edges on the +neighbor's frontmatter pointing TO the orphan. + +Usage: + python3 reweave.py --dry-run # Show what would be connected + python3 reweave.py --max-orphans 50 # Process up to 50 orphans + python3 reweave.py --threshold 0.72 # Override similarity floor + +Design: + - Orphan = zero incoming links (no other claim's supports/challenges/related/depends_on points to it) + - Write edge on NEIGHBOR (not orphan) so orphan gains an incoming link + - Haiku classifies: supports | challenges | related (>=0.85 confidence for supports/challenges) + - reweave_edges parallel field for tooling-readable provenance + - Single PR per run for Leo review + +Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887> +""" + +import argparse +import datetime +import hashlib +import json +import logging +import os +import re +import subprocess +import sys +import time +import urllib.request +from pathlib import Path + +import yaml + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger("reweave") + +# --- Config --- +REPO_DIR = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")) +SECRETS_DIR = Path(os.environ.get("SECRETS_DIR", "/opt/teleo-eval/secrets")) +QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") +QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION", "teleo-claims") +FORGEJO_URL = os.environ.get("FORGEJO_URL", "http://localhost:3000") + +EMBED_DIRS = ["domains", "core", "foundations", "decisions", "entities"] +EDGE_FIELDS = ("supports", "challenges", "depends_on", "related") +WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]") + +# Thresholds (from calibration data — Mar 28) +DEFAULT_THRESHOLD = 0.70 # Elbow in score distribution +DEFAULT_MAX_ORPHANS = 50 # Keep PRs reviewable +DEFAULT_MAX_NEIGHBORS = 3 # Don't over-connect +HAIKU_CONFIDENCE_FLOOR = 0.85 # Below this → default to "related" +PER_FILE_EDGE_CAP = 10 # Max total reweave edges per neighbor file + +# Domain processing order: diversity first, internet-finance last (Leo) +DOMAIN_PRIORITY = [ + "ai-alignment", "health", "space-development", "entertainment", + "creative-industries", "collective-intelligence", "governance", + # internet-finance last — batch-imported futarchy cluster, lower cross-domain value + "internet-finance", +] + + +# ─── Orphan Detection ──────────────────────────────────────────────────────── + + +def _parse_frontmatter(path: Path) -> dict | None: + """Parse YAML frontmatter from a markdown file. Returns dict or None.""" + try: + text = path.read_text(errors="replace") + except Exception: + return None + if not text.startswith("---"): + return None + end = text.find("\n---", 3) + if end == -1: + return None + try: + fm = yaml.safe_load(text[3:end]) + return fm if isinstance(fm, dict) else None + except Exception: + return None + + +def _get_body(path: Path) -> str: + """Get body text (after frontmatter) from a markdown file.""" + try: + text = path.read_text(errors="replace") + except Exception: + return "" + if not text.startswith("---"): + return text + end = text.find("\n---", 3) + if end == -1: + return text + return text[end + 4:].strip() + + +def _get_edge_targets(path: Path) -> list[str]: + """Extract all outgoing edge targets from a claim's frontmatter + wiki links.""" + targets = [] + fm = _parse_frontmatter(path) + if fm: + for field in EDGE_FIELDS: + val = fm.get(field) + if isinstance(val, list): + targets.extend(str(v).strip().lower() for v in val if v) + elif isinstance(val, str) and val.strip(): + targets.append(val.strip().lower()) + # Also check reweave_edges (from previous runs) + rw = fm.get("reweave_edges") + if isinstance(rw, list): + targets.extend(str(v).strip().lower() for v in rw if v) + + # Wiki links in body + try: + text = path.read_text(errors="replace") + end = text.find("\n---", 3) + if end > 0: + body = text[end + 4:] + for link in WIKI_LINK_RE.findall(body): + targets.append(link.strip().lower()) + except Exception: + pass + + return targets + + +def _claim_name_variants(path: Path, repo_root: Path = None) -> list[str]: + """Generate name variants for a claim file (used for incoming link matching). + + A claim at domains/ai-alignment/rlhf-reward-hacking.md could be referenced as: + - "rlhf-reward-hacking" + - "rlhf reward hacking" + - "RLHF reward hacking" (title case) + - The actual 'name' or 'title' from frontmatter + - "domains/ai-alignment/rlhf-reward-hacking" (relative path without .md) + """ + variants = set() + stem = path.stem + variants.add(stem.lower()) + variants.add(stem.lower().replace("-", " ")) + + # Also match by relative path (Ganymede Q1: some edges use path references) + if repo_root: + try: + rel = str(path.relative_to(repo_root)).removesuffix(".md") + variants.add(rel.lower()) + except ValueError: + pass + + fm = _parse_frontmatter(path) + if fm: + for key in ("name", "title"): + val = fm.get(key) + if isinstance(val, str) and val.strip(): + variants.add(val.strip().lower()) + + return list(variants) + + +def find_all_claims(repo_root: Path) -> list[Path]: + """Find all knowledge files (claim, framework, entity, decision) in the KB.""" + claims = [] + for d in EMBED_DIRS: + base = repo_root / d + if not base.is_dir(): + continue + for md in base.rglob("*.md"): + if md.name.startswith("_"): + continue + fm = _parse_frontmatter(md) + if fm and fm.get("type") not in ("source", "musing", None): + claims.append(md) + return claims + + +def build_reverse_link_index(claims: list[Path]) -> dict[str, set[Path]]: + """Build a reverse index: claim_name_variant → set of files that link TO it. + + For each claim, extract all outgoing edges. For each target name, record + the source claim as an incoming link for that target. + """ + # name_variant → set of source paths that point to it + incoming: dict[str, set[Path]] = {} + + for claim_path in claims: + targets = _get_edge_targets(claim_path) + for target in targets: + if target not in incoming: + incoming[target] = set() + incoming[target].add(claim_path) + + return incoming + + +def find_orphans(claims: list[Path], incoming: dict[str, set[Path]], + repo_root: Path = None) -> list[Path]: + """Find claims with zero incoming links.""" + orphans = [] + for claim_path in claims: + variants = _claim_name_variants(claim_path, repo_root) + has_incoming = any( + len(incoming.get(v, set()) - {claim_path}) > 0 + for v in variants + ) + if not has_incoming: + orphans.append(claim_path) + return orphans + + +def sort_orphans_by_domain(orphans: list[Path], repo_root: Path) -> list[Path]: + """Sort orphans by domain priority (diversity first, internet-finance last).""" + def domain_key(path: Path) -> tuple[int, str]: + rel = path.relative_to(repo_root) + parts = rel.parts + domain = "" + if len(parts) >= 2 and parts[0] in ("domains", "entities", "decisions"): + domain = parts[1] + elif parts[0] == "foundations" and len(parts) >= 2: + domain = parts[1] + elif parts[0] == "core": + domain = "core" + + try: + priority = DOMAIN_PRIORITY.index(domain) + except ValueError: + # Unknown domain goes before internet-finance but after known ones + priority = len(DOMAIN_PRIORITY) - 1 + + return (priority, path.stem) + + return sorted(orphans, key=domain_key) + + +# ─── Qdrant Search ─────────────────────────────────────────────────────────── + + +def _get_api_key() -> str: + """Load OpenRouter API key.""" + key_file = SECRETS_DIR / "openrouter-key" + if key_file.exists(): + return key_file.read_text().strip() + key = os.environ.get("OPENROUTER_API_KEY", "") + if key: + return key + logger.error("No OpenRouter API key found") + sys.exit(1) + + +def make_point_id(rel_path: str) -> str: + """Deterministic point ID from repo-relative path (matches embed-claims.py).""" + return hashlib.md5(rel_path.encode()).hexdigest() + + +def get_vector_from_qdrant(rel_path: str) -> list[float] | None: + """Retrieve a claim's existing vector from Qdrant by its point ID.""" + point_id = make_point_id(rel_path) + body = json.dumps({"ids": [point_id], "with_vector": True}).encode() + req = urllib.request.Request( + f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points", + data=body, + headers={"Content-Type": "application/json"}, + ) + try: + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read()) + points = data.get("result", []) + if points and points[0].get("vector"): + return points[0]["vector"] + except Exception as e: + logger.warning("Qdrant point lookup failed for %s: %s", rel_path, e) + return None + + +def search_neighbors(vector: list[float], exclude_path: str, + threshold: float, limit: int) -> list[dict]: + """Search Qdrant for nearest neighbors above threshold, excluding self.""" + body = { + "vector": vector, + "limit": limit + 5, # over-fetch to account for self + filtered + "with_payload": True, + "score_threshold": threshold, + "filter": { + "must_not": [{"key": "claim_path", "match": {"value": exclude_path}}] + }, + } + req = urllib.request.Request( + f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points/search", + data=json.dumps(body).encode(), + headers={"Content-Type": "application/json"}, + ) + try: + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read()) + hits = data.get("result", []) + return hits[:limit] + except Exception as e: + logger.warning("Qdrant search failed: %s", e) + return [] + + +# ─── Haiku Edge Classification ─────────────────────────────────────────────── + + +CLASSIFY_PROMPT = """You are classifying the relationship between two knowledge claims. + +CLAIM A (the orphan — needs to be connected): +Title: {orphan_title} +Body: {orphan_body} + +CLAIM B (the neighbor — already connected in the knowledge graph): +Title: {neighbor_title} +Body: {neighbor_body} + +What is the relationship FROM Claim B TO Claim A? + +Options: +- "supports" — Claim B provides evidence, reasoning, or examples that strengthen Claim A +- "challenges" — Claim B contradicts, undermines, or provides counter-evidence to Claim A +- "related" — Claims are topically connected but neither supports nor challenges the other + +Respond with EXACTLY this JSON format, nothing else: +{{"edge_type": "supports|challenges|related", "confidence": 0.0-1.0, "reason": "one sentence explanation"}} +""" + + +def classify_edge(orphan_title: str, orphan_body: str, + neighbor_title: str, neighbor_body: str, + api_key: str) -> dict: + """Use Haiku to classify the edge type between two claims. + + Returns {"edge_type": str, "confidence": float, "reason": str}. + Falls back to "related" on any failure. + """ + default = {"edge_type": "related", "confidence": 0.5, "reason": "classification failed"} + + prompt = CLASSIFY_PROMPT.format( + orphan_title=orphan_title, + orphan_body=orphan_body[:500], + neighbor_title=neighbor_title, + neighbor_body=neighbor_body[:500], + ) + + payload = json.dumps({ + "model": "anthropic/claude-3.5-haiku", + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 200, + "temperature": 0.1, + }).encode() + + req = urllib.request.Request( + "https://openrouter.ai/api/v1/chat/completions", + data=payload, + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + ) + + try: + with urllib.request.urlopen(req, timeout=15) as resp: + data = json.loads(resp.read()) + content = data["choices"][0]["message"]["content"].strip() + + # Parse JSON from response (handle markdown code blocks) + if content.startswith("```"): + content = content.split("\n", 1)[-1].rsplit("```", 1)[0].strip() + + result = json.loads(content) + edge_type = result.get("edge_type", "related") + confidence = float(result.get("confidence", 0.5)) + + # Enforce confidence floor for supports/challenges + if edge_type in ("supports", "challenges") and confidence < HAIKU_CONFIDENCE_FLOOR: + edge_type = "related" + + return { + "edge_type": edge_type, + "confidence": confidence, + "reason": result.get("reason", ""), + } + except Exception as e: + logger.warning("Haiku classification failed: %s", e) + return default + + +# ─── YAML Frontmatter Editing ──────────────────────────────────────────────── + + +def _count_reweave_edges(path: Path) -> int: + """Count existing reweave_edges in a file's frontmatter.""" + fm = _parse_frontmatter(path) + if not fm: + return 0 + rw = fm.get("reweave_edges") + if isinstance(rw, list): + return len(rw) + return 0 + + +def write_edge(neighbor_path: Path, orphan_title: str, edge_type: str, + date_str: str, dry_run: bool = False) -> bool: + """Write a reweave edge on the neighbor's frontmatter. + + Adds to both the edge_type list (related/supports/challenges) and + the parallel reweave_edges list for provenance tracking. + + Uses ruamel.yaml for round-trip YAML preservation. + """ + # Check per-file cap + if _count_reweave_edges(neighbor_path) >= PER_FILE_EDGE_CAP: + logger.info(" Skip %s — per-file edge cap (%d) reached", neighbor_path.name, PER_FILE_EDGE_CAP) + return False + + try: + text = neighbor_path.read_text(errors="replace") + except Exception as e: + logger.warning(" Cannot read %s: %s", neighbor_path, e) + return False + + if not text.startswith("---"): + logger.warning(" No frontmatter in %s", neighbor_path.name) + return False + + end = text.find("\n---", 3) + if end == -1: + return False + + fm_text = text[3:end] + body_text = text[end:] # includes the closing --- + + # Try ruamel.yaml for round-trip editing + try: + from ruamel.yaml import YAML + ry = YAML() + ry.preserve_quotes = True + ry.width = 4096 # prevent line wrapping + + import io + fm = ry.load(fm_text) + if not isinstance(fm, dict): + return False + + # Add to edge_type list (related/supports/challenges) + # Clean value only — provenance tracked in reweave_edges (Ganymede: comment-in-string bug) + if edge_type not in fm: + fm[edge_type] = [] + elif not isinstance(fm[edge_type], list): + fm[edge_type] = [fm[edge_type]] + + # Check for duplicate + existing = [str(v).strip().lower() for v in fm[edge_type] if v] + if orphan_title.strip().lower() in existing: + logger.info(" Skip duplicate edge: %s → %s", neighbor_path.name, orphan_title) + return False + + fm[edge_type].append(orphan_title) + + # Add to reweave_edges with provenance (edge_type + date for audit trail) + if "reweave_edges" not in fm: + fm["reweave_edges"] = [] + elif not isinstance(fm["reweave_edges"], list): + fm["reweave_edges"] = [fm["reweave_edges"]] + fm["reweave_edges"].append(f"{orphan_title}|{edge_type}|{date_str}") + + # Serialize back + buf = io.StringIO() + ry.dump(fm, buf) + new_fm = buf.getvalue().rstrip("\n") + + new_text = f"---\n{new_fm}{body_text}" + + if not dry_run: + neighbor_path.write_text(new_text) + return True + + except ImportError: + # Fallback: regex-based editing (no ruamel.yaml installed) + logger.info(" ruamel.yaml not available, using regex fallback") + return _write_edge_regex(neighbor_path, fm_text, body_text, orphan_title, + edge_type, date_str, dry_run) + + +def _write_edge_regex(neighbor_path: Path, fm_text: str, body_text: str, + orphan_title: str, edge_type: str, date_str: str, + dry_run: bool) -> bool: + """Fallback: add edge via regex when ruamel.yaml is unavailable.""" + # Check if edge_type field exists + field_re = re.compile(rf"^{edge_type}:\s*$", re.MULTILINE) + inline_re = re.compile(rf'^{edge_type}:\s*\[', re.MULTILINE) + + entry_line = f' - "{orphan_title}"' + rw_line = f' - "{orphan_title}|{edge_type}|{date_str}"' + + if field_re.search(fm_text): + # Multi-line list exists — find end of list, append + lines = fm_text.split("\n") + new_lines = [] + in_field = False + inserted = False + for line in lines: + new_lines.append(line) + if re.match(rf"^{edge_type}:\s*$", line): + in_field = True + elif in_field and not line.startswith(" -"): + # End of list — insert before this line + new_lines.insert(-1, entry_line) + in_field = False + inserted = True + if in_field and not inserted: + # Field was last in frontmatter + new_lines.append(entry_line) + fm_text = "\n".join(new_lines) + + elif inline_re.search(fm_text): + # Inline list — skip, too complex for regex + logger.warning(" Inline list format for %s in %s, skipping", edge_type, neighbor_path.name) + return False + else: + # Field doesn't exist — add at end of frontmatter + fm_text = fm_text.rstrip("\n") + f"\n{edge_type}:\n{entry_line}" + + # Add reweave_edges field + if "reweave_edges:" in fm_text: + lines = fm_text.split("\n") + new_lines = [] + in_rw = False + inserted_rw = False + for line in lines: + new_lines.append(line) + if re.match(r"^reweave_edges:\s*$", line): + in_rw = True + elif in_rw and not line.startswith(" -"): + new_lines.insert(-1, rw_line) + in_rw = False + inserted_rw = True + if in_rw and not inserted_rw: + new_lines.append(rw_line) + fm_text = "\n".join(new_lines) + else: + fm_text = fm_text.rstrip("\n") + f"\nreweave_edges:\n{rw_line}" + + new_text = f"---\n{fm_text}{body_text}" + + if not dry_run: + neighbor_path.write_text(new_text) + return True + + +# ─── Git + PR ──────────────────────────────────────────────────────────────── + + +def create_branch(repo_root: Path, branch_name: str) -> bool: + """Create and checkout a new branch.""" + try: + subprocess.run(["git", "checkout", "-b", branch_name], + cwd=str(repo_root), check=True, capture_output=True) + return True + except subprocess.CalledProcessError as e: + logger.error("Failed to create branch %s: %s", branch_name, e.stderr.decode()) + return False + + +def commit_and_push(repo_root: Path, branch_name: str, modified_files: list[Path], + orphan_count: int) -> bool: + """Stage modified files, commit, and push.""" + # Stage only modified files + for f in modified_files: + subprocess.run(["git", "add", str(f)], cwd=str(repo_root), + check=True, capture_output=True) + + # Check if anything staged + result = subprocess.run(["git", "diff", "--cached", "--name-only"], + cwd=str(repo_root), capture_output=True, text=True) + if not result.stdout.strip(): + logger.info("No files staged — nothing to commit") + return False + + msg = ( + f"reweave: connect {orphan_count} orphan claims via vector similarity\n\n" + f"Threshold: {DEFAULT_THRESHOLD}, Haiku classification, {len(modified_files)} files modified.\n\n" + f"Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887>" + ) + subprocess.run(["git", "commit", "-m", msg], cwd=str(repo_root), + check=True, capture_output=True) + + # Push — inject token + token_file = SECRETS_DIR / "forgejo-admin-token" + if not token_file.exists(): + logger.error("No Forgejo token found at %s", token_file) + return False + token = token_file.read_text().strip() + push_url = f"http://teleo:{token}@localhost:3000/teleo/teleo-codex.git" + + subprocess.run(["git", "push", "-u", push_url, branch_name], + cwd=str(repo_root), check=True, capture_output=True) + return True + + +def create_pr(branch_name: str, orphan_count: int, summary_lines: list[str]) -> str | None: + """Create a Forgejo PR for the reweave batch.""" + token_file = SECRETS_DIR / "forgejo-admin-token" + if not token_file.exists(): + return None + token = token_file.read_text().strip() + + summary = "\n".join(f"- {line}" for line in summary_lines[:30]) + body = ( + f"## Orphan Reweave\n\n" + f"Connected **{orphan_count}** orphan claims to the knowledge graph " + f"via vector similarity (threshold {DEFAULT_THRESHOLD}) + Haiku edge classification.\n\n" + f"### Edges Added\n{summary}\n\n" + f"### Review Guide\n" + f"- Each edge has a `# reweave:YYYY-MM-DD` comment — strip after review\n" + f"- `reweave_edges` field tracks automated edges for tooling (graph_expand weights them 0.75x)\n" + f"- Upgrade `related` → `supports`/`challenges` where you have better judgment\n" + f"- Delete any edges that don't make sense\n\n" + f"Pentagon-Agent: Epimetheus" + ) + + payload = json.dumps({ + "title": f"reweave: connect {orphan_count} orphan claims", + "body": body, + "head": branch_name, + "base": "main", + }).encode() + + req = urllib.request.Request( + f"{FORGEJO_URL}/api/v1/repos/teleo/teleo-codex/pulls", + data=payload, + headers={ + "Authorization": f"token {token}", + "Content-Type": "application/json", + }, + ) + + try: + with urllib.request.urlopen(req, timeout=30) as resp: + data = json.loads(resp.read()) + return data.get("html_url", "") + except Exception as e: + logger.error("PR creation failed: %s", e) + return None + + +# ─── Worktree Lock ─────────────────────────────────────────────────────────── + +_lock_fd = None # Module-level to prevent GC and avoid function-attribute fragility + + +def acquire_lock(lock_path: Path, timeout: int = 30) -> bool: + """Acquire file lock for worktree access. Returns True if acquired.""" + global _lock_fd + import fcntl + try: + lock_path.parent.mkdir(parents=True, exist_ok=True) + _lock_fd = open(lock_path, "w") + fcntl.flock(_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + _lock_fd.write(f"reweave:{os.getpid()}\n") + _lock_fd.flush() + return True + except (IOError, OSError): + logger.warning("Could not acquire worktree lock at %s — another process has it", lock_path) + _lock_fd = None + return False + + +def release_lock(lock_path: Path): + """Release worktree lock.""" + global _lock_fd + import fcntl + fd = _lock_fd + _lock_fd = None + if fd: + try: + fcntl.flock(fd, fcntl.LOCK_UN) + fd.close() + except Exception: + pass + try: + lock_path.unlink(missing_ok=True) + except Exception: + pass + + +# ─── Main ──────────────────────────────────────────────────────────────────── + + +def main(): + global REPO_DIR, DEFAULT_THRESHOLD + + parser = argparse.ArgumentParser(description="Orphan Reweave — connect isolated claims") + parser.add_argument("--dry-run", action="store_true", + help="Show what would be connected without modifying files") + parser.add_argument("--max-orphans", type=int, default=DEFAULT_MAX_ORPHANS, + help=f"Max orphans to process (default {DEFAULT_MAX_ORPHANS})") + parser.add_argument("--max-neighbors", type=int, default=DEFAULT_MAX_NEIGHBORS, + help=f"Max neighbors per orphan (default {DEFAULT_MAX_NEIGHBORS})") + parser.add_argument("--threshold", type=float, default=DEFAULT_THRESHOLD, + help=f"Minimum cosine similarity (default {DEFAULT_THRESHOLD})") + parser.add_argument("--repo-dir", type=str, default=None, + help="Override repo directory") + args = parser.parse_args() + + if args.repo_dir: + REPO_DIR = Path(args.repo_dir) + DEFAULT_THRESHOLD = args.threshold + + date_str = datetime.date.today().isoformat() + branch_name = f"reweave/{date_str}" + + logger.info("=== Orphan Reweave ===") + logger.info("Repo: %s", REPO_DIR) + logger.info("Threshold: %.2f, Max orphans: %d, Max neighbors: %d", + args.threshold, args.max_orphans, args.max_neighbors) + if args.dry_run: + logger.info("DRY RUN — no files will be modified") + + # Step 1: Find all claims and build reverse-link index + logger.info("Step 1: Scanning KB for claims...") + claims = find_all_claims(REPO_DIR) + logger.info(" Found %d knowledge files", len(claims)) + + logger.info("Step 2: Building reverse-link index...") + incoming = build_reverse_link_index(claims) + + logger.info("Step 3: Finding orphans...") + orphans = find_orphans(claims, incoming, REPO_DIR) + orphans = sort_orphans_by_domain(orphans, REPO_DIR) + logger.info(" Found %d orphans (%.1f%% of %d claims)", + len(orphans), 100 * len(orphans) / max(len(claims), 1), len(claims)) + + if not orphans: + logger.info("No orphans found — KB is fully connected!") + return + + # Cap to max_orphans + batch = orphans[:args.max_orphans] + logger.info(" Processing batch of %d orphans", len(batch)) + + # Step 4: For each orphan, find neighbors and classify edges + api_key = _get_api_key() + edges_to_write: list[dict] = [] # {neighbor_path, orphan_title, edge_type, reason, score} + skipped_no_vector = 0 + skipped_no_neighbors = 0 + + for i, orphan_path in enumerate(batch): + rel_path = str(orphan_path.relative_to(REPO_DIR)) + fm = _parse_frontmatter(orphan_path) + orphan_title = fm.get("name", fm.get("title", orphan_path.stem.replace("-", " "))) if fm else orphan_path.stem + orphan_body = _get_body(orphan_path) + + logger.info("[%d/%d] %s", i + 1, len(batch), orphan_title[:80]) + + # Get vector from Qdrant + vector = get_vector_from_qdrant(rel_path) + if not vector: + logger.info(" No vector in Qdrant — skipping (not embedded yet)") + skipped_no_vector += 1 + continue + + # Find neighbors + hits = search_neighbors(vector, rel_path, args.threshold, args.max_neighbors) + if not hits: + logger.info(" No neighbors above threshold %.2f", args.threshold) + skipped_no_neighbors += 1 + continue + + for hit in hits: + payload = hit.get("payload", {}) + neighbor_rel = payload.get("claim_path", "") + neighbor_title = payload.get("claim_title", "") + score = hit.get("score", 0) + + if not neighbor_rel: + continue + + neighbor_path = REPO_DIR / neighbor_rel + if not neighbor_path.exists(): + logger.info(" Neighbor %s not found on disk — skipping", neighbor_rel) + continue + + neighbor_body = _get_body(neighbor_path) + + # Classify with Haiku + result = classify_edge(orphan_title, orphan_body, + neighbor_title, neighbor_body, api_key) + edge_type = result["edge_type"] + confidence = result["confidence"] + reason = result["reason"] + + logger.info(" → %s (%.3f) %s [%.2f]: %s", + neighbor_title[:50], score, edge_type, confidence, reason[:60]) + + edges_to_write.append({ + "neighbor_path": neighbor_path, + "neighbor_rel": neighbor_rel, + "neighbor_title": neighbor_title, + "orphan_title": str(orphan_title), + "orphan_rel": rel_path, + "edge_type": edge_type, + "score": score, + "confidence": confidence, + "reason": reason, + }) + + # Rate limit courtesy + if not args.dry_run and i < len(batch) - 1: + time.sleep(0.3) + + logger.info("\n=== Summary ===") + logger.info("Orphans processed: %d", len(batch)) + logger.info("Edges to write: %d", len(edges_to_write)) + logger.info("Skipped (no vector): %d", skipped_no_vector) + logger.info("Skipped (no neighbors): %d", skipped_no_neighbors) + + if not edges_to_write: + logger.info("Nothing to write.") + return + + if args.dry_run: + logger.info("\n=== Dry Run — Edges That Would Be Written ===") + for e in edges_to_write: + logger.info(" %s → [%s] → %s (score=%.3f, conf=%.2f)", + e["neighbor_title"][:40], e["edge_type"], + e["orphan_title"][:40], e["score"], e["confidence"]) + return + + # Step 5: Acquire lock, create branch, write edges, commit, push, create PR + lock_path = REPO_DIR.parent / ".main-worktree.lock" + if not acquire_lock(lock_path): + logger.error("Cannot acquire worktree lock — aborting") + sys.exit(1) + + try: + # Create branch + if not create_branch(REPO_DIR, branch_name): + logger.error("Failed to create branch %s", branch_name) + sys.exit(1) + + # Write edges + modified_files = set() + written = 0 + summary_lines = [] + + for e in edges_to_write: + ok = write_edge( + e["neighbor_path"], e["orphan_title"], e["edge_type"], + date_str, dry_run=False, + ) + if ok: + modified_files.add(e["neighbor_path"]) + written += 1 + summary_lines.append( + f"`{e['neighbor_title'][:50]}` → [{e['edge_type']}] → " + f"`{e['orphan_title'][:50]}` (score={e['score']:.3f})" + ) + + logger.info("Wrote %d edges across %d files", written, len(modified_files)) + + if not modified_files: + logger.info("No edges written — cleaning up branch") + subprocess.run(["git", "checkout", "main"], cwd=str(REPO_DIR), + capture_output=True) + subprocess.run(["git", "branch", "-d", branch_name], cwd=str(REPO_DIR), + capture_output=True) + return + + # Commit and push + orphan_count = len(set(e["orphan_title"] for e in edges_to_write if e["neighbor_path"] in modified_files)) + if commit_and_push(REPO_DIR, branch_name, list(modified_files), orphan_count): + logger.info("Pushed branch %s", branch_name) + + # Create PR + pr_url = create_pr(branch_name, orphan_count, summary_lines) + if pr_url: + logger.info("PR created: %s", pr_url) + else: + logger.warning("PR creation failed — branch is pushed, create manually") + else: + logger.error("Commit/push failed") + + finally: + # Always return to main — even on exception (Ganymede: branch cleanup) + try: + subprocess.run(["git", "checkout", "main"], cwd=str(REPO_DIR), + capture_output=True) + except Exception: + pass + release_lock(lock_path) + + logger.info("Done.") + + +if __name__ == "__main__": + main() diff --git a/telegram/bot.py b/telegram/bot.py index e9907fb..25c5005 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -41,6 +41,7 @@ from telegram.ext import ( ) sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +import json as _json from kb_retrieval import KBIndex, format_context_for_prompt, retrieve_context from market_data import get_token_price, format_price_context from worktree_lock import main_worktree_lock @@ -57,6 +58,10 @@ MAIN_WORKTREE = "/opt/teleo-eval/workspaces/main" # For git operations only LEARNINGS_FILE = "/opt/teleo-eval/workspaces/main/agents/rio/learnings.md" # Agent memory (Option D) LOG_FILE = "/opt/teleo-eval/logs/telegram-bot.log" +# Persistent audit connection — opened once at startup, reused for all writes +# (Ganymede + Rhea: no per-response sqlite3.connect / migrate) +_audit_conn: sqlite3.Connection | None = None + # Triage interval (seconds) TRIAGE_INTERVAL = 900 # 15 minutes @@ -828,6 +833,10 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100]) + # ─── Audit: init timing and tool call tracking ────────────────── + response_start = time.monotonic() + tool_calls = [] + # Check for /research command — run search BEFORE Opus so results are in context research_context = "" research_match = RESEARCH_PATTERN.search(text) @@ -885,6 +894,7 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): logger.warning("Failed to fetch X link %s: %s", url, e) # Haiku pre-pass: does this message need an X search? (Option A: two-pass) + t_haiku = time.monotonic() if not research_context: # Skip if /research already ran try: haiku_prompt = ( @@ -922,14 +932,94 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): logger.warning("Haiku research archive failed: %s", e) except Exception as e: logger.warning("Haiku pre-pass failed: %s", e) + haiku_duration = int((time.monotonic() - t_haiku) * 1000) + if research_context: + tool_calls.append({ + "tool": "haiku_prepass", "input": {"query": text[:200]}, + "output": {"triggered": True, "result_length": len(research_context)}, + "duration_ms": haiku_duration, + }) + + # ─── Query reformulation for follow-ups ──────────────────────── + # Conversational follow-ups ("you're wrong", "tell me more") are unsearchable. + # Use Haiku to rewrite them into standalone queries using conversation context. + search_query_text = text # default: use raw message + user_key = (msg.chat_id, user.id if user else 0) + hist = conversation_history.get(user_key, []) + if hist: + # There's conversation history — check if this is a follow-up + try: + last_exchange = hist[-1] + recent_context = "" + if last_exchange.get("user"): + recent_context += f"User: {last_exchange['user'][:300]}\n" + if last_exchange.get("bot"): + recent_context += f"Bot: {last_exchange['bot'][:300]}\n" + reformulate_prompt = ( + f"A user is in a conversation. Given the recent exchange and their new message, " + f"rewrite the new message as a STANDALONE search query that captures what they're " + f"actually asking about. The query should work for semantic search — specific topics, " + f"entities, and concepts.\n\n" + f"Recent exchange:\n{recent_context}\n" + f"New message: {text}\n\n" + f"If the message is already a clear standalone question or topic, return it unchanged.\n" + f"If it's a follow-up, correction, or reference to the conversation, rewrite it.\n\n" + f"Return ONLY the rewritten query, nothing else. Max 30 words." + ) + reformulated = await call_openrouter("anthropic/claude-haiku-4.5", reformulate_prompt, max_tokens=80) + if reformulated and reformulated.strip() and len(reformulated.strip()) > 3: + search_query_text = reformulated.strip() + logger.info("Query reformulated: '%s' → '%s'", text[:60], search_query_text[:60]) + tool_calls.append({ + "tool": "query_reformulate", "input": {"original": text[:200], "history_turns": len(hist)}, + "output": {"reformulated": search_query_text[:200]}, + "duration_ms": 0, # included in haiku timing + }) + except Exception as e: + logger.warning("Query reformulation failed: %s", e) + # Fall through — use raw text # Retrieve full KB context (entity resolution + claim search + agent positions) - kb_ctx = retrieve_context(text, KB_READ_DIR, index=kb_index) + t_kb = time.monotonic() + kb_ctx = retrieve_context(search_query_text, KB_READ_DIR, index=kb_index) kb_context_text = format_context_for_prompt(kb_ctx) + kb_duration = int((time.monotonic() - t_kb) * 1000) + retrieval_layers = ["keyword"] if (kb_ctx and (kb_ctx.entities or kb_ctx.claims)) else [] + tool_calls.append({ + "tool": "retrieve_context", + "input": {"query": search_query_text[:200], "original_query": text[:200] if search_query_text != text else None}, + "output": {"entities": len(kb_ctx.entities) if kb_ctx else 0, + "claims": len(kb_ctx.claims) if kb_ctx else 0}, + "duration_ms": kb_duration, + }) + + # Layer 1+2: Qdrant vector search + graph expansion (semantic, complements keyword) + # Pass keyword-matched paths to exclude duplicates at Qdrant query level + # Normalize: KBIndex stores absolute paths, Qdrant stores repo-relative paths + keyword_paths = [] + if kb_ctx and kb_ctx.claims: + for c in kb_ctx.claims: + p = c.path + if KB_READ_DIR and p.startswith(KB_READ_DIR): + p = p[len(KB_READ_DIR):].lstrip("/") + keyword_paths.append(p) + from kb_retrieval import retrieve_vector_context + vector_context, vector_meta = retrieve_vector_context(search_query_text, keyword_paths=keyword_paths) + if vector_context: + kb_context_text = kb_context_text + "\n\n" + vector_context + retrieval_layers.extend(vector_meta.get("layers_hit", [])) + tool_calls.append({ + "tool": "retrieve_qdrant_context", "input": {"query": text[:200]}, + "output": {"direct_hits": len(vector_meta.get("direct_results", [])), + "expanded": len(vector_meta.get("expanded_results", []))}, + "duration_ms": vector_meta.get("duration_ms", 0), + }) + stats = get_db_stats() # Fetch live market data for any tokens mentioned (Rhea: market-data API) market_context = "" + market_data_audit = {} token_mentions = re.findall(r"\$([A-Z]{2,10})", text.upper()) # Entity name → token mapping for natural language mentions ENTITY_TOKEN_MAP = { @@ -945,6 +1035,7 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): for tag in ent.tags: if tag.upper() in ENTITY_TOKEN_MAP.values(): token_mentions.append(tag.upper()) + t_market = time.monotonic() for token in set(token_mentions): try: data = await get_token_price(token) @@ -952,8 +1043,16 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): price_str = format_price_context(data, token) if price_str: market_context += price_str + "\n" + market_data_audit[token] = data except Exception: pass # Market data is supplementary — never block on failure + market_duration = int((time.monotonic() - t_market) * 1000) + if token_mentions: + tool_calls.append({ + "tool": "market_data", "input": {"tickers": list(set(token_mentions))}, + "output": market_data_audit, + "duration_ms": market_duration, + }) # Build Opus prompt — Rio's voice prompt = f"""You are Rio, the Teleo internet finance agent. Your Telegram handle is @FutAIrdBot — that IS you. Users tag @FutAIrdBot to reach you. Never say "I'm not FutAIrdBot." You are also @futaRdIO on X. You have deep knowledge about futarchy, prediction markets, token governance, and the MetaDAO ecosystem. @@ -1005,7 +1104,10 @@ IMPORTANT: Special tags you can append at the end of your response (after your m When a user shares valuable source material (X posts, articles, data). Creates a source file in the ingestion pipeline, attributed to the user. Include the verbatim content — don't alter or summarize the user's contribution. Use this when someone drops a link or shares original analysis worth preserving. 4. CLAIM: [specific, disagreeable assertion] - When a user makes a specific claim with evidence that could enter the KB. Creates a draft claim file attributed to them. Only for genuine claims — not opinions or questions.""" + When a user makes a specific claim with evidence that could enter the KB. Creates a draft claim file attributed to them. Only for genuine claims — not opinions or questions. + +5. CONFIDENCE: [0.0-1.0] + ALWAYS include this tag. Rate how well the KB context above actually helped you answer this question. 1.0 = KB had exactly what was needed. 0.5 = KB had partial/tangential info. 0.0 = KB had nothing relevant, you answered from general knowledge. This is for internal audit only — never visible to users.""" # Call Opus response = await call_openrouter(RESPONSE_MODEL, prompt, max_tokens=1024) @@ -1054,6 +1156,90 @@ IMPORTANT: Special tags you can append at the end of your response (after your m _create_inline_claim(claim_text.strip(), text, user, msg) logger.info("Inline CLAIM drafted: %s", claim_text[:80]) + # CONFIDENCE: tag — model self-rated retrieval quality (audit only) + # Handles: "CONFIDENCE: 0.8", "CONFIDENCE: [0.8]", "Confidence: 0.8", case-insensitive + # Ganymede: must strip from display even if the model deviates from exact format + confidence_score = None + confidence_match = re.search(r'^CONFIDENCE:\s*\[?([\d.]+)\]?', response, re.MULTILINE | re.IGNORECASE) + if confidence_match: + try: + confidence_score = max(0.0, min(1.0, float(confidence_match.group(1)))) + except ValueError: + pass + # Strip ANY line starting with CONFIDENCE (broad match — catches format deviations) + display_response = re.sub(r'\n?^CONFIDENCE\s*:.*$', '', display_response, flags=re.MULTILINE | re.IGNORECASE).rstrip() + + # ─── Audit: write response_audit record ──────────────────────── + response_time_ms = int((time.monotonic() - response_start) * 1000) + tool_calls.append({ + "tool": "llm_call", "input": {"model": RESPONSE_MODEL}, + "output": {"response_length": len(response), "tags_found": { + "learning": len(learning_lines) if learning_lines else 0, + "research": len(research_lines) if research_lines else 0, + "source": len(source_lines) if source_lines else 0, + "claim": len(claim_lines) if claim_lines else 0, + }}, + "duration_ms": response_time_ms - sum(tc.get("duration_ms", 0) for tc in tool_calls), + }) + + # Build claims_matched with rank + source info (Rio: rank order matters) + claims_audit = [] + for i, c in enumerate(kb_ctx.claims if kb_ctx else []): + claims_audit.append({"path": c.path, "title": c.title, "score": c.score, + "rank": i + 1, "source": "keyword"}) + for r in vector_meta.get("direct_results", []): + claims_audit.append({"path": r["path"], "title": r["title"], "score": r["score"], + "rank": len(claims_audit) + 1, "source": "qdrant"}) + for r in vector_meta.get("expanded_results", []): + claims_audit.append({"path": r["path"], "title": r["title"], "score": 0, + "rank": len(claims_audit) + 1, "source": "graph", + "edge_type": r.get("edge_type", "")}) + + # Detect retrieval gap (Rio: most valuable signal for KB improvement) + retrieval_gap = None + if not claims_audit and not (kb_ctx and kb_ctx.entities): + retrieval_gap = f"No KB matches for: {text[:200]}" + elif confidence_score is not None and confidence_score < 0.3: + retrieval_gap = f"Low confidence ({confidence_score}) — KB may lack coverage for: {text[:200]}" + + # Conversation window (Ganymede + Rio: capture prior messages) + conv_window = None + if user: + hist = conversation_history.get((msg.chat_id, user.id), []) + if hist: + conv_window = _json.dumps(hist[-5:]) + + try: + from lib.db import insert_response_audit + insert_response_audit( + _audit_conn, + timestamp=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + chat_id=msg.chat_id, + user=f"@{user.username}" if user and user.username else "unknown", + agent="rio", + model=RESPONSE_MODEL, + query=text[:2000], + conversation_window=conv_window, + entities_matched=_json.dumps([{"name": e.name, "path": e.path} + for e in (kb_ctx.entities if kb_ctx else [])]), + claims_matched=_json.dumps(claims_audit), + retrieval_layers_hit=_json.dumps(list(set(retrieval_layers))), + retrieval_gap=retrieval_gap, + market_data=_json.dumps(market_data_audit) if market_data_audit else None, + research_context=research_context[:2000] if research_context else None, + kb_context_text=kb_context_text[:10000], + tool_calls=_json.dumps(tool_calls), + raw_response=response[:5000], + display_response=display_response[:5000], + confidence_score=confidence_score, + response_time_ms=response_time_ms, + ) + _audit_conn.commit() + logger.info("Audit record written (confidence=%.2f, layers=%s, %d claims, %dms)", + confidence_score or 0, retrieval_layers, len(claims_audit), response_time_ms) + except Exception as e: + logger.warning("Failed to write audit record: %s", e) + # Post response (without tag lines) # Telegram has a 4096 char limit — split long messages if len(display_response) <= 4096: @@ -1515,6 +1701,19 @@ def main(): logger.info("Starting Teleo Telegram bot (Rio)...") + # Initialize persistent audit connection (Ganymede + Rhea: once at startup, not per-response) + global _audit_conn + _audit_conn = sqlite3.connect(PIPELINE_DB, timeout=30) + _audit_conn.row_factory = sqlite3.Row + _audit_conn.execute("PRAGMA journal_mode=WAL") + _audit_conn.execute("PRAGMA busy_timeout=10000") + try: + from lib.db import migrate + migrate(_audit_conn) + logger.info("Audit DB connection initialized, schema migrated") + except Exception as e: + logger.error("Audit DB migration failed — audit writes will fail: %s", e) + # Build application app = Application.builder().token(token).build() @@ -1557,6 +1756,21 @@ def main(): first=3600, ) + # Audit retention cleanup — daily, 90-day window (Ganymede: match transcript policy) + async def _cleanup_audit(context=None): + try: + _audit_conn.execute("DELETE FROM response_audit WHERE timestamp < datetime('now', '-90 days')") + _audit_conn.commit() + logger.info("Audit retention cleanup complete") + except Exception as e: + logger.warning("Audit cleanup failed: %s", e) + + app.job_queue.run_repeating( + _cleanup_audit, + interval=86400, # daily + first=86400, + ) + # Run logger.info("Bot running. Triage interval: %ds, transcript dump: 1h", TRIAGE_INTERVAL) app.run_polling(drop_pending_updates=True) diff --git a/tests/test_attribution.py b/tests/test_attribution.py index 30a0daa..b46e8dd 100644 --- a/tests/test_attribution.py +++ b/tests/test_attribution.py @@ -86,6 +86,21 @@ class TestValidateAttribution: issues = validate_attribution(fm) assert "missing_attribution_extractor" in issues + def test_missing_extractor_auto_fix_with_agent(self): + """When agent is provided, auto-fix missing extractor instead of blocking.""" + fm = {"attribution": {"sourcer": [{"handle": "someone"}]}} + issues = validate_attribution(fm, agent="leo") + assert "fixed_missing_extractor" in issues + assert "missing_attribution_extractor" not in issues + # Verify the fix was applied in-place + assert fm["attribution"]["extractor"] == [{"handle": "leo"}] + + def test_missing_extractor_no_agent_still_blocks(self): + """Without agent context, missing extractor is still a hard failure.""" + fm = {"attribution": {"sourcer": [{"handle": "someone"}]}} + issues = validate_attribution(fm, agent=None) + assert "missing_attribution_extractor" in issues + class TestBuildAttributionBlock: def test_basic_build(self): diff --git a/tests/test_post_extract.py b/tests/test_post_extract.py index 150d0bb..98dddfb 100644 --- a/tests/test_post_extract.py +++ b/tests/test_post_extract.py @@ -544,3 +544,71 @@ description: "Test" ) assert len(rejected) == 1 assert any("dm_missing" in i for i in stats["issues"]) + + +# ─── _yaml_line dict handling (attribution round-trip) ────────────────── + + +class TestYamlLineDict: + """Verify _yaml_line produces valid YAML for nested dicts (attribution block).""" + + def test_attribution_round_trip(self): + """Attribution dict → _yaml_line → parse_frontmatter should survive.""" + from lib.post_extract import _rebuild_content, parse_frontmatter + + fm = { + "type": "claim", + "domain": "ai-alignment", + "description": "Test claim for round-trip", + "confidence": "experimental", + "source": "unit test", + "created": "2026-03-28", + "attribution": { + "extractor": [{"handle": "rio", "agent_id": "760F7FE7"}], + "sourcer": [{"handle": "someone", "context": "test source"}], + "challenger": [], + "synthesizer": [], + "reviewer": [], + }, + } + body = "# Test claim for attribution round-trip\n\nBody text." + + rebuilt = _rebuild_content(fm, body) + parsed_fm, parsed_body = parse_frontmatter(rebuilt) + + assert parsed_fm is not None + # Attribution must survive as a dict, not a string + attr = parsed_fm.get("attribution") + assert isinstance(attr, dict), f"attribution is {type(attr)}, expected dict" + assert attr["extractor"][0]["handle"] == "rio" + assert attr["sourcer"][0]["handle"] == "someone" + + def test_empty_attribution_roles(self): + """Empty role lists should serialize as [] and survive round-trip.""" + from lib.post_extract import _rebuild_content, parse_frontmatter + + fm = { + "type": "claim", + "domain": "ai-alignment", + "description": "Test", + "confidence": "experimental", + "source": "test", + "created": "2026-03-28", + "attribution": { + "extractor": [{"handle": "leo"}], + "sourcer": [], + "challenger": [], + "synthesizer": [], + "reviewer": [], + }, + } + body = "# Test claim with empty roles\n\nBody." + + rebuilt = _rebuild_content(fm, body) + parsed_fm, _ = parse_frontmatter(rebuilt) + + assert parsed_fm is not None + attr = parsed_fm.get("attribution") + assert isinstance(attr, dict) + assert attr["extractor"][0]["handle"] == "leo" + assert attr.get("sourcer") == [] or attr.get("sourcer") is None