Prove phase 1b local e2e

This commit is contained in:
twentyOne2x 2026-05-29 15:08:09 +02:00
parent cdb0b1498d
commit 59951346b2
9 changed files with 1500 additions and 70 deletions

158
lib/db.py
View file

@ -9,7 +9,7 @@ from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 26
SCHEMA_VERSION = 27
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -93,6 +93,10 @@ CREATE TABLE IF NOT EXISTS costs (
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
duration_ms INTEGER DEFAULT 0,
cache_read_tokens INTEGER DEFAULT 0,
cache_write_tokens INTEGER DEFAULT 0,
cost_estimate_usd REAL DEFAULT 0,
PRIMARY KEY (date, model, stage)
);
@ -403,7 +407,7 @@ def migrate(conn: sqlite3.Connection):
if current < 5:
# Phase 5: contributor identity system — tracks who contributed what
# Aligned with schemas/attribution.md (5 roles) + Leo's tier system.
# CI is COMPUTED from raw counts × weights, never stored.
# CI is COMPUTED from raw counts x weights, never stored.
conn.executescript("""
CREATE TABLE IF NOT EXISTS contributors (
handle TEXT PRIMARY KEY,
@ -522,43 +526,105 @@ def migrate(conn: sqlite3.Connection):
# Old constraint (v7): extract,research,entity,decision,reweave,fix,unknown
# New constraint: adds challenge,enrich,synthesize
# Also re-derive commit_type from branch prefix for rows with invalid/NULL values.
prs_sql_row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type = 'table' AND name = 'prs'"
).fetchone()
prs_sql = (prs_sql_row["sql"] or "") if prs_sql_row else ""
# Step 1: Get all column names from existing table
cols_info = conn.execute("PRAGMA table_info(prs)").fetchall()
col_names = [c["name"] for c in cols_info]
col_list = ", ".join(col_names)
if all(kind in prs_sql for kind in ("challenge", "enrich", "synthesize")):
logger.info("Migration v9: prs commit_type CHECK already expanded, rebuild skipped")
else:
# Step 1: Get all column names from existing table.
cols_info = conn.execute("PRAGMA table_info(prs)").fetchall()
col_names = [c["name"] for c in cols_info]
# Step 2: Create new table with expanded CHECK constraint
conn.executescript(f"""
CREATE TABLE prs_new (
number INTEGER PRIMARY KEY,
source_path TEXT REFERENCES sources(path),
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
domain TEXT,
agent TEXT,
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown')),
tier TEXT,
tier0_pass INTEGER,
leo_verdict TEXT DEFAULT 'pending',
domain_verdict TEXT DEFAULT 'pending',
domain_agent TEXT,
domain_model TEXT,
priority TEXT,
origin TEXT DEFAULT 'pipeline',
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
INSERT INTO prs_new ({col_list}) SELECT {col_list} FROM prs;
DROP TABLE prs;
ALTER TABLE prs_new RENAME TO prs;
""")
logger.info("Migration v9: rebuilt prs table with expanded commit_type CHECK constraint")
# Step 2: Create new table with the expanded CHECK constraint.
# Keep columns introduced before and after v9 when present. This keeps
# fresh DB bootstrap and partially manually-migrated VPS DBs idempotent.
target_cols = [
"number",
"source_path",
"branch",
"status",
"domain",
"agent",
"commit_type",
"tier",
"tier0_pass",
"leo_verdict",
"domain_verdict",
"domain_agent",
"domain_model",
"priority",
"origin",
"eval_attempts",
"eval_issues",
"fix_attempts",
"transient_retries",
"substantive_retries",
"last_error",
"last_attempt",
"cost_usd",
"auto_merge",
"github_pr",
"source_channel",
"prompt_version",
"pipeline_version",
"submitted_by",
"conflict_rebase_attempts",
"merge_failures",
"merge_cycled",
"created_at",
"merged_at",
]
insert_cols = [col for col in target_cols if col in col_names]
col_list = ", ".join(insert_cols)
conn.executescript("""
CREATE TABLE prs_new (
number INTEGER PRIMARY KEY,
source_path TEXT REFERENCES sources(path),
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
domain TEXT,
agent TEXT,
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown')),
tier TEXT,
tier0_pass INTEGER,
leo_verdict TEXT DEFAULT 'pending',
domain_verdict TEXT DEFAULT 'pending',
domain_agent TEXT,
domain_model TEXT,
priority TEXT,
origin TEXT DEFAULT 'pipeline',
eval_attempts INTEGER DEFAULT 0,
eval_issues TEXT DEFAULT '[]',
fix_attempts INTEGER DEFAULT 0,
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
auto_merge INTEGER DEFAULT 0,
github_pr INTEGER,
source_channel TEXT,
prompt_version TEXT,
pipeline_version TEXT,
submitted_by TEXT,
conflict_rebase_attempts INTEGER DEFAULT 0,
merge_failures INTEGER DEFAULT 0,
merge_cycled INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
""")
if insert_cols:
conn.execute(f"INSERT INTO prs_new ({col_list}) SELECT {col_list} FROM prs")
conn.executescript("""
DROP TABLE prs;
ALTER TABLE prs_new RENAME TO prs;
""")
logger.info("Migration v9: rebuilt prs table with expanded commit_type CHECK constraint")
# Step 3: Re-derive commit_type from branch prefix for invalid/NULL values
rows = conn.execute(
@ -613,7 +679,7 @@ def migrate(conn: sqlite3.Connection):
if current < 17:
# Add prompt/pipeline version tracking per PR
for col, default in [
for col, _default in [
("prompt_version", None),
("pipeline_version", None),
]:
@ -804,7 +870,7 @@ def migrate(conn: sqlite3.Connection):
# Add publishers + contributor_identities. Non-breaking — new tables only.
# No existing data moved. Classification into publishers happens via a
# separate script (scripts/reclassify-contributors.py) with Cory-reviewed
# seed list. CHECK constraint on contributors.kind deferred to v27 after
# seed list. CHECK constraint on contributors.kind deferred until after
# classification completes. (Apr 24 Cory directive: "fix schema, don't
# filter output" — separate contributors from publishers at the data layer.)
conn.executescript("""
@ -845,6 +911,20 @@ def migrate(conn: sqlite3.Connection):
conn.commit()
logger.info("Migration v26: added publishers + contributor_identities tables + sources provenance columns")
if current < 27:
for col, definition in [
("duration_ms", "INTEGER DEFAULT 0"),
("cache_read_tokens", "INTEGER DEFAULT 0"),
("cache_write_tokens", "INTEGER DEFAULT 0"),
("cost_estimate_usd", "REAL DEFAULT 0"),
]:
try:
conn.execute(f"ALTER TABLE costs ADD COLUMN {col} {definition}")
except sqlite3.OperationalError:
pass
conn.commit()
logger.info("Migration v27: added detailed cost accounting columns")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",

View file

@ -19,7 +19,6 @@ Epimetheus owns this module. Leo reviews changes.
import json
import logging
import os
import re
from datetime import date, datetime
from difflib import SequenceMatcher
@ -67,6 +66,9 @@ def parse_frontmatter(text: str) -> tuple[dict | None, str]:
fm = yaml.safe_load(raw)
if not isinstance(fm, dict):
return None, body
for key, value in list(fm.items()):
if isinstance(value, date | datetime):
fm[key] = value.isoformat()
return fm, body
except ImportError:
pass
@ -142,8 +144,13 @@ def fix_frontmatter(content: str, domain: str, agent: str) -> tuple[str, list[st
# Fix 5: description field
if "description" not in fm or not fm["description"]:
# Try to derive from body's first sentence
first_sentence = body.split(".")[0].strip().lstrip("# ") if body else ""
# Try to derive from the first non-empty body line.
first_sentence = ""
for line in body.splitlines():
first_sentence = line.strip().lstrip("# ")
if first_sentence:
first_sentence = first_sentence.split(".")[0].strip()
break
if first_sentence and len(first_sentence) > 10:
fm["description"] = first_sentence[:200]
fixes.append("derived_description_from_body")
@ -429,7 +436,7 @@ def validate_and_fix_entities(
issues = []
if action == "create" and content:
fm, body = parse_frontmatter(content)
fm, _body = parse_frontmatter(content)
if fm is None:
issues.append("no_frontmatter")
else:

View file

@ -0,0 +1,930 @@
{
"agent_review_calls": [
{
"agent": "Leo",
"files": [
"domains/grand-strategy/strategy.md"
],
"route": {
"evidence": [
{
"agent": "Leo",
"signal": "path",
"value": "domains/grand-strategy/strategy.md",
"weight": 8
}
],
"fallback": false,
"primary_agent": "Leo",
"required_agents": [
"Leo"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 8,
"Rio": 0,
"Theseus": 0,
"Vida": 0
},
"touched_domains": [
"grand-strategy"
]
},
"tier": "STANDARD",
"verdict": "APPROVE"
},
{
"agent": "Theseus",
"files": [
"domains/ai-alignment/systems.md"
],
"route": {
"evidence": [
{
"agent": "Theseus",
"signal": "path",
"value": "domains/ai-alignment/systems.md",
"weight": 8
}
],
"fallback": false,
"primary_agent": "Theseus",
"required_agents": [
"Theseus"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 0,
"Theseus": 8,
"Vida": 0
},
"touched_domains": [
"ai-alignment"
]
},
"tier": "STANDARD",
"verdict": "APPROVE"
},
{
"agent": "Rio",
"files": [
"domains/internet-finance/x402.md"
],
"route": {
"evidence": [
{
"agent": "Rio",
"signal": "path",
"value": "domains/internet-finance/x402.md",
"weight": 8
},
{
"agent": "Rio",
"signal": "keyword",
"value": "x402",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Rio",
"required_agents": [
"Rio"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 10,
"Theseus": 0,
"Vida": 0
},
"touched_domains": [
"internet-finance"
]
},
"tier": "STANDARD",
"verdict": "APPROVE"
},
{
"agent": "Vida",
"files": [
"domains/health/clinical.md"
],
"route": {
"evidence": [
{
"agent": "Vida",
"signal": "path",
"value": "domains/health/clinical.md",
"weight": 8
},
{
"agent": "Vida",
"signal": "keyword",
"value": "health",
"weight": 2
},
{
"agent": "Vida",
"signal": "keyword",
"value": "clinical",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Vida",
"required_agents": [
"Vida"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 0,
"Theseus": 0,
"Vida": 12
},
"touched_domains": [
"health"
]
},
"tier": "STANDARD",
"verdict": "APPROVE"
},
{
"agent": "Clay",
"files": [
"domains/entertainment/games.md"
],
"route": {
"evidence": [
{
"agent": "Clay",
"signal": "path",
"value": "domains/entertainment/games.md",
"weight": 8
},
{
"agent": "Clay",
"signal": "keyword",
"value": "entertainment",
"weight": 2
},
{
"agent": "Clay",
"signal": "keyword",
"value": "games",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Clay",
"required_agents": [
"Clay"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 12,
"Leo": 0,
"Rio": 0,
"Theseus": 0,
"Vida": 0
},
"touched_domains": [
"entertainment"
]
},
"tier": "STANDARD",
"verdict": "APPROVE"
},
{
"agent": "Astra",
"files": [
"domains/space-development/robotics.md"
],
"route": {
"evidence": [
{
"agent": "Astra",
"signal": "path",
"value": "domains/space-development/robotics.md",
"weight": 8
},
{
"agent": "Astra",
"signal": "keyword",
"value": "space",
"weight": 2
},
{
"agent": "Astra",
"signal": "keyword",
"value": "robotics",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Astra",
"required_agents": [
"Astra"
],
"route_kind": "single",
"scores": {
"Astra": 12,
"Clay": 0,
"Leo": 0,
"Rio": 0,
"Theseus": 0,
"Vida": 0
},
"touched_domains": [
"space-development"
]
},
"tier": "STANDARD",
"verdict": "APPROVE"
},
{
"agent": "Rio",
"files": [
"domains/ai-systems/agent-wallets.md",
"domains/internet-finance/x402.md"
],
"route": {
"evidence": [
{
"agent": "Theseus",
"signal": "path",
"value": "domains/ai-systems/agent-wallets.md",
"weight": 8
},
{
"agent": "Rio",
"signal": "path",
"value": "domains/internet-finance/x402.md",
"weight": 8
},
{
"agent": "Rio",
"signal": "keyword",
"value": "x402",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Rio",
"required_agents": [
"Rio",
"Theseus"
],
"route_kind": "multi",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 10,
"Theseus": 8,
"Vida": 0
},
"touched_domains": [
"ai-systems",
"internet-finance"
]
},
"tier": "STANDARD",
"verdict": "APPROVE"
},
{
"agent": "Theseus",
"files": [
"domains/ai-systems/agent-wallets.md",
"domains/internet-finance/x402.md"
],
"route": {
"evidence": [
{
"agent": "Theseus",
"signal": "path",
"value": "domains/ai-systems/agent-wallets.md",
"weight": 8
},
{
"agent": "Rio",
"signal": "path",
"value": "domains/internet-finance/x402.md",
"weight": 8
},
{
"agent": "Rio",
"signal": "keyword",
"value": "x402",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Rio",
"required_agents": [
"Rio",
"Theseus"
],
"route_kind": "multi",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 10,
"Theseus": 8,
"Vida": 0
},
"touched_domains": [
"ai-systems",
"internet-finance"
]
},
"tier": "STANDARD",
"verdict": "APPROVE"
},
{
"agent": "Vida",
"files": [
"domains/health/incorrect-health-claim.md"
],
"route": {
"evidence": [
{
"agent": "Vida",
"signal": "path",
"value": "domains/health/incorrect-health-claim.md",
"weight": 8
},
{
"agent": "Vida",
"signal": "keyword",
"value": "health",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Vida",
"required_agents": [
"Vida"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 0,
"Theseus": 0,
"Vida": 10
},
"touched_domains": [
"health"
]
},
"tier": "STANDARD",
"verdict": "REQUEST_CHANGES"
}
],
"agents_seen": [
"Astra",
"Clay",
"Leo",
"Rio",
"Theseus",
"Vida"
],
"case_results": [
{
"comments": 1,
"domain": "grand-strategy",
"domain_agent": "Leo",
"domain_verdict": "skipped",
"expected_agents": [
"Leo"
],
"markers": [
"<!-- PHASE1B_REVIEW:PR=101:AGENT=LEO -->"
],
"number": 101,
"reviewers": [
"Leo"
],
"status": "approved"
},
{
"comments": 1,
"domain": "ai-alignment",
"domain_agent": "Theseus",
"domain_verdict": "approve",
"expected_agents": [
"Theseus"
],
"markers": [
"<!-- PHASE1B_REVIEW:PR=102:AGENT=THESEUS -->"
],
"number": 102,
"reviewers": [
"Theseus"
],
"status": "approved"
},
{
"comments": 1,
"domain": "internet-finance",
"domain_agent": "Rio",
"domain_verdict": "approve",
"expected_agents": [
"Rio"
],
"markers": [
"<!-- PHASE1B_REVIEW:PR=103:AGENT=RIO -->"
],
"number": 103,
"reviewers": [
"Rio"
],
"status": "approved"
},
{
"comments": 1,
"domain": "health",
"domain_agent": "Vida",
"domain_verdict": "approve",
"expected_agents": [
"Vida"
],
"markers": [
"<!-- PHASE1B_REVIEW:PR=104:AGENT=VIDA -->"
],
"number": 104,
"reviewers": [
"Vida"
],
"status": "approved"
},
{
"comments": 1,
"domain": "entertainment",
"domain_agent": "Clay",
"domain_verdict": "approve",
"expected_agents": [
"Clay"
],
"markers": [
"<!-- PHASE1B_REVIEW:PR=105:AGENT=CLAY -->"
],
"number": 105,
"reviewers": [
"Clay"
],
"status": "approved"
},
{
"comments": 1,
"domain": "space-development",
"domain_agent": "Astra",
"domain_verdict": "approve",
"expected_agents": [
"Astra"
],
"markers": [
"<!-- PHASE1B_REVIEW:PR=106:AGENT=ASTRA -->"
],
"number": 106,
"reviewers": [
"Astra"
],
"status": "approved"
},
{
"comments": 2,
"domain": "cross-ai-finance",
"domain_agent": "Rio",
"domain_verdict": "approve",
"expected_agents": [
"Rio",
"Theseus"
],
"markers": [
"<!-- PHASE1B_REVIEW:PR=107:AGENT=RIO -->",
"<!-- PHASE1B_REVIEW:PR=107:AGENT=THESEUS -->"
],
"number": 107,
"reviewers": [
"Rio",
"Theseus"
],
"status": "approved"
},
{
"comments": 1,
"domain": "health-feedback",
"domain_agent": "Vida",
"domain_verdict": "request_changes",
"expected_agents": [
"Vida"
],
"markers": [
"<!-- PHASE1B_REVIEW:PR=108:AGENT=VIDA -->"
],
"number": 108,
"reviewers": [
"Vida"
],
"status": "open"
}
],
"cases_total": 8,
"eval_feedback": [
{
"issues": [],
"outcome": "approved",
"pr": 101
},
{
"issues": [],
"outcome": "approved",
"pr": 102
},
{
"issues": [],
"outcome": "approved",
"pr": 103
},
{
"issues": [],
"outcome": "approved",
"pr": 104
},
{
"issues": [],
"outcome": "approved",
"pr": 105
},
{
"issues": [],
"outcome": "approved",
"pr": 106
},
{
"issues": [],
"outcome": "approved",
"pr": 107
},
{
"issues": [
"factual_discrepancy"
],
"outcome": "rejected",
"pr": 108
}
],
"failed": 0,
"feature_flag": "PHASE1B_AGENT_ROUTING_ENABLED",
"formal_approvals": [
101,
102,
103,
104,
105,
106,
107
],
"ok": true,
"rejection_dispositions": [
{
"eval_attempts": 1,
"issues": [
"factual_discrepancy"
],
"pr": 108
}
],
"route_events": [
{
"pr": 101,
"route": {
"evidence": [
{
"agent": "Leo",
"signal": "path",
"value": "domains/grand-strategy/strategy.md",
"weight": 8
}
],
"fallback": false,
"primary_agent": "Leo",
"required_agents": [
"Leo"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 8,
"Rio": 0,
"Theseus": 0,
"Vida": 0
},
"touched_domains": [
"grand-strategy"
]
},
"tier": "STANDARD"
},
{
"pr": 102,
"route": {
"evidence": [
{
"agent": "Theseus",
"signal": "path",
"value": "domains/ai-alignment/systems.md",
"weight": 8
}
],
"fallback": false,
"primary_agent": "Theseus",
"required_agents": [
"Theseus"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 0,
"Theseus": 8,
"Vida": 0
},
"touched_domains": [
"ai-alignment"
]
},
"tier": "STANDARD"
},
{
"pr": 103,
"route": {
"evidence": [
{
"agent": "Rio",
"signal": "path",
"value": "domains/internet-finance/x402.md",
"weight": 8
},
{
"agent": "Rio",
"signal": "keyword",
"value": "x402",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Rio",
"required_agents": [
"Rio"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 10,
"Theseus": 0,
"Vida": 0
},
"touched_domains": [
"internet-finance"
]
},
"tier": "STANDARD"
},
{
"pr": 104,
"route": {
"evidence": [
{
"agent": "Vida",
"signal": "path",
"value": "domains/health/clinical.md",
"weight": 8
},
{
"agent": "Vida",
"signal": "keyword",
"value": "health",
"weight": 2
},
{
"agent": "Vida",
"signal": "keyword",
"value": "clinical",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Vida",
"required_agents": [
"Vida"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 0,
"Theseus": 0,
"Vida": 12
},
"touched_domains": [
"health"
]
},
"tier": "STANDARD"
},
{
"pr": 105,
"route": {
"evidence": [
{
"agent": "Clay",
"signal": "path",
"value": "domains/entertainment/games.md",
"weight": 8
},
{
"agent": "Clay",
"signal": "keyword",
"value": "entertainment",
"weight": 2
},
{
"agent": "Clay",
"signal": "keyword",
"value": "games",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Clay",
"required_agents": [
"Clay"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 12,
"Leo": 0,
"Rio": 0,
"Theseus": 0,
"Vida": 0
},
"touched_domains": [
"entertainment"
]
},
"tier": "STANDARD"
},
{
"pr": 106,
"route": {
"evidence": [
{
"agent": "Astra",
"signal": "path",
"value": "domains/space-development/robotics.md",
"weight": 8
},
{
"agent": "Astra",
"signal": "keyword",
"value": "space",
"weight": 2
},
{
"agent": "Astra",
"signal": "keyword",
"value": "robotics",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Astra",
"required_agents": [
"Astra"
],
"route_kind": "single",
"scores": {
"Astra": 12,
"Clay": 0,
"Leo": 0,
"Rio": 0,
"Theseus": 0,
"Vida": 0
},
"touched_domains": [
"space-development"
]
},
"tier": "STANDARD"
},
{
"pr": 107,
"route": {
"evidence": [
{
"agent": "Theseus",
"signal": "path",
"value": "domains/ai-systems/agent-wallets.md",
"weight": 8
},
{
"agent": "Rio",
"signal": "path",
"value": "domains/internet-finance/x402.md",
"weight": 8
},
{
"agent": "Rio",
"signal": "keyword",
"value": "x402",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Rio",
"required_agents": [
"Rio",
"Theseus"
],
"route_kind": "multi",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 10,
"Theseus": 8,
"Vida": 0
},
"touched_domains": [
"ai-systems",
"internet-finance"
]
},
"tier": "STANDARD"
},
{
"pr": 108,
"route": {
"evidence": [
{
"agent": "Vida",
"signal": "path",
"value": "domains/health/incorrect-health-claim.md",
"weight": 8
},
{
"agent": "Vida",
"signal": "keyword",
"value": "health",
"weight": 2
}
],
"fallback": false,
"primary_agent": "Vida",
"required_agents": [
"Vida"
],
"route_kind": "single",
"scores": {
"Astra": 0,
"Clay": 0,
"Leo": 0,
"Rio": 0,
"Theseus": 0,
"Vida": 10
},
"touched_domains": [
"health"
]
},
"tier": "STANDARD"
}
],
"schema_version": 27,
"scope": "local_no_network_phase1b_eval_cycle",
"source_feedback_paths": [
"inbox/archive/phase1b-108.md"
],
"succeeded": 8
}

View file

@ -0,0 +1,346 @@
#!/usr/bin/env python3
"""No-network local proof for Phase 1b agent routing.
This script exercises the real evaluate cycle against an in-memory migrated DB
while replacing only external network/LLM edges with deterministic fakes.
"""
# ruff: noqa: E402,I001
from __future__ import annotations
import argparse
import asyncio
import json
import re
import sqlite3
import sys
from pathlib import Path
from typing import Any
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from lib import config, db
from lib import evaluate as evaluate_mod
SINGLE_DOMAIN_CASES = [
{
"number": 101,
"domain": "grand-strategy",
"branch": "leo/grand-strategy",
"paths": ["domains/grand-strategy/strategy.md"],
"expected_agents": ["Leo"],
},
{
"number": 102,
"domain": "ai-alignment",
"branch": "theseus/alignment",
"paths": ["domains/ai-alignment/systems.md"],
"expected_agents": ["Theseus"],
},
{
"number": 103,
"domain": "internet-finance",
"branch": "rio/x402",
"paths": ["domains/internet-finance/x402.md"],
"expected_agents": ["Rio"],
},
{
"number": 104,
"domain": "health",
"branch": "vida/health",
"paths": ["domains/health/clinical.md"],
"expected_agents": ["Vida"],
},
{
"number": 105,
"domain": "entertainment",
"branch": "clay/games",
"paths": ["domains/entertainment/games.md"],
"expected_agents": ["Clay"],
},
{
"number": 106,
"domain": "space-development",
"branch": "astra/robotics",
"paths": ["domains/space-development/robotics.md"],
"expected_agents": ["Astra"],
},
]
CROSS_DOMAIN_CASE = {
"number": 107,
"domain": "cross-ai-finance",
"branch": "rio/ai-x402",
"paths": ["domains/ai-systems/agent-wallets.md", "domains/internet-finance/x402.md"],
"expected_agents": ["Theseus", "Rio"],
}
FEEDBACK_CASE = {
"number": 108,
"domain": "health-feedback",
"branch": "vida/reject-health",
"paths": ["domains/health/incorrect-health-claim.md"],
"expected_agents": ["Vida"],
}
def _diff_for(paths: list[str]) -> str:
chunks = []
for path in paths:
chunks.append(
"\n".join(
[
f"diff --git a/{path} b/{path}",
"--- a/file.md",
"+++ b/file.md",
"+type: claim",
"+description: local phase 1b proof claim",
]
)
)
return "\n".join(chunks)
def _insert_pr(conn: sqlite3.Connection, case: dict[str, Any]) -> None:
source_path = f"inbox/archive/phase1b-{case['number']}.md"
conn.execute(
"INSERT INTO sources (path, status, priority) VALUES (?, 'extracted', 'medium')",
(source_path,),
)
conn.execute(
"""INSERT INTO prs
(number, source_path, branch, status, tier, tier0_pass,
leo_verdict, domain_verdict, eval_attempts, priority)
VALUES (?, ?, ?, 'open', 'STANDARD', 1, 'pending', 'pending', 0, 'medium')""",
(case["number"], source_path, case["branch"]),
)
def _pr_number_from_path(path: str) -> int | None:
match = re.search(r"(?:issues|pulls)/(\d+)", path)
return int(match.group(1)) if match else None
async def run_phase1b_local_proof() -> dict[str, Any]:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
db.migrate(conn)
cases = [*SINGLE_DOMAIN_CASES, CROSS_DOMAIN_CASE, FEEDBACK_CASE]
diffs = {case["number"]: _diff_for(case["paths"]) for case in cases}
for case in cases:
_insert_pr(conn, case)
comments: dict[int, list[str]] = {}
formal_approvals: list[int] = []
eval_feedback: list[dict[str, Any]] = []
dispositions: list[dict[str, Any]] = []
agent_review_calls: list[dict[str, Any]] = []
async def fake_get_pr_diff(pr_number: int) -> str:
return diffs[pr_number]
async def fake_run_agent_review(
diff: str,
files: str,
agent: str,
route_context: str = "",
tier: str = "STANDARD",
) -> tuple[str, dict[str, int]]:
verdict = "REQUEST_CHANGES" if "incorrect-health-claim.md" in diff and agent == "Vida" else "APPROVE"
issues = "\n<!-- ISSUES: factual_discrepancy -->" if verdict == "REQUEST_CHANGES" else ""
agent_review_calls.append(
{
"agent": agent,
"tier": tier,
"files": files.splitlines(),
"route": json.loads(route_context),
"verdict": verdict,
}
)
return (
f"{agent} local Phase 1b review{issues}\n<!-- VERDICT:{agent.upper()}:{verdict} -->",
{"prompt_tokens": 10, "completion_tokens": 5},
)
async def fake_forgejo_api(method: str, path: str, body: dict | None = None, token: str | None = None):
pr_number = _pr_number_from_path(path)
if method == "GET" and "comments" in path:
return [{"body": body_text} for body_text in comments.get(pr_number or -1, [])]
if method == "POST" and "comments" in path:
comments.setdefault(pr_number or -1, []).append((body or {}).get("body", ""))
return {"id": len(comments[pr_number or -1])}
if method == "GET" and "pulls/" in path:
return {"user": {"login": "phase1b-local-proof"}}
return {"ok": True, "token": bool(token)}
async def fake_post_formal_approvals(pr_number: int, pr_author: str) -> None:
formal_approvals.append(pr_number)
async def fake_on_eval_complete(
conn: sqlite3.Connection,
pr_number: int,
*,
outcome: str,
review_text: str,
issues: list[str] | None = None,
) -> None:
eval_feedback.append({"pr": pr_number, "outcome": outcome, "issues": issues or []})
async def fake_dispose_rejected_pr(
conn: sqlite3.Connection,
pr_number: int,
eval_attempts: int,
issues: list[str],
) -> None:
dispositions.append({"pr": pr_number, "eval_attempts": eval_attempts, "issues": issues})
originals = {
"flag": config.PHASE1B_AGENT_ROUTING_ENABLED,
"backoff": evaluate_mod._rate_limit_backoff_until,
"get_pr_diff": evaluate_mod.get_pr_diff,
"run_agent_review": evaluate_mod.run_agent_review,
"forgejo_api": evaluate_mod.forgejo_api,
"post_formal_approvals": evaluate_mod.post_formal_approvals,
"on_eval_complete": evaluate_mod.on_eval_complete,
"dispose_rejected_pr": evaluate_mod.dispose_rejected_pr,
}
try:
config.PHASE1B_AGENT_ROUTING_ENABLED = True
evaluate_mod._rate_limit_backoff_until = None
evaluate_mod.get_pr_diff = fake_get_pr_diff
evaluate_mod.run_agent_review = fake_run_agent_review
evaluate_mod.forgejo_api = fake_forgejo_api
evaluate_mod.post_formal_approvals = fake_post_formal_approvals
evaluate_mod.on_eval_complete = fake_on_eval_complete
evaluate_mod.dispose_rejected_pr = fake_dispose_rejected_pr
succeeded, failed = await evaluate_mod.evaluate_cycle(conn, max_workers=len(cases))
finally:
config.PHASE1B_AGENT_ROUTING_ENABLED = originals["flag"]
evaluate_mod._rate_limit_backoff_until = originals["backoff"]
evaluate_mod.get_pr_diff = originals["get_pr_diff"]
evaluate_mod.run_agent_review = originals["run_agent_review"]
evaluate_mod.forgejo_api = originals["forgejo_api"]
evaluate_mod.post_formal_approvals = originals["post_formal_approvals"]
evaluate_mod.on_eval_complete = originals["on_eval_complete"]
evaluate_mod.dispose_rejected_pr = originals["dispose_rejected_pr"]
pr_rows = {
row["number"]: dict(row)
for row in conn.execute(
"""SELECT number, status, branch, domain, domain_agent, leo_verdict,
domain_verdict, auto_merge, eval_issues
FROM prs
ORDER BY number"""
).fetchall()
}
review_rows = [dict(row) for row in conn.execute("SELECT * FROM review_records ORDER BY pr_number, agent")]
route_events = [
json.loads(row["detail"])
for row in conn.execute(
"SELECT detail FROM audit_log WHERE stage = 'evaluate' AND event = 'phase1b_route' ORDER BY id"
).fetchall()
]
source_feedback = {
row["path"]: row["feedback"]
for row in conn.execute("SELECT path, feedback FROM sources WHERE feedback IS NOT NULL ORDER BY path")
}
case_results = []
for case in cases:
number = case["number"]
reviewers = sorted(row["agent"] for row in review_rows if row["pr_number"] == number)
posted = comments.get(number, [])
case_results.append(
{
"number": number,
"domain": case["domain"],
"expected_agents": sorted(case["expected_agents"]),
"reviewers": reviewers,
"status": pr_rows[number]["status"],
"domain_agent": pr_rows[number]["domain_agent"],
"domain_verdict": pr_rows[number]["domain_verdict"],
"comments": len(posted),
"markers": [
marker
for body in posted
for marker in re.findall(r"<!-- PHASE1B_REVIEW:PR=\d+:AGENT=[A-Z]+ -->", body)
],
}
)
proof = {
"ok": True,
"scope": "local_no_network_phase1b_eval_cycle",
"schema_version": db.SCHEMA_VERSION,
"feature_flag": "PHASE1B_AGENT_ROUTING_ENABLED",
"succeeded": succeeded,
"failed": failed,
"cases_total": len(cases),
"case_results": case_results,
"agents_seen": sorted({call["agent"] for call in agent_review_calls}),
"agent_review_calls": agent_review_calls,
"formal_approvals": sorted(formal_approvals),
"eval_feedback": sorted(eval_feedback, key=lambda item: item["pr"]),
"rejection_dispositions": dispositions,
"route_events": route_events,
"source_feedback_paths": sorted(source_feedback),
}
_assert_phase1b_proof(proof)
return proof
def _assert_phase1b_proof(proof: dict[str, Any]) -> None:
expected_agents = ["Astra", "Clay", "Leo", "Rio", "Theseus", "Vida"]
assert proof["succeeded"] == proof["cases_total"]
assert proof["failed"] == 0
assert proof["agents_seen"] == expected_agents
assert len(proof["route_events"]) == proof["cases_total"]
by_number = {case["number"]: case for case in proof["case_results"]}
for case in SINGLE_DOMAIN_CASES:
result = by_number[case["number"]]
assert result["status"] == "approved"
assert result["reviewers"] == sorted(case["expected_agents"])
assert result["comments"] == len(case["expected_agents"])
cross = by_number[CROSS_DOMAIN_CASE["number"]]
assert cross["status"] == "approved"
assert cross["reviewers"] == sorted(CROSS_DOMAIN_CASE["expected_agents"])
assert cross["comments"] == 2
feedback = by_number[FEEDBACK_CASE["number"]]
assert feedback["status"] == "open"
assert feedback["reviewers"] == ["Vida"]
assert feedback["domain_verdict"] == "request_changes"
assert proof["rejection_dispositions"] == [
{"pr": FEEDBACK_CASE["number"], "eval_attempts": 1, "issues": ["factual_discrepancy"]}
]
assert len(proof["formal_approvals"]) == len(SINGLE_DOMAIN_CASES) + 1
assert [item for item in proof["eval_feedback"] if item["outcome"] == "rejected"]
def main() -> None:
parser = argparse.ArgumentParser(description="Run local no-network Phase 1b proof")
parser.add_argument(
"--output",
default="proof/phase1b-local-e2e-proof.json",
help="JSON proof output path",
)
args = parser.parse_args()
proof = asyncio.run(run_phase1b_local_proof())
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(proof, indent=2, sort_keys=True) + "\n")
print(json.dumps({"ok": True, "output": str(output_path), "cases_total": proof["cases_total"]}, sort_keys=True))
if __name__ == "__main__":
main()

View file

@ -14,14 +14,22 @@ No deal terms, no dollar amounts, no private investment details in approval requ
Epimetheus owns this module.
"""
# ruff: noqa: I001
import logging
import re
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import CallbackQueryHandler, ContextTypes
try:
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import CallbackQueryHandler, ContextTypes
except ImportError: # Optional in local unit tests that only exercise OPSEC logic.
InlineKeyboardButton = None
InlineKeyboardMarkup = None
Update = None
CallbackQueryHandler = None
ContextTypes = None
logger = logging.getLogger("telegram.approvals")
@ -110,8 +118,8 @@ def format_approval_message(row: sqlite3.Row) -> str:
content = content[:3000] + "\n\n[... truncated]"
parts = [
f"APPROVAL REQUEST",
f"",
"APPROVAL REQUEST",
"",
f"Type: {type_label}",
f"From: {agent}",
]
@ -134,6 +142,8 @@ def format_approval_message(row: sqlite3.Row) -> str:
def build_keyboard(request_id: int) -> InlineKeyboardMarkup:
"""Build inline keyboard with Approve/Reject buttons."""
if InlineKeyboardMarkup is None or InlineKeyboardButton is None:
raise ImportError("python-telegram-bot is required to build approval keyboards")
return InlineKeyboardMarkup([
[
InlineKeyboardButton("Approve", callback_data=f"approve:{request_id}"),
@ -225,8 +235,6 @@ async def handle_approval_callback(update: Update, context: ContextTypes.DEFAULT
return
if action == "reject":
# Check if user sent a reply with rejection reason
rejection_reason = None
# For rejection, edit the message to ask for reason
row = conn.execute(
"SELECT * FROM approval_queue WHERE id = ?", (request_id,)

View file

@ -1,9 +1,11 @@
"""Tests for lib/contributor.py — contributor attribution functions."""
import sqlite3
# ruff: noqa: E402,I001
import asyncio
import sys
import os
import sqlite3
import sys
from unittest.mock import AsyncMock, MagicMock, patch
sys.modules.setdefault("aiohttp", MagicMock())
@ -176,9 +178,16 @@ def _make_attribution_db():
conn.execute("""CREATE TABLE prs (
number INTEGER PRIMARY KEY,
commit_type TEXT,
agent TEXT
agent TEXT,
submitted_by TEXT,
domain TEXT,
source_channel TEXT,
leo_verdict TEXT,
domain_verdict TEXT,
domain_agent TEXT,
merged_at TEXT
)""")
conn.execute("INSERT INTO prs VALUES (100, 'extract', 'rio')")
conn.execute("INSERT INTO prs (number, commit_type, agent) VALUES (100, 'extract', 'rio')")
return conn
def test_record_skips_pipeline_only():
@ -196,12 +205,19 @@ def test_record_skips_pipeline_only():
def test_record_fallback_to_pr_agent():
conn = _make_attribution_db()
mock_diff = "+++ b/domains/crypto/claim.md\n+some content\n"
mock_diff = "diff --git a/x b/domains/crypto/claim.md\nnew file\n+++ b/domains/crypto/claim.md\n+some content\n"
async def run():
with patch("lib.contributor.get_pr_diff", new_callable=AsyncMock, return_value=mock_diff):
# First call: trailer log (no trailers), Second call: author log (bot name → skipped)
git_fn = AsyncMock(side_effect=[(0, "no trailers here"), (0, "m3taversal")])
git_fn = AsyncMock(
side_effect=[
(0, "no trailers here"),
(0, "domains/crypto/claim.md"),
(0, ""),
(0, "m3taversal"),
]
)
with patch("lib.contributor.config") as mock_config:
mock_config.CONTRIBUTOR_TIER_RULES = {
"veteran": {"claims_merged": 50, "min_days_since_first": 90, "challenges_survived": 5},
@ -218,13 +234,23 @@ def test_record_fallback_to_pr_agent():
def test_record_fallback_to_git_author():
"""External contributors get credited via git commit author."""
conn = _make_attribution_db()
conn.execute("INSERT INTO prs VALUES (200, 'contrib', 'external')")
mock_diff = "+++ b/domains/ai-alignment/claim.md\n+new content\n"
conn.execute("INSERT INTO prs (number, commit_type, agent) VALUES (200, 'contrib', 'external')")
mock_diff = (
"diff --git a/x b/domains/ai-alignment/claim.md\nnew file\n"
"+++ b/domains/ai-alignment/claim.md\n+new content\n"
)
async def run():
with patch("lib.contributor.get_pr_diff", new_callable=AsyncMock, return_value=mock_diff):
# First call: trailer log (no trailers), Second call: author log (external name)
git_fn = AsyncMock(side_effect=[(0, "no trailers"), (0, "Cameron-S1")])
git_fn = AsyncMock(
side_effect=[
(0, "no trailers"),
(0, "domains/ai-alignment/claim.md"),
(0, ""),
(0, "Cameron-S1"),
]
)
with patch("lib.contributor.config") as mock_config:
mock_config.CONTRIBUTOR_TIER_RULES = {
"veteran": {"claims_merged": 50, "min_days_since_first": 90, "challenges_survived": 5},

View file

@ -1,7 +1,9 @@
"""Tests for lib/eval_parse.py — pure parsing functions extracted from evaluate.py."""
import sys
# ruff: noqa: E402,I001
import os
import sys
from unittest.mock import MagicMock
import pytest
@ -12,7 +14,6 @@ sys.modules.setdefault("aiohttp", MagicMock())
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.eval_parse import (
VALID_ISSUE_TAGS,
classify_issues,
deterministic_tier,
diff_contains_claim_type,
@ -40,7 +41,7 @@ class TestFilterDiff:
"diff --git a/domains/finance/claim.md b/domains/finance/claim.md\n"
"+real content\n"
)
review_diff, entity_diff = filter_diff(diff)
review_diff, _entity_diff = filter_diff(diff)
assert "inbox" not in review_diff
assert "claim.md" in review_diff

View file

@ -0,0 +1,31 @@
"""End-to-end local proof for Phase 1b agent routing."""
import pytest
from scripts.prove_phase1b_local import CROSS_DOMAIN_CASE, FEEDBACK_CASE, SINGLE_DOMAIN_CASES, run_phase1b_local_proof
@pytest.mark.asyncio
async def test_phase1b_local_eval_cycle_routes_reviews_approves_and_feedbacks():
proof = await run_phase1b_local_proof()
assert proof["scope"] == "local_no_network_phase1b_eval_cycle"
assert proof["succeeded"] == len(SINGLE_DOMAIN_CASES) + 2
assert proof["failed"] == 0
assert proof["agents_seen"] == ["Astra", "Clay", "Leo", "Rio", "Theseus", "Vida"]
results = {case["number"]: case for case in proof["case_results"]}
for case in SINGLE_DOMAIN_CASES:
result = results[case["number"]]
assert result["status"] == "approved"
assert result["reviewers"] == sorted(case["expected_agents"])
cross_domain = results[CROSS_DOMAIN_CASE["number"]]
assert cross_domain["status"] == "approved"
assert cross_domain["reviewers"] == sorted(CROSS_DOMAIN_CASE["expected_agents"])
feedback = results[FEEDBACK_CASE["number"]]
assert feedback["status"] == "open"
assert feedback["reviewers"] == ["Vida"]
assert feedback["domain_verdict"] == "request_changes"
assert proof["source_feedback_paths"] == [f"inbox/archive/phase1b-{FEEDBACK_CASE['number']}.md"]

View file

@ -1,21 +1,20 @@
"""Tests for lib/search.py — vector search and graph expansion."""
import json
from pathlib import Path
from unittest.mock import patch, MagicMock
from unittest.mock import MagicMock, patch
import pytest
from lib.search import (
PASS1_THRESHOLD,
WIKI_LINK_RE,
_parse_frontmatter_edges,
_resolve_claim_path,
graph_expand,
search,
search_qdrant,
WIKI_LINK_RE,
)
# ─── Fixtures ──────────────────────────────────────────────────────────────
@ -513,17 +512,19 @@ class TestTwoPassRetrieval:
@patch("lib.search.search_qdrant")
@patch("lib.search.embed_query")
def test_pass1_only_default(self, mock_embed, mock_qdrant, mock_expand):
"""Default search (expand=False) only calls Qdrant once with high threshold."""
"""Default search (expand=False) only calls Qdrant once with the pass-1 threshold."""
mock_embed.return_value = [0.1] * 1536
mock_qdrant.return_value = [
{"score": 0.85, "payload": {"claim_title": "Hit", "claim_path": "d/a.md"}},
]
result = search("query")
mock_qdrant.assert_called_once()
# Should use PASS1_THRESHOLD (0.70)
# Should use the production pass-1 threshold.
call_kwargs = mock_qdrant.call_args
assert call_kwargs.kwargs.get("score_threshold") == 0.70 \
or call_kwargs[1].get("score_threshold") == 0.70
assert (
call_kwargs.kwargs.get("score_threshold") == PASS1_THRESHOLD
or call_kwargs[1].get("score_threshold") == PASS1_THRESHOLD
)
mock_expand.assert_not_called()
assert len(result["direct_results"]) == 1