From 799249d4703fee7056af40e151e9c08948077ba8 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Thu, 12 Mar 2026 14:11:18 +0000 Subject: [PATCH] Initial commit: Pipeline v2 daemon + infrastructure docs - teleo-pipeline.py: async daemon with 4 stage loops (ingest/validate/evaluate/merge) - lib/: config, db, evaluate, validate, merge, breaker, costs, health, log modules - INFRASTRUCTURE.md: comprehensive deep-dive for onboarding - teleo-pipeline.service: systemd unit file Pentagon-Agent: Leo <294C3CA1-0205-4668-82FA-B984D54F48AD> --- .gitignore | 19 ++ INFRASTRUCTURE.md | 447 +++++++++++++++++++++++++ lib/__init__.py | 0 lib/breaker.py | 139 ++++++++ lib/config.py | 116 +++++++ lib/costs.py | 88 +++++ lib/db.py | 228 +++++++++++++ lib/evaluate.py | 725 +++++++++++++++++++++++++++++++++++++++++ lib/health.py | 237 ++++++++++++++ lib/log.py | 48 +++ lib/merge.py | 436 +++++++++++++++++++++++++ lib/validate.py | 591 +++++++++++++++++++++++++++++++++ teleo-pipeline.py | 240 ++++++++++++++ teleo-pipeline.service | 36 ++ 14 files changed, 3350 insertions(+) create mode 100644 .gitignore create mode 100644 INFRASTRUCTURE.md create mode 100644 lib/__init__.py create mode 100644 lib/breaker.py create mode 100644 lib/config.py create mode 100644 lib/costs.py create mode 100644 lib/db.py create mode 100644 lib/evaluate.py create mode 100644 lib/health.py create mode 100644 lib/log.py create mode 100644 lib/merge.py create mode 100644 lib/validate.py create mode 100644 teleo-pipeline.py create mode 100644 teleo-pipeline.service diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c1d7dfb --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +# Python +__pycache__/ +*.pyc +*.pyo + +# Database +*.db +*.db-wal +*.db-shm + +# Secrets (never commit) +secrets/ + +# Logs +logs/ +*.log + +# OS +.DS_Store diff --git a/INFRASTRUCTURE.md b/INFRASTRUCTURE.md new file mode 100644 index 0000000..abfe780 --- /dev/null +++ b/INFRASTRUCTURE.md @@ -0,0 +1,447 @@ +# Teleo Infrastructure Deep Dive + +## Overview + +Teleo runs a **knowledge extraction and evaluation pipeline** on a single VPS. Six AI domain agents (Rio, Clay, Theseus, Vida, Astra, Leo) continuously extract claims from source material, evaluate them through a multi-stage review process, and merge approved claims into a shared knowledge base. + +The system is mid-migration from **7 bash cron scripts** (v1) to a **single Python async daemon** (v2). Pipeline v2 handles validate, evaluate, and merge. Extraction still runs on v1 cron. Ingest (Phase 4) will complete the migration. + +``` +Source Material → Ingest → Validate → Evaluate → Merge → Knowledge Base + (cron v1) (stub) (v2) (v2) (v2) (git repo) +``` + +--- + +## VPS + +- **Host**: `77.42.65.182` (Hetzner, Debian) +- **SSH**: `root@77.42.65.182` (key auth) +- **Disk**: 150GB, 19GB used (13%) +- **User**: `teleo` (pipeline runs as this user) +- **Base dir**: `/opt/teleo-eval/` + +### Directory Layout + +``` +/opt/teleo-eval/ +├── pipeline/ # Pipeline v2 daemon +│ ├── teleo-pipeline.py # Main entry point (4 async stage loops) +│ ├── pipeline.db # SQLite WAL state store (160KB) +│ ├── .venv/ # Python virtualenv (aiohttp) +│ └── lib/ +│ ├── config.py # All constants, model assignments, overflow policies +│ ├── db.py # Schema, migrations, connection management +│ ├── validate.py # Tier 0 validation (schema, links, duplicates) +│ ├── evaluate.py # Triage + domain review + Leo review +│ ├── merge.py # Domain-serialized rebase + Forgejo API merge +│ ├── health.py # HTTP health API (localhost:8080) +│ ├── breaker.py # Circuit breaker per stage +│ ├── costs.py # API cost tracking with daily budgets +│ └── log.py # JSON structured logging +├── workspaces/ +│ ├── teleo-codex.git/ # Bare repo (49MB) — pipeline's git backend +│ └── main/ # Main branch worktree (for validation checks) +├── mirror/ +│ └── teleo-codex.git/ # Separate bare repo for GitHub↔Forgejo sync +├── secrets/ +│ ├── forgejo-admin-token # Admin Forgejo API token +│ ├── forgejo-{agent}-token # Per-agent tokens (rio, clay, theseus, vida, astra, leo) +│ ├── github-pat # GitHub mirror push token +│ ├── openrouter-key # OpenRouter API key +│ ├── twitterapi-io-key # X/Twitter API key +│ └── x-bearer-token # X bearer token +├── logs/ # Log files for cron scripts and pipeline +├── *.sh # Legacy cron scripts (being replaced) +└── eval/ # Legacy eval scripts +``` + +--- + +## Services + +### Forgejo (Git Forge) + +- **Runs in**: Docker container (`codeberg.org/forgejo/forgejo:9`) +- **Port**: 3000 (HTTP), 2222 (SSH) +- **Public URL**: `https://git.livingip.xyz` +- **Repo**: `teleo/teleo-codex` +- **Purpose**: Hosts the knowledge base repo, manages PRs, stores review comments +- **Users**: Per-agent Forgejo accounts (`rio`, `clay`, `theseus`, `vida`, `astra`, `leo`, `teleo`) + +### Pipeline v2 Daemon + +- **Service**: `teleo-pipeline.service` (systemd) +- **Commands**: `systemctl {start|stop|restart|status} teleo-pipeline` +- **Logs**: `journalctl -u teleo-pipeline -f` +- **Health**: `curl localhost:8080/health` +- **Shutdown**: SIGTERM → 60s drain → force-cancel → kill subprocesses (180s total) + +### Active Cron Jobs (teleo user) + +| Schedule | Script | Purpose | +|----------|--------|---------| +| `*/3 * * *` | `extract-cron.sh` | Source extraction (v1, still active) | +| `*/2 * * *` | `sync-mirror.sh` | Forgejo↔GitHub bidirectional sync | +| `*/2 * * *` | `fetch-bare.sh` | Fetch latest into bare repo | +| `0 0 * * *` | `pipeline-health-check.sh` | Daily health metrics | +| `0 */2 * * *` | `pipeline-health-check.py` | 2-hourly health report | + +### Disabled Cron Jobs (replaced by Pipeline v2) + +- `fix-extraction-prs.py` — replaced by `validate.py` +- `eval-dispatcher.sh` — replaced by `evaluate.py` +- `merge-retry.sh` — replaced by `merge.py` +- Research sessions (rio, clay, theseus, vida, astra) — disabled during pipeline migration + +### GitHub Mirror + +- **Repo**: `github.com/user/teleo-codex` (public mirror) +- **Sync**: Bidirectional, Forgejo authoritative on conflict +- **Frequency**: Every 2 minutes via `sync-mirror.sh` +- **Security**: GitHub→Forgejo path never auto-processes branches. Only PRs trigger pipeline work. + +--- + +## Pipeline v2 Architecture + +### Stage Loop + +Each stage runs as an async task with its own interval, circuit breaker, and shutdown check: + +```python +async def stage_loop(name, interval, func, conn, breaker): + while not shutdown_event.is_set(): + if breaker.allow_request(): + succeeded, failed = await func(conn, max_workers=breaker.max_workers()) + # Record success/failure for breaker + await asyncio.wait_for(shutdown_event.wait(), timeout=interval) +``` + +| Stage | Interval | Function | Status | +|-------|----------|----------|--------| +| Ingest | 60s | `ingest_cycle()` | **Stub** — Phase 4 | +| Validate | 30s | `validate_cycle()` | **Live** | +| Evaluate | 30s | `evaluate_cycle()` | **Live** | +| Merge | 30s | `merge_cycle()` | **Live** | + +### Crash Recovery + +On startup, the daemon recovers interrupted state from prior crashes: + +1. Sources stuck in `extracting` → increment retry counter → `unprocessed` (or `error` if budget exhausted) +2. PRs stuck in `merging` → `approved` (re-enter merge queue) +3. PRs stuck in `reviewing` → `open` (re-enter eval queue) +4. Orphan git worktrees (`/tmp/teleo-extract-*`, `/tmp/teleo-merge-*`) cleaned up + +--- + +## Stage 1: Validate (`lib/validate.py`) + +Runs Tier 0 structural validation on PRs with `status='open'` and `tier0_pass IS NULL`. + +### Checks + +1. **Schema validation** — YAML frontmatter has required fields (type, domain, description, confidence, source, created) +2. **Date format** — `created` field is valid YYYY-MM-DD +3. **Title format** — Prose proposition, not a label (heuristic: 8+ words, no bare noun phrases) +4. **Wiki link validity** — `[[links]]` resolve to real files in the repo +5. **Universal quantifier check** — Flags claims using "all", "always", "never", "every" without scoping +6. **Domain-directory match** — Claim's `domain` field matches its file path +7. **Description quality** — Description adds info beyond the title (not a substring) +8. **Near-duplicate detection** — Trigram similarity against existing claims +9. **Proposition heuristic** — Title passes the claim test ("This note argues that [title]" works) + +### Output + +- Posts Tier 0 validation comment on Forgejo PR (with SHA-based idempotency marker) +- Sets `tier0_pass = 1` (pass) or `tier0_pass = 0` (fail) +- Failing PRs remain `status='open'` but are excluded from eval queue + +--- + +## Stage 2: Evaluate (`lib/evaluate.py`) + +The core intelligence stage. Domain-first, Leo-last architecture. + +### PR Flow + +``` +PR (open, tier0_pass=1) + │ + ├─ Triage (Haiku/OpenRouter) → DEEP / STANDARD / LIGHT + │ + ├─ Domain Review (Sonnet/Claude Max → overflow GPT-4o/OpenRouter) + │ ├─ REJECT → status='open', feedback stored, Leo skipped + │ └─ APPROVE → continue to Leo + │ + ├─ Leo Review (Opus/Claude Max → overflow: queue only) + │ ├─ REJECT → status='open', feedback stored + │ └─ APPROVE → continue + │ + ├─ LIGHT tier: Leo skipped, domain-only gate + │ + ├─ Both approve → formal Forgejo approvals (2 agent tokens) → status='approved' + │ + └─ Musings bypass: PRs touching only agents/*/musings/ auto-approve +``` + +### Model Routing + +| Stage | Primary | Overflow | Policy | +|-------|---------|----------|--------| +| Triage | Haiku (OpenRouter) | — | Always API | +| Domain review | Sonnet (Claude Max) | GPT-4o (OpenRouter) | `overflow` | +| Leo review | Opus (Claude Max) | — | `queue` (never overflow) | +| DEEP cross-family | GPT-4o (OpenRouter) | — | Always API (not yet implemented) | + +**Claude Max** is a subscription — free but rate-limited. When rate-limited, the CLI returns `"You've hit your limit"` on **stdout** (not stderr) with exit code 1. The pipeline detects this and applies the overflow policy. + +**Key design principle**: Opus is the scarce resource. Domain review (Sonnet) filters first — high volume, catches most issues. Leo review (Opus) only sees pre-filtered PRs. This maximizes value per scarce Opus call. + +### Domain Routing + +Domain detection reads diff file paths (`domains/`, `entities/`, `core/`, `foundations/`) and maps to the responsible agent: + +| Domain | Agent | +|--------|-------| +| internet-finance, mechanisms, living-capital, teleological-economics | Rio | +| entertainment, cultural-dynamics | Clay | +| ai-alignment, living-agents, critical-systems, collective-intelligence | Theseus | +| health | Vida | +| space-development | Astra | +| teleohumanity, grand-strategy | Leo | + +### Backoff and Resume + +- **10-minute backoff**: PRs attempted within the last 10 minutes are skipped (prevents retry storms during rate limits) +- **Domain review resume**: If domain review completed but Leo review was rate-limited, domain review is skipped on retry (no wasted OpenRouter calls) +- **`last_attempt` tracking**: Set at the start of `evaluate_pr`, persists through status revert + +### Review Attribution + +- Domain review comments post from the domain agent's Forgejo account (e.g., Rio posts Rio's review) +- Leo review comments post from Leo's Forgejo account +- Formal approvals come from 2 agent tokens (not the PR author) + +### Verdict Parsing + +Reviews end with HTML comment tags: +``` + + + +``` + +--- + +## Stage 3: Merge (`lib/merge.py`) + +Domain-serialized priority queue with rebase-before-merge. + +### Design + +- **Domain serialization**: Same-domain merges are serial (prevents `_map.md` conflicts). Cross-domain merges are parallel. +- **Two-layer locking**: `asyncio.Lock` per domain (fast path, lost on crash) + `prs.status='merging'` in SQLite (durable, crash recovery) +- **NOT EXISTS subquery**: SQL defense-in-depth prevents two PRs in the same domain from merging simultaneously + +### Merge Flow + +``` +1. Discover external PRs (pagination over Forgejo API) + - Detect origin: pipeline vs human (by author login) + - Human PRs: priority='high', ack comment posted + +2. For each domain with approved PRs: + a. Claim next PR (atomic UPDATE...RETURNING with priority queue) + b. Create git worktree at /tmp/teleo-merge-{branch} + c. Capture expected SHA (pin for force-with-lease) + d. Fetch origin/main, check if rebase needed + e. Rebase onto main (abort on conflict → status='conflict') + f. Force-push with --force-with-lease={branch}:{expected_sha} + g. Merge via Forgejo API + h. Delete remote branch + i. Cleanup worktree +``` + +### Priority Queue + +```sql +COALESCE(p.priority, s.priority, 'medium') +-- PR-level priority > source-level priority > default 'medium' +-- NULL falls to ELSE 4 (intentionally below explicit medium) +``` + +| Priority | Value | Use | +|----------|-------|-----| +| critical | 0 | Reserved for explicit human override | +| high | 1 | Human-submitted PRs | +| medium | 2 | Standard pipeline PRs | +| low | 3 | Explicitly deprioritized | +| NULL | 4 | Unclassified (below medium) | + +### Timeouts + +- **Merge timeout**: 5 minutes per PR. Exceeding → `status='conflict'` +- **Rebase timeout**: 2 minutes +- **Push timeout**: 30 seconds +- **API merge failure**: Sets `status='conflict'` (not `approved` — prevents infinite retry) + +--- + +## Database Schema + +SQLite WAL mode. Schema version 2. + +### Tables + +**`sources`** — Source material pipeline +- `path` (PK), `status`, `priority`, `extraction_model`, `claims_count`, `pr_number` +- `transient_retries`, `substantive_retries`, `last_error`, `feedback` + +**`prs`** — Pull request lifecycle +- `number` (PK), `source_path`, `branch`, `status`, `domain`, `tier` +- `tier0_pass`, `leo_verdict`, `domain_verdict`, `domain_agent`, `domain_model` +- `priority`, `origin` (pipeline/human), `last_attempt` + +**`costs`** — API spend tracking +- `(date, model, stage)` (composite PK), `calls`, `input_tokens`, `output_tokens`, `cost_usd` + +**`circuit_breakers`** — Per-stage health +- `name` (PK), `state` (closed/open/halfopen), `failures`, `successes`, `last_success_at` + +**`audit_log`** — Event log +- `id`, `timestamp`, `stage`, `event`, `detail` (JSON) + +### PR Status Lifecycle + +``` +open → validating → open (tier0_pass set) + → reviewing → approved → merging → merged + → open (rejected, feedback stored) + → conflict (rebase/merge failed) + → zombie (stuck, manual intervention) +``` + +--- + +## Health API + +`GET localhost:8080/health` returns: + +```json +{ + "status": "healthy|degraded|stalled", + "breakers": { + "ingest": {"state": "closed", "failures": 0}, + "validate": {"state": "closed", "failures": 0, "last_success_age_s": 30, "stalled": false}, + "evaluate": {"state": "closed", "failures": 0, "last_success_age_s": 45, "stalled": false}, + "merge": {"state": "closed", "failures": 0} + }, + "sources": {"unprocessed": 10, "extracting": 2}, + "prs": {"open": 117, "approved": 5, "merging": 1}, + "merge_queue_by_domain": {"internet-finance": 3, "health": 2}, + "budget": {"ok": true, "spend": 1.23, "budget": 20.0, "pct": 6.2}, + "metabolic": { + "null_result_rate_24h": 0.05, + "domain_approval_rate_24h": 0.96, + "leo_approval_rate_24h": 0.85 + } +} +``` + +**Stall detection**: If `now() - last_success_at > 2 * interval`, the stage is stalled. + +--- + +## Circuit Breakers + +Each stage has an independent circuit breaker: + +- **Closed** (normal): All requests pass +- **Open** (tripped): Requests blocked for `BREAKER_COOLDOWN` (15 min) +- **Half-open**: One test request allowed; success → closed, failure → open + +Triggers: 5 consecutive failures trip the breaker. Worker count reduces under pressure. + +--- + +## Cost Management + +- **Daily budget**: $20 USD (OpenRouter) +- **Warning threshold**: 80% of budget +- **Claude Max**: Free (tracked for volume, cost = $0) +- **Budget check**: Health API reports spend, pipeline can pause extraction when budget exhausted + +--- + +## Known Issues and Deferred Work + +### Active Issues + +1. **PR #702 in `conflict`**: Archive-only PR, Forgejo returned 500 on merge API. Likely needs manual merge or close. +2. **36 PRs failed Tier 0**: Will not enter eval. Need either re-extraction or closure. +3. **Domain-rejected PR limbo** (Ganymede warning #4): PRs rejected by domain review have `status='open'` but exit the eval queue. No path to re-extraction or closure. Needs `domain_rejected` status or auto-close mechanism. +4. **DEEP cross-family review not implemented** (Ganymede warning #5): Docstring promises GPT-4o adversarial review for DEEP PRs after both domain and Leo approve. Not in code. +5. **Sonnet leniency tracking**: 96% domain approval rate. Need to measure Opus disagreement rate when it comes online (Mar 13, 5pm UTC). If Opus rejects >15% of domain-approved PRs, domain prompt needs tightening. + +### Deferred Nits + +- `entity_diff` from `_filter_diff()` is returned but unused +- Formal approvals use hardcoded agent order instead of actual reviewers +- `aiohttp.ClientSession` created per API call (should be one per cycle) + +### Phase 4: Ingest Module (`lib/ingest.py`) + +Not yet built. Will port `extract-cron.sh` + `extract-worker.sh`. When complete, the remaining v1 cron scripts can be disabled. + +### Phase 5: Integration + Cutover + +Full pipeline test with all 4 stages. Disable remaining cron scripts. Re-enable research sessions. + +--- + +## Operational Runbook + +### Check pipeline health +```bash +ssh root@77.42.65.182 'curl -s localhost:8080/health | python3 -m json.tool' +``` + +### View logs +```bash +ssh root@77.42.65.182 'journalctl -u teleo-pipeline -f' # live +ssh root@77.42.65.182 'journalctl -u teleo-pipeline -n 50' # recent +ssh root@77.42.65.182 'journalctl -u teleo-pipeline --since "1 hour ago"' +``` + +### Restart pipeline +```bash +ssh root@77.42.65.182 'systemctl restart teleo-pipeline' +``` + +### Query database +```bash +ssh root@77.42.65.182 'sqlite3 /opt/teleo-eval/pipeline/pipeline.db "SELECT status, count(*) FROM prs GROUP BY status"' +``` + +### Deploy code changes +```bash +scp lib/evaluate.py root@77.42.65.182:/opt/teleo-eval/pipeline/lib/evaluate.py +ssh root@77.42.65.182 'chown teleo:teleo /opt/teleo-eval/pipeline/lib/evaluate.py && systemctl restart teleo-pipeline' +``` + +### Reset a stuck PR +```bash +ssh root@77.42.65.182 'sqlite3 /opt/teleo-eval/pipeline/pipeline.db "UPDATE prs SET status = \"open\", leo_verdict = \"pending\", domain_verdict = \"pending\" WHERE number = 702"' +``` + +### Check circuit breakers +```bash +ssh root@77.42.65.182 'sqlite3 /opt/teleo-eval/pipeline/pipeline.db "SELECT * FROM circuit_breakers"' +``` + +### View cost breakdown +```bash +ssh root@77.42.65.182 'sqlite3 /opt/teleo-eval/pipeline/pipeline.db "SELECT model, stage, calls, cost_usd FROM costs WHERE date = date(\"now\") ORDER BY cost_usd DESC"' +``` diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/breaker.py b/lib/breaker.py new file mode 100644 index 0000000..6a7b4ca --- /dev/null +++ b/lib/breaker.py @@ -0,0 +1,139 @@ +"""Circuit breaker state machine — per-stage, backed by SQLite.""" + +import logging +from datetime import datetime, timezone + +from . import config + +logger = logging.getLogger("pipeline.breaker") + +# States +CLOSED = "closed" +OPEN = "open" +HALFOPEN = "halfopen" + + +class CircuitBreaker: + """Per-stage circuit breaker. + + CLOSED: normal operation + OPEN: stage paused (threshold consecutive failures reached) + HALFOPEN: cooldown expired, try 1 worker to probe recovery + """ + + def __init__(self, name: str, conn): + self.name = name + self.conn = conn + self._ensure_row() + + def _ensure_row(self): + self.conn.execute( + "INSERT OR IGNORE INTO circuit_breakers (name) VALUES (?)", + (self.name,), + ) + + def _get_state(self) -> dict: + row = self.conn.execute( + "SELECT state, failures, successes, tripped_at, last_success_at FROM circuit_breakers WHERE name = ?", + (self.name,), + ).fetchone() + return dict(row) if row else {"state": CLOSED, "failures": 0, "successes": 0, "tripped_at": None, "last_success_at": None} + + def _set_state(self, state: str, failures: int = None, successes: int = None, + tripped_at: str = None, last_success_at: str = None): + updates = ["state = ?", "last_update = datetime('now')"] + params = [state] + if failures is not None: + updates.append("failures = ?") + params.append(failures) + if successes is not None: + updates.append("successes = ?") + params.append(successes) + if tripped_at is not None: + updates.append("tripped_at = ?") + params.append(tripped_at) + if last_success_at is not None: + updates.append("last_success_at = ?") + params.append(last_success_at) + params.append(self.name) + self.conn.execute( + f"UPDATE circuit_breakers SET {', '.join(updates)} WHERE name = ?", + params, + ) + + def allow_request(self) -> bool: + """Check if requests are allowed. Returns True if CLOSED or HALFOPEN.""" + s = self._get_state() + + if s["state"] == CLOSED: + return True + + if s["state"] == OPEN: + # Check cooldown + if s["tripped_at"]: + tripped = datetime.fromisoformat(s["tripped_at"]) + if tripped.tzinfo is None: + tripped = tripped.replace(tzinfo=timezone.utc) + elapsed = (datetime.now(timezone.utc) - tripped).total_seconds() + if elapsed >= config.BREAKER_COOLDOWN: + logger.info("Breaker %s: cooldown expired, entering HALFOPEN", self.name) + self._set_state(HALFOPEN, successes=0) + return True + return False + + # HALFOPEN — allow one probe + return True + + def max_workers(self) -> int: + """Return max workers allowed in current state.""" + s = self._get_state() + if s["state"] == HALFOPEN: + return 1 # probe with single worker + return None # no restriction from breaker + + def record_success(self): + """Record a successful cycle. Updates last_success_at for stall detection (Vida).""" + s = self._get_state() + now = datetime.now(timezone.utc).isoformat() + + if s["state"] == HALFOPEN: + logger.info("Breaker %s: HALFOPEN probe succeeded, closing", self.name) + self._set_state(CLOSED, failures=0, successes=0, last_success_at=now) + elif s["state"] == CLOSED: + if s["failures"] > 0: + self._set_state(CLOSED, failures=0, last_success_at=now) + else: + self._set_state(CLOSED, last_success_at=now) + + def record_failure(self): + """Record a failed cycle.""" + s = self._get_state() + + if s["state"] == HALFOPEN: + logger.warning("Breaker %s: HALFOPEN probe failed, reopening", self.name) + self._set_state( + OPEN, + failures=s["failures"] + 1, + tripped_at=datetime.now(timezone.utc).isoformat(), + ) + elif s["state"] == CLOSED: + new_failures = s["failures"] + 1 + if new_failures >= config.BREAKER_THRESHOLD: + logger.warning( + "Breaker %s: threshold reached (%d failures), opening", + self.name, new_failures, + ) + self._set_state( + OPEN, + failures=new_failures, + tripped_at=datetime.now(timezone.utc).isoformat(), + ) + else: + self._set_state(CLOSED, failures=new_failures) + elif s["state"] == OPEN: + self._set_state(OPEN, failures=s["failures"] + 1) + + def reset(self): + """Force reset to CLOSED.""" + logger.info("Breaker %s: force reset to CLOSED", self.name) + self._set_state(CLOSED, failures=0, successes=0) diff --git a/lib/config.py b/lib/config.py new file mode 100644 index 0000000..6791d20 --- /dev/null +++ b/lib/config.py @@ -0,0 +1,116 @@ +"""Pipeline v2 configuration — all constants and thresholds.""" + +import os +from pathlib import Path + +# --- Paths --- +BASE_DIR = Path(os.environ.get("PIPELINE_BASE", "/opt/teleo-eval")) +REPO_DIR = BASE_DIR / "workspaces" / "teleo-codex.git" +MAIN_WORKTREE = BASE_DIR / "workspaces" / "main" +SECRETS_DIR = BASE_DIR / "secrets" +LOG_DIR = BASE_DIR / "logs" +DB_PATH = BASE_DIR / "pipeline" / "pipeline.db" +INBOX_ARCHIVE = "inbox/archive" + +# --- Forgejo --- +FORGEJO_URL = os.environ.get("FORGEJO_URL", "http://localhost:3000") +FORGEJO_OWNER = "teleo" +FORGEJO_REPO = "teleo-codex" +FORGEJO_TOKEN_FILE = SECRETS_DIR / "forgejo-admin-token" +FORGEJO_PIPELINE_USER = "teleo" # git user for pipeline commits + +# --- Models --- +CLAUDE_CLI = os.environ.get("CLAUDE_CLI", "/home/teleo/.local/bin/claude") +OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" + +# Model IDs +MODEL_OPUS = "opus" +MODEL_SONNET = "sonnet" +MODEL_HAIKU = "anthropic/claude-3.5-haiku" +MODEL_GPT4O = "openai/gpt-4o" + +# --- Model assignment per stage --- +# Principle: Opus is a scarce resource. Use it only where judgment quality matters. +# Sonnet handles volume. Haiku handles routing. Opus handles synthesis + critical eval. +# +# Pipeline eval ordering (domain-first, Leo-last): +# 1. Domain review → Sonnet (catches domain issues, evidence gaps — high volume filter) +# 2. Leo review → Opus (cross-domain synthesis, confidence calibration — only pre-filtered PRs) +# 3. DEEP cross-family → GPT-4o (adversarial blind-spot check — paid, highest-value claims only) +EXTRACT_MODEL = MODEL_SONNET # extraction: structured output, volume work +TRIAGE_MODEL = MODEL_HAIKU # triage: routing decision, cheapest +EVAL_DOMAIN_MODEL = MODEL_SONNET # domain review: high-volume filter +EVAL_LEO_MODEL = MODEL_OPUS # Leo review: scarce, high-value +EVAL_DEEP_MODEL = MODEL_GPT4O # DEEP cross-family: paid, adversarial + +# --- Model backends --- +# Each model can run on Claude Max (subscription, base load) or API (overflow/spikes). +# Claude Max: free but rate-limited. API: paid but unlimited. +# When Claude Max is rate-limited, behavior per stage: +# "queue" — wait for capacity (preferred for non-urgent work) +# "overflow" — fall back to API (for time-sensitive work) +# "skip" — skip this cycle (for optional stages like sample audit) +OVERFLOW_POLICY = { + "extract": "queue", # extraction can wait + "triage": "overflow", # triage is cheap on API anyway + "eval_domain": "overflow", # domain review is the volume filter — don't let it bottleneck (Rhea) + "eval_leo": "queue", # Leo review is the bottleneck we protect + "eval_deep": "overflow", # DEEP is already on API + "sample_audit": "skip", # optional, skip if constrained +} + +# OpenRouter cost rates per 1K tokens (only applies when using API, not Claude Max) +MODEL_COSTS = { + "opus": {"input": 0.015, "output": 0.075}, + "sonnet": {"input": 0.003, "output": 0.015}, + MODEL_HAIKU: {"input": 0.0008, "output": 0.004}, + MODEL_GPT4O: {"input": 0.0025, "output": 0.01}, +} + +# --- Concurrency --- +MAX_EXTRACT_WORKERS = int(os.environ.get("MAX_EXTRACT_WORKERS", "5")) +MAX_EVAL_WORKERS = int(os.environ.get("MAX_EVAL_WORKERS", "7")) +MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain + +# --- Timeouts (seconds) --- +EXTRACT_TIMEOUT = 600 # 10 min +EVAL_TIMEOUT = 300 # 5 min +MERGE_TIMEOUT = 300 # 5 min — force-reset to conflict if exceeded (Rhea) +CLAUDE_MAX_PROBE_TIMEOUT = 15 + +# --- Backpressure --- +BACKPRESSURE_HIGH = 40 # pause extraction above this +BACKPRESSURE_LOW = 20 # throttle extraction above this +BACKPRESSURE_THROTTLE_WORKERS = 2 # workers when throttled + +# --- Retry budgets --- +TRANSIENT_RETRY_MAX = 5 # API timeouts, rate limits +SUBSTANTIVE_RETRY_STANDARD = 2 # reviewer request_changes +SUBSTANTIVE_RETRY_DEEP = 3 + +# --- Circuit breakers --- +BREAKER_THRESHOLD = 5 +BREAKER_COOLDOWN = 900 # 15 min + +# --- Cost budgets --- +OPENROUTER_DAILY_BUDGET = 20.0 # USD +OPENROUTER_WARN_THRESHOLD = 0.8 # 80% of budget + +# --- Quality --- +SAMPLE_AUDIT_RATE = 0.10 # 10% of LIGHT merges +SAMPLE_AUDIT_DISAGREEMENT_THRESHOLD = 0.10 # 10% disagreement → tighten LIGHT criteria + +# --- Polling intervals (seconds) --- +INGEST_INTERVAL = 60 +VALIDATE_INTERVAL = 30 +EVAL_INTERVAL = 30 +MERGE_INTERVAL = 30 +HEALTH_CHECK_INTERVAL = 60 + +# --- Health API --- +HEALTH_PORT = 8080 + +# --- Logging --- +LOG_FILE = LOG_DIR / "pipeline.jsonl" +LOG_ROTATION_MAX_BYTES = 50 * 1024 * 1024 # 50MB per file +LOG_ROTATION_BACKUP_COUNT = 7 # keep 7 days diff --git a/lib/costs.py b/lib/costs.py new file mode 100644 index 0000000..5a0cc37 --- /dev/null +++ b/lib/costs.py @@ -0,0 +1,88 @@ +"""Cost tracking — per-model per-day with budget enforcement.""" + +import logging +from datetime import date + +from . import config + +logger = logging.getLogger("pipeline.costs") + + +def record_usage( + conn, + model: str, + stage: str, + input_tokens: int = 0, + output_tokens: int = 0, + backend: str = "api", +): + """Record usage and compute cost. Returns cost in USD. + + backend: "max" (Claude Max subscription, free) or "api" (paid). + Claude Max calls are tracked for volume metrics but cost $0. (Ganymede) + """ + if backend == "max": + cost = 0.0 + else: + rates = config.MODEL_COSTS.get(model) + if not rates: + logger.warning("No cost rates for model %s, recording zero cost", model) + cost = 0.0 + else: + cost = (input_tokens * rates["input"] + output_tokens * rates["output"]) / 1000 + + today = date.today().isoformat() + # Include backend in the stage key so max vs api are tracked separately + stage_key = f"{stage}:{backend}" if backend != "api" else stage + conn.execute( + """INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd) + VALUES (?, ?, ?, 1, ?, ?, ?) + ON CONFLICT (date, model, stage) DO UPDATE SET + calls = calls + 1, + input_tokens = input_tokens + excluded.input_tokens, + output_tokens = output_tokens + excluded.output_tokens, + cost_usd = cost_usd + excluded.cost_usd""", + (today, model, stage_key, input_tokens, output_tokens, cost), + ) + return cost + + +def get_daily_spend(conn, day: str = None) -> float: + """Get total OpenRouter spend for a given day (default: today).""" + if day is None: + day = date.today().isoformat() + row = conn.execute( + "SELECT COALESCE(SUM(cost_usd), 0) as total FROM costs WHERE date = ?", + (day,), + ).fetchone() + return row["total"] + + +def get_daily_breakdown(conn, day: str = None) -> list: + """Get per-model per-stage breakdown for a day.""" + if day is None: + day = date.today().isoformat() + rows = conn.execute( + """SELECT model, stage, calls, input_tokens, output_tokens, cost_usd + FROM costs WHERE date = ? ORDER BY cost_usd DESC""", + (day,), + ).fetchall() + return [dict(r) for r in rows] + + +def check_budget(conn) -> dict: + """Check budget status. Returns {ok, spend, budget, pct}.""" + spend = get_daily_spend(conn) + pct = spend / config.OPENROUTER_DAILY_BUDGET if config.OPENROUTER_DAILY_BUDGET > 0 else 0 + return { + "ok": pct < 1.0, + "warn": pct >= config.OPENROUTER_WARN_THRESHOLD, + "spend": round(spend, 4), + "budget": config.OPENROUTER_DAILY_BUDGET, + "pct": round(pct * 100, 1), + } + + +def budget_allows(conn) -> bool: + """Quick check: is spending under daily budget?""" + return check_budget(conn)["ok"] diff --git a/lib/db.py b/lib/db.py new file mode 100644 index 0000000..9552b47 --- /dev/null +++ b/lib/db.py @@ -0,0 +1,228 @@ +"""SQLite database — schema, migrations, connection management.""" + +import sqlite3 +import json +import logging +from contextlib import contextmanager +from pathlib import Path + +from . import config + +logger = logging.getLogger("pipeline.db") + +SCHEMA_VERSION = 2 + +SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER PRIMARY KEY, + applied_at TEXT DEFAULT (datetime('now')) +); + +CREATE TABLE IF NOT EXISTS sources ( + path TEXT PRIMARY KEY, + status TEXT NOT NULL DEFAULT 'unprocessed', + -- unprocessed, triaging, extracting, extracted, null_result, + -- needs_reextraction, error + priority TEXT DEFAULT 'medium', + -- critical, high, medium, low, skip + priority_log TEXT DEFAULT '[]', + -- JSON array: [{stage, priority, reasoning, ts}] + extraction_model TEXT, + claims_count INTEGER DEFAULT 0, + pr_number INTEGER, + transient_retries INTEGER DEFAULT 0, + substantive_retries INTEGER DEFAULT 0, + last_error TEXT, + feedback TEXT, + -- eval feedback for re-extraction (JSON) + cost_usd REAL DEFAULT 0, + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) +); + +CREATE TABLE IF NOT EXISTS prs ( + number INTEGER PRIMARY KEY, + source_path TEXT REFERENCES sources(path), + branch TEXT, + status TEXT NOT NULL DEFAULT 'open', + -- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict + -- conflict: rebase failed or merge timed out — needs human intervention + domain TEXT, + agent TEXT, + tier TEXT, + -- LIGHT, STANDARD, DEEP + tier0_pass INTEGER, + -- 0/1 + leo_verdict TEXT DEFAULT 'pending', + -- pending, approve, request_changes, skipped, failed + domain_verdict TEXT DEFAULT 'pending', + domain_agent TEXT, + domain_model TEXT, + priority TEXT, + -- NULL = inherit from source. Set explicitly for human-submitted PRs. + -- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium') + -- Human PRs: 'critical' (detected via missing source_path or non-agent author) + origin TEXT DEFAULT 'pipeline', + -- pipeline | human | external + transient_retries INTEGER DEFAULT 0, + substantive_retries INTEGER DEFAULT 0, + last_error TEXT, + last_attempt TEXT, + cost_usd REAL DEFAULT 0, + created_at TEXT DEFAULT (datetime('now')), + merged_at TEXT +); + +CREATE TABLE IF NOT EXISTS costs ( + date TEXT, + model TEXT, + stage TEXT, + calls INTEGER DEFAULT 0, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + cost_usd REAL DEFAULT 0, + PRIMARY KEY (date, model, stage) +); + +CREATE TABLE IF NOT EXISTS circuit_breakers ( + name TEXT PRIMARY KEY, + state TEXT DEFAULT 'closed', + -- closed, open, halfopen + failures INTEGER DEFAULT 0, + successes INTEGER DEFAULT 0, + tripped_at TEXT, + last_success_at TEXT, + -- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida) + last_update TEXT DEFAULT (datetime('now')) +); + +CREATE TABLE IF NOT EXISTS audit_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT DEFAULT (datetime('now')), + stage TEXT, + event TEXT, + detail TEXT +); + +CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status); +CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status); +CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain); +CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date); +CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage); +""" + + +def get_connection(readonly: bool = False) -> sqlite3.Connection: + """Create a SQLite connection with WAL mode and proper settings.""" + config.DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect( + str(config.DB_PATH), + timeout=30, + isolation_level=None, # autocommit — we manage transactions explicitly + ) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=10000") + conn.execute("PRAGMA foreign_keys=ON") + if readonly: + conn.execute("PRAGMA query_only=ON") + return conn + + +@contextmanager +def transaction(conn: sqlite3.Connection): + """Context manager for explicit transactions.""" + conn.execute("BEGIN") + try: + yield conn + conn.execute("COMMIT") + except Exception: + conn.execute("ROLLBACK") + raise + + +def migrate(conn: sqlite3.Connection): + """Run schema migrations.""" + conn.executescript(SCHEMA_SQL) + + # Check current version + try: + row = conn.execute( + "SELECT MAX(version) as v FROM schema_version" + ).fetchone() + current = row["v"] if row and row["v"] else 0 + except sqlite3.OperationalError: + current = 0 + + # --- Incremental migrations --- + if current < 2: + # Phase 2: add multiplayer columns to prs table + for stmt in [ + "ALTER TABLE prs ADD COLUMN priority TEXT", + "ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'", + "ALTER TABLE prs ADD COLUMN last_error TEXT", + ]: + try: + conn.execute(stmt) + except sqlite3.OperationalError: + pass # Column already exists (idempotent) + logger.info("Migration v2: added priority, origin, last_error to prs") + + if current < SCHEMA_VERSION: + conn.execute( + "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", + (SCHEMA_VERSION,), + ) + logger.info("Database migrated to schema version %d", SCHEMA_VERSION) + else: + logger.debug("Database at schema version %d", current) + + +def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None): + """Write an audit log entry.""" + conn.execute( + "INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)", + (stage, event, detail), + ) + + +def append_priority_log( + conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str +): + """Append a priority assessment to a source's priority_log. + + NOTE: This does NOT update the source's priority column. The priority column + is the authoritative priority, set only by initial triage or human override. + The priority_log records each stage's opinion for offline calibration analysis. + (Bug caught by Theseus — original version overwrote priority with each stage's opinion.) + (Race condition fix per Vida — read-then-write wrapped in transaction.) + """ + conn.execute("BEGIN") + try: + row = conn.execute( + "SELECT priority_log FROM sources WHERE path = ?", (path,) + ).fetchone() + if not row: + conn.execute("ROLLBACK") + return + log = json.loads(row["priority_log"] or "[]") + log.append( + {"stage": stage, "priority": priority, "reasoning": reasoning} + ) + conn.execute( + "UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?", + (json.dumps(log), path), + ) + conn.execute("COMMIT") + except Exception: + conn.execute("ROLLBACK") + raise + + +def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"): + """Set a source's authoritative priority. Used for human overrides and initial triage.""" + conn.execute( + "UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?", + (priority, path), + ) + append_priority_log(conn, path, "override", priority, reason) diff --git a/lib/evaluate.py b/lib/evaluate.py new file mode 100644 index 0000000..a11e4e2 --- /dev/null +++ b/lib/evaluate.py @@ -0,0 +1,725 @@ +"""Evaluate stage — triage + domain review + Leo review. + +Ported from eval-worker.sh. Key architectural change: domain-first, Leo-last. +Sonnet (domain review) filters before Opus (Leo review) to maximize value per +scarce Opus call. + +Flow per PR: + 1. Triage → Haiku (OpenRouter) → DEEP / STANDARD / LIGHT + 2. Domain review → Sonnet (Claude Max, overflow: OpenRouter GPT-4o) + 3. Leo review → Opus (Claude Max, overflow: queue) — skipped for LIGHT + 4. DEEP cross-family → GPT-4o (OpenRouter) — only if domain + Leo both approve + 5. Post reviews, submit formal Forgejo approvals, update SQLite + 6. If both approve → status = 'approved' (merge module picks it up) + +Design reviewed by Ganymede, Rhea, Vida, Theseus. +""" + +import asyncio +import json +import logging +import re +from datetime import datetime, timezone + +from . import config, db + +logger = logging.getLogger("pipeline.evaluate") + +# ─── Constants ────────────────────────────────────────────────────────────── + +DOMAIN_AGENT_MAP = { + "internet-finance": "Rio", + "entertainment": "Clay", + "health": "Vida", + "ai-alignment": "Theseus", + "space-development": "Astra", + "mechanisms": "Rio", + "living-capital": "Rio", + "living-agents": "Theseus", + "teleohumanity": "Leo", + "grand-strategy": "Leo", + "critical-systems": "Theseus", + "collective-intelligence": "Theseus", + "teleological-economics": "Rio", + "cultural-dynamics": "Clay", +} + + +def _agent_token(agent_name: str) -> str | None: + """Read Forgejo token for a named agent. Returns token string or None.""" + token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token" + if token_file.exists(): + return token_file.read_text().strip() + return None + + +REVIEW_STYLE_GUIDE = ( + "Be concise. Only mention what fails or is interesting. " + "Do not summarize what the PR does — the diff speaks for itself. " + "If everything passes, say so in one line and approve." +) + + +# ─── Prompt templates ────────────────────────────────────────────────────── + +TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT. + +DEEP — use when ANY of these apply: +- PR adds or modifies claims rated "likely" or higher confidence +- PR touches agent beliefs or creates cross-domain wiki links +- PR challenges an existing claim (has "challenged_by" or contradicts existing) +- PR modifies axiom-level beliefs +- PR is a cross-domain synthesis claim + +STANDARD — use when: +- New claims in established domain areas +- Enrichments to existing claims (confirm/extend) +- New hypothesis-level beliefs +- Source archives with extraction results + +LIGHT — use ONLY when ALL changes fit these categories: +- Entity attribute updates (factual corrections, new data points) +- Source archiving without extraction +- Formatting fixes, typo corrections +- Status field changes + +IMPORTANT: When uncertain, classify UP, not down. Always err toward more review. + +Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line. + +--- PR DIFF --- +{diff}""" + +DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base. + +Review this PR from your domain expertise: +1. Technical accuracy — are the claims factually correct in your domain? +2. Domain duplicates — does your domain already have substantially similar claims? +3. Missing context — is important domain context absent that would change interpretation? +4. Confidence calibration — from your domain expertise, is the confidence level right? +5. Enrichment opportunities — should this connect to existing claims via wiki links? + +{style_guide} + +If you are requesting changes, tag the specific issues: + + +Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by + +End your review with exactly one of: + + + +--- PR DIFF --- +{diff} + +--- CHANGED FILES --- +{files}""" + +LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. + +Review this PR against the quality criteria: +1. Schema compliance — YAML frontmatter, prose-as-title, required fields +2. Duplicate check — does this claim already exist? +3. Confidence calibration — appropriate for the evidence? +4. Wiki link validity — references real claims? +5. Source quality — credible for the claim? +6. Domain assignment — correct domain? +7. Epistemic hygiene — specific enough to be wrong? + +{style_guide} + +If requesting changes, tag the issues: + + +End your review with exactly one of: + + + +--- PR DIFF --- +{diff} + +--- CHANGED FILES --- +{files}""" + +LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. + +Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check: +1. Cross-domain implications — does this claim affect beliefs in other domains? +2. Confidence calibration — is the confidence level justified by the evidence? +3. Contradiction check — does this contradict any existing claims without explicit argument? +4. Wiki link validity — do all wiki links reference real, existing claims? +5. Axiom integrity — if touching axiom-level beliefs, is the justification extraordinary? +6. Source quality — is the source credible for the claim being made? +7. Duplicate check — does a substantially similar claim already exist? +8. Enrichment vs new claim — should this be an enrichment to an existing claim instead? +9. Domain assignment — is the claim in the correct domain? +10. Schema compliance — YAML frontmatter, prose-as-title format, required fields +11. Epistemic hygiene — is the claim specific enough to be wrong? + +{style_guide} + +If requesting changes, tag the issues: + + +End your review with exactly one of: + + + +--- PR DIFF --- +{diff} + +--- CHANGED FILES --- +{files}""" + + +# ─── API helpers ─────────────────────────────────────────────────────────── + +async def _forgejo_api(method: str, path: str, body: dict = None, token: str = None): + """Call Forgejo API.""" + import aiohttp + url = f"{config.FORGEJO_URL}/api/v1{path}" + if token is None: + token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" + headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} + + try: + async with aiohttp.ClientSession() as session: + async with session.request(method, url, headers=headers, + json=body, timeout=aiohttp.ClientTimeout(total=60)) as resp: + if resp.status >= 400: + text = await resp.text() + logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) + return None + if resp.status == 204: + return {} + return await resp.json() + except Exception as e: + logger.error("Forgejo API error: %s %s → %s", method, path, e) + return None + + +async def _openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None: + """Call OpenRouter API. Returns response text or None on failure.""" + import aiohttp + key_file = config.SECRETS_DIR / "openrouter-key" + if not key_file.exists(): + logger.error("OpenRouter key file not found") + return None + key = key_file.read_text().strip() + + payload = { + "model": model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 4096, + "temperature": 0.2, + } + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + config.OPENROUTER_URL, + headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"}, + json=payload, + timeout=aiohttp.ClientTimeout(total=timeout_sec), + ) as resp: + if resp.status >= 400: + text = await resp.text() + logger.error("OpenRouter %s → %d: %s", model, resp.status, text[:200]) + return None + data = await resp.json() + return data.get("choices", [{}])[0].get("message", {}).get("content") + except Exception as e: + logger.error("OpenRouter error: %s → %s", model, e) + return None + + +async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None: + """Call Claude via CLI (Claude Max subscription). Returns response or None.""" + proc = await asyncio.create_subprocess_exec( + str(config.CLAUDE_CLI), "-p", "--model", model, "--output-format", "text", + cwd=cwd or str(config.REPO_DIR), + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(input=prompt.encode()), + timeout=timeout_sec, + ) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + logger.error("Claude CLI timed out after %ds", timeout_sec) + return None + + if proc.returncode != 0: + err = (stderr or b"").decode()[:500] + out = (stdout or b"").decode()[:500] + # Check for rate limit — Claude CLI puts limit message on stdout, not stderr + combined = (err + out).lower() + if "hit your limit" in combined or "rate limit" in combined: + logger.warning("Claude Max rate limited (stdout: %s)", out[:200]) + return "RATE_LIMITED" + logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err[:200], out[:200]) + return None + + return (stdout or b"").decode().strip() + + +# ─── Diff helpers ────────────────────────────────────────────────────────── + +async def _get_pr_diff(pr_number: int) -> str: + """Fetch PR diff via Forgejo API.""" + import aiohttp + url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff" + token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" + + try: + async with aiohttp.ClientSession() as session: + async with session.get( + url, + headers={"Authorization": f"token {token}", "Accept": "text/plain"}, + timeout=aiohttp.ClientTimeout(total=60), + ) as resp: + if resp.status >= 400: + return "" + diff = await resp.text() + if len(diff) > 2_000_000: + return "" + return diff + except Exception as e: + logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e) + return "" + + +def _filter_diff(diff: str) -> tuple[str, str]: + """Filter diff to only review-relevant files. + + Returns (review_diff, entity_diff). + Strips: inbox/archive/, schemas/, skills/, agents/*/musings/ + """ + sections = re.split(r'(?=^diff --git )', diff, flags=re.MULTILINE) + skip_patterns = [r'^diff --git a/(inbox/archive|schemas|skills|agents/[^/]+/musings)/'] + core_domains = {'living-agents', 'living-capital', 'teleohumanity', 'mechanisms'} + + claim_sections = [] + entity_sections = [] + + for section in sections: + if not section.strip(): + continue + if any(re.match(p, section) for p in skip_patterns): + continue + entity_match = re.match(r'^diff --git a/entities/([^/]+)/', section) + if entity_match and entity_match.group(1) not in core_domains: + entity_sections.append(section) + continue + claim_sections.append(section) + + return ''.join(claim_sections), ''.join(entity_sections) + + +def _extract_changed_files(diff: str) -> str: + """Extract changed file paths from diff.""" + return "\n".join( + line.replace("diff --git a/", "").split(" b/")[0] + for line in diff.split("\n") + if line.startswith("diff --git") + ) + + +def _detect_domain_from_diff(diff: str) -> str | None: + """Detect primary domain from changed file paths. + + Checks domains/, entities/, core/, foundations/ for domain classification. + """ + domain_counts: dict[str, int] = {} + for line in diff.split("\n"): + if line.startswith("diff --git"): + # Check domains/ and entities/ (both carry domain info) + match = re.search(r'(?:domains|entities)/([^/]+)/', line) + if match: + d = match.group(1) + domain_counts[d] = domain_counts.get(d, 0) + 1 + continue + # Check core/ subdirectories + match = re.search(r'core/([^/]+)/', line) + if match: + d = match.group(1) + if d in DOMAIN_AGENT_MAP: + domain_counts[d] = domain_counts.get(d, 0) + 1 + continue + # Check foundations/ subdirectories + match = re.search(r'foundations/([^/]+)/', line) + if match: + d = match.group(1) + if d in DOMAIN_AGENT_MAP: + domain_counts[d] = domain_counts.get(d, 0) + 1 + if domain_counts: + return max(domain_counts, key=domain_counts.get) + return None + + +def _is_musings_only(diff: str) -> bool: + """Check if PR only modifies musing files.""" + has_musings = False + has_other = False + for line in diff.split("\n"): + if line.startswith("diff --git"): + if "agents/" in line and "/musings/" in line: + has_musings = True + else: + has_other = True + return has_musings and not has_other + + +# ─── Verdict parsing ────────────────────────────────────────────────────── + +def _parse_verdict(review_text: str, reviewer: str) -> str: + """Parse VERDICT tag from review. Returns 'approve' or 'request_changes'.""" + upper = reviewer.upper() + if f"VERDICT:{upper}:APPROVE" in review_text: + return "approve" + elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text: + return "request_changes" + else: + logger.warning("No parseable verdict from %s — treating as request_changes", reviewer) + return "request_changes" + + +def _parse_issues(review_text: str) -> list[str]: + """Extract issue tags from review.""" + match = re.search(r'', review_text) + if not match: + return [] + return [tag.strip() for tag in match.group(1).split(",") if tag.strip()] + + +# ─── Review execution ───────────────────────────────────────────────────── + +async def _triage_pr(diff: str) -> str: + """Triage PR via Haiku → DEEP/STANDARD/LIGHT.""" + prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage + result = await _openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30) + if not result: + logger.warning("Triage failed, defaulting to STANDARD") + return "STANDARD" + + tier = result.split("\n")[0].strip().upper() + if tier in ("DEEP", "STANDARD", "LIGHT"): + reason = result.split("\n")[1].strip() if "\n" in result else "" + logger.info("Triage: %s — %s", tier, reason[:100]) + return tier + + logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20]) + return "STANDARD" + + +async def _run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None: + """Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o.""" + prompt = DOMAIN_PROMPT.format( + agent=agent, agent_upper=agent.upper(), domain=domain, + style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files, + ) + + # Try Claude Max Sonnet first + result = await _claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + + if result == "RATE_LIMITED": + # Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck) + policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow") + if policy == "overflow": + logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o") + result = await _openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + else: + logger.info("Claude Max rate limited, queuing domain review") + return None + + return result + + +async def _run_leo_review(diff: str, files: str, tier: str) -> str | None: + """Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy).""" + prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD + prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files) + + result = await _claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + + if result == "RATE_LIMITED": + # Leo review queues — don't waste Opus calls (never overflow) + logger.info("Claude Max Opus rate limited, queuing Leo review") + return None + + return result + + +async def _post_formal_approvals(pr_number: int, pr_author: str): + """Submit formal Forgejo reviews from 2 agents (not the PR author).""" + approvals = 0 + for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]: + if agent_name == pr_author: + continue + if approvals >= 2: + break + token_file = config.SECRETS_DIR / f"forgejo-{agent_name}-token" + if token_file.exists(): + token = token_file.read_text().strip() + result = await _forgejo_api( + "POST", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/reviews", + {"body": "Approved.", "event": "APPROVED"}, + token=token, + ) + if result is not None: + approvals += 1 + logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals) + + +# ─── Single PR evaluation ───────────────────────────────────────────────── + +async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: + """Evaluate a single PR. Returns result dict.""" + # Fetch diff + diff = await _get_pr_diff(pr_number) + if not diff: + return {"pr": pr_number, "skipped": True, "reason": "no_diff"} + + # Musings bypass + if _is_musings_only(diff): + logger.info("PR #%d is musings-only — auto-approving", pr_number) + await _forgejo_api( + "POST", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + {"body": "Auto-approved: musings bypass eval per collective policy."}, + ) + conn.execute( + """UPDATE prs SET status = 'approved', leo_verdict = 'skipped', + domain_verdict = 'skipped' WHERE number = ?""", + (pr_number,), + ) + return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"} + + # Filter diff + review_diff, entity_diff = _filter_diff(diff) + if not review_diff: + review_diff = diff + files = _extract_changed_files(diff) + + # Detect domain + domain = _detect_domain_from_diff(diff) + agent = DOMAIN_AGENT_MAP.get(domain, "Leo") if domain else "Leo" + + # Default NULL domain to 'general' (archive-only PRs have no domain files) + if domain is None: + domain = "general" + + # Update PR domain if not set + conn.execute( + "UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number = ?", + (domain, agent, pr_number), + ) + + # Step 1: Triage (if not already triaged) + if tier is None: + tier = await _triage_pr(diff) + conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number)) + + # Mark as reviewing + conn.execute( + "UPDATE prs SET status = 'reviewing', last_attempt = datetime('now') WHERE number = ?", + (pr_number,), + ) + + # Check if domain review already completed (resuming after Leo rate limit) + existing = conn.execute( + "SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,) + ).fetchone() + existing_domain_verdict = existing["domain_verdict"] if existing else "pending" + existing_leo_verdict = existing["leo_verdict"] if existing else "pending" + + # Step 2: Domain review FIRST (Sonnet — high volume filter) + # Skip if already completed from a previous attempt + if existing_domain_verdict not in ("pending", None): + domain_verdict = existing_domain_verdict + logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict) + else: + logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier) + domain_review = await _run_domain_review(review_diff, files, domain or "general", agent) + + if domain_review is None: + # Rate limited, couldn't overflow — revert to open for retry + conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,)) + return {"pr": pr_number, "skipped": True, "reason": "rate_limited"} + + domain_verdict = _parse_verdict(domain_review, agent) + conn.execute( + "UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?", + (domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number), + ) + + # Post domain review as comment (from agent's Forgejo account) + agent_tok = _agent_token(agent) + await _forgejo_api( + "POST", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + {"body": domain_review}, + token=agent_tok, + ) + + # If domain review rejects, skip Leo review (save Opus) + if domain_verdict == "request_changes": + logger.info("PR #%d: domain rejected, skipping Leo review", pr_number) + conn.execute( + """UPDATE prs SET status = 'open', leo_verdict = 'skipped', + last_error = 'domain review requested changes' + WHERE number = ?""", + (pr_number,), + ) + db.audit(conn, "evaluate", "domain_rejected", + json.dumps({"pr": pr_number, "agent": agent})) + return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped"} + + # Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT) + leo_verdict = "skipped" + if tier != "LIGHT": + logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier) + leo_review = await _run_leo_review(review_diff, files, tier) + + if leo_review is None: + # Opus rate limited — revert to open for retry (keep domain verdict) + conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,)) + return {"pr": pr_number, "skipped": True, "reason": "opus_rate_limited"} + + leo_verdict = _parse_verdict(leo_review, "LEO") + conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number)) + + # Post Leo review as comment (from Leo's Forgejo account) + leo_tok = _agent_token("Leo") + await _forgejo_api( + "POST", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + {"body": leo_review}, + token=leo_tok, + ) + else: + # LIGHT tier: Leo is auto-skipped, domain verdict is the only gate + conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,)) + + # Step 4: Determine final verdict + both_approve = ( + (leo_verdict == "approve" or leo_verdict == "skipped") + and domain_verdict == "approve" + ) + + if both_approve: + # Get PR author for formal approvals + pr_info = await _forgejo_api( + "GET", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}", + ) + pr_author = pr_info.get("user", {}).get("login", "") if pr_info else "" + + # Submit formal Forgejo reviews (required for merge) + await _post_formal_approvals(pr_number, pr_author) + + conn.execute( + "UPDATE prs SET status = 'approved' WHERE number = ?", + (pr_number,), + ) + db.audit(conn, "evaluate", "approved", + json.dumps({"pr": pr_number, "tier": tier, "domain": domain, + "leo": leo_verdict, "domain_agent": agent})) + logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", + pr_number, tier, leo_verdict, domain_verdict) + else: + conn.execute( + "UPDATE prs SET status = 'open' WHERE number = ?", + (pr_number,), + ) + # Store feedback for re-extraction path + feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier} + if domain_verdict == "request_changes": + feedback["domain_issues"] = _parse_issues(domain_review) + conn.execute( + "UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)", + (json.dumps(feedback), pr_number), + ) + db.audit(conn, "evaluate", "changes_requested", + json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, + "domain": domain_verdict})) + logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s)", + pr_number, leo_verdict, domain_verdict) + + # Record cost (domain review) + from . import costs + costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="max") + if tier != "LIGHT": + costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max") + + return { + "pr": pr_number, "tier": tier, "domain": domain, + "leo_verdict": leo_verdict, "domain_verdict": domain_verdict, + "approved": both_approve, + } + + +# ─── Main entry point ────────────────────────────────────────────────────── + +async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: + """Run one evaluation cycle. + + Finds PRs with status='open', tier0_pass=1, and no pending verdicts. + Evaluates in priority order. + """ + # Find PRs ready for evaluation: + # - status = 'open' + # - tier0_pass = 1 (passed validation) + # - leo_verdict = 'pending' OR domain_verdict = 'pending' + # Skip PRs attempted within last 10 minutes (backoff during rate limits) + rows = conn.execute( + """SELECT p.number, p.tier FROM prs p + LEFT JOIN sources s ON p.source_path = s.path + WHERE p.status = 'open' + AND p.tier0_pass = 1 + AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending') + AND (p.last_attempt IS NULL + OR p.last_attempt < datetime('now', '-10 minutes')) + ORDER BY + CASE COALESCE(p.priority, s.priority, 'medium') + WHEN 'critical' THEN 0 + WHEN 'high' THEN 1 + WHEN 'medium' THEN 2 + WHEN 'low' THEN 3 + ELSE 4 + END, + p.created_at ASC + LIMIT ?""", + (max_workers or config.MAX_EVAL_WORKERS,), + ).fetchall() + + if not rows: + return 0, 0 + + succeeded = 0 + failed = 0 + + for row in rows: + try: + result = await evaluate_pr(conn, row["number"], tier=row["tier"]) + if result.get("skipped"): + # Rate limited — don't count as failure, just skip + logger.debug("PR #%d skipped: %s", row["number"], result.get("reason")) + else: + succeeded += 1 + except Exception: + logger.exception("Failed to evaluate PR #%d", row["number"]) + failed += 1 + # Revert to open on unhandled error + conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],)) + + if succeeded or failed: + logger.info("Evaluate cycle: %d evaluated, %d errors", succeeded, failed) + + return succeeded, failed diff --git a/lib/health.py b/lib/health.py new file mode 100644 index 0000000..00d4045 --- /dev/null +++ b/lib/health.py @@ -0,0 +1,237 @@ +"""Health API — HTTP server on configurable port for monitoring.""" + +import logging +from aiohttp import web +from datetime import date, datetime, timezone + +from . import config, db, costs + +logger = logging.getLogger("pipeline.health") + + +def _conn(request): + """Get the persistent readonly connection from app state.""" + return request.app["db"] + + +async def handle_health(request): + """GET /health — overall pipeline health.""" + conn = _conn(request) + + # Stage status from circuit breakers + breakers = conn.execute( + "SELECT name, state, failures, last_success_at, last_update FROM circuit_breakers" + ).fetchall() + + # Queue depths + sources_by_status = conn.execute( + "SELECT status, COUNT(*) as n FROM sources GROUP BY status" + ).fetchall() + prs_by_status = conn.execute( + "SELECT status, COUNT(*) as n FROM prs GROUP BY status" + ).fetchall() + + # Per-domain merge queue depth (Vida) + merge_queue = conn.execute( + "SELECT domain, COUNT(*) as n FROM prs WHERE status = 'approved' GROUP BY domain" + ).fetchall() + + # Cost + budget = costs.check_budget(conn) + + # Metabolic metrics (Vida) + null_rate = conn.execute( + """SELECT + CAST(SUM(CASE WHEN status = 'null_result' THEN 1 ELSE 0 END) AS REAL) / + NULLIF(COUNT(*), 0) as rate + FROM sources + WHERE updated_at > datetime('now', '-24 hours') + AND status IN ('extracted', 'null_result', 'error')""" + ).fetchone() + + approval_rate = conn.execute( + """SELECT + CAST(SUM(CASE WHEN domain_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) / + NULLIF(COUNT(*), 0) as domain_rate, + CAST(SUM(CASE WHEN leo_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) / + NULLIF(COUNT(*), 0) as leo_rate + FROM prs + WHERE last_attempt > datetime('now', '-24 hours') + AND domain_verdict != 'pending'""" + ).fetchone() + + # Recent activity (last hour) + recent = conn.execute( + """SELECT stage, event, COUNT(*) as n + FROM audit_log + WHERE timestamp > datetime('now', '-1 hour') + GROUP BY stage, event""" + ).fetchall() + + body = { + "status": "healthy", + "breakers": {}, + "sources": {r["status"]: r["n"] for r in sources_by_status}, + "prs": {r["status"]: r["n"] for r in prs_by_status}, + "merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue}, + "budget": budget, + "metabolic": { + "null_result_rate_24h": round(null_rate["rate"] or 0, 3), + "domain_approval_rate_24h": round(approval_rate["domain_rate"] or 0, 3) if approval_rate["domain_rate"] else None, + "leo_approval_rate_24h": round(approval_rate["leo_rate"] or 0, 3) if approval_rate["leo_rate"] else None, + }, + "recent_activity": [ + {"stage": r["stage"], "event": r["event"], "count": r["n"]} + for r in recent + ], + } + + # Breaker state + stall detection (Vida: last_success_at heartbeat) + for r in breakers: + breaker_info = {"state": r["state"], "failures": r["failures"]} + if r["last_success_at"]: + last = datetime.fromisoformat(r["last_success_at"]) + if last.tzinfo is None: + last = last.replace(tzinfo=timezone.utc) + age_s = (datetime.now(timezone.utc) - last).total_seconds() + breaker_info["last_success_age_s"] = round(age_s) + # Stall detection: no success in 2x the stage's interval + intervals = {"ingest": config.INGEST_INTERVAL, "validate": config.VALIDATE_INTERVAL, + "evaluate": config.EVAL_INTERVAL, "merge": config.MERGE_INTERVAL} + threshold = intervals.get(r["name"], 60) * 2 + if age_s > threshold: + breaker_info["stalled"] = True + body["breakers"][r["name"]] = breaker_info + + # Overall status + if any(b.get("stalled") for b in body["breakers"].values()): + body["status"] = "stalled" + if any(b["state"] == "open" for b in body["breakers"].values()): + body["status"] = "degraded" + if not budget["ok"]: + body["status"] = "budget_exhausted" + # Rubber-stamp warning (Vida) + if (approval_rate["domain_rate"] or 0) > 0.95: + body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping" + + status_code = 200 if body["status"] == "healthy" else 503 + return web.json_response(body, status=status_code) + + +async def handle_costs(request): + """GET /costs — daily cost breakdown.""" + conn = _conn(request) + day = request.query.get("date", date.today().isoformat()) + breakdown = costs.get_daily_breakdown(conn, day) + budget = costs.check_budget(conn) + return web.json_response({"date": day, "budget": budget, "breakdown": breakdown}) + + +async def handle_sources(request): + """GET /sources — source pipeline status.""" + conn = _conn(request) + status_filter = request.query.get("status") + if status_filter: + rows = conn.execute( + "SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources WHERE status = ? ORDER BY updated_at DESC LIMIT 50", + (status_filter,), + ).fetchall() + else: + rows = conn.execute( + "SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources ORDER BY updated_at DESC LIMIT 50" + ).fetchall() + return web.json_response({"sources": [dict(r) for r in rows]}) + + +async def handle_prs(request): + """GET /prs — PR pipeline status.""" + conn = _conn(request) + status_filter = request.query.get("status") + if status_filter: + rows = conn.execute( + "SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs WHERE status = ? ORDER BY number DESC LIMIT 50", + (status_filter,), + ).fetchall() + else: + rows = conn.execute( + "SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs ORDER BY number DESC LIMIT 50" + ).fetchall() + return web.json_response({"prs": [dict(r) for r in rows]}) + + +async def handle_breakers(request): + """GET /breakers — circuit breaker states.""" + conn = _conn(request) + rows = conn.execute("SELECT * FROM circuit_breakers").fetchall() + return web.json_response({"breakers": [dict(r) for r in rows]}) + + +async def handle_calibration(request): + """GET /calibration — priority calibration analysis (Vida).""" + conn = _conn(request) + # Find sources where eval disagreed with ingest priority + # Focus on upgrades (Theseus: upgrades are the learnable signal) + rows = conn.execute( + """SELECT path, priority, priority_log FROM sources + WHERE json_array_length(priority_log) >= 2""" + ).fetchall() + + upgrades = [] + downgrades = [] + for r in rows: + import json + log = json.loads(r["priority_log"] or "[]") + if len(log) < 2: + continue + first = log[0]["priority"] + last = log[-1]["priority"] + levels = {"critical": 4, "high": 3, "medium": 2, "low": 1, "skip": 0} + if levels.get(last, 2) > levels.get(first, 2): + upgrades.append({"path": r["path"], "from": first, "to": last}) + elif levels.get(last, 2) < levels.get(first, 2): + downgrades.append({"path": r["path"], "from": first, "to": last}) + + return web.json_response({ + "upgrades": upgrades[:20], + "downgrades_count": len(downgrades), + "upgrades_count": len(upgrades), + "note": "Focus on upgrades — downgrades are expected (downstream has more context)" + }) + + +def create_app() -> web.Application: + """Create the health API application.""" + app = web.Application() + # Persistent readonly connection — one connection, no churn (Ganymede) + app["db"] = db.get_connection(readonly=True) + app.router.add_get("/health", handle_health) + app.router.add_get("/costs", handle_costs) + app.router.add_get("/sources", handle_sources) + app.router.add_get("/prs", handle_prs) + app.router.add_get("/breakers", handle_breakers) + app.router.add_get("/calibration", handle_calibration) + app.on_cleanup.append(_cleanup) + return app + + +async def _cleanup(app): + app["db"].close() + + +async def start_health_server(runner_ref: list): + """Start the health HTTP server. Stores runner in runner_ref for shutdown.""" + app = create_app() + runner = web.AppRunner(app) + await runner.setup() + # Bind to 127.0.0.1 only — use reverse proxy for external access (Ganymede) + site = web.TCPSite(runner, "127.0.0.1", config.HEALTH_PORT) + await site.start() + runner_ref.append(runner) + logger.info("Health API listening on 127.0.0.1:%d", config.HEALTH_PORT) + + +async def stop_health_server(runner_ref: list): + """Stop the health HTTP server.""" + for runner in runner_ref: + await runner.cleanup() + logger.info("Health API stopped") diff --git a/lib/log.py b/lib/log.py new file mode 100644 index 0000000..a34a3b5 --- /dev/null +++ b/lib/log.py @@ -0,0 +1,48 @@ +"""Structured JSON logging with rotation.""" + +import json +import logging +import logging.handlers +from datetime import datetime, timezone + +from . import config + + +class JSONFormatter(logging.Formatter): + """Format log records as JSON lines.""" + + def format(self, record): + entry = { + "ts": datetime.now(timezone.utc).isoformat(), + "level": record.levelname, + "logger": record.name, + "msg": record.getMessage(), + } + if record.exc_info and record.exc_info[0]: + entry["exception"] = self.formatException(record.exc_info) + # Include extra fields if present + for key in ("stage", "source", "pr", "model", "cost", "event"): + if hasattr(record, key): + entry[key] = getattr(record, key) + return json.dumps(entry) + + +def setup_logging(): + """Configure structured JSON logging with rotation.""" + config.LOG_DIR.mkdir(parents=True, exist_ok=True) + + handler = logging.handlers.RotatingFileHandler( + str(config.LOG_FILE), + maxBytes=config.LOG_ROTATION_MAX_BYTES, + backupCount=config.LOG_ROTATION_BACKUP_COUNT, + ) + handler.setFormatter(JSONFormatter()) + + # Also log to stderr for systemd journal + console = logging.StreamHandler() + console.setFormatter(logging.Formatter("%(name)s [%(levelname)s] %(message)s")) + + root = logging.getLogger() + root.setLevel(logging.INFO) + root.addHandler(handler) + root.addHandler(console) diff --git a/lib/merge.py b/lib/merge.py new file mode 100644 index 0000000..e56451b --- /dev/null +++ b/lib/merge.py @@ -0,0 +1,436 @@ +"""Merge stage — domain-serialized priority queue with rebase-before-merge. + +Design reviewed by Ganymede (round 2) and Rhea. Key decisions: +- Two-layer locking: asyncio.Lock per domain (fast path) + prs.status (crash recovery) +- Rebase-before-merge with pinned force-with-lease SHA (Ganymede) +- Priority queue: COALESCE(p.priority, s.priority, 'medium') — PR > source > default +- Human PRs default to 'high', not 'critical' (Ganymede — prevents DoS on pipeline) +- 5-minute merge timeout — force-reset to 'conflict' (Rhea) +- Ack comment on human PR discovery (Rhea) +- Pagination on all Forgejo list endpoints (Ganymede standing rule) +""" + +import asyncio +import json +import logging +from collections import defaultdict +from datetime import datetime, timezone + +from . import config, db + +logger = logging.getLogger("pipeline.merge") + +# In-memory domain locks — fast path, lost on crash (durable layer is prs.status) +_domain_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + +# Merge timeout: if a PR stays 'merging' longer than this, force-reset (Rhea) +MERGE_TIMEOUT_SECONDS = 300 # 5 minutes + + +# --- Git helpers --- + +async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]: + """Run a git command async. Returns (returncode, stdout+stderr).""" + proc = await asyncio.create_subprocess_exec( + "git", *args, + cwd=cwd or str(config.REPO_DIR), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + return -1, f"git {args[0]} timed out after {timeout}s" + output = (stdout or b"").decode().strip() + if stderr: + output += "\n" + stderr.decode().strip() + return proc.returncode, output + + +async def _forgejo_api(method: str, path: str, body: dict = None) -> dict | list | None: + """Call Forgejo API. Returns parsed JSON or None on error.""" + import aiohttp + url = f"{config.FORGEJO_URL}/api/v1{path}" + token_file = config.FORGEJO_TOKEN_FILE + token = token_file.read_text().strip() if token_file.exists() else "" + headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} + + try: + async with aiohttp.ClientSession() as session: + async with session.request(method, url, headers=headers, + json=body, timeout=aiohttp.ClientTimeout(total=30)) as resp: + if resp.status >= 400: + text = await resp.text() + logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) + return None + if resp.status == 204: # No content (DELETE) + return {} + return await resp.json() + except Exception as e: + logger.error("Forgejo API error: %s %s → %s", method, path, e) + return None + + +# --- PR Discovery (Multiplayer v1) --- + +async def discover_external_prs(conn) -> int: + """Scan Forgejo for open PRs not tracked in SQLite. + + Human PRs (non-pipeline author) get priority 'high' and origin 'human'. + Critical is reserved for explicit human override only. (Ganymede) + + Pagination on all Forgejo list endpoints. (Ganymede standing rule #5) + """ + known = {r["number"] for r in conn.execute("SELECT number FROM prs").fetchall()} + discovered = 0 + page = 1 + + while True: + prs = await _forgejo_api( + "GET", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls" + f"?state=open&limit=50&page={page}", + ) + if not prs: + break + + for pr in prs: + if pr["number"] not in known: + # Detect origin: pipeline agents have per-agent Forgejo users + pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"} + author = pr.get("user", {}).get("login", "") + is_pipeline = author.lower() in pipeline_users + origin = "pipeline" if is_pipeline else "human" + priority = "high" if origin == "human" else None + domain = _detect_domain_from_files(pr) if not is_pipeline else _detect_domain_from_branch(pr["head"]["ref"]) + + conn.execute( + """INSERT OR IGNORE INTO prs + (number, branch, status, origin, priority, domain) + VALUES (?, ?, 'open', ?, ?, ?)""", + (pr["number"], pr["head"]["ref"], origin, priority, domain), + ) + db.audit(conn, "merge", "pr_discovered", + json.dumps({"pr": pr["number"], "origin": origin, + "author": pr.get("user", {}).get("login"), + "priority": priority or "inherited"})) + + # Ack comment on human PRs so contributor feels acknowledged (Rhea) + if origin == "human": + await _post_ack_comment(pr["number"]) + + discovered += 1 + + if len(prs) < 50: + break # Last page + page += 1 + + if discovered: + logger.info("Discovered %d external PRs", discovered) + return discovered + + +def _detect_domain_from_branch(branch: str) -> str | None: + """Extract domain from branch name like 'rio/claims-futarchy' → 'internet-finance'. + + Agent-to-domain mapping for pipeline branches. + """ + agent_domain = { + "rio": "internet-finance", + "clay": "entertainment", + "theseus": "ai-alignment", + "vida": "health", + "astra": "space-development", + "leo": "grand-strategy", + } + prefix = branch.split("/")[0].lower() if "/" in branch else "" + return agent_domain.get(prefix) + + +def _detect_domain_from_files(pr: dict) -> str | None: + """Detect domain from PR's changed files for human-submitted PRs. + + Humans may not follow agent branch naming. Fall back to inspecting + file paths. (Ganymede nit) + """ + # We'd need to fetch files from the API — do it lazily on first eval + # For now, return None. Domain gets set during evaluation. + return None + + +async def _post_ack_comment(pr_number: int): + """Post acknowledgment comment on human-submitted PR. (Rhea) + + Contributor should feel acknowledged immediately, not wonder if + their PR disappeared into a void. + """ + body = ( + "Thanks for the contribution! Your PR is queued for evaluation " + "(priority: high). Expected review time: ~5 minutes.\n\n" + "_This is an automated message from the Teleo pipeline._" + ) + await _forgejo_api( + "POST", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + {"body": body}, + ) + + +# --- Merge operations --- + +async def _claim_next_pr(conn, domain: str) -> dict | None: + """Claim the next approved PR for a domain via atomic UPDATE. + + Priority inheritance: COALESCE(p.priority, s.priority, 'medium') + - Explicit PR priority (human PRs) > source priority (pipeline) > default medium + - NULL priorities fall to ELSE 4, which ranks below explicit 'medium' (WHEN 2) + - This is intentional: unclassified PRs don't jump ahead of triaged ones + (Rhea: document the precedence for future maintainers) + + NOT EXISTS enforces domain serialization in SQL — defense-in-depth even if + asyncio.Lock is bypassed. (Ganymede: approved) + """ + row = conn.execute( + """UPDATE prs SET status = 'merging', last_attempt = datetime('now') + WHERE number = ( + SELECT p.number FROM prs p + LEFT JOIN sources s ON p.source_path = s.path + WHERE p.status = 'approved' + AND p.domain = ? + AND NOT EXISTS ( + SELECT 1 FROM prs p2 + WHERE p2.domain = p.domain + AND p2.status = 'merging' + ) + ORDER BY + CASE COALESCE(p.priority, s.priority, 'medium') + WHEN 'critical' THEN 0 + WHEN 'high' THEN 1 + WHEN 'medium' THEN 2 + WHEN 'low' THEN 3 + ELSE 4 + END, + p.created_at ASC + LIMIT 1 + ) + RETURNING number, source_path, branch, domain""", + (domain,), + ).fetchone() + return dict(row) if row else None + + +async def _rebase_and_push(branch: str) -> tuple[bool, str]: + """Rebase branch onto main and force-push with pinned SHA. + + Always use --force-with-lease with pinned SHA for ALL branches — + pipeline and human. No split logic. (Ganymede) + """ + worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}" + + # Create worktree for the branch + rc, out = await _git("worktree", "add", worktree_path, branch) + if rc != 0: + return False, f"worktree add failed: {out}" + + try: + # Capture expected SHA before rebase (Ganymede: pin for force-with-lease) + rc, expected_sha = await _git("rev-parse", f"origin/{branch}", cwd=worktree_path) + if rc != 0: + return False, f"rev-parse failed: {expected_sha}" + expected_sha = expected_sha.strip().split("\n")[0] # First line only + + # Fetch latest main + rc, out = await _git("fetch", "origin", "main", cwd=worktree_path) + if rc != 0: + return False, f"fetch failed: {out}" + + # Check if rebase is needed + rc, merge_base = await _git("merge-base", "origin/main", "HEAD", cwd=worktree_path) + rc2, main_sha = await _git("rev-parse", "origin/main", cwd=worktree_path) + if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip(): + # Already up to date, no rebase needed + return True, "already up to date" + + # Rebase onto main + rc, out = await _git("rebase", "origin/main", cwd=worktree_path, timeout=120) + if rc != 0: + # Rebase conflict + await _git("rebase", "--abort", cwd=worktree_path) + return False, f"rebase conflict: {out}" + + # Force-push with pinned SHA (Ganymede: defeats tracking-ref update race) + rc, out = await _git( + "push", + f"--force-with-lease={branch}:{expected_sha}", + "origin", f"HEAD:{branch}", + cwd=worktree_path, + timeout=30, + ) + if rc != 0: + return False, f"push rejected: {out}" + + return True, "rebased and pushed" + + finally: + # Cleanup worktree + await _git("worktree", "remove", "--force", worktree_path) + + +async def _merge_pr(pr_number: int) -> tuple[bool, str]: + """Merge PR via Forgejo API. Preserves PR metadata and reviewer attribution.""" + result = await _forgejo_api( + "POST", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/merge", + {"Do": "merge", "merge_message_field": ""}, + ) + if result is None: + return False, "Forgejo merge API failed" + return True, "merged" + + +async def _delete_remote_branch(branch: str): + """Delete remote branch immediately after merge. (Ganymede Q4: immediate, not batch) + + If DELETE fails, log and move on — stale branch is cosmetic, + stale merge is operational. + """ + result = await _forgejo_api( + "DELETE", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/branches/{branch}", + ) + if result is None: + logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch) + + +# --- Domain merge task --- + +async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: + """Process the merge queue for a single domain. Returns (succeeded, failed).""" + succeeded = 0 + failed = 0 + + while True: + async with _domain_locks[domain]: + pr = await _claim_next_pr(conn, domain) + if not pr: + break # No more approved PRs for this domain + + pr_num = pr["number"] + branch = pr["branch"] + logger.info("Merging PR #%d (%s) in domain %s", pr_num, branch, domain) + + try: + # Rebase with timeout (Rhea: 5 min max, then force-reset to conflict) + rebase_ok, rebase_msg = await asyncio.wait_for( + _rebase_and_push(branch), + timeout=MERGE_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: + logger.error("PR #%d merge timed out after %ds — resetting to conflict (Rhea)", + pr_num, MERGE_TIMEOUT_SECONDS) + conn.execute( + "UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?", + (f"merge timed out after {MERGE_TIMEOUT_SECONDS}s", pr_num), + ) + db.audit(conn, "merge", "timeout", + json.dumps({"pr": pr_num, "timeout_seconds": MERGE_TIMEOUT_SECONDS})) + failed += 1 + continue + + if not rebase_ok: + logger.warning("PR #%d rebase failed: %s", pr_num, rebase_msg) + conn.execute( + "UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?", + (rebase_msg[:500], pr_num), + ) + db.audit(conn, "merge", "rebase_failed", + json.dumps({"pr": pr_num, "error": rebase_msg[:200]})) + failed += 1 + continue + + # Merge via API + merge_ok, merge_msg = await _merge_pr(pr_num) + if not merge_ok: + logger.error("PR #%d API merge failed: %s", pr_num, merge_msg) + conn.execute( + "UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?", + (merge_msg[:500], pr_num), + ) + db.audit(conn, "merge", "api_merge_failed", + json.dumps({"pr": pr_num, "error": merge_msg[:200]})) + failed += 1 + continue + + # Success — update status and cleanup + conn.execute( + """UPDATE prs SET status = 'merged', + merged_at = datetime('now'), + last_error = NULL + WHERE number = ?""", + (pr_num,), + ) + db.audit(conn, "merge", "merged", json.dumps({"pr": pr_num, "branch": branch})) + logger.info("PR #%d merged successfully", pr_num) + + # Delete remote branch immediately (Ganymede Q4) + await _delete_remote_branch(branch) + + # Prune local worktree metadata + await _git("worktree", "prune") + + succeeded += 1 + + return succeeded, failed + + +# --- Main entry point --- + +async def merge_cycle(conn, max_workers=None) -> tuple[int, int]: + """Run one merge cycle across all domains. + + 1. Discover external PRs (multiplayer v1) + 2. Find all domains with approved PRs + 3. Launch one async task per domain (cross-domain parallel, same-domain serial) + """ + # Step 1: Discover external PRs + await discover_external_prs(conn) + + # Step 2: Find domains with approved work + rows = conn.execute( + "SELECT DISTINCT domain FROM prs WHERE status = 'approved' AND domain IS NOT NULL" + ).fetchall() + domains = [r["domain"] for r in rows] + + # Also check for NULL-domain PRs (human PRs with undetected domain) + null_domain = conn.execute( + "SELECT COUNT(*) as c FROM prs WHERE status = 'approved' AND domain IS NULL" + ).fetchone() + if null_domain and null_domain["c"] > 0: + logger.warning("%d approved PRs have NULL domain — skipping until eval assigns domain", + null_domain["c"]) + + if not domains: + return 0, 0 + + # Step 3: Merge all domains concurrently + tasks = [_merge_domain_queue(conn, domain) for domain in domains] + results = await asyncio.gather(*tasks, return_exceptions=True) + + total_succeeded = 0 + total_failed = 0 + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.exception("Domain %s merge failed with exception", domains[i]) + total_failed += 1 + else: + s, f = result + total_succeeded += s + total_failed += f + + if total_succeeded or total_failed: + logger.info("Merge cycle: %d succeeded, %d failed across %d domains", + total_succeeded, total_failed, len(domains)) + + return total_succeeded, total_failed diff --git a/lib/validate.py b/lib/validate.py new file mode 100644 index 0000000..10569f8 --- /dev/null +++ b/lib/validate.py @@ -0,0 +1,591 @@ +"""Validate stage — Tier 0 deterministic validation gate. + +Ported from tier0-gate.py + validate_claims.py. Pure Python, no LLM calls. +Validates claim frontmatter, title format, wiki links, domain-directory match, +proposition heuristic, universal quantifiers, near-duplicate detection. + +Runs against PRs with status 'open' that have tier0_pass IS NULL. +Posts results as PR comments. In gate mode, sets tier0_pass = 0/1. +""" + +import json +import logging +import re +from datetime import date, datetime, timezone +from difflib import SequenceMatcher +from pathlib import Path + +from . import config, db + +logger = logging.getLogger("pipeline.validate") + +# ─── Constants ────────────────────────────────────────────────────────────── + +VALID_DOMAINS = frozenset({ + "internet-finance", "entertainment", "health", "ai-alignment", + "space-development", "grand-strategy", "mechanisms", "living-capital", + "living-agents", "teleohumanity", "critical-systems", + "collective-intelligence", "teleological-economics", "cultural-dynamics", +}) + +VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"}) +VALID_TYPES = frozenset({"claim", "framework"}) +REQUIRED_FIELDS = ("type", "domain", "description", "confidence", "source", "created") +DATE_MIN = date(2020, 1, 1) +WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]") +DEDUP_THRESHOLD = 0.85 + +# Proposition heuristic patterns +_STRONG_SIGNALS = re.compile( + r"\b(because|therefore|however|although|despite|since|" + r"rather than|instead of|not just|more than|less than|" + r"by\b|through\b|via\b|without\b|" + r"when\b|where\b|while\b|if\b|unless\b|" + r"which\b|that\b|" + r"is\b|are\b|was\b|were\b|will\b|would\b|" + r"can\b|could\b|should\b|must\b|" + r"has\b|have\b|had\b|does\b|did\b)", + re.IGNORECASE, +) + +_VERB_ENDINGS = re.compile( + r"\b\w{2,}(ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns|ps|ts|rs|ns|ds)\b", + re.IGNORECASE, +) + +_UNIVERSAL_QUANTIFIERS = re.compile( + r"\b(all|every|always|never|no one|nobody|nothing|none of|" + r"the only|the fundamental|the sole|the single|" + r"universally|invariably|without exception|in every case)\b", + re.IGNORECASE, +) + +_SCOPING_LANGUAGE = re.compile( + r"\b(when|if|under|given|assuming|provided|in cases where|" + r"for .+ that|among|within|across|during|between|" + r"approximately|roughly|nearly|most|many|often|typically|" + r"tends? to|generally|usually|frequently)\b", + re.IGNORECASE, +) + + +# ─── YAML frontmatter parser ─────────────────────────────────────────────── + +def parse_frontmatter(text: str) -> tuple[dict | None, str]: + """Extract YAML frontmatter and body from markdown text.""" + if not text.startswith("---"): + return None, text + end = text.find("---", 3) + if end == -1: + return None, text + raw = text[3:end] + body = text[end + 3:].strip() + + try: + import yaml + fm = yaml.safe_load(raw) + if not isinstance(fm, dict): + return None, body + return fm, body + except ImportError: + pass + except Exception: + return None, body + + # Fallback: simple key-value parser + fm = {} + for line in raw.strip().split("\n"): + line = line.strip() + if not line or line.startswith("#"): + continue + if ":" not in line: + continue + key, _, val = line.partition(":") + key = key.strip() + val = val.strip().strip('"').strip("'") + if val.lower() == "null" or val == "": + val = None + elif val.startswith("["): + val = [v.strip().strip('"').strip("'") + for v in val.strip("[]").split(",") if v.strip()] + fm[key] = val + return fm if fm else None, body + + +# ─── Validators ───────────────────────────────────────────────────────────── + +def validate_schema(fm: dict) -> list[str]: + """Check required fields and valid enums.""" + violations = [] + for field in REQUIRED_FIELDS: + if field not in fm or fm[field] is None: + violations.append(f"missing_field:{field}") + + ftype = fm.get("type") + if ftype and ftype not in VALID_TYPES: + violations.append(f"invalid_type:{ftype}") + + domain = fm.get("domain") + if domain and domain not in VALID_DOMAINS: + violations.append(f"invalid_domain:{domain}") + + confidence = fm.get("confidence") + if confidence and confidence not in VALID_CONFIDENCE: + violations.append(f"invalid_confidence:{confidence}") + + desc = fm.get("description") + if isinstance(desc, str) and len(desc.strip()) < 10: + violations.append("description_too_short") + + source = fm.get("source") + if isinstance(source, str) and len(source.strip()) < 3: + violations.append("source_too_short") + + return violations + + +def validate_date(date_val) -> list[str]: + """Validate created date.""" + violations = [] + if date_val is None: + return ["missing_field:created"] + + parsed = None + if isinstance(date_val, date): + parsed = date_val + elif isinstance(date_val, str): + try: + parsed = datetime.strptime(date_val, "%Y-%m-%d").date() + except ValueError: + return [f"invalid_date_format:{date_val}"] + else: + return [f"invalid_date_type:{type(date_val).__name__}"] + + today = date.today() + if parsed > today: + violations.append(f"future_date:{parsed}") + if parsed < DATE_MIN: + violations.append(f"date_before_2020:{parsed}") + return violations + + +def validate_title(filepath: str) -> list[str]: + """Check filename follows prose-as-claim convention.""" + violations = [] + name = Path(filepath).stem + normalized = name.replace("-", " ") + + if len(normalized) < 20: + violations.append("title_too_short") + + words = normalized.split() + if len(words) < 4: + violations.append("title_too_few_words") + + cleaned = re.sub(r"[a-zA-Z0-9\s\-\.,'()%]", "", name) + if cleaned: + violations.append(f"title_special_chars:{cleaned[:20]}") + + return violations + + +def validate_wiki_links(body: str, existing_claims: set[str]) -> list[str]: + """Check that [[wiki links]] resolve to known claims.""" + violations = [] + for link in WIKI_LINK_RE.findall(body): + if link.strip() and link.strip() not in existing_claims: + violations.append(f"broken_wiki_link:{link.strip()[:80]}") + return violations + + +def validate_proposition(title: str) -> list[str]: + """Check title reads as a proposition, not a label.""" + normalized = title.replace("-", " ") + words = normalized.split() + n = len(words) + + if n < 4: + return ["title_not_proposition:too short to be a disagreeable sentence"] + + if _STRONG_SIGNALS.search(normalized): + return [] + if _VERB_ENDINGS.search(normalized): + return [] + if n >= 8: + return [] + + return ["title_not_proposition:no verb or connective found"] + + +def validate_universal_quantifiers(title: str) -> list[str]: + """Flag unscoped universal quantifiers (warning, not gate).""" + universals = _UNIVERSAL_QUANTIFIERS.findall(title) + if universals and not _SCOPING_LANGUAGE.search(title): + return [f"unscoped_universal:{','.join(universals)}"] + return [] + + +def validate_domain_directory_match(filepath: str, fm: dict) -> list[str]: + """Check file's directory matches its domain field.""" + domain = fm.get("domain") + if not domain: + return [] + + parts = Path(filepath).parts + for i, part in enumerate(parts): + if part == "domains" and i + 1 < len(parts): + dir_domain = parts[i + 1] + if dir_domain != domain: + secondary = fm.get("secondary_domains", []) + if isinstance(secondary, str): + secondary = [secondary] + if dir_domain not in (secondary or []): + return [f"domain_directory_mismatch:file in domains/{dir_domain}/ " + f"but domain field says '{domain}'"] + break + return [] + + +def validate_description_not_title(title: str, description: str) -> list[str]: + """Check description adds info beyond the title.""" + if not description: + return [] + title_lower = title.lower().strip() + desc_lower = description.lower().strip().rstrip(".") + + if desc_lower in title_lower or title_lower in desc_lower: + return ["description_echoes_title"] + + ratio = SequenceMatcher(None, title_lower, desc_lower).ratio() + if ratio > 0.75: + return [f"description_too_similar:{ratio:.0%}"] + return [] + + +def find_near_duplicates(title: str, existing_claims: set[str]) -> list[str]: + """Find near-duplicate titles using SequenceMatcher with word pre-filter.""" + title_lower = title.lower() + title_words = set(title_lower.split()[:6]) + warnings = [] + for existing in existing_claims: + existing_lower = existing.lower() + if len(title_words & set(existing_lower.split()[:6])) < 2: + continue + ratio = SequenceMatcher(None, title_lower, existing_lower).ratio() + if ratio >= DEDUP_THRESHOLD: + warnings.append(f"near_duplicate:{existing[:80]} (similarity={ratio:.2f})") + return warnings + + +# ─── Full Tier 0 validation ──────────────────────────────────────────────── + +def tier0_validate_claim(filepath: str, content: str, existing_claims: set[str]) -> dict: + """Run full Tier 0 validation. Returns {filepath, passes, violations, warnings}.""" + violations = [] + warnings = [] + + fm, body = parse_frontmatter(content) + if fm is None: + return {"filepath": filepath, "passes": False, + "violations": ["no_frontmatter"], "warnings": []} + + violations.extend(validate_schema(fm)) + violations.extend(validate_date(fm.get("created"))) + violations.extend(validate_title(filepath)) + violations.extend(validate_wiki_links(body, existing_claims)) + + title = Path(filepath).stem + violations.extend(validate_proposition(title)) + warnings.extend(validate_universal_quantifiers(title)) + violations.extend(validate_domain_directory_match(filepath, fm)) + + desc = fm.get("description", "") + if isinstance(desc, str): + warnings.extend(validate_description_not_title(title, desc)) + + warnings.extend(find_near_duplicates(title, existing_claims)) + + return {"filepath": filepath, "passes": len(violations) == 0, + "violations": violations, "warnings": warnings} + + +# ─── Diff parsing ────────────────────────────────────────────────────────── + +def extract_claim_files_from_diff(diff: str) -> dict[str, str]: + """Parse unified diff to extract new/modified claim file contents.""" + claim_dirs = ("domains/", "core/", "foundations/") + files = {} + current_file = None + current_lines = [] + is_deletion = False + + for line in diff.split("\n"): + if line.startswith("diff --git"): + if current_file and not is_deletion: + files[current_file] = "\n".join(current_lines) + current_file = None + current_lines = [] + is_deletion = False + elif line.startswith("deleted file mode") or line.startswith("+++ /dev/null"): + is_deletion = True + current_file = None + elif line.startswith("+++ b/") and not is_deletion: + path = line[6:] + basename = path.rsplit("/", 1)[-1] if "/" in path else path + if (any(path.startswith(d) for d in claim_dirs) + and path.endswith(".md") + and not basename.startswith("_")): + current_file = path + elif current_file and line.startswith("+") and not line.startswith("+++"): + current_lines.append(line[1:]) + + if current_file and not is_deletion: + files[current_file] = "\n".join(current_lines) + + return files + + +# ─── Forgejo API (using merge module's helper) ───────────────────────────── + +async def _forgejo_api(method: str, path: str, body: dict = None): + """Call Forgejo API. Reuses merge module pattern.""" + import aiohttp + url = f"{config.FORGEJO_URL}/api/v1{path}" + token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" + headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} + + try: + async with aiohttp.ClientSession() as session: + async with session.request(method, url, headers=headers, + json=body, timeout=aiohttp.ClientTimeout(total=30)) as resp: + if resp.status >= 400: + text = await resp.text() + logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) + return None + if resp.status == 204: + return {} + return await resp.json() + except Exception as e: + logger.error("Forgejo API error: %s %s → %s", method, path, e) + return None + + +async def _get_pr_diff(pr_number: int) -> str: + """Fetch PR diff via Forgejo API.""" + import aiohttp + url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff" + token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" + headers = {"Authorization": f"token {token}", "Accept": "text/plain"} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers, + timeout=aiohttp.ClientTimeout(total=60)) as resp: + if resp.status >= 400: + return "" + diff = await resp.text() + if len(diff) > 2_000_000: + return "" # Too large + return diff + except Exception as e: + logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e) + return "" + + +async def _get_pr_head_sha(pr_number: int) -> str: + """Get HEAD SHA of PR's branch.""" + pr_info = await _forgejo_api( + "GET", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}", + ) + if pr_info: + return pr_info.get("head", {}).get("sha", "") + return "" + + +async def _has_tier0_comment(pr_number: int, head_sha: str) -> bool: + """Check if we already validated this exact commit.""" + if not head_sha: + return False + # Paginate comments (Ganymede standing rule) + page = 1 + while True: + comments = await _forgejo_api( + "GET", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments" + f"?limit=50&page={page}", + ) + if not comments: + break + marker = f"" + for c in comments: + if marker in c.get("body", ""): + return True + if len(comments) < 50: + break + page += 1 + return False + + +async def _post_validation_comment(pr_number: int, results: list[dict], head_sha: str): + """Post Tier 0 validation results as PR comment.""" + all_pass = all(r["passes"] for r in results) + total = len(results) + passing = sum(1 for r in results if r["passes"]) + + marker = f"" if head_sha else "" + status = "PASS" if all_pass else "FAIL" + lines = [ + marker, + f"**Tier 0 Validation: {status}** — {passing}/{total} claims pass\n", + ] + + for r in results: + icon = "pass" if r["passes"] else "FAIL" + short_path = r["filepath"].split("/", 1)[-1] if "/" in r["filepath"] else r["filepath"] + lines.append(f"**[{icon}]** `{short_path}`") + for v in r["violations"]: + lines.append(f" - {v}") + for w in r["warnings"]: + lines.append(f" - (warn) {w}") + lines.append("") + + if not all_pass: + lines.append("---") + lines.append("Fix the violations above and push to trigger re-validation.") + + lines.append(f"\n*tier0-gate v2 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*") + + await _forgejo_api( + "POST", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + {"body": "\n".join(lines)}, + ) + + +# ─── Existing claims index ───────────────────────────────────────────────── + +def load_existing_claims() -> set[str]: + """Build set of known claim titles from the main worktree.""" + claims: set[str] = set() + base = config.MAIN_WORKTREE + for subdir in ["domains", "core", "foundations", "maps", "agents", "schemas"]: + full = base / subdir + if not full.is_dir(): + continue + for f in full.rglob("*.md"): + claims.add(f.stem) + return claims + + +# ─── Main entry point ────────────────────────────────────────────────────── + +async def validate_pr(conn, pr_number: int) -> dict: + """Run Tier 0 validation on a single PR. + + Returns {pr, all_pass, total, passing, skipped, reason}. + """ + # Get HEAD SHA for idempotency + head_sha = await _get_pr_head_sha(pr_number) + + # Skip if already validated for this commit + if await _has_tier0_comment(pr_number, head_sha): + logger.debug("PR #%d already validated at %s", pr_number, head_sha[:8]) + return {"pr": pr_number, "skipped": True, "reason": "already_validated"} + + # Fetch diff + diff = await _get_pr_diff(pr_number) + if not diff: + logger.debug("PR #%d: empty or oversized diff", pr_number) + return {"pr": pr_number, "skipped": True, "reason": "no_diff"} + + # Extract claim files + claim_files = extract_claim_files_from_diff(diff) + if not claim_files: + logger.debug("PR #%d: no claim files in diff", pr_number) + return {"pr": pr_number, "skipped": True, "reason": "no_claims"} + + # Load existing claims index + existing_claims = load_existing_claims() + + # Validate each claim + results = [] + for filepath, content in claim_files.items(): + result = tier0_validate_claim(filepath, content, existing_claims) + results.append(result) + status = "PASS" if result["passes"] else "FAIL" + logger.debug("PR #%d: %s %s v=%s w=%s", pr_number, status, filepath, + result["violations"], result["warnings"]) + + all_pass = all(r["passes"] for r in results) + total = len(results) + passing = sum(1 for r in results if r["passes"]) + + logger.info("PR #%d: Tier 0 — %d/%d pass, all_pass=%s", pr_number, passing, total, all_pass) + + # Post comment + await _post_validation_comment(pr_number, results, head_sha) + + # Update PR record + conn.execute( + "UPDATE prs SET tier0_pass = ? WHERE number = ?", + (1 if all_pass else 0, pr_number), + ) + db.audit(conn, "validate", "tier0_complete", + json.dumps({"pr": pr_number, "pass": all_pass, "passing": passing, "total": total})) + + return {"pr": pr_number, "all_pass": all_pass, "total": total, "passing": passing} + + +async def validate_cycle(conn, max_workers=None) -> tuple[int, int]: + """Run one validation cycle. + + Finds PRs with status='open' and tier0_pass IS NULL, validates them. + """ + # Find unvalidated PRs (priority ordered) + rows = conn.execute( + """SELECT p.number FROM prs p + LEFT JOIN sources s ON p.source_path = s.path + WHERE p.status = 'open' + AND p.tier0_pass IS NULL + ORDER BY + CASE COALESCE(p.priority, s.priority, 'medium') + WHEN 'critical' THEN 0 + WHEN 'high' THEN 1 + WHEN 'medium' THEN 2 + WHEN 'low' THEN 3 + ELSE 4 + END, + p.created_at ASC + LIMIT ?""", + (max_workers or 10,), + ).fetchall() + + if not rows: + return 0, 0 + + succeeded = 0 + failed = 0 + + for row in rows: + try: + result = await validate_pr(conn, row["number"]) + if result.get("skipped"): + # Mark as validated even if skipped (no claims = pass) + conn.execute( + "UPDATE prs SET tier0_pass = 1 WHERE number = ? AND tier0_pass IS NULL", + (row["number"],), + ) + succeeded += 1 + elif result.get("all_pass"): + succeeded += 1 + else: + succeeded += 1 # Validation ran successfully, even if claims failed + except Exception: + logger.exception("Failed to validate PR #%d", row["number"]) + failed += 1 + + if succeeded or failed: + logger.info("Validate cycle: %d validated, %d errors", succeeded, failed) + + return succeeded, failed diff --git a/teleo-pipeline.py b/teleo-pipeline.py new file mode 100644 index 0000000..002412e --- /dev/null +++ b/teleo-pipeline.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +"""Teleo Pipeline v2 — single async daemon replacing 7 cron scripts. + +Four stages: Ingest → Validate → Evaluate → Merge +SQLite WAL state store. systemd-managed. Graceful shutdown. +""" + +import asyncio +import logging +import signal +import sys +from datetime import datetime, timezone + +# Add parent dir to path so lib/ is importable +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +from lib import config, db, log as logmod +from lib.health import start_health_server, stop_health_server +from lib.breaker import CircuitBreaker +from lib.merge import merge_cycle +from lib.validate import validate_cycle +from lib.evaluate import evaluate_cycle + +logger = logging.getLogger("pipeline") + +# Global shutdown event — stages check this between iterations +shutdown_event = asyncio.Event() + +# Track active subprocesses for cleanup +active_subprocesses: set = set() + + +async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker): + """Generic stage loop with interval, shutdown check, and circuit breaker.""" + logger.info("Stage %s started (interval=%ds)", name, interval) + while not shutdown_event.is_set(): + try: + if not breaker.allow_request(): + logger.debug("Stage %s: breaker OPEN, skipping cycle", name) + else: + workers = breaker.max_workers() + succeeded, failed = await func(conn, max_workers=workers) + if failed > 0 and succeeded == 0: + breaker.record_failure() + elif succeeded > 0: + breaker.record_success() + except Exception: + logger.exception("Stage %s: unhandled error in cycle", name) + breaker.record_failure() + + # Wait for interval or shutdown, whichever comes first + try: + await asyncio.wait_for(shutdown_event.wait(), timeout=interval) + break # shutdown_event was set + except asyncio.TimeoutError: + pass # interval elapsed, continue loop + + logger.info("Stage %s stopped", name) + + +# --- Stage stubs (Phase 1 — replaced in later phases) --- + +async def ingest_cycle(conn, max_workers=None): + """Stage 1: Scan inbox, extract claims. (stub)""" + return 0, 0 + + +# validate_cycle imported from lib.validate + + +# evaluate_cycle imported from lib.evaluate + + +# merge_cycle imported from lib.merge + + +# --- Shutdown --- + +def handle_signal(sig): + """Signal handler — sets shutdown event.""" + logger.info("Received %s, initiating graceful shutdown...", sig.name) + shutdown_event.set() + + +async def kill_subprocesses(): + """Kill any lingering Claude CLI subprocesses.""" + for proc in list(active_subprocesses): + if proc.returncode is None: + logger.warning("Killing lingering subprocess PID %d", proc.pid) + try: + proc.kill() + await proc.wait() + except ProcessLookupError: + pass + active_subprocesses.clear() + + +async def cleanup_orphan_worktrees(): + """Remove any orphan worktrees from previous crashes.""" + import glob + import shutil + # Use specific prefix to avoid colliding with other /tmp users (Ganymede) + orphans = glob.glob("/tmp/teleo-extract-*") + glob.glob("/tmp/teleo-merge-*") + for path in orphans: + logger.warning("Cleaning orphan worktree: %s", path) + try: + proc = await asyncio.create_subprocess_exec( + "git", "worktree", "remove", "--force", path, + cwd=str(config.REPO_DIR), + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + await asyncio.wait_for(proc.wait(), timeout=10) + except Exception: + shutil.rmtree(path, ignore_errors=True) + # Prune stale worktree metadata entries from bare repo (Ganymede) + try: + proc = await asyncio.create_subprocess_exec( + "git", "worktree", "prune", + cwd=str(config.REPO_DIR), + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + await asyncio.wait_for(proc.wait(), timeout=10) + except Exception: + logger.warning("git worktree prune failed, continuing") + + +# --- Main --- + +async def main(): + logmod.setup_logging() + logger.info("Teleo Pipeline v2 starting") + + # Clean orphan worktrees from prior crashes (Ganymede's requirement) + await cleanup_orphan_worktrees() + + # Initialize database + conn = db.get_connection() + db.migrate(conn) + logger.info("Database ready at %s", config.DB_PATH) + + # Initialize circuit breakers + breakers = { + "ingest": CircuitBreaker("ingest", conn), + "validate": CircuitBreaker("validate", conn), + "evaluate": CircuitBreaker("evaluate", conn), + "merge": CircuitBreaker("merge", conn), + } + + # Recover interrupted state from crashes + # Atomic recovery: all three resets in one transaction (Ganymede) + # Increment transient_retries on recovered sources to prevent infinite cycling (Vida) + with db.transaction(conn): + # Sources stuck in 'extracting' — increment retry counter, move to error if exhausted + c1 = conn.execute( + """UPDATE sources SET + transient_retries = transient_retries + 1, + status = CASE + WHEN transient_retries + 1 >= ? THEN 'error' + ELSE 'unprocessed' + END, + last_error = CASE + WHEN transient_retries + 1 >= ? THEN 'crash recovery: retry budget exhausted' + ELSE last_error + END, + updated_at = datetime('now') + WHERE status = 'extracting'""", + (config.TRANSIENT_RETRY_MAX, config.TRANSIENT_RETRY_MAX), + ) + # PRs stuck in 'merging' → approved (Ganymede's Q4 answer) + c2 = conn.execute( + "UPDATE prs SET status = 'approved' WHERE status = 'merging'" + ) + # PRs stuck in 'reviewing' → open + c3 = conn.execute( + "UPDATE prs SET status = 'open' WHERE status = 'reviewing'" + ) + recovered = c1.rowcount + c2.rowcount + c3.rowcount + if recovered: + logger.info("Recovered %d interrupted rows from prior crash", recovered) + + # Register signal handlers + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, handle_signal, sig) + + # Start health API + health_runners = [] + await start_health_server(health_runners) + + # Start stage loops + stages = [ + asyncio.create_task( + stage_loop("ingest", config.INGEST_INTERVAL, ingest_cycle, conn, breakers["ingest"]), + name="ingest", + ), + asyncio.create_task( + stage_loop("validate", config.VALIDATE_INTERVAL, validate_cycle, conn, breakers["validate"]), + name="validate", + ), + asyncio.create_task( + stage_loop("evaluate", config.EVAL_INTERVAL, evaluate_cycle, conn, breakers["evaluate"]), + name="evaluate", + ), + asyncio.create_task( + stage_loop("merge", config.MERGE_INTERVAL, merge_cycle, conn, breakers["merge"]), + name="merge", + ), + ] + + logger.info("All stages running") + + # Wait for shutdown signal + await shutdown_event.wait() + logger.info("Shutdown event received, waiting for stages to finish...") + + # Give stages time to finish current work + try: + await asyncio.wait_for(asyncio.gather(*stages, return_exceptions=True), timeout=60) + except asyncio.TimeoutError: + logger.warning("Stages did not finish within 60s, force-cancelling") + for task in stages: + task.cancel() + await asyncio.gather(*stages, return_exceptions=True) + + # Kill lingering subprocesses + await kill_subprocesses() + + # Stop health API + await stop_health_server(health_runners) + + # Close DB + conn.close() + logger.info("Teleo Pipeline v2 shut down cleanly") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/teleo-pipeline.service b/teleo-pipeline.service new file mode 100644 index 0000000..8b0b379 --- /dev/null +++ b/teleo-pipeline.service @@ -0,0 +1,36 @@ +[Unit] +Description=Teleo Pipeline v2 — extraction/eval/merge daemon +After=network.target +Wants=network.target + +[Service] +Type=simple +User=teleo +Group=teleo +WorkingDirectory=/opt/teleo-eval +ExecStart=/opt/teleo-eval/pipeline/.venv/bin/python3 /opt/teleo-eval/pipeline/teleo-pipeline.py +Restart=on-failure +RestartSec=30 + +# Graceful shutdown: SIGTERM → 60s drain → force-cancel → kill subprocesses +# 180s buffer handles in-flight extractions (up to 10 min each) (Ganymede) +KillSignal=SIGTERM +TimeoutStopSec=180 + +# Environment +Environment=PIPELINE_BASE=/opt/teleo-eval +EnvironmentFile=-/opt/teleo-eval/secrets/pipeline.env + +# Logging goes to journal + pipeline.jsonl +StandardOutput=journal +StandardError=journal + +# Security hardening +NoNewPrivileges=yes +ProtectSystem=strict +ReadWritePaths=/opt/teleo-eval /tmp +# PrivateTmp=no: daemon uses /tmp/teleo-extract-* worktrees shared with git (Ganymede) +PrivateTmp=no + +[Install] +WantedBy=multi-user.target