Phase 1 — Audit logging infrastructure: - review_records table (migration v12) capturing every eval verdict with outcome, rejection reason, disagreement type - Cascade automation: auto-flag dependent beliefs/positions when merged claims change - Merge frontmatter stamps: last_review metadata on merged claim files Phase 2 — Cross-domain and state tracking: - Cross-domain citation index: entity overlap detection across domains on every merge - Agent-state schema v1: file-backed state for VPS agents (memory, tasks, inbox, metrics) - Cascade completion tracking: process-cascade-inbox.py logs review outcomes - research-session.sh: state hooks + cascade processing integration All changes are live on VPS. This commit brings the code under version control for review. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
113 lines
3.3 KiB
Python
113 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Process cascade inbox messages after a research session.
|
|
|
|
For each unread cascade-*.md in an agent's inbox:
|
|
1. Logs cascade_reviewed event to pipeline.db audit_log
|
|
2. Moves the file to inbox/processed/
|
|
|
|
Usage: python3 process-cascade-inbox.py <agent-name>
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
AGENT_STATE_DIR = Path(os.environ.get("AGENT_STATE_DIR", "/opt/teleo-eval/agent-state"))
|
|
PIPELINE_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
|
|
|
|
|
|
def parse_frontmatter(text: str) -> dict:
|
|
"""Parse YAML-like frontmatter from markdown."""
|
|
fm = {}
|
|
match = re.match(r'^---\n(.*?)\n---', text, re.DOTALL)
|
|
if not match:
|
|
return fm
|
|
for line in match.group(1).strip().splitlines():
|
|
if ':' in line:
|
|
key, val = line.split(':', 1)
|
|
fm[key.strip()] = val.strip().strip('"')
|
|
return fm
|
|
|
|
|
|
def process_agent_inbox(agent: str) -> int:
|
|
"""Process cascade messages in agent's inbox. Returns count processed."""
|
|
inbox_dir = AGENT_STATE_DIR / agent / "inbox"
|
|
if not inbox_dir.exists():
|
|
return 0
|
|
|
|
cascade_files = sorted(inbox_dir.glob("cascade-*.md"))
|
|
if not cascade_files:
|
|
return 0
|
|
|
|
# Ensure processed dir exists
|
|
processed_dir = inbox_dir / "processed"
|
|
processed_dir.mkdir(exist_ok=True)
|
|
|
|
processed = 0
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(PIPELINE_DB), timeout=10)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
except sqlite3.Error as e:
|
|
print(f"WARNING: Cannot connect to pipeline.db: {e}", file=sys.stderr)
|
|
# Still move files even if DB is unavailable
|
|
conn = None
|
|
|
|
for cf in cascade_files:
|
|
try:
|
|
text = cf.read_text()
|
|
fm = parse_frontmatter(text)
|
|
|
|
# Skip already-processed files
|
|
if fm.get("status") == "processed":
|
|
continue
|
|
|
|
# Log to audit_log
|
|
if conn:
|
|
detail = {
|
|
"agent": agent,
|
|
"cascade_file": cf.name,
|
|
"subject": fm.get("subject", "unknown"),
|
|
"original_created": fm.get("created", "unknown"),
|
|
"reviewed_at": now,
|
|
}
|
|
conn.execute(
|
|
"INSERT INTO audit_log (stage, event, detail, timestamp) VALUES (?, ?, ?, ?)",
|
|
("cascade", "cascade_reviewed", json.dumps(detail), now),
|
|
)
|
|
|
|
# Move to processed
|
|
dest = processed_dir / cf.name
|
|
shutil.move(str(cf), str(dest))
|
|
processed += 1
|
|
|
|
except Exception as e:
|
|
print(f"WARNING: Failed to process {cf.name}: {e}", file=sys.stderr)
|
|
|
|
if conn:
|
|
try:
|
|
conn.commit()
|
|
conn.close()
|
|
except sqlite3.Error:
|
|
pass
|
|
|
|
return processed
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 2:
|
|
print(f"Usage: {sys.argv[0]} <agent-name>", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
agent = sys.argv[1]
|
|
count = process_agent_inbox(agent)
|
|
if count > 0:
|
|
print(f"Processed {count} cascade message(s) for {agent}")
|
|
# Exit 0 regardless — non-fatal
|
|
sys.exit(0)
|