Some checks failed
CI / lint-and-test (push) Has been cancelled
Sources merged: - teleo-codex/ops/pipeline-v2/ (11 newer lib files, 5 new lib modules) - teleo-codex/ops/ (agent-state, diagnostics expansion, systemd units, ops scripts) - VPS /opt/teleo-eval/telegram/ (10 new bot files, agent configs) - VPS /opt/teleo-eval/pipeline/ops/ (vector-gc, backfill-descriptions) - VPS /opt/teleo-eval/sync-mirror.sh (Bug 2 + Step 2.5 fixes) Non-trivial merges: - connect.py: kept codex threshold (0.65) + added infra domain parameter - watchdog.py: kept infra version (stale_pr integration, superset of codex) - deploy.sh: codex rsync version (interim, until VPS git clone migration) - diagnostics/app.py: codex decomposed dashboard (14 new route modules) 81 files changed, +17105/-200 lines Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
113 lines
3.3 KiB
Python
113 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Process cascade inbox messages after a research session.
|
|
|
|
For each unread cascade-*.md in an agent's inbox:
|
|
1. Logs cascade_reviewed event to pipeline.db audit_log
|
|
2. Moves the file to inbox/processed/
|
|
|
|
Usage: python3 process-cascade-inbox.py <agent-name>
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
AGENT_STATE_DIR = Path(os.environ.get("AGENT_STATE_DIR", "/opt/teleo-eval/agent-state"))
|
|
PIPELINE_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
|
|
|
|
|
|
def parse_frontmatter(text: str) -> dict:
|
|
"""Parse YAML-like frontmatter from markdown."""
|
|
fm = {}
|
|
match = re.match(r'^---\n(.*?)\n---', text, re.DOTALL)
|
|
if not match:
|
|
return fm
|
|
for line in match.group(1).strip().splitlines():
|
|
if ':' in line:
|
|
key, val = line.split(':', 1)
|
|
fm[key.strip()] = val.strip().strip('"')
|
|
return fm
|
|
|
|
|
|
def process_agent_inbox(agent: str) -> int:
|
|
"""Process cascade messages in agent's inbox. Returns count processed."""
|
|
inbox_dir = AGENT_STATE_DIR / agent / "inbox"
|
|
if not inbox_dir.exists():
|
|
return 0
|
|
|
|
cascade_files = sorted(inbox_dir.glob("cascade-*.md"))
|
|
if not cascade_files:
|
|
return 0
|
|
|
|
# Ensure processed dir exists
|
|
processed_dir = inbox_dir / "processed"
|
|
processed_dir.mkdir(exist_ok=True)
|
|
|
|
processed = 0
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(PIPELINE_DB), timeout=10)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
except sqlite3.Error as e:
|
|
print(f"WARNING: Cannot connect to pipeline.db: {e}", file=sys.stderr)
|
|
# Still move files even if DB is unavailable
|
|
conn = None
|
|
|
|
for cf in cascade_files:
|
|
try:
|
|
text = cf.read_text()
|
|
fm = parse_frontmatter(text)
|
|
|
|
# Skip already-processed files
|
|
if fm.get("status") == "processed":
|
|
continue
|
|
|
|
# Log to audit_log
|
|
if conn:
|
|
detail = {
|
|
"agent": agent,
|
|
"cascade_file": cf.name,
|
|
"subject": fm.get("subject", "unknown"),
|
|
"original_created": fm.get("created", "unknown"),
|
|
"reviewed_at": now,
|
|
}
|
|
conn.execute(
|
|
"INSERT INTO audit_log (stage, event, detail, timestamp) VALUES (?, ?, ?, ?)",
|
|
("cascade", "cascade_reviewed", json.dumps(detail), now),
|
|
)
|
|
|
|
# Move to processed
|
|
dest = processed_dir / cf.name
|
|
shutil.move(str(cf), str(dest))
|
|
processed += 1
|
|
|
|
except Exception as e:
|
|
print(f"WARNING: Failed to process {cf.name}: {e}", file=sys.stderr)
|
|
|
|
if conn:
|
|
try:
|
|
conn.commit()
|
|
conn.close()
|
|
except sqlite3.Error:
|
|
pass
|
|
|
|
return processed
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 2:
|
|
print(f"Usage: {sys.argv[0]} <agent-name>", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
agent = sys.argv[1]
|
|
count = process_agent_inbox(agent)
|
|
if count > 0:
|
|
print(f"Processed {count} cascade message(s) for {agent}")
|
|
# Exit 0 regardless — non-fatal
|
|
sys.exit(0)
|