wire cascade, cross_domain, and review_records into pipeline

- merge.py: import + await cascade_after_merge and cross_domain_after_merge
  after reciprocal edges, before branch deletion. Both non-fatal.
  Added conn.commit() before slow branch deletion (Ganymede Q4).
- db.py: add record_review() helper + migration v18 (review_records table
  with indexes). Schema version 17→18.
- evaluate.py: call record_review() at all 3 verdict points:
  domain_rejected → outcome=rejected
  approved → outcome=approved
  changes_requested → outcome=approved-with-changes
  Notes field captures review text (capped 4000 chars).

Pentagon-Agent: Ship <E2A054E5-A6D6-4AE0-B0A3-F51A3B4DBCA5>
This commit is contained in:
m3taversal 2026-04-07 02:28:07 +01:00 committed by Teleo Agents
parent 8f6057686e
commit 0591c4c0df
3 changed files with 87 additions and 1 deletions

View file

@ -9,7 +9,7 @@ from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 17
SCHEMA_VERSION = 18
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -492,6 +492,30 @@ def migrate(conn: sqlite3.Connection):
conn.commit()
logger.info("Migration v17: added prompt_version, pipeline_version to prs table")
if current < 18:
conn.executescript("""
CREATE TABLE IF NOT EXISTS review_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
agent TEXT,
reviewer TEXT,
reviewer_model TEXT,
outcome TEXT NOT NULL,
rejection_reason TEXT,
disagreement_type TEXT,
notes TEXT,
batch_id TEXT,
claims_in_batch INTEGER,
reviewed_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_review_records_pr ON review_records(pr_number);
CREATE INDEX IF NOT EXISTS idx_review_records_agent ON review_records(agent);
""")
conn.commit()
logger.info("Migration v18: created review_records table")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
@ -511,6 +535,36 @@ def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None):
)
def record_review(
conn: sqlite3.Connection,
pr_number: int,
outcome: str,
*,
domain: str = None,
agent: str = None,
reviewer: str = None,
reviewer_model: str = None,
rejection_reason: str = None,
disagreement_type: str = None,
notes: str = None,
claims_in_batch: int = None,
):
"""Write a review record. Called at each eval verdict point."""
conn.execute(
"""INSERT INTO review_records
(pr_number, domain, agent, reviewer, reviewer_model, outcome,
rejection_reason, disagreement_type, notes, batch_id, claims_in_batch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
pr_number, domain, agent, reviewer, reviewer_model, outcome,
rejection_reason, disagreement_type,
notes[:4000] if notes else None,
str(pr_number), # batch_id = PR number
claims_in_batch,
),
)
def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str):
"""Append a priority assessment to a source's priority_log.

View file

@ -705,6 +705,11 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
db.audit(
conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues})
)
db.record_review(
conn, pr_number, "rejected",
domain=domain, agent=agent, reviewer=agent, reviewer_model="gpt-4o",
notes=(domain_review or "")[:4000],
)
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues)
@ -776,6 +781,11 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
json.dumps({"pr": pr_number, "tier": tier, "domain": domain, "leo": leo_verdict, "domain_agent": agent,
"auto_merge": is_agent_pr}),
)
db.record_review(
conn, pr_number, "approved",
domain=domain, agent=agent, reviewer="leo", reviewer_model="sonnet" if tier == "STANDARD" else "opus",
notes=(leo_review or "")[:4000] if leo_review else None,
)
if is_agent_pr:
logger.info("PR #%d: APPROVED + auto_merge (agent branch %s)", pr_number, branch_name)
else:
@ -806,6 +816,12 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
{"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues}
),
)
db.record_review(
conn, pr_number, "approved-with-changes",
domain=domain, agent=agent, reviewer="leo",
reviewer_model="sonnet" if tier == "STANDARD" else "opus",
notes=(leo_review or domain_review or "")[:4000],
)
logger.info(
"PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)",
pr_number,

View file

@ -48,6 +48,8 @@ except ImportError:
import sys
sys.path.insert(0, os.path.dirname(__file__))
from worktree_lock import async_main_worktree_lock
from .cascade import cascade_after_merge
from .cross_domain import cross_domain_after_merge
from .forgejo import get_agent_token, get_pr_diff, repo_path
logger = logging.getLogger("pipeline.merge")
@ -1516,6 +1518,20 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
# New claim A with supports:[B] → add supports:[A] on B's frontmatter
await _reciprocal_edges(main_sha, branch_sha)
# Cascade: notify agents whose beliefs/positions depend on changed claims
try:
await cascade_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE, conn=conn)
except Exception:
logger.exception("PR #%d: cascade failed (non-fatal)", pr_num)
# Cross-domain citation index: log entity-based connections between domains
try:
await cross_domain_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE, conn=conn)
except Exception:
logger.exception("PR #%d: cross_domain failed (non-fatal)", pr_num)
conn.commit() # Commit DB writes before slow branch deletion
# Delete remote branch immediately (Ganymede Q4)
await _delete_remote_branch(branch)