From 0591c4c0df413c759a78b4525f31b2980ef4f0c9 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Tue, 7 Apr 2026 02:28:07 +0100 Subject: [PATCH] wire cascade, cross_domain, and review_records into pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - merge.py: import + await cascade_after_merge and cross_domain_after_merge after reciprocal edges, before branch deletion. Both non-fatal. Added conn.commit() before slow branch deletion (Ganymede Q4). - db.py: add record_review() helper + migration v18 (review_records table with indexes). Schema version 17→18. - evaluate.py: call record_review() at all 3 verdict points: domain_rejected → outcome=rejected approved → outcome=approved changes_requested → outcome=approved-with-changes Notes field captures review text (capped 4000 chars). Pentagon-Agent: Ship --- ops/pipeline-v2/lib/db.py | 56 ++++++++++++++++++++++++++++++++- ops/pipeline-v2/lib/evaluate.py | 16 ++++++++++ ops/pipeline-v2/lib/merge.py | 16 ++++++++++ 3 files changed, 87 insertions(+), 1 deletion(-) diff --git a/ops/pipeline-v2/lib/db.py b/ops/pipeline-v2/lib/db.py index 1bd2abe4e..1d15bc00b 100644 --- a/ops/pipeline-v2/lib/db.py +++ b/ops/pipeline-v2/lib/db.py @@ -9,7 +9,7 @@ from . import config logger = logging.getLogger("pipeline.db") -SCHEMA_VERSION = 17 +SCHEMA_VERSION = 18 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -492,6 +492,30 @@ def migrate(conn: sqlite3.Connection): conn.commit() logger.info("Migration v17: added prompt_version, pipeline_version to prs table") + if current < 18: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS review_records ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pr_number INTEGER NOT NULL, + claim_path TEXT, + domain TEXT, + agent TEXT, + reviewer TEXT, + reviewer_model TEXT, + outcome TEXT NOT NULL, + rejection_reason TEXT, + disagreement_type TEXT, + notes TEXT, + batch_id TEXT, + claims_in_batch INTEGER, + reviewed_at TEXT DEFAULT (datetime('now')) + ); + CREATE INDEX IF NOT EXISTS idx_review_records_pr ON review_records(pr_number); + CREATE INDEX IF NOT EXISTS idx_review_records_agent ON review_records(agent); + """) + conn.commit() + logger.info("Migration v18: created review_records table") + if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", @@ -511,6 +535,36 @@ def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None): ) +def record_review( + conn: sqlite3.Connection, + pr_number: int, + outcome: str, + *, + domain: str = None, + agent: str = None, + reviewer: str = None, + reviewer_model: str = None, + rejection_reason: str = None, + disagreement_type: str = None, + notes: str = None, + claims_in_batch: int = None, +): + """Write a review record. Called at each eval verdict point.""" + conn.execute( + """INSERT INTO review_records + (pr_number, domain, agent, reviewer, reviewer_model, outcome, + rejection_reason, disagreement_type, notes, batch_id, claims_in_batch) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + pr_number, domain, agent, reviewer, reviewer_model, outcome, + rejection_reason, disagreement_type, + notes[:4000] if notes else None, + str(pr_number), # batch_id = PR number + claims_in_batch, + ), + ) + + def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str): """Append a priority assessment to a source's priority_log. diff --git a/ops/pipeline-v2/lib/evaluate.py b/ops/pipeline-v2/lib/evaluate.py index 7dca3c3e3..ff6dab8a9 100644 --- a/ops/pipeline-v2/lib/evaluate.py +++ b/ops/pipeline-v2/lib/evaluate.py @@ -705,6 +705,11 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: db.audit( conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues}) ) + db.record_review( + conn, pr_number, "rejected", + domain=domain, agent=agent, reviewer=agent, reviewer_model="gpt-4o", + notes=(domain_review or "")[:4000], + ) # Disposition: check if this PR should be terminated or kept open await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues) @@ -776,6 +781,11 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: json.dumps({"pr": pr_number, "tier": tier, "domain": domain, "leo": leo_verdict, "domain_agent": agent, "auto_merge": is_agent_pr}), ) + db.record_review( + conn, pr_number, "approved", + domain=domain, agent=agent, reviewer="leo", reviewer_model="sonnet" if tier == "STANDARD" else "opus", + notes=(leo_review or "")[:4000] if leo_review else None, + ) if is_agent_pr: logger.info("PR #%d: APPROVED + auto_merge (agent branch %s)", pr_number, branch_name) else: @@ -806,6 +816,12 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: {"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues} ), ) + db.record_review( + conn, pr_number, "approved-with-changes", + domain=domain, agent=agent, reviewer="leo", + reviewer_model="sonnet" if tier == "STANDARD" else "opus", + notes=(leo_review or domain_review or "")[:4000], + ) logger.info( "PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)", pr_number, diff --git a/ops/pipeline-v2/lib/merge.py b/ops/pipeline-v2/lib/merge.py index d6c8dfcf1..49a20c677 100644 --- a/ops/pipeline-v2/lib/merge.py +++ b/ops/pipeline-v2/lib/merge.py @@ -48,6 +48,8 @@ except ImportError: import sys sys.path.insert(0, os.path.dirname(__file__)) from worktree_lock import async_main_worktree_lock +from .cascade import cascade_after_merge +from .cross_domain import cross_domain_after_merge from .forgejo import get_agent_token, get_pr_diff, repo_path logger = logging.getLogger("pipeline.merge") @@ -1516,6 +1518,20 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: # New claim A with supports:[B] → add supports:[A] on B's frontmatter await _reciprocal_edges(main_sha, branch_sha) + # Cascade: notify agents whose beliefs/positions depend on changed claims + try: + await cascade_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE, conn=conn) + except Exception: + logger.exception("PR #%d: cascade failed (non-fatal)", pr_num) + + # Cross-domain citation index: log entity-based connections between domains + try: + await cross_domain_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE, conn=conn) + except Exception: + logger.exception("PR #%d: cross_domain failed (non-fatal)", pr_num) + + conn.commit() # Commit DB writes before slow branch deletion + # Delete remote branch immediately (Ganymede Q4) await _delete_remote_branch(branch)