Add Phase 1+2 instrumentation: review records, cascade automation, cross-domain index, agent state

Phase 1 — Audit logging infrastructure:
- review_records table (migration v12) capturing every eval verdict with outcome, rejection reason, disagreement type
- Cascade automation: auto-flag dependent beliefs/positions when merged claims change
- Merge frontmatter stamps: last_review metadata on merged claim files

Phase 2 — Cross-domain and state tracking:
- Cross-domain citation index: entity overlap detection across domains on every merge
- Agent-state schema v1: file-backed state for VPS agents (memory, tasks, inbox, metrics)
- Cascade completion tracking: process-cascade-inbox.py logs review outcomes
- research-session.sh: state hooks + cascade processing integration

All changes are live on VPS. This commit brings the code under version control for review.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-04-02 11:48:09 +01:00 committed by Teleo Agents
parent ea4085a553
commit 2c0d428dc0
10 changed files with 4884 additions and 4 deletions

255
ops/agent-state/SCHEMA.md Normal file
View file

@ -0,0 +1,255 @@
# Agent State Schema v1
File-backed durable state for teleo agents running headless on VPS.
Survives context truncation, crash recovery, and session handoffs.
## Design Principles
1. **Three formats** — JSON for structured fields, JSONL for append-only logs, Markdown for context-window-friendly content
2. **Many small files** — selective loading, crash isolation, no locks needed
3. **Write on events** — not timers. State updates happen when something meaningful changes.
4. **Shared-nothing writes** — each agent owns its directory. Communication via inbox files.
5. **State ≠ Git** — state is operational (how the agent functions). Git is output (what the agent produces).
## Directory Layout
```
/opt/teleo-eval/agent-state/{agent}/
├── report.json # Current status — read every wake
├── tasks.json # Active task queue — read every wake
├── session.json # Current/last session metadata
├── memory.md # Accumulated cross-session knowledge (structured)
├── inbox/ # Messages from other agents/orchestrator
│ └── {uuid}.json # One file per message, atomic create
├── journal.jsonl # Append-only session log
└── metrics.json # Cumulative performance counters
```
## File Specifications
### report.json
Written: after each meaningful action (session start, key finding, session end)
Read: every wake, by orchestrator for monitoring
```json
{
"agent": "rio",
"updated_at": "2026-03-31T22:00:00Z",
"status": "idle | researching | extracting | evaluating | error",
"summary": "Completed research session — 8 sources archived on Solana launchpad mechanics",
"current_task": null,
"last_session": {
"id": "20260331-220000",
"started_at": "2026-03-31T20:30:00Z",
"ended_at": "2026-03-31T22:00:00Z",
"outcome": "completed | timeout | error",
"sources_archived": 8,
"branch": "rio/research-2026-03-31",
"pr_number": 247
},
"blocked_by": null,
"next_priority": "Follow up on conditional AMM thread from @0xfbifemboy"
}
```
### tasks.json
Written: when task status changes
Read: every wake
```json
{
"agent": "rio",
"updated_at": "2026-03-31T22:00:00Z",
"tasks": [
{
"id": "task-001",
"type": "research | extract | evaluate | follow-up | disconfirm",
"description": "Investigate conditional AMM mechanisms in MetaDAO v2",
"status": "pending | active | completed | dropped",
"priority": "high | medium | low",
"created_at": "2026-03-31T22:00:00Z",
"context": "Flagged in research session 2026-03-31 — @0xfbifemboy thread on conditional liquidity",
"follow_up_from": null,
"completed_at": null,
"outcome": null
}
]
}
```
### session.json
Written: at session start and session end
Read: every wake (for continuation), by orchestrator for scheduling
```json
{
"agent": "rio",
"session_id": "20260331-220000",
"started_at": "2026-03-31T20:30:00Z",
"ended_at": "2026-03-31T22:00:00Z",
"type": "research | extract | evaluate | ad-hoc",
"domain": "internet-finance",
"branch": "rio/research-2026-03-31",
"status": "running | completed | timeout | error",
"model": "sonnet",
"timeout_seconds": 5400,
"research_question": "How is conditional liquidity being implemented in Solana AMMs?",
"belief_targeted": "Markets aggregate information better than votes because skin-in-the-game creates selection pressure on beliefs",
"disconfirmation_target": "Cases where prediction markets failed to aggregate information despite financial incentives",
"sources_archived": 8,
"sources_expected": 10,
"tokens_used": null,
"cost_usd": null,
"errors": [],
"handoff_notes": "Found 3 sources on conditional AMM failures — needs extraction. Also flagged @metaproph3t thread for Theseus (AI governance angle)."
}
```
### memory.md
Written: at session end, when learning something critical
Read: every wake (included in research prompt context)
```markdown
# Rio — Operational Memory
## Cross-Session Patterns
- Conditional AMMs keep appearing across 3+ independent sources (sessions 03-28, 03-29, 03-31). This is likely a real trend, not cherry-picking.
- @0xfbifemboy consistently produces highest-signal threads in the DeFi mechanism design space.
## Dead Ends (don't re-investigate)
- Polymarket fee structure analysis (2026-03-25): fully documented in existing claims, no new angles.
- Jupiter governance token utility (2026-03-27): vaporware, no mechanism to analyze.
## Open Questions
- Is MetaDAO's conditional market maker manipulation-resistant at scale? No evidence either way yet.
- How does futarchy handle low-liquidity markets? This is the keystone weakness.
## Corrections
- Previously believed Drift protocol was pure order-book. Actually hybrid AMM+CLOB. Updated 2026-03-30.
## Cross-Agent Flags Received
- Theseus (2026-03-29): "Check if MetaDAO governance has AI agent participation — alignment implications"
- Leo (2026-03-28): "Your conditional AMM analysis connects to Astra's resource allocation claims"
```
### inbox/{uuid}.json
Written: by other agents or orchestrator
Read: checked on wake, deleted after processing
```json
{
"id": "msg-abc123",
"from": "theseus",
"to": "rio",
"created_at": "2026-03-31T18:00:00Z",
"type": "flag | task | question | cascade",
"priority": "high | normal",
"subject": "Check MetaDAO for AI agent participation",
"body": "Found evidence that AI agents are trading on Drift — check if any are participating in MetaDAO conditional markets. Alignment implications if automated agents are influencing futarchic governance.",
"source_ref": "theseus/research-2026-03-31",
"expires_at": null
}
```
### journal.jsonl
Written: append at session boundaries
Read: debug/audit only (never loaded into agent context by default)
```jsonl
{"ts":"2026-03-31T20:30:00Z","event":"session_start","session_id":"20260331-220000","type":"research"}
{"ts":"2026-03-31T20:35:00Z","event":"orient_complete","files_read":["identity.md","beliefs.md","reasoning.md","_map.md"]}
{"ts":"2026-03-31T21:30:00Z","event":"sources_archived","count":5,"domain":"internet-finance"}
{"ts":"2026-03-31T22:00:00Z","event":"session_end","outcome":"completed","sources_archived":8,"handoff":"conditional AMM failures need extraction"}
```
### metrics.json
Written: at session end (cumulative counters)
Read: by CI scoring system, by orchestrator for scheduling decisions
```json
{
"agent": "rio",
"updated_at": "2026-03-31T22:00:00Z",
"lifetime": {
"sessions_total": 47,
"sessions_completed": 42,
"sessions_timeout": 3,
"sessions_error": 2,
"sources_archived": 312,
"claims_proposed": 89,
"claims_accepted": 71,
"claims_challenged": 12,
"claims_rejected": 6,
"disconfirmation_attempts": 47,
"disconfirmation_hits": 8,
"cross_agent_flags_sent": 23,
"cross_agent_flags_received": 15
},
"rolling_30d": {
"sessions": 12,
"sources_archived": 87,
"claims_proposed": 24,
"acceptance_rate": 0.83,
"avg_sources_per_session": 7.25
}
}
```
## Integration Points
### research-session.sh
Add these hooks:
1. **Pre-session** (after branch creation, before Claude launch):
- Write `session.json` with status "running"
- Write `report.json` with status "researching"
- Append session_start to `journal.jsonl`
- Include `memory.md` and `tasks.json` in the research prompt
2. **Post-session** (after commit, before/after PR):
- Update `session.json` with outcome, source count, branch, PR number
- Update `report.json` with summary and next_priority
- Update `metrics.json` counters
- Append session_end to `journal.jsonl`
- Process and clean `inbox/` (mark processed messages)
3. **On error/timeout**:
- Update `session.json` status to "error" or "timeout"
- Update `report.json` with error info
- Append error event to `journal.jsonl`
### Pipeline daemon (teleo-pipeline.py)
- Read `report.json` for all agents to build dashboard
- Write to `inbox/` when cascade events need agent attention
- Read `metrics.json` for scheduling decisions (deprioritize agents with high error rates)
### Claude research prompt
Add to the prompt:
```
### Step 0: Load Operational State (1 min)
Read /opt/teleo-eval/agent-state/{agent}/memory.md — this is your cross-session operational memory.
Read /opt/teleo-eval/agent-state/{agent}/tasks.json — check for pending tasks.
Check /opt/teleo-eval/agent-state/{agent}/inbox/ for messages from other agents.
Process any high-priority inbox items before choosing your research direction.
```
## Bootstrap
Run `ops/agent-state/bootstrap.sh` to create directories and seed initial state for all agents.
## Migration from Existing State
- `research-journal.md` continues as-is (agent-written, in git). `memory.md` is the structured equivalent for operational state (not in git).
- `ops/sessions/*.json` continue for backward compat. `session.json` per agent is the richer replacement.
- `ops/queue.md` remains the human-visible task board. `tasks.json` per agent is the machine-readable equivalent.
- Workspace flags (`~/.pentagon/workspace/collective/flag-*`) migrate to `inbox/` messages over time.

145
ops/agent-state/bootstrap.sh Executable file
View file

@ -0,0 +1,145 @@
#!/bin/bash
# Bootstrap agent-state directories for all teleo agents.
# Run once on VPS: bash ops/agent-state/bootstrap.sh
# Safe to re-run — skips existing files, only creates missing ones.
set -euo pipefail
STATE_ROOT="${TELEO_STATE_ROOT:-/opt/teleo-eval/agent-state}"
AGENTS=("rio" "clay" "theseus" "vida" "astra" "leo")
DOMAINS=("internet-finance" "entertainment" "ai-alignment" "health" "space-development" "grand-strategy")
log() { echo "[$(date -Iseconds)] $*"; }
for i in "${!AGENTS[@]}"; do
AGENT="${AGENTS[$i]}"
DOMAIN="${DOMAINS[$i]}"
DIR="$STATE_ROOT/$AGENT"
log "Bootstrapping $AGENT..."
mkdir -p "$DIR/inbox"
# report.json — current status
if [ ! -f "$DIR/report.json" ]; then
cat > "$DIR/report.json" <<EOJSON
{
"agent": "$AGENT",
"updated_at": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"status": "idle",
"summary": "State initialized — no sessions recorded yet.",
"current_task": null,
"last_session": null,
"blocked_by": null,
"next_priority": null
}
EOJSON
log " Created report.json"
fi
# tasks.json — empty task queue
if [ ! -f "$DIR/tasks.json" ]; then
cat > "$DIR/tasks.json" <<EOJSON
{
"agent": "$AGENT",
"updated_at": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"tasks": []
}
EOJSON
log " Created tasks.json"
fi
# session.json — no session yet
if [ ! -f "$DIR/session.json" ]; then
cat > "$DIR/session.json" <<EOJSON
{
"agent": "$AGENT",
"session_id": null,
"started_at": null,
"ended_at": null,
"type": null,
"domain": "$DOMAIN",
"branch": null,
"status": "idle",
"model": null,
"timeout_seconds": null,
"research_question": null,
"belief_targeted": null,
"disconfirmation_target": null,
"sources_archived": 0,
"sources_expected": 0,
"tokens_used": null,
"cost_usd": null,
"errors": [],
"handoff_notes": null
}
EOJSON
log " Created session.json"
fi
# memory.md — empty operational memory
if [ ! -f "$DIR/memory.md" ]; then
cat > "$DIR/memory.md" <<EOMD
# ${AGENT^} — Operational Memory
## Cross-Session Patterns
(none yet)
## Dead Ends
(none yet)
## Open Questions
(none yet)
## Corrections
(none yet)
## Cross-Agent Flags Received
(none yet)
EOMD
log " Created memory.md"
fi
# metrics.json — zero counters
if [ ! -f "$DIR/metrics.json" ]; then
cat > "$DIR/metrics.json" <<EOJSON
{
"agent": "$AGENT",
"updated_at": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"lifetime": {
"sessions_total": 0,
"sessions_completed": 0,
"sessions_timeout": 0,
"sessions_error": 0,
"sources_archived": 0,
"claims_proposed": 0,
"claims_accepted": 0,
"claims_challenged": 0,
"claims_rejected": 0,
"disconfirmation_attempts": 0,
"disconfirmation_hits": 0,
"cross_agent_flags_sent": 0,
"cross_agent_flags_received": 0
},
"rolling_30d": {
"sessions": 0,
"sources_archived": 0,
"claims_proposed": 0,
"acceptance_rate": 0.0,
"avg_sources_per_session": 0.0
}
}
EOJSON
log " Created metrics.json"
fi
# journal.jsonl — empty log
if [ ! -f "$DIR/journal.jsonl" ]; then
echo "{\"ts\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\",\"event\":\"state_initialized\",\"schema_version\":\"1.0\"}" > "$DIR/journal.jsonl"
log " Created journal.jsonl"
fi
done
log "Bootstrap complete. State root: $STATE_ROOT"
log "Agents initialized: ${AGENTS[*]}"

258
ops/agent-state/lib-state.sh Executable file
View file

@ -0,0 +1,258 @@
#!/bin/bash
# lib-state.sh — Bash helpers for reading/writing agent state files.
# Source this in pipeline scripts: source ops/agent-state/lib-state.sh
#
# All writes use atomic rename (write to .tmp, then mv) to prevent corruption.
# All reads return valid JSON or empty string on missing/corrupt files.
STATE_ROOT="${TELEO_STATE_ROOT:-/opt/teleo-eval/agent-state}"
# --- Internal helpers ---
_state_dir() {
local agent="$1"
echo "$STATE_ROOT/$agent"
}
# Atomic write: write to tmp file, then rename. Prevents partial reads.
_atomic_write() {
local filepath="$1"
local content="$2"
local tmpfile="${filepath}.tmp.$$"
echo "$content" > "$tmpfile"
mv -f "$tmpfile" "$filepath"
}
# --- Report (current status) ---
state_read_report() {
local agent="$1"
local file="$(_state_dir "$agent")/report.json"
[ -f "$file" ] && cat "$file" || echo "{}"
}
state_update_report() {
local agent="$1"
local status="$2"
local summary="$3"
local file="$(_state_dir "$agent")/report.json"
# Read existing, merge with updates using python (available on VPS)
python3 -c "
import json, sys
try:
with open('$file') as f:
data = json.load(f)
except:
data = {'agent': '$agent'}
data['status'] = '$status'
data['summary'] = '''$summary'''
data['updated_at'] = '$(date -u +%Y-%m-%dT%H:%M:%SZ)'
print(json.dumps(data, indent=2))
" | _atomic_write_stdin "$file"
}
# Variant that takes full JSON from stdin
_atomic_write_stdin() {
local filepath="$1"
local tmpfile="${filepath}.tmp.$$"
cat > "$tmpfile"
mv -f "$tmpfile" "$filepath"
}
# Full report update with session info (called at session end)
state_finalize_report() {
local agent="$1"
local status="$2"
local summary="$3"
local session_id="$4"
local started_at="$5"
local ended_at="$6"
local outcome="$7"
local sources="$8"
local branch="$9"
local pr_number="${10}"
local next_priority="${11:-null}"
local file="$(_state_dir "$agent")/report.json"
python3 -c "
import json
data = {
'agent': '$agent',
'updated_at': '$ended_at',
'status': '$status',
'summary': '''$summary''',
'current_task': None,
'last_session': {
'id': '$session_id',
'started_at': '$started_at',
'ended_at': '$ended_at',
'outcome': '$outcome',
'sources_archived': $sources,
'branch': '$branch',
'pr_number': $pr_number
},
'blocked_by': None,
'next_priority': $([ "$next_priority" = "null" ] && echo "None" || echo "'$next_priority'")
}
print(json.dumps(data, indent=2))
" | _atomic_write_stdin "$file"
}
# --- Session ---
state_start_session() {
local agent="$1"
local session_id="$2"
local type="$3"
local domain="$4"
local branch="$5"
local model="${6:-sonnet}"
local timeout="${7:-5400}"
local started_at
started_at="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
local file="$(_state_dir "$agent")/session.json"
python3 -c "
import json
data = {
'agent': '$agent',
'session_id': '$session_id',
'started_at': '$started_at',
'ended_at': None,
'type': '$type',
'domain': '$domain',
'branch': '$branch',
'status': 'running',
'model': '$model',
'timeout_seconds': $timeout,
'research_question': None,
'belief_targeted': None,
'disconfirmation_target': None,
'sources_archived': 0,
'sources_expected': 0,
'tokens_used': None,
'cost_usd': None,
'errors': [],
'handoff_notes': None
}
print(json.dumps(data, indent=2))
" | _atomic_write_stdin "$file"
echo "$started_at"
}
state_end_session() {
local agent="$1"
local outcome="$2"
local sources="${3:-0}"
local pr_number="${4:-null}"
local file="$(_state_dir "$agent")/session.json"
python3 -c "
import json
with open('$file') as f:
data = json.load(f)
data['ended_at'] = '$(date -u +%Y-%m-%dT%H:%M:%SZ)'
data['status'] = '$outcome'
data['sources_archived'] = $sources
print(json.dumps(data, indent=2))
" | _atomic_write_stdin "$file"
}
# --- Journal (append-only JSONL) ---
state_journal_append() {
local agent="$1"
local event="$2"
shift 2
# Remaining args are key=value pairs for extra fields
local file="$(_state_dir "$agent")/journal.jsonl"
local extras=""
for kv in "$@"; do
local key="${kv%%=*}"
local val="${kv#*=}"
extras="$extras, \"$key\": \"$val\""
done
echo "{\"ts\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\",\"event\":\"$event\"$extras}" >> "$file"
}
# --- Metrics ---
state_update_metrics() {
local agent="$1"
local outcome="$2"
local sources="${3:-0}"
local file="$(_state_dir "$agent")/metrics.json"
python3 -c "
import json
try:
with open('$file') as f:
data = json.load(f)
except:
data = {'agent': '$agent', 'lifetime': {}, 'rolling_30d': {}}
lt = data.setdefault('lifetime', {})
lt['sessions_total'] = lt.get('sessions_total', 0) + 1
if '$outcome' == 'completed':
lt['sessions_completed'] = lt.get('sessions_completed', 0) + 1
elif '$outcome' == 'timeout':
lt['sessions_timeout'] = lt.get('sessions_timeout', 0) + 1
elif '$outcome' == 'error':
lt['sessions_error'] = lt.get('sessions_error', 0) + 1
lt['sources_archived'] = lt.get('sources_archived', 0) + $sources
data['updated_at'] = '$(date -u +%Y-%m-%dT%H:%M:%SZ)'
print(json.dumps(data, indent=2))
" | _atomic_write_stdin "$file"
}
# --- Inbox ---
state_check_inbox() {
local agent="$1"
local inbox="$(_state_dir "$agent")/inbox"
[ -d "$inbox" ] && ls "$inbox"/*.json 2>/dev/null || true
}
state_send_message() {
local from="$1"
local to="$2"
local type="$3"
local subject="$4"
local body="$5"
local inbox="$(_state_dir "$to")/inbox"
local msg_id="msg-$(date +%s)-$$"
local file="$inbox/${msg_id}.json"
mkdir -p "$inbox"
python3 -c "
import json
data = {
'id': '$msg_id',
'from': '$from',
'to': '$to',
'created_at': '$(date -u +%Y-%m-%dT%H:%M:%SZ)',
'type': '$type',
'priority': 'normal',
'subject': '''$subject''',
'body': '''$body''',
'source_ref': None,
'expires_at': None
}
print(json.dumps(data, indent=2))
" | _atomic_write_stdin "$file"
echo "$msg_id"
}
# --- State directory check ---
state_ensure_dir() {
local agent="$1"
local dir="$(_state_dir "$agent")"
if [ ! -d "$dir" ]; then
echo "ERROR: Agent state not initialized for $agent. Run bootstrap.sh first." >&2
return 1
fi
}

View file

@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""Process cascade inbox messages after a research session.
For each unread cascade-*.md in an agent's inbox:
1. Logs cascade_reviewed event to pipeline.db audit_log
2. Moves the file to inbox/processed/
Usage: python3 process-cascade-inbox.py <agent-name>
"""
import json
import os
import re
import shutil
import sqlite3
import sys
from datetime import datetime, timezone
from pathlib import Path
AGENT_STATE_DIR = Path(os.environ.get("AGENT_STATE_DIR", "/opt/teleo-eval/agent-state"))
PIPELINE_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
def parse_frontmatter(text: str) -> dict:
"""Parse YAML-like frontmatter from markdown."""
fm = {}
match = re.match(r'^---\n(.*?)\n---', text, re.DOTALL)
if not match:
return fm
for line in match.group(1).strip().splitlines():
if ':' in line:
key, val = line.split(':', 1)
fm[key.strip()] = val.strip().strip('"')
return fm
def process_agent_inbox(agent: str) -> int:
"""Process cascade messages in agent's inbox. Returns count processed."""
inbox_dir = AGENT_STATE_DIR / agent / "inbox"
if not inbox_dir.exists():
return 0
cascade_files = sorted(inbox_dir.glob("cascade-*.md"))
if not cascade_files:
return 0
# Ensure processed dir exists
processed_dir = inbox_dir / "processed"
processed_dir.mkdir(exist_ok=True)
processed = 0
now = datetime.now(timezone.utc).isoformat()
try:
conn = sqlite3.connect(str(PIPELINE_DB), timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
except sqlite3.Error as e:
print(f"WARNING: Cannot connect to pipeline.db: {e}", file=sys.stderr)
# Still move files even if DB is unavailable
conn = None
for cf in cascade_files:
try:
text = cf.read_text()
fm = parse_frontmatter(text)
# Skip already-processed files
if fm.get("status") == "processed":
continue
# Log to audit_log
if conn:
detail = {
"agent": agent,
"cascade_file": cf.name,
"subject": fm.get("subject", "unknown"),
"original_created": fm.get("created", "unknown"),
"reviewed_at": now,
}
conn.execute(
"INSERT INTO audit_log (stage, event, detail, timestamp) VALUES (?, ?, ?, ?)",
("cascade", "cascade_reviewed", json.dumps(detail), now),
)
# Move to processed
dest = processed_dir / cf.name
shutil.move(str(cf), str(dest))
processed += 1
except Exception as e:
print(f"WARNING: Failed to process {cf.name}: {e}", file=sys.stderr)
if conn:
try:
conn.commit()
conn.close()
except sqlite3.Error:
pass
return processed
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <agent-name>", file=sys.stderr)
sys.exit(1)
agent = sys.argv[1]
count = process_agent_inbox(agent)
if count > 0:
print(f"Processed {count} cascade message(s) for {agent}")
# Exit 0 regardless — non-fatal
sys.exit(0)

View file

@ -0,0 +1,274 @@
"""Cascade automation — auto-flag dependent beliefs/positions when claims change.
Hook point: called from merge.py after _embed_merged_claims, before _delete_remote_branch.
Uses the same main_sha/branch_sha diff to detect changed claim files, then scans
all agent beliefs and positions for depends_on references to those claims.
Notifications are written to /opt/teleo-eval/agent-state/{agent}/inbox/ using
the same atomic-write pattern as lib-state.sh.
"""
import asyncio
import hashlib
import json
import logging
import os
import re
import tempfile
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger("pipeline.cascade")
AGENT_STATE_DIR = Path("/opt/teleo-eval/agent-state")
CLAIM_DIRS = {"domains/", "core/", "foundations/", "decisions/"}
AGENT_NAMES = ["rio", "leo", "clay", "astra", "vida", "theseus"]
def _extract_claim_titles_from_diff(diff_files: list[str]) -> set[str]:
"""Extract claim titles from changed file paths."""
titles = set()
for fpath in diff_files:
if not fpath.endswith(".md"):
continue
if not any(fpath.startswith(d) for d in CLAIM_DIRS):
continue
basename = os.path.basename(fpath)
if basename.startswith("_") or basename == "directory.md":
continue
title = basename.removesuffix(".md")
titles.add(title)
return titles
def _normalize_for_match(text: str) -> str:
"""Normalize for fuzzy matching: lowercase, hyphens to spaces, strip punctuation, collapse whitespace."""
text = text.lower().strip()
text = text.replace("-", " ")
text = re.sub(r"[^\w\s]", "", text)
text = re.sub(r"\s+", " ", text)
return text
def _slug_to_words(slug: str) -> str:
"""Convert kebab-case slug to space-separated words."""
return slug.replace("-", " ")
def _parse_depends_on(file_path: Path) -> tuple[str, list[str]]:
"""Parse a belief or position file's depends_on entries.
Returns (agent_name, [dependency_titles]).
"""
try:
content = file_path.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
return ("", [])
agent = ""
deps = []
in_frontmatter = False
in_depends = False
for line in content.split("\n"):
if line.strip() == "---":
if not in_frontmatter:
in_frontmatter = True
continue
else:
break
if in_frontmatter:
if line.startswith("agent:"):
agent = line.split(":", 1)[1].strip().strip('"').strip("'")
elif line.startswith("depends_on:"):
in_depends = True
rest = line.split(":", 1)[1].strip()
if rest.startswith("["):
items = re.findall(r'"([^"]+)"|\'([^\']+)\'', rest)
for item in items:
dep = item[0] or item[1]
dep = dep.strip("[]").replace("[[", "").replace("]]", "")
deps.append(dep)
in_depends = False
elif in_depends:
if line.startswith(" - "):
dep = line.strip().lstrip("- ").strip('"').strip("'")
dep = dep.replace("[[", "").replace("]]", "")
deps.append(dep)
elif line.strip() and not line.startswith(" "):
in_depends = False
# Also scan body for [[wiki-links]]
body_links = re.findall(r"\[\[([^\]]+)\]\]", content)
for link in body_links:
if link not in deps:
deps.append(link)
return (agent, deps)
def _write_inbox_message(agent: str, subject: str, body: str) -> bool:
"""Write a cascade notification to an agent's inbox. Atomic tmp+rename."""
inbox_dir = AGENT_STATE_DIR / agent / "inbox"
if not inbox_dir.exists():
logger.warning("cascade: no inbox dir for agent %s, skipping", agent)
return False
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
file_hash = hashlib.md5(f"{agent}-{subject}-{body[:200]}".encode()).hexdigest()[:8]
filename = f"cascade-{ts}-{subject[:60]}-{file_hash}.md"
final_path = inbox_dir / filename
try:
fd, tmp_path = tempfile.mkstemp(dir=str(inbox_dir), suffix=".tmp")
with os.fdopen(fd, "w") as f:
f.write(f"---\n")
f.write(f"type: cascade\n")
f.write(f"from: pipeline\n")
f.write(f"to: {agent}\n")
f.write(f"subject: \"{subject}\"\n")
f.write(f"created: {datetime.now(timezone.utc).isoformat()}\n")
f.write(f"status: unread\n")
f.write(f"---\n\n")
f.write(body)
os.rename(tmp_path, str(final_path))
return True
except OSError:
logger.exception("cascade: failed to write inbox message for %s", agent)
return False
def _find_matches(deps: list[str], claim_lookup: dict[str, str]) -> list[str]:
"""Check if any dependency matches a changed claim.
Uses exact normalized match first, then substring containment for longer
strings only (min 15 chars) to avoid false positives on short generic names.
"""
matched = []
for dep in deps:
norm = _normalize_for_match(dep)
if norm in claim_lookup:
matched.append(claim_lookup[norm])
else:
# Substring match only for sufficiently specific strings
shorter = min(len(norm), min((len(k) for k in claim_lookup), default=0))
if shorter >= 15:
for claim_norm, claim_orig in claim_lookup.items():
if claim_norm in norm or norm in claim_norm:
matched.append(claim_orig)
break
return matched
def _format_cascade_body(
file_name: str,
file_type: str,
matched_claims: list[str],
pr_num: int,
) -> str:
"""Format the cascade notification body."""
claims_list = "\n".join(f"- {c}" for c in matched_claims)
return (
f"# Cascade: upstream claims changed\n\n"
f"Your {file_type} **{file_name}** depends on claims that were modified in PR #{pr_num}.\n\n"
f"## Changed claims\n\n{claims_list}\n\n"
f"## Action needed\n\n"
f"Review whether your {file_type}'s confidence, description, or grounding "
f"needs updating in light of these changes. If the evidence strengthened, "
f"consider increasing confidence. If it weakened or contradicted, flag for "
f"re-evaluation.\n"
)
async def cascade_after_merge(
main_sha: str,
branch_sha: str,
pr_num: int,
main_worktree: Path,
conn=None,
) -> int:
"""Scan for beliefs/positions affected by claims changed in this merge.
Returns the number of cascade notifications sent.
"""
# 1. Get changed files
proc = await asyncio.create_subprocess_exec(
"git", "diff", "--name-only", "--diff-filter=ACMR",
main_sha, branch_sha,
cwd=str(main_worktree),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
logger.warning("cascade: git diff timed out")
return 0
if proc.returncode != 0:
logger.warning("cascade: git diff failed (rc=%d)", proc.returncode)
return 0
diff_files = [f for f in stdout.decode().strip().split("\n") if f]
# 2. Extract claim titles from changed files
changed_claims = _extract_claim_titles_from_diff(diff_files)
if not changed_claims:
return 0
logger.info("cascade: %d claims changed in PR #%d: %s",
len(changed_claims), pr_num, list(changed_claims)[:5])
# Build normalized lookup for fuzzy matching
claim_lookup = {}
for claim in changed_claims:
claim_lookup[_normalize_for_match(claim)] = claim
claim_lookup[_normalize_for_match(_slug_to_words(claim))] = claim
# 3. Scan all beliefs and positions
notifications = 0
agents_dir = main_worktree / "agents"
if not agents_dir.exists():
logger.warning("cascade: no agents/ dir in worktree")
return 0
for agent_name in AGENT_NAMES:
agent_dir = agents_dir / agent_name
if not agent_dir.exists():
continue
for subdir, file_type in [("beliefs", "belief"), ("positions", "position")]:
target_dir = agent_dir / subdir
if not target_dir.exists():
continue
for md_file in target_dir.glob("*.md"):
_, deps = _parse_depends_on(md_file)
matched = _find_matches(deps, claim_lookup)
if matched:
body = _format_cascade_body(md_file.name, file_type, matched, pr_num)
if _write_inbox_message(agent_name, f"claim-changed-affects-{file_type}", body):
notifications += 1
logger.info("cascade: notified %s%s '%s' affected by %s",
agent_name, file_type, md_file.stem, matched)
if notifications:
logger.info("cascade: sent %d notifications for PR #%d", notifications, pr_num)
# Write structured audit_log entry for cascade tracking (Page 4 data)
if conn is not None:
try:
conn.execute(
"INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)",
("cascade", "cascade_triggered", json.dumps({
"pr": pr_num,
"claims_changed": list(changed_claims)[:20],
"notifications_sent": notifications,
})),
)
except Exception:
logger.exception("cascade: audit_log write failed (non-fatal)")
return notifications

View file

@ -0,0 +1,230 @@
"""Cross-domain citation index — detect entity overlap across domains.
Hook point: called from merge.py after cascade_after_merge.
After a claim merges, checks if its referenced entities also appear in claims
from other domains. Logs connections to audit_log for silo detection.
Two detection methods:
1. Entity name matching entity names appearing in claim body text (word-boundary)
2. Source overlap claims citing the same source archive files
At ~600 claims and ~100 entities, full scan per merge takes <1 second.
"""
import asyncio
import json
import logging
import os
import re
from pathlib import Path
logger = logging.getLogger("pipeline.cross_domain")
# Minimum entity name length to avoid false positives (ORE, QCX, etc)
MIN_ENTITY_NAME_LEN = 4
# Entity names that are common English words — skip to avoid false positives
ENTITY_STOPLIST = {"versus", "island", "loyal", "saber", "nebula", "helium", "coal", "snapshot", "dropout"}
def _build_entity_names(worktree: Path) -> dict[str, str]:
"""Build mapping of entity_slug -> display_name from entity files."""
names = {}
entity_dir = worktree / "entities"
if not entity_dir.exists():
return names
for md_file in entity_dir.rglob("*.md"):
if md_file.name.startswith("_"):
continue
try:
content = md_file.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
continue
for line in content.split("\n"):
if line.startswith("name:"):
name = line.split(":", 1)[1].strip().strip('"').strip("'")
if len(name) >= MIN_ENTITY_NAME_LEN and name.lower() not in ENTITY_STOPLIST:
names[md_file.stem] = name
break
return names
def _compile_entity_patterns(entity_names: dict[str, str]) -> dict[str, re.Pattern]:
"""Pre-compile word-boundary regex for each entity name."""
patterns = {}
for slug, name in entity_names.items():
try:
patterns[slug] = re.compile(r'\b' + re.escape(name) + r'\b', re.IGNORECASE)
except re.error:
continue
return patterns
def _extract_source_refs(content: str) -> set[str]:
"""Extract source archive references ([[YYYY-MM-DD-...]]) from content."""
return set(re.findall(r"\[\[(20\d{2}-\d{2}-\d{2}-[^\]]+)\]\]", content))
def _find_entity_mentions(content: str, patterns: dict[str, re.Pattern]) -> set[str]:
"""Find entity slugs whose names appear in the content (word-boundary match)."""
found = set()
for slug, pat in patterns.items():
if pat.search(content):
found.add(slug)
return found
def _scan_domain_claims(worktree: Path, patterns: dict[str, re.Pattern]) -> dict[str, list[dict]]:
"""Build domain -> [claim_info] mapping for all claims."""
domain_claims = {}
domains_dir = worktree / "domains"
if not domains_dir.exists():
return domain_claims
for domain_dir in domains_dir.iterdir():
if not domain_dir.is_dir():
continue
claims = []
for claim_file in domain_dir.glob("*.md"):
if claim_file.name.startswith("_") or claim_file.name == "directory.md":
continue
try:
content = claim_file.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
continue
claims.append({
"slug": claim_file.stem,
"entities": _find_entity_mentions(content, patterns),
"sources": _extract_source_refs(content),
})
domain_claims[domain_dir.name] = claims
return domain_claims
async def cross_domain_after_merge(
main_sha: str,
branch_sha: str,
pr_num: int,
main_worktree: Path,
conn=None,
) -> int:
"""Detect cross-domain entity/source overlap for claims changed in this merge.
Returns the number of cross-domain connections found.
"""
# 1. Get changed files
proc = await asyncio.create_subprocess_exec(
"git", "diff", "--name-only", "--diff-filter=ACMR",
main_sha, branch_sha,
cwd=str(main_worktree),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
logger.warning("cross_domain: git diff timed out")
return 0
if proc.returncode != 0:
return 0
diff_files = [f for f in stdout.decode().strip().split("\n") if f]
# 2. Filter to claim files
changed_claims = []
for fpath in diff_files:
if not fpath.endswith(".md") or not fpath.startswith("domains/"):
continue
parts = fpath.split("/")
if len(parts) < 3:
continue
basename = os.path.basename(fpath)
if basename.startswith("_") or basename == "directory.md":
continue
changed_claims.append({"path": fpath, "domain": parts[1], "slug": Path(basename).stem})
if not changed_claims:
return 0
# 3. Build entity patterns and scan all claims
entity_names = _build_entity_names(main_worktree)
if not entity_names:
return 0
patterns = _compile_entity_patterns(entity_names)
domain_claims = _scan_domain_claims(main_worktree, patterns)
# 4. For each changed claim, find cross-domain connections
total_connections = 0
all_connections = []
for claim in changed_claims:
claim_path = main_worktree / claim["path"]
try:
content = claim_path.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
continue
my_entities = _find_entity_mentions(content, patterns)
my_sources = _extract_source_refs(content)
if not my_entities and not my_sources:
continue
connections = []
for other_domain, other_claims in domain_claims.items():
if other_domain == claim["domain"]:
continue
for other in other_claims:
shared_entities = my_entities & other["entities"]
shared_sources = my_sources & other["sources"]
# Threshold: >=2 shared entities, OR 1 entity + 1 source
entity_count = len(shared_entities)
source_count = len(shared_sources)
if entity_count >= 2 or (entity_count >= 1 and source_count >= 1):
connections.append({
"other_claim": other["slug"],
"other_domain": other_domain,
"shared_entities": sorted(shared_entities)[:5],
"shared_sources": sorted(shared_sources)[:3],
})
if connections:
total_connections += len(connections)
all_connections.append({
"claim": claim["slug"],
"domain": claim["domain"],
"connections": connections[:10],
})
logger.info(
"cross_domain: %s (%s) has %d cross-domain connections",
claim["slug"], claim["domain"], len(connections),
)
# 5. Log to audit_log
if all_connections and conn is not None:
try:
conn.execute(
"INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)",
("cross_domain", "connections_found", json.dumps({
"pr": pr_num,
"total_connections": total_connections,
"claims_with_connections": len(all_connections),
"details": all_connections[:10],
})),
)
except Exception:
logger.exception("cross_domain: audit_log write failed (non-fatal)")
if total_connections:
logger.info(
"cross_domain: PR #%d%d connections across %d claims",
pr_num, total_connections, len(all_connections),
)
return total_connections

625
ops/pipeline-v2/lib/db.py Normal file
View file

@ -0,0 +1,625 @@
"""SQLite database — schema, migrations, connection management."""
import json
import logging
import sqlite3
from contextlib import contextmanager
from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 12
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS sources (
path TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'unprocessed',
-- unprocessed, triaging, extracting, extracted, null_result,
-- needs_reextraction, error
priority TEXT DEFAULT 'medium',
-- critical, high, medium, low, skip
priority_log TEXT DEFAULT '[]',
-- JSON array: [{stage, priority, reasoning, ts}]
extraction_model TEXT,
claims_count INTEGER DEFAULT 0,
pr_number INTEGER,
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
feedback TEXT,
-- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS prs (
number INTEGER PRIMARY KEY,
source_path TEXT REFERENCES sources(path),
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
-- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict
-- conflict: rebase failed or merge timed out needs human intervention
domain TEXT,
agent TEXT,
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'challenge', 'enrich', 'synthesize', 'unknown')),
tier TEXT,
-- LIGHT, STANDARD, DEEP
tier0_pass INTEGER,
-- 0/1
leo_verdict TEXT DEFAULT 'pending',
-- pending, approve, request_changes, skipped, failed
domain_verdict TEXT DEFAULT 'pending',
domain_agent TEXT,
domain_model TEXT,
priority TEXT,
-- NULL = inherit from source. Set explicitly for human-submitted PRs.
-- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium')
-- Human PRs: 'critical' (detected via missing source_path or non-agent author)
origin TEXT DEFAULT 'pipeline',
-- pipeline | human | external
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
CREATE TABLE IF NOT EXISTS costs (
date TEXT,
model TEXT,
stage TEXT,
calls INTEGER DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
PRIMARY KEY (date, model, stage)
);
CREATE TABLE IF NOT EXISTS circuit_breakers (
name TEXT PRIMARY KEY,
state TEXT DEFAULT 'closed',
-- closed, open, halfopen
failures INTEGER DEFAULT 0,
successes INTEGER DEFAULT 0,
tripped_at TEXT,
last_success_at TEXT,
-- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida)
last_update TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
stage TEXT,
event TEXT,
detail TEXT
);
CREATE TABLE IF NOT EXISTS response_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
chat_id INTEGER,
user TEXT,
agent TEXT DEFAULT 'rio',
model TEXT,
query TEXT,
conversation_window TEXT,
-- JSON: prior N messages for context
-- NOTE: intentional duplication of transcript data for audit self-containment.
-- Transcripts live in /opt/teleo-eval/transcripts/ but audit rows need prompt
-- context inline for retrieval-quality diagnosis. Primary driver of row size
-- target for cleanup when 90-day retention policy lands.
entities_matched TEXT,
-- JSON: [{name, path, score, used_in_response}]
claims_matched TEXT,
-- JSON: [{path, title, score, source, used_in_response}]
retrieval_layers_hit TEXT,
-- JSON: ["keyword","qdrant","graph"]
retrieval_gap TEXT,
-- What the KB was missing (if anything)
market_data TEXT,
-- JSON: injected token prices
research_context TEXT,
-- Haiku pre-pass results if any
kb_context_text TEXT,
-- Full context string sent to model
tool_calls TEXT,
-- JSON: ordered array [{tool, input, output, duration_ms, ts}]
raw_response TEXT,
display_response TEXT,
confidence_score REAL,
-- Model self-rated retrieval quality 0.0-1.0
response_time_ms INTEGER,
-- Eval pipeline columns (v10)
prompt_tokens INTEGER,
completion_tokens INTEGER,
generation_cost REAL,
embedding_cost REAL,
total_cost REAL,
blocked INTEGER DEFAULT 0,
block_reason TEXT,
query_type TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status);
CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain);
CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date);
CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
"""
def get_connection(readonly: bool = False) -> sqlite3.Connection:
"""Create a SQLite connection with WAL mode and proper settings."""
config.DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(
str(config.DB_PATH),
timeout=30,
isolation_level=None, # autocommit — we manage transactions explicitly
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
conn.execute("PRAGMA foreign_keys=ON")
if readonly:
conn.execute("PRAGMA query_only=ON")
return conn
@contextmanager
def transaction(conn: sqlite3.Connection):
"""Context manager for explicit transactions."""
conn.execute("BEGIN")
try:
yield conn
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
# Branch prefix → (agent, commit_type) mapping.
# Single source of truth — used by merge.py at INSERT time and migration v7 backfill.
# Unknown prefixes → ('unknown', 'unknown') + warning log.
BRANCH_PREFIX_MAP = {
"extract": ("pipeline", "extract"),
"ingestion": ("pipeline", "extract"),
"epimetheus": ("epimetheus", "extract"),
"rio": ("rio", "research"),
"theseus": ("theseus", "research"),
"astra": ("astra", "research"),
"vida": ("vida", "research"),
"clay": ("clay", "research"),
"leo": ("leo", "entity"),
"reweave": ("pipeline", "reweave"),
"fix": ("pipeline", "fix"),
}
def classify_branch(branch: str) -> tuple[str, str]:
"""Derive (agent, commit_type) from branch prefix.
Returns ('unknown', 'unknown') and logs a warning for unrecognized prefixes.
"""
prefix = branch.split("/", 1)[0] if "/" in branch else branch
result = BRANCH_PREFIX_MAP.get(prefix)
if result is None:
logger.warning("Unknown branch prefix %r in branch %r — defaulting to ('unknown', 'unknown')", prefix, branch)
return ("unknown", "unknown")
return result
def migrate(conn: sqlite3.Connection):
"""Run schema migrations."""
conn.executescript(SCHEMA_SQL)
# Check current version
try:
row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone()
current = row["v"] if row and row["v"] else 0
except sqlite3.OperationalError:
current = 0
# --- Incremental migrations ---
if current < 2:
# Phase 2: add multiplayer columns to prs table
for stmt in [
"ALTER TABLE prs ADD COLUMN priority TEXT",
"ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'",
"ALTER TABLE prs ADD COLUMN last_error TEXT",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v2: added priority, origin, last_error to prs")
if current < 3:
# Phase 3: retry budget — track eval attempts and issue tags per PR
for stmt in [
"ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0",
"ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v3: added eval_attempts, eval_issues to prs")
if current < 4:
# Phase 4: auto-fixer — track fix attempts per PR
for stmt in [
"ALTER TABLE prs ADD COLUMN fix_attempts INTEGER DEFAULT 0",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v4: added fix_attempts to prs")
if current < 5:
# Phase 5: contributor identity system — tracks who contributed what
# Aligned with schemas/attribution.md (5 roles) + Leo's tier system.
# CI is COMPUTED from raw counts × weights, never stored.
conn.executescript("""
CREATE TABLE IF NOT EXISTS contributors (
handle TEXT PRIMARY KEY,
display_name TEXT,
agent_id TEXT,
first_contribution TEXT,
last_contribution TEXT,
tier TEXT DEFAULT 'new',
-- new, contributor, veteran
sourcer_count INTEGER DEFAULT 0,
extractor_count INTEGER DEFAULT 0,
challenger_count INTEGER DEFAULT 0,
synthesizer_count INTEGER DEFAULT 0,
reviewer_count INTEGER DEFAULT 0,
claims_merged INTEGER DEFAULT 0,
challenges_survived INTEGER DEFAULT 0,
domains TEXT DEFAULT '[]',
highlights TEXT DEFAULT '[]',
identities TEXT DEFAULT '{}',
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_contributors_tier ON contributors(tier);
""")
logger.info("Migration v5: added contributors table")
if current < 6:
# Phase 6: analytics — time-series metrics snapshots for trending dashboard
conn.executescript("""
CREATE TABLE IF NOT EXISTS metrics_snapshots (
ts TEXT DEFAULT (datetime('now')),
throughput_1h INTEGER,
approval_rate REAL,
open_prs INTEGER,
merged_total INTEGER,
closed_total INTEGER,
conflict_total INTEGER,
evaluated_24h INTEGER,
fix_success_rate REAL,
rejection_broken_wiki_links INTEGER DEFAULT 0,
rejection_frontmatter_schema INTEGER DEFAULT 0,
rejection_near_duplicate INTEGER DEFAULT 0,
rejection_confidence INTEGER DEFAULT 0,
rejection_other INTEGER DEFAULT 0,
extraction_model TEXT,
eval_domain_model TEXT,
eval_leo_model TEXT,
prompt_version TEXT,
pipeline_version TEXT,
source_origin_agent INTEGER DEFAULT 0,
source_origin_human INTEGER DEFAULT 0,
source_origin_scraper INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON metrics_snapshots(ts);
""")
logger.info("Migration v6: added metrics_snapshots table for analytics dashboard")
if current < 7:
# Phase 7: agent attribution + commit_type for dashboard
# commit_type column + backfill agent/commit_type from branch prefix
try:
conn.execute("ALTER TABLE prs ADD COLUMN commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'unknown'))")
except sqlite3.OperationalError:
pass # column already exists from CREATE TABLE
# Backfill agent and commit_type from branch prefix
rows = conn.execute("SELECT number, branch FROM prs WHERE branch IS NOT NULL").fetchall()
for row in rows:
agent, commit_type = classify_branch(row["branch"])
conn.execute(
"UPDATE prs SET agent = ?, commit_type = ? WHERE number = ? AND (agent IS NULL OR commit_type IS NULL)",
(agent, commit_type, row["number"]),
)
backfilled = len(rows)
logger.info("Migration v7: added commit_type column, backfilled %d PRs with agent/commit_type", backfilled)
if current < 8:
# Phase 8: response audit — full-chain visibility for agent response quality
# Captures: query → tool calls → retrieval → context → response → confidence
# Approved by Ganymede (architecture), Rio (agent needs), Rhea (ops)
conn.executescript("""
CREATE TABLE IF NOT EXISTS response_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
chat_id INTEGER,
user TEXT,
agent TEXT DEFAULT 'rio',
model TEXT,
query TEXT,
conversation_window TEXT, -- intentional transcript duplication for audit self-containment
entities_matched TEXT,
claims_matched TEXT,
retrieval_layers_hit TEXT,
retrieval_gap TEXT,
market_data TEXT,
research_context TEXT,
kb_context_text TEXT,
tool_calls TEXT,
raw_response TEXT,
display_response TEXT,
confidence_score REAL,
response_time_ms INTEGER,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
""")
logger.info("Migration v8: added response_audit table for agent response auditing")
if current < 9:
# Phase 9: rebuild prs table to expand CHECK constraint on commit_type.
# SQLite cannot ALTER CHECK constraints in-place — must rebuild table.
# Old constraint (v7): extract,research,entity,decision,reweave,fix,unknown
# New constraint: adds challenge,enrich,synthesize
# Also re-derive commit_type from branch prefix for rows with invalid/NULL values.
# Step 1: Get all column names from existing table
cols_info = conn.execute("PRAGMA table_info(prs)").fetchall()
col_names = [c["name"] for c in cols_info]
col_list = ", ".join(col_names)
# Step 2: Create new table with expanded CHECK constraint
conn.executescript(f"""
CREATE TABLE prs_new (
number INTEGER PRIMARY KEY,
source_path TEXT REFERENCES sources(path),
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
domain TEXT,
agent TEXT,
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown')),
tier TEXT,
tier0_pass INTEGER,
leo_verdict TEXT DEFAULT 'pending',
domain_verdict TEXT DEFAULT 'pending',
domain_agent TEXT,
domain_model TEXT,
priority TEXT,
origin TEXT DEFAULT 'pipeline',
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
INSERT INTO prs_new ({col_list}) SELECT {col_list} FROM prs;
DROP TABLE prs;
ALTER TABLE prs_new RENAME TO prs;
""")
logger.info("Migration v9: rebuilt prs table with expanded commit_type CHECK constraint")
# Step 3: Re-derive commit_type from branch prefix for invalid/NULL values
rows = conn.execute(
"""SELECT number, branch FROM prs
WHERE branch IS NOT NULL
AND (commit_type IS NULL
OR commit_type NOT IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown'))"""
).fetchall()
fixed = 0
for row in rows:
agent, commit_type = classify_branch(row["branch"])
conn.execute(
"UPDATE prs SET agent = COALESCE(agent, ?), commit_type = ? WHERE number = ?",
(agent, commit_type, row["number"]),
)
fixed += 1
conn.commit()
logger.info("Migration v9: re-derived commit_type for %d PRs with invalid/NULL values", fixed)
if current < 10:
# Add eval pipeline columns to response_audit
# VPS may already be at v10/v11 from prior (incomplete) deploys — use IF NOT EXISTS pattern
for col_def in [
("prompt_tokens", "INTEGER"),
("completion_tokens", "INTEGER"),
("generation_cost", "REAL"),
("embedding_cost", "REAL"),
("total_cost", "REAL"),
("blocked", "INTEGER DEFAULT 0"),
("block_reason", "TEXT"),
("query_type", "TEXT"),
]:
try:
conn.execute(f"ALTER TABLE response_audit ADD COLUMN {col_def[0]} {col_def[1]}")
except sqlite3.OperationalError:
pass # Column already exists
conn.commit()
logger.info("Migration v10: added eval pipeline columns to response_audit")
if current < 11:
# Phase 11: compute tracking — extended costs table columns
# (May already exist on VPS from manual deploy — idempotent ALTERs)
for col_def in [
("duration_ms", "INTEGER DEFAULT 0"),
("cache_read_tokens", "INTEGER DEFAULT 0"),
("cache_write_tokens", "INTEGER DEFAULT 0"),
("cost_estimate_usd", "REAL DEFAULT 0"),
]:
try:
conn.execute(f"ALTER TABLE costs ADD COLUMN {col_def[0]} {col_def[1]}")
except sqlite3.OperationalError:
pass # Column already exists
conn.commit()
logger.info("Migration v11: added compute tracking columns to costs")
if current < 12:
# Phase 12: structured review records — captures all evaluation outcomes
# including rejections, disagreements, and approved-with-changes.
# Schema locked with Leo (2026-04-01).
conn.executescript("""
CREATE TABLE IF NOT EXISTS review_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
agent TEXT,
reviewer TEXT NOT NULL,
reviewer_model TEXT,
outcome TEXT NOT NULL
CHECK (outcome IN ('approved', 'approved-with-changes', 'rejected')),
rejection_reason TEXT
CHECK (rejection_reason IS NULL OR rejection_reason IN (
'fails-standalone-test', 'duplicate', 'scope-mismatch',
'evidence-insufficient', 'framing-poor', 'other'
)),
disagreement_type TEXT
CHECK (disagreement_type IS NULL OR disagreement_type IN (
'factual', 'scope', 'framing', 'evidence'
)),
notes TEXT,
batch_id TEXT,
claims_in_batch INTEGER DEFAULT 1,
reviewed_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_review_records_pr ON review_records(pr_number);
CREATE INDEX IF NOT EXISTS idx_review_records_outcome ON review_records(outcome);
CREATE INDEX IF NOT EXISTS idx_review_records_domain ON review_records(domain);
CREATE INDEX IF NOT EXISTS idx_review_records_reviewer ON review_records(reviewer);
""")
logger.info("Migration v12: created review_records table")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
(SCHEMA_VERSION,),
)
conn.commit() # Explicit commit — executescript auto-commits DDL but not subsequent DML
logger.info("Database migrated to schema version %d", SCHEMA_VERSION)
else:
logger.debug("Database at schema version %d", current)
def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None):
"""Write an audit log entry."""
conn.execute(
"INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)",
(stage, event, detail),
)
def record_review(conn, pr_number: int, reviewer: str, outcome: str, *,
claim_path: str = None, domain: str = None, agent: str = None,
reviewer_model: str = None, rejection_reason: str = None,
disagreement_type: str = None, notes: str = None,
claims_in_batch: int = 1):
"""Record a structured review outcome.
Called from evaluate stage after Leo/domain reviewer returns a verdict.
outcome must be: approved, approved-with-changes, or rejected.
"""
batch_id = str(pr_number)
conn.execute(
"""INSERT INTO review_records
(pr_number, claim_path, domain, agent, reviewer, reviewer_model,
outcome, rejection_reason, disagreement_type, notes,
batch_id, claims_in_batch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(pr_number, claim_path, domain, agent, reviewer, reviewer_model,
outcome, rejection_reason, disagreement_type, notes,
batch_id, claims_in_batch),
)
def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str):
"""Append a priority assessment to a source's priority_log.
NOTE: This does NOT update the source's priority column. The priority column
is the authoritative priority, set only by initial triage or human override.
The priority_log records each stage's opinion for offline calibration analysis.
(Bug caught by Theseus original version overwrote priority with each stage's opinion.)
(Race condition fix per Vida read-then-write wrapped in transaction.)
"""
conn.execute("BEGIN")
try:
row = conn.execute("SELECT priority_log FROM sources WHERE path = ?", (path,)).fetchone()
if not row:
conn.execute("ROLLBACK")
return
log = json.loads(row["priority_log"] or "[]")
log.append({"stage": stage, "priority": priority, "reasoning": reasoning})
conn.execute(
"UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?",
(json.dumps(log), path),
)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def insert_response_audit(conn: sqlite3.Connection, **kwargs):
"""Insert a response audit record. All fields optional except query."""
cols = [
"timestamp", "chat_id", "user", "agent", "model", "query",
"conversation_window", "entities_matched", "claims_matched",
"retrieval_layers_hit", "retrieval_gap", "market_data",
"research_context", "kb_context_text", "tool_calls",
"raw_response", "display_response", "confidence_score",
"response_time_ms",
# Eval pipeline columns (v10)
"prompt_tokens", "completion_tokens", "generation_cost",
"embedding_cost", "total_cost", "blocked", "block_reason",
"query_type",
]
present = {k: v for k, v in kwargs.items() if k in cols and v is not None}
if not present:
return
col_names = ", ".join(present.keys())
placeholders = ", ".join("?" for _ in present)
conn.execute(
f"INSERT INTO response_audit ({col_names}) VALUES ({placeholders})",
tuple(present.values()),
)
def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"):
"""Set a source's authoritative priority. Used for human overrides and initial triage."""
conn.execute(
"UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?",
(priority, path),
)
append_priority_log(conn, path, "override", priority, reason)

File diff suppressed because it is too large Load diff

1449
ops/pipeline-v2/lib/merge.py Normal file

File diff suppressed because it is too large Load diff

View file

@ -31,6 +31,17 @@ RAW_DIR="/opt/teleo-eval/research-raw/${AGENT}"
log() { echo "[$(date -Iseconds)] $*" >> "$LOG"; } log() { echo "[$(date -Iseconds)] $*" >> "$LOG"; }
# --- Agent State ---
STATE_LIB="/opt/teleo-eval/ops/agent-state/lib-state.sh"
if [ -f "$STATE_LIB" ]; then
source "$STATE_LIB"
HAS_STATE=true
SESSION_ID="${AGENT}-$(date +%Y%m%d-%H%M%S)"
else
HAS_STATE=false
log "WARN: agent-state lib not found, running without state"
fi
# --- Lock (prevent concurrent sessions for same agent) --- # --- Lock (prevent concurrent sessions for same agent) ---
if [ -f "$LOCKFILE" ]; then if [ -f "$LOCKFILE" ]; then
pid=$(cat "$LOCKFILE" 2>/dev/null) pid=$(cat "$LOCKFILE" 2>/dev/null)
@ -178,6 +189,14 @@ git branch -D "$BRANCH" 2>/dev/null || true
git checkout -b "$BRANCH" >> "$LOG" 2>&1 git checkout -b "$BRANCH" >> "$LOG" 2>&1
log "On branch $BRANCH" log "On branch $BRANCH"
# --- Pre-session state ---
if [ "$HAS_STATE" = true ]; then
state_start_session "$AGENT" "$SESSION_ID" "research" "$DOMAIN" "$BRANCH" "sonnet" "5400" > /dev/null 2>&1 || true
state_update_report "$AGENT" "researching" "Starting research session ${DATE}" 2>/dev/null || true
state_journal_append "$AGENT" "session_start" "session_id=$SESSION_ID" "type=research" "branch=$BRANCH" 2>/dev/null || true
log "Agent state: session started ($SESSION_ID)"
fi
# --- Build the research prompt --- # --- Build the research prompt ---
# Write tweet data to a temp file so Claude can read it # Write tweet data to a temp file so Claude can read it
echo "$TWEET_DATA" > "$TWEET_FILE" echo "$TWEET_DATA" > "$TWEET_FILE"
@ -188,6 +207,11 @@ RESEARCH_PROMPT="You are ${AGENT}, a Teleo knowledge base agent. Domain: ${DOMAI
You have ~90 minutes of compute. Use it wisely. You have ~90 minutes of compute. Use it wisely.
### Step 0: Load Operational State (1 min)
Read /opt/teleo-eval/agent-state/${AGENT}/memory.md — this is your cross-session operational memory. It contains patterns, dead ends, open questions, and corrections from previous sessions.
Read /opt/teleo-eval/agent-state/${AGENT}/tasks.json — check for pending tasks assigned to you.
Check /opt/teleo-eval/agent-state/${AGENT}/inbox/ for messages from other agents. Process any high-priority inbox items before choosing your research direction.
### Step 1: Orient (5 min) ### Step 1: Orient (5 min)
Read these files to understand your current state: Read these files to understand your current state:
- agents/${AGENT}/identity.md (who you are) - agents/${AGENT}/identity.md (who you are)
@ -229,7 +253,7 @@ Include which belief you targeted for disconfirmation and what you searched for.
### Step 6: Archive Sources (60 min) ### Step 6: Archive Sources (60 min)
For each relevant tweet/thread, create an archive file: For each relevant tweet/thread, create an archive file:
Path: inbox/archive/YYYY-MM-DD-{author-handle}-{brief-slug}.md Path: inbox/queue/YYYY-MM-DD-{author-handle}-{brief-slug}.md
Use this frontmatter: Use this frontmatter:
--- ---
@ -267,7 +291,7 @@ EXTRACTION HINT: [what the extractor should focus on — scopes attention]
- Set all sources to status: unprocessed (a DIFFERENT instance will extract) - Set all sources to status: unprocessed (a DIFFERENT instance will extract)
- Flag cross-domain sources with flagged_for_{agent}: [\"reason\"] - Flag cross-domain sources with flagged_for_{agent}: [\"reason\"]
- Do NOT extract claims yourself — write good notes so the extractor can - Do NOT extract claims yourself — write good notes so the extractor can
- Check inbox/archive/ for duplicates before creating new archives - Check inbox/queue/ and inbox/archive/ for duplicates before creating new archives
- Aim for 5-15 source archives per session - Aim for 5-15 source archives per session
### Step 7: Flag Follow-up Directions (5 min) ### Step 7: Flag Follow-up Directions (5 min)
@ -303,6 +327,8 @@ The journal accumulates session over session. After 5+ sessions, review it for c
### Step 9: Stop ### Step 9: Stop
When you've finished archiving sources, updating your musing, and writing the research journal entry, STOP. Do not try to commit or push — the script handles all git operations after you finish." When you've finished archiving sources, updating your musing, and writing the research journal entry, STOP. Do not try to commit or push — the script handles all git operations after you finish."
CASCADE_PROCESSOR="/opt/teleo-eval/ops/agent-state/process-cascade-inbox.py"
# --- Run Claude research session --- # --- Run Claude research session ---
log "Starting Claude research session..." log "Starting Claude research session..."
timeout 5400 "$CLAUDE_BIN" -p "$RESEARCH_PROMPT" \ timeout 5400 "$CLAUDE_BIN" -p "$RESEARCH_PROMPT" \
@ -311,31 +337,61 @@ timeout 5400 "$CLAUDE_BIN" -p "$RESEARCH_PROMPT" \
--permission-mode bypassPermissions \ --permission-mode bypassPermissions \
>> "$LOG" 2>&1 || { >> "$LOG" 2>&1 || {
log "WARN: Research session failed or timed out for $AGENT" log "WARN: Research session failed or timed out for $AGENT"
# Process cascade inbox even on timeout (agent may have read them in Step 0)
if [ -f "$CASCADE_PROCESSOR" ]; then
python3 "$CASCADE_PROCESSOR" "$AGENT" 2>>"$LOG" || true
fi
if [ "$HAS_STATE" = true ]; then
state_end_session "$AGENT" "timeout" "0" "null" 2>/dev/null || true
state_update_report "$AGENT" "idle" "Research session timed out or failed on ${DATE}" 2>/dev/null || true
state_update_metrics "$AGENT" "timeout" "0" 2>/dev/null || true
state_journal_append "$AGENT" "session_end" "outcome=timeout" "session_id=$SESSION_ID" 2>/dev/null || true
log "Agent state: session recorded as timeout"
fi
git checkout main >> "$LOG" 2>&1 git checkout main >> "$LOG" 2>&1
exit 1 exit 1
} }
log "Claude session complete" log "Claude session complete"
# --- Process cascade inbox messages (log completion to pipeline.db) ---
if [ -f "$CASCADE_PROCESSOR" ]; then
CASCADE_RESULT=$(python3 "$CASCADE_PROCESSOR" "$AGENT" 2>>"$LOG")
[ -n "$CASCADE_RESULT" ] && log "Cascade: $CASCADE_RESULT"
fi
# --- Check for changes --- # --- Check for changes ---
CHANGED_FILES=$(git status --porcelain) CHANGED_FILES=$(git status --porcelain)
if [ -z "$CHANGED_FILES" ]; then if [ -z "$CHANGED_FILES" ]; then
log "No sources archived by $AGENT" log "No sources archived by $AGENT"
if [ "$HAS_STATE" = true ]; then
state_end_session "$AGENT" "completed" "0" "null" 2>/dev/null || true
state_update_report "$AGENT" "idle" "Research session completed with no new sources on ${DATE}" 2>/dev/null || true
state_update_metrics "$AGENT" "completed" "0" 2>/dev/null || true
state_journal_append "$AGENT" "session_end" "outcome=no_sources" "session_id=$SESSION_ID" 2>/dev/null || true
log "Agent state: session recorded (no sources)"
fi
git checkout main >> "$LOG" 2>&1 git checkout main >> "$LOG" 2>&1
exit 0 exit 0
fi fi
# --- Stage and commit --- # --- Stage and commit ---
git add inbox/archive/ agents/${AGENT}/musings/ agents/${AGENT}/research-journal.md 2>/dev/null || true git add inbox/queue/ agents/${AGENT}/musings/ agents/${AGENT}/research-journal.md 2>/dev/null || true
if git diff --cached --quiet; then if git diff --cached --quiet; then
log "No valid changes to commit" log "No valid changes to commit"
if [ "$HAS_STATE" = true ]; then
state_end_session "$AGENT" "completed" "0" "null" 2>/dev/null || true
state_update_report "$AGENT" "idle" "Research session completed with no valid changes on ${DATE}" 2>/dev/null || true
state_update_metrics "$AGENT" "completed" "0" 2>/dev/null || true
state_journal_append "$AGENT" "session_end" "outcome=no_valid_changes" "session_id=$SESSION_ID" 2>/dev/null || true
fi
git checkout main >> "$LOG" 2>&1 git checkout main >> "$LOG" 2>&1
exit 0 exit 0
fi fi
AGENT_UPPER=$(echo "$AGENT" | sed 's/./\U&/') AGENT_UPPER=$(echo "$AGENT" | sed 's/./\U&/')
SOURCE_COUNT=$(git diff --cached --name-only | grep -c "^inbox/archive/" || echo "0") SOURCE_COUNT=$(git diff --cached --name-only | grep -c "^inbox/queue/" || echo "0")
git commit -m "${AGENT}: research session ${DATE}${SOURCE_COUNT} sources archived git commit -m "${AGENT}: research session ${DATE}${SOURCE_COUNT} sources archived
Pentagon-Agent: ${AGENT_UPPER} <HEADLESS>" >> "$LOG" 2>&1 Pentagon-Agent: ${AGENT_UPPER} <HEADLESS>" >> "$LOG" 2>&1
@ -375,6 +431,16 @@ Researcher and extractor are different Claude instances to prevent motivated rea
log "PR #${PR_NUMBER} opened for ${AGENT}'s research session" log "PR #${PR_NUMBER} opened for ${AGENT}'s research session"
fi fi
# --- Post-session state (success) ---
if [ "$HAS_STATE" = true ]; then
FINAL_PR="${EXISTING_PR:-${PR_NUMBER:-unknown}}"
state_end_session "$AGENT" "completed" "$SOURCE_COUNT" "$FINAL_PR" 2>/dev/null || true
state_finalize_report "$AGENT" "idle" "Research session completed: ${SOURCE_COUNT} sources archived" "$SESSION_ID" "$(date -u +%Y-%m-%dT%H:%M:%SZ)" "$(date -u +%Y-%m-%dT%H:%M:%SZ)" "completed" "$SOURCE_COUNT" "$BRANCH" "${FINAL_PR}" 2>/dev/null || true
state_update_metrics "$AGENT" "completed" "$SOURCE_COUNT" 2>/dev/null || true
state_journal_append "$AGENT" "session_end" "outcome=completed" "sources=$SOURCE_COUNT" "branch=$BRANCH" "pr=$FINAL_PR" 2>/dev/null || true
log "Agent state: session finalized (${SOURCE_COUNT} sources, PR #${FINAL_PR})"
fi
# --- Back to main --- # --- Back to main ---
git checkout main >> "$LOG" 2>&1 git checkout main >> "$LOG" 2>&1
log "=== Research session complete for $AGENT ===" log "=== Research session complete for $AGENT ==="