From 143adb09e988d47fa6457a119dcee2f70f19d2a0 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Tue, 14 Apr 2026 11:37:12 +0100 Subject: [PATCH 1/4] epimetheus: merge root/diagnostics fixes into canonical ops/diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit dashboard_routes.py — root copy is superset: - Extraction yield query: source_url→path, s.url→s.path (truth audit) - insufficient_data flag on cascade-coverage endpoint - Rejection reasons fallback to prs.eval_issues when review_records empty - rejection_source field replaces disagreement_types in review-summary - New /api/agent-scorecard endpoint (Argus truth audit) - Route registration for agent-scorecard alerting.py — merged from both copies: - FROM ROOT: "unknown" agent filter in check_agent_health (bug #3) - FROM ROOT: prs.eval_issues queries in check_rejection_spike, check_stuck_loops, check_domain_rejection_patterns, generate_failure_report (truth audit correction Apr 2) - FROM CANONICAL: _ALLOWED_DIM_EXPRS SQL whitelist + validation in _check_approval_by_dimension (Ganymede security fix) Files verified canonical=newer (no changes needed): IDENTICAL: dashboard_prs.py, shared_ui.py, dashboard_ops.py, dashboard_health.py, research_tracking.py, response_audit_routes.py CANONICAL WINS: dashboard_epistemic.py, tier1_metrics.py, dashboard_agents.py, alerting_routes.py, tier1_routes.py NOTE: dashboard_routes.py review-summary API no longer returns disagreement_types, but canonical dashboard_epistemic.py still renders it — UI will show empty data. Flag for Ganymede review. Root /diagnostics/ copies are now safe to delete for these 2 files. Remaining root files already match or are older than canonical. Pentagon-Agent: Epimetheus <0144398E-4ED3-4FE2-95A3-3D72E1ABF887> --- ops/diagnostics/alerting.py | 61 +++++------ ops/diagnostics/dashboard_routes.py | 150 +++++++++++++++++++++++++--- 2 files changed, 164 insertions(+), 47 deletions(-) diff --git a/ops/diagnostics/alerting.py b/ops/diagnostics/alerting.py index c0dab371a..3de381946 100644 --- a/ops/diagnostics/alerting.py +++ b/ops/diagnostics/alerting.py @@ -67,6 +67,8 @@ def check_agent_health(conn: sqlite3.Connection) -> list[dict]: now = datetime.now(timezone.utc) for r in rows: agent = r["agent"] + if agent in ("unknown", None): + continue latest = r["latest"] if not latest: continue @@ -266,24 +268,22 @@ def check_rejection_spike(conn: sqlite3.Connection) -> list[dict]: """Detect single rejection reason exceeding REJECTION_SPIKE_RATIO of recent rejections.""" alerts = [] - # Total rejections in 24h + # Total rejected PRs in 24h (prs.eval_issues is the canonical source — Epimetheus 2026-04-02) total = conn.execute( - """SELECT COUNT(*) as n FROM audit_log - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-24 hours')""" + """SELECT COUNT(*) as n FROM prs + WHERE eval_issues IS NOT NULL AND eval_issues != '[]' + AND created_at > datetime('now', '-24 hours')""" ).fetchone()["n"] if total < 10: return alerts # Not enough data - # Count by rejection tag + # Count by rejection tag from prs.eval_issues tags = conn.execute( """SELECT value as tag, COUNT(*) as cnt - FROM audit_log, json_each(json_extract(detail, '$.issues')) - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-24 hours') + FROM prs, json_each(prs.eval_issues) + WHERE eval_issues IS NOT NULL AND eval_issues != '[]' + AND created_at > datetime('now', '-24 hours') GROUP BY tag ORDER BY cnt DESC""" ).fetchall() @@ -315,16 +315,13 @@ def check_stuck_loops(conn: sqlite3.Connection) -> list[dict]: """Detect agents repeatedly failing on the same rejection reason.""" alerts = [] - # COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28) + # Agent + rejection reason from prs table directly (Epimetheus correction 2026-04-02) rows = conn.execute( - """SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent, - value as tag, - COUNT(*) as cnt - FROM audit_log, json_each(json_extract(detail, '$.issues')) - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-6 hours') - AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL + """SELECT agent, value as tag, COUNT(*) as cnt + FROM prs, json_each(prs.eval_issues) + WHERE eval_issues IS NOT NULL AND eval_issues != '[]' + AND agent IS NOT NULL + AND created_at > datetime('now', '-6 hours') GROUP BY agent, tag HAVING cnt > ?""", (STUCK_LOOP_THRESHOLD,), @@ -412,16 +409,13 @@ def check_domain_rejection_patterns(conn: sqlite3.Connection) -> list[dict]: """Track rejection reason shift per domain — surfaces domain maturity issues.""" alerts = [] - # Per-domain rejection breakdown in 24h + # Per-domain rejection breakdown in 24h from prs table (Epimetheus correction 2026-04-02) rows = conn.execute( - """SELECT json_extract(detail, '$.domain') as domain, - value as tag, - COUNT(*) as cnt - FROM audit_log, json_each(json_extract(detail, '$.issues')) - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-24 hours') - AND json_extract(detail, '$.domain') IS NOT NULL + """SELECT domain, value as tag, COUNT(*) as cnt + FROM prs, json_each(prs.eval_issues) + WHERE eval_issues IS NOT NULL AND eval_issues != '[]' + AND domain IS NOT NULL + AND created_at > datetime('now', '-24 hours') GROUP BY domain, tag ORDER BY domain, cnt DESC""" ).fetchall() @@ -473,12 +467,11 @@ def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 2 hours = int(hours) # defensive — callers should pass int, but enforce it rows = conn.execute( """SELECT value as tag, COUNT(*) as cnt, - GROUP_CONCAT(DISTINCT json_extract(detail, '$.pr')) as pr_numbers - FROM audit_log, json_each(json_extract(detail, '$.issues')) - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND json_extract(detail, '$.agent') = ? - AND timestamp > datetime('now', ? || ' hours') + GROUP_CONCAT(DISTINCT number) as pr_numbers + FROM prs, json_each(prs.eval_issues) + WHERE eval_issues IS NOT NULL AND eval_issues != '[]' + AND agent = ? + AND created_at > datetime('now', ? || ' hours') GROUP BY tag ORDER BY cnt DESC LIMIT 5""", (agent, f"-{hours}"), diff --git a/ops/diagnostics/dashboard_routes.py b/ops/diagnostics/dashboard_routes.py index f2a1df430..4b912c825 100644 --- a/ops/diagnostics/dashboard_routes.py +++ b/ops/diagnostics/dashboard_routes.py @@ -237,9 +237,9 @@ async def handle_extraction_yield_by_domain(request): # Sources per domain (approximate from PR source_path domain) source_counts = conn.execute( - """SELECT domain, COUNT(DISTINCT source_url) as sources + """SELECT domain, COUNT(DISTINCT path) as sources FROM sources s - JOIN prs p ON p.source_path LIKE '%' || s.url || '%' + JOIN prs p ON p.source_path LIKE '%' || s.path || '%' WHERE s.created_at > datetime('now', ? || ' days') GROUP BY domain""", (f"-{days}",), @@ -444,6 +444,8 @@ async def handle_cascade_coverage(request): for r in triggered ] + insufficient_data = total_triggered < 5 + return web.json_response({ "days": days, "total_triggered": total_triggered, @@ -452,6 +454,7 @@ async def handle_cascade_coverage(request): "total_notifications": summaries["total_notifications"] if summaries else 0, "merges_with_cascade": summaries["total_merges_with_cascade"] if summaries else 0, "by_agent": by_agent, + "insufficient_data": insufficient_data, }) finally: conn.close() @@ -490,7 +493,7 @@ async def handle_review_summary(request): (f"-{days}",), ).fetchall() - # Rejection reasons + # Rejection reasons — try review_records first, fall back to prs.eval_issues reasons = conn.execute( """SELECT rejection_reason, COUNT(*) as cnt FROM review_records @@ -500,15 +503,17 @@ async def handle_review_summary(request): (f"-{days}",), ).fetchall() - # Disagreement types - disagreements = conn.execute( - """SELECT disagreement_type, COUNT(*) as cnt - FROM review_records - WHERE disagreement_type IS NOT NULL - AND reviewed_at > datetime('now', ? || ' days') - GROUP BY disagreement_type ORDER BY cnt DESC""", - (f"-{days}",), - ).fetchall() + rejection_source = "review_records" + if not reasons: + reasons = conn.execute( + """SELECT value AS rejection_reason, COUNT(*) as cnt + FROM prs, json_each(prs.eval_issues) + WHERE eval_issues IS NOT NULL AND eval_issues != '[]' + AND created_at > datetime('now', ? || ' days') + GROUP BY value ORDER BY cnt DESC""", + (f"-{days}",), + ).fetchall() + rejection_source = "prs.eval_issues" # Per-reviewer breakdown reviewers = conn.execute( @@ -541,7 +546,7 @@ async def handle_review_summary(request): "total": total, "outcomes": {r["outcome"]: r["cnt"] for r in outcomes}, "rejection_reasons": [{"reason": r["rejection_reason"], "count": r["cnt"]} for r in reasons], - "disagreement_types": [{"type": r["disagreement_type"], "count": r["cnt"]} for r in disagreements], + "rejection_source": rejection_source, "reviewers": [ {"reviewer": r["reviewer"], "approved": r["approved"], "approved_with_changes": r["approved_with_changes"], "rejected": r["rejected"], "total": r["total"]} @@ -557,6 +562,124 @@ async def handle_review_summary(request): conn.close() +# ─── GET /api/agent-scorecard ────────────────────────────────────────────── + +async def handle_agent_scorecard(request): + """Per-agent scorecard: PRs submitted, review outcomes, rejection reasons. + + Data from review_records (structured reviews) + prs (submission counts). + Falls back to prs.eval_issues for rejection reasons when review_records + has no rejections yet. + """ + conn = request.app["_get_conn"]() + try: + try: + days = min(int(request.query.get("days", "30")), 90) + except ValueError: + days = 30 + day_filter = f"-{days}" + + # PRs submitted per agent + prs_by_agent = conn.execute( + """SELECT agent, COUNT(*) as cnt FROM prs + WHERE agent IS NOT NULL + AND created_at > datetime('now', ? || ' days') + GROUP BY agent""", + (day_filter,), + ).fetchall() + prs_map = {r["agent"]: r["cnt"] for r in prs_by_agent} + + # Review outcomes from review_records + review_data = {} + try: + reviews = conn.execute( + """SELECT reviewer as agent, outcome, COUNT(*) as cnt + FROM review_records + WHERE reviewed_at > datetime('now', ? || ' days') + GROUP BY reviewer, outcome""", + (day_filter,), + ).fetchall() + for r in reviews: + agent = r["agent"] + if agent not in review_data: + review_data[agent] = {"approved": 0, "approved_with_changes": 0, "rejected": 0, "total": 0} + review_data[agent][r["outcome"].replace("-", "_")] = r["cnt"] + review_data[agent]["total"] += r["cnt"] + except sqlite3.OperationalError: + pass + + # If review_records is empty, fall back to audit_log eval events + if not review_data: + evals = conn.execute( + """SELECT + COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent, + event, COUNT(*) as cnt + FROM audit_log + WHERE stage='evaluate' + AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') + AND timestamp > datetime('now', ? || ' days') + GROUP BY agent, event""", + (day_filter,), + ).fetchall() + for r in evals: + agent = r["agent"] + if not agent: + continue + if agent not in review_data: + review_data[agent] = {"approved": 0, "approved_with_changes": 0, "rejected": 0, "total": 0} + if r["event"] == "approved": + review_data[agent]["approved"] += r["cnt"] + elif r["event"] == "changes_requested": # fixer auto-remediated; equivalent in pre-review_records era + review_data[agent]["approved_with_changes"] += r["cnt"] + else: + review_data[agent]["rejected"] += r["cnt"] + review_data[agent]["total"] += r["cnt"] + + # Rejection reasons from prs.eval_issues (canonical source) + reason_rows = conn.execute( + """SELECT agent, value as reason, COUNT(*) as cnt + FROM prs, json_each(prs.eval_issues) + WHERE eval_issues IS NOT NULL AND eval_issues != '[]' + AND agent IS NOT NULL + AND created_at > datetime('now', ? || ' days') + GROUP BY agent, reason ORDER BY agent, cnt DESC""", + (day_filter,), + ).fetchall() + reasons_map = {} + for r in reason_rows: + if r["agent"] not in reasons_map: + reasons_map[r["agent"]] = {} + reasons_map[r["agent"]][r["reason"]] = r["cnt"] + + # Build scorecards + all_agents = sorted(set(list(prs_map.keys()) + list(review_data.keys()))) + scorecards = [] + for agent in all_agents: + if agent in ("unknown", None): + continue + rd = review_data.get(agent, {"approved": 0, "approved_with_changes": 0, "rejected": 0, "total": 0}) + total_reviews = rd["total"] + approved = rd["approved"] + approved_wc = rd["approved_with_changes"] + rejected = rd["rejected"] + approval_rate = ((approved + approved_wc) / total_reviews * 100) if total_reviews else 0 + scorecards.append({ + "agent": agent, + "total_prs": prs_map.get(agent, 0), + "total_reviews": total_reviews, + "approved": approved, + "approved_with_changes": approved_wc, + "rejected": rejected, + "approval_rate": round(approval_rate, 1), + "rejection_reasons": reasons_map.get(agent, {}), + }) + + scorecards.sort(key=lambda x: x["total_reviews"], reverse=True) + return web.json_response({"days": days, "scorecards": scorecards}) + finally: + conn.close() + + # ─── Trace endpoint ──────────────────────────────────────────────────────── @@ -998,6 +1121,7 @@ def register_dashboard_routes(app: web.Application, get_conn): app.router.add_get("/api/agents-dashboard", handle_agents_dashboard) app.router.add_get("/api/cascade-coverage", handle_cascade_coverage) app.router.add_get("/api/review-summary", handle_review_summary) + app.router.add_get("/api/agent-scorecard", handle_agent_scorecard) app.router.add_get("/api/trace/{trace_id}", handle_trace) app.router.add_get("/api/growth", handle_growth) app.router.add_get("/api/pr-lifecycle", handle_pr_lifecycle) From d8a64d479f54ea6a41c268f4f060536724e5bb7f Mon Sep 17 00:00:00 2001 From: m3taversal Date: Tue, 14 Apr 2026 11:57:01 +0100 Subject: [PATCH 2/4] epimetheus: remove dead disagreement_types UI card MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ganymede review finding — the review-summary API no longer returns disagreement_types, so the card always showed "No disagreements." Removed the JS loop and HTML table. Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887> Co-Authored-By: Claude Opus 4.6 (1M context) --- ops/diagnostics/dashboard_epistemic.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/ops/diagnostics/dashboard_epistemic.py b/ops/diagnostics/dashboard_epistemic.py index cb3dd5ef7..6074f4243 100644 --- a/ops/diagnostics/dashboard_epistemic.py +++ b/ops/diagnostics/dashboard_epistemic.py @@ -194,12 +194,6 @@ fetch('/api/review-summary?days=30') reasonRows += '' + esc(r.reason) + '' + r.count + ''; }} - // Disagreement types - let disagreeRows = ''; - for (const d of (data.disagreement_types || [])) {{ - disagreeRows += '' + esc(d.type) + '' + d.count + ''; - }} - el.innerHTML = `
Total Reviews
${{data.total}}
@@ -215,13 +209,6 @@ fetch('/api/review-summary?days=30') ${{reasonRows || 'No rejections'}}
-
-
Disagreement Types
- - - ${{disagreeRows || ''}} -
TypeCount
No disagreements
-
`; }}).catch(() => {{ document.getElementById('review-container').innerHTML = From 154f36f2d3ea1fc36d5a935de45a8c091e667a65 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Tue, 14 Apr 2026 12:01:13 +0100 Subject: [PATCH 3/4] epimetheus: fix eval crash + wire per-PR cost tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three bugs fixed: 1. triage_pr() returns 3 values but line 611 unpacked 2 → ValueError on every non-deterministic PR (circuit breaker opened, 5 PRs stuck) 2. costs import was inside triage else-block → NameError on deterministic routes 3. pr_cost never written to prs.cost_usd → 0% cost tracking across 1,118 PRs Cost tracking now covers all 4 exit paths: domain failed, domain rejected, Leo failed, and normal completion. Uses additive UPDATE (cost_usd + ?) so re-evals accumulate correctly. Co-Authored-By: Claude Opus 4.6 (1M context) --- ops/pipeline-v2/lib/evaluate.py | 45 ++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/ops/pipeline-v2/lib/evaluate.py b/ops/pipeline-v2/lib/evaluate.py index ff6dab8a9..104635ec2 100644 --- a/ops/pipeline-v2/lib/evaluate.py +++ b/ops/pipeline-v2/lib/evaluate.py @@ -493,6 +493,9 @@ async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_iss async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: """Evaluate a single PR. Returns result dict.""" + from . import costs + pr_cost = 0.0 + # Check eval attempt budget before claiming row = conn.execute("SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,)).fetchone() eval_attempts = (row["eval_attempts"] or 0) if row else 0 @@ -608,10 +611,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: json.dumps({"pr": pr_number, "tier": tier}), ) else: - tier, triage_usage = await triage_pr(diff) - # Record triage cost - from . import costs - costs.record_usage( + tier, triage_usage, _triage_reason = await triage_pr(diff) + pr_cost += costs.record_usage( conn, config.TRIAGE_MODEL, "eval_triage", input_tokens=triage_usage.get("prompt_tokens", 0), output_tokens=triage_usage.get("completion_tokens", 0), @@ -674,6 +675,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # OpenRouter failure (timeout, error) — revert to open for retry. # NOT a rate limit — don't trigger 15-min backoff, just skip this PR. conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,)) + if pr_cost > 0: + conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number)) return {"pr": pr_number, "skipped": True, "reason": "openrouter_failed"} domain_verdict = _parse_verdict(domain_review, agent) @@ -714,6 +717,15 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # Disposition: check if this PR should be terminated or kept open await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues) + if domain_verdict != "skipped": + pr_cost += costs.record_usage( + conn, config.EVAL_DOMAIN_MODEL, "eval_domain", + input_tokens=domain_usage.get("prompt_tokens", 0), + output_tokens=domain_usage.get("completion_tokens", 0), + backend="openrouter", + ) + if pr_cost > 0: + conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number)) return { "pr": pr_number, "domain_verdict": domain_verdict, @@ -731,6 +743,15 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: if leo_review is None: # DEEP: Opus rate limited (queue for later). STANDARD: OpenRouter failed (skip, retry next cycle). conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,)) + if domain_verdict != "skipped": + pr_cost += costs.record_usage( + conn, config.EVAL_DOMAIN_MODEL, "eval_domain", + input_tokens=domain_usage.get("prompt_tokens", 0), + output_tokens=domain_usage.get("completion_tokens", 0), + backend="openrouter", + ) + if pr_cost > 0: + conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number)) reason = "opus_rate_limited" if tier == "DEEP" else "openrouter_failed" return {"pr": pr_number, "skipped": True, "reason": reason} @@ -834,10 +855,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues) # Record cost (only for reviews that actually ran) - from . import costs - if domain_verdict != "skipped": - costs.record_usage( + pr_cost += costs.record_usage( conn, config.EVAL_DOMAIN_MODEL, "eval_domain", input_tokens=domain_usage.get("prompt_tokens", 0), output_tokens=domain_usage.get("completion_tokens", 0), @@ -845,15 +864,23 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: ) if leo_verdict not in ("skipped",): if tier == "DEEP": - costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max") + pr_cost += costs.record_usage( + conn, config.EVAL_LEO_MODEL, "eval_leo", + input_tokens=leo_usage.get("prompt_tokens", 0), + output_tokens=leo_usage.get("completion_tokens", 0), + backend="max", + ) else: - costs.record_usage( + pr_cost += costs.record_usage( conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", input_tokens=leo_usage.get("prompt_tokens", 0), output_tokens=leo_usage.get("completion_tokens", 0), backend="openrouter", ) + if pr_cost > 0: + conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number)) + return { "pr": pr_number, "tier": tier, From 5b9ce01412c7978b5d32f391a0c6b28fc283e7f0 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Tue, 14 Apr 2026 12:01:21 +0100 Subject: [PATCH 4/4] epimetheus: wire LLM connections into typed frontmatter edges MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract.py was discarding LLM-provided connections — related_claims went into frontmatter as wiki-links but supports/challenges/depends_on from the connections field were ignored entirely. This is the primary driver of 50%+ orphan ratio. Now: connections[] → typed edge fields (supports/challenges/related) in YAML frontmatter. related_claims fall back to related edges. Post-write connect_new_claims() adds vector-search edges for claims the LLM missed. Co-Authored-By: Claude Opus 4.6 (1M context) --- ops/pipeline-v2/lib/extract.py | 43 ++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/ops/pipeline-v2/lib/extract.py b/ops/pipeline-v2/lib/extract.py index ab663c2d2..de6a8c995 100644 --- a/ops/pipeline-v2/lib/extract.py +++ b/ops/pipeline-v2/lib/extract.py @@ -37,6 +37,7 @@ from .domains import agent_for_domain from .extraction_prompt import build_extraction_prompt from .forgejo import api as forgejo_api from .llm import openrouter_call +from .connect import connect_new_claims from .post_extract import load_existing_claims_from_repo, validate_and_fix_claims from .worktree_lock import async_main_worktree_lock @@ -225,7 +226,29 @@ def _build_claim_content(claim: dict, agent: str) -> str: body = claim.get("body", "") scope = claim.get("scope", "") sourcer = claim.get("sourcer", "") - related = claim.get("related_claims", []) + related_claims = claim.get("related_claims", []) + connections = claim.get("connections", []) + + edge_fields = {"supports": [], "challenges": [], "related": []} + for conn in connections: + target = conn.get("target", "") + rel = conn.get("relationship", "related") + if target and rel in edge_fields: + target = target.replace(".md", "") + if target not in edge_fields[rel]: + edge_fields[rel].append(target) + for r in related_claims[:5]: + r_clean = r.replace(".md", "") + if r_clean not in edge_fields["related"]: + edge_fields["related"].append(r_clean) + + edge_lines = [] + for edge_type in ("supports", "challenges", "related"): + targets = edge_fields[edge_type] + if targets: + edge_lines.append(f"{edge_type}:") + for t in targets: + edge_lines.append(f" - {t}") lines = [ "---", @@ -242,10 +265,7 @@ def _build_claim_content(claim: dict, agent: str) -> str: lines.append(f"scope: {scope}") if sourcer: lines.append(f'sourcer: "{sourcer}"') - if related: - lines.append("related_claims:") - for r in related: - lines.append(f' - "[[{r}]]"') + lines.extend(edge_lines) lines.append("---") lines.append("") lines.append(f"# {title}") @@ -456,6 +476,19 @@ async def _extract_one_source( await _archive_source(source_path, domain, "null-result") return 0, 0 + # Post-write: connect new claims to existing KB via vector search (non-fatal) + claim_paths = [str(worktree / f) for f in files_written if f.startswith("domains/")] + if claim_paths: + try: + connect_stats = connect_new_claims(claim_paths) + if connect_stats["connected"] > 0: + logger.info( + "Extract-connect: %d/%d claims → %d edges", + connect_stats["connected"], len(claim_paths), connect_stats["edges_added"], + ) + except Exception: + logger.warning("Extract-connect failed (non-fatal)", exc_info=True) + # Stage and commit for f in files_written: await _git("add", f, cwd=str(EXTRACT_WORKTREE))