Initial commit: Pipeline v2 daemon + infrastructure docs

- teleo-pipeline.py: async daemon with 4 stage loops (ingest/validate/evaluate/merge)
- lib/: config, db, evaluate, validate, merge, breaker, costs, health, log modules
- INFRASTRUCTURE.md: comprehensive deep-dive for onboarding
- teleo-pipeline.service: systemd unit file

Pentagon-Agent: Leo <294C3CA1-0205-4668-82FA-B984D54F48AD>
This commit is contained in:
m3taversal 2026-03-12 14:11:18 +00:00
commit 799249d470
14 changed files with 3350 additions and 0 deletions

19
.gitignore vendored Normal file
View file

@ -0,0 +1,19 @@
# Python
__pycache__/
*.pyc
*.pyo
# Database
*.db
*.db-wal
*.db-shm
# Secrets (never commit)
secrets/
# Logs
logs/
*.log
# OS
.DS_Store

447
INFRASTRUCTURE.md Normal file
View file

@ -0,0 +1,447 @@
# Teleo Infrastructure Deep Dive
## Overview
Teleo runs a **knowledge extraction and evaluation pipeline** on a single VPS. Six AI domain agents (Rio, Clay, Theseus, Vida, Astra, Leo) continuously extract claims from source material, evaluate them through a multi-stage review process, and merge approved claims into a shared knowledge base.
The system is mid-migration from **7 bash cron scripts** (v1) to a **single Python async daemon** (v2). Pipeline v2 handles validate, evaluate, and merge. Extraction still runs on v1 cron. Ingest (Phase 4) will complete the migration.
```
Source Material → Ingest → Validate → Evaluate → Merge → Knowledge Base
(cron v1) (stub) (v2) (v2) (v2) (git repo)
```
---
## VPS
- **Host**: `77.42.65.182` (Hetzner, Debian)
- **SSH**: `root@77.42.65.182` (key auth)
- **Disk**: 150GB, 19GB used (13%)
- **User**: `teleo` (pipeline runs as this user)
- **Base dir**: `/opt/teleo-eval/`
### Directory Layout
```
/opt/teleo-eval/
├── pipeline/ # Pipeline v2 daemon
│ ├── teleo-pipeline.py # Main entry point (4 async stage loops)
│ ├── pipeline.db # SQLite WAL state store (160KB)
│ ├── .venv/ # Python virtualenv (aiohttp)
│ └── lib/
│ ├── config.py # All constants, model assignments, overflow policies
│ ├── db.py # Schema, migrations, connection management
│ ├── validate.py # Tier 0 validation (schema, links, duplicates)
│ ├── evaluate.py # Triage + domain review + Leo review
│ ├── merge.py # Domain-serialized rebase + Forgejo API merge
│ ├── health.py # HTTP health API (localhost:8080)
│ ├── breaker.py # Circuit breaker per stage
│ ├── costs.py # API cost tracking with daily budgets
│ └── log.py # JSON structured logging
├── workspaces/
│ ├── teleo-codex.git/ # Bare repo (49MB) — pipeline's git backend
│ └── main/ # Main branch worktree (for validation checks)
├── mirror/
│ └── teleo-codex.git/ # Separate bare repo for GitHub↔Forgejo sync
├── secrets/
│ ├── forgejo-admin-token # Admin Forgejo API token
│ ├── forgejo-{agent}-token # Per-agent tokens (rio, clay, theseus, vida, astra, leo)
│ ├── github-pat # GitHub mirror push token
│ ├── openrouter-key # OpenRouter API key
│ ├── twitterapi-io-key # X/Twitter API key
│ └── x-bearer-token # X bearer token
├── logs/ # Log files for cron scripts and pipeline
├── *.sh # Legacy cron scripts (being replaced)
└── eval/ # Legacy eval scripts
```
---
## Services
### Forgejo (Git Forge)
- **Runs in**: Docker container (`codeberg.org/forgejo/forgejo:9`)
- **Port**: 3000 (HTTP), 2222 (SSH)
- **Public URL**: `https://git.livingip.xyz`
- **Repo**: `teleo/teleo-codex`
- **Purpose**: Hosts the knowledge base repo, manages PRs, stores review comments
- **Users**: Per-agent Forgejo accounts (`rio`, `clay`, `theseus`, `vida`, `astra`, `leo`, `teleo`)
### Pipeline v2 Daemon
- **Service**: `teleo-pipeline.service` (systemd)
- **Commands**: `systemctl {start|stop|restart|status} teleo-pipeline`
- **Logs**: `journalctl -u teleo-pipeline -f`
- **Health**: `curl localhost:8080/health`
- **Shutdown**: SIGTERM → 60s drain → force-cancel → kill subprocesses (180s total)
### Active Cron Jobs (teleo user)
| Schedule | Script | Purpose |
|----------|--------|---------|
| `*/3 * * *` | `extract-cron.sh` | Source extraction (v1, still active) |
| `*/2 * * *` | `sync-mirror.sh` | Forgejo↔GitHub bidirectional sync |
| `*/2 * * *` | `fetch-bare.sh` | Fetch latest into bare repo |
| `0 0 * * *` | `pipeline-health-check.sh` | Daily health metrics |
| `0 */2 * * *` | `pipeline-health-check.py` | 2-hourly health report |
### Disabled Cron Jobs (replaced by Pipeline v2)
- `fix-extraction-prs.py` — replaced by `validate.py`
- `eval-dispatcher.sh` — replaced by `evaluate.py`
- `merge-retry.sh` — replaced by `merge.py`
- Research sessions (rio, clay, theseus, vida, astra) — disabled during pipeline migration
### GitHub Mirror
- **Repo**: `github.com/user/teleo-codex` (public mirror)
- **Sync**: Bidirectional, Forgejo authoritative on conflict
- **Frequency**: Every 2 minutes via `sync-mirror.sh`
- **Security**: GitHub→Forgejo path never auto-processes branches. Only PRs trigger pipeline work.
---
## Pipeline v2 Architecture
### Stage Loop
Each stage runs as an async task with its own interval, circuit breaker, and shutdown check:
```python
async def stage_loop(name, interval, func, conn, breaker):
while not shutdown_event.is_set():
if breaker.allow_request():
succeeded, failed = await func(conn, max_workers=breaker.max_workers())
# Record success/failure for breaker
await asyncio.wait_for(shutdown_event.wait(), timeout=interval)
```
| Stage | Interval | Function | Status |
|-------|----------|----------|--------|
| Ingest | 60s | `ingest_cycle()` | **Stub** — Phase 4 |
| Validate | 30s | `validate_cycle()` | **Live** |
| Evaluate | 30s | `evaluate_cycle()` | **Live** |
| Merge | 30s | `merge_cycle()` | **Live** |
### Crash Recovery
On startup, the daemon recovers interrupted state from prior crashes:
1. Sources stuck in `extracting` → increment retry counter → `unprocessed` (or `error` if budget exhausted)
2. PRs stuck in `merging``approved` (re-enter merge queue)
3. PRs stuck in `reviewing``open` (re-enter eval queue)
4. Orphan git worktrees (`/tmp/teleo-extract-*`, `/tmp/teleo-merge-*`) cleaned up
---
## Stage 1: Validate (`lib/validate.py`)
Runs Tier 0 structural validation on PRs with `status='open'` and `tier0_pass IS NULL`.
### Checks
1. **Schema validation** — YAML frontmatter has required fields (type, domain, description, confidence, source, created)
2. **Date format**`created` field is valid YYYY-MM-DD
3. **Title format** — Prose proposition, not a label (heuristic: 8+ words, no bare noun phrases)
4. **Wiki link validity**`[[links]]` resolve to real files in the repo
5. **Universal quantifier check** — Flags claims using "all", "always", "never", "every" without scoping
6. **Domain-directory match** — Claim's `domain` field matches its file path
7. **Description quality** — Description adds info beyond the title (not a substring)
8. **Near-duplicate detection** — Trigram similarity against existing claims
9. **Proposition heuristic** — Title passes the claim test ("This note argues that [title]" works)
### Output
- Posts Tier 0 validation comment on Forgejo PR (with SHA-based idempotency marker)
- Sets `tier0_pass = 1` (pass) or `tier0_pass = 0` (fail)
- Failing PRs remain `status='open'` but are excluded from eval queue
---
## Stage 2: Evaluate (`lib/evaluate.py`)
The core intelligence stage. Domain-first, Leo-last architecture.
### PR Flow
```
PR (open, tier0_pass=1)
├─ Triage (Haiku/OpenRouter) → DEEP / STANDARD / LIGHT
├─ Domain Review (Sonnet/Claude Max → overflow GPT-4o/OpenRouter)
│ ├─ REJECT → status='open', feedback stored, Leo skipped
│ └─ APPROVE → continue to Leo
├─ Leo Review (Opus/Claude Max → overflow: queue only)
│ ├─ REJECT → status='open', feedback stored
│ └─ APPROVE → continue
├─ LIGHT tier: Leo skipped, domain-only gate
├─ Both approve → formal Forgejo approvals (2 agent tokens) → status='approved'
└─ Musings bypass: PRs touching only agents/*/musings/ auto-approve
```
### Model Routing
| Stage | Primary | Overflow | Policy |
|-------|---------|----------|--------|
| Triage | Haiku (OpenRouter) | — | Always API |
| Domain review | Sonnet (Claude Max) | GPT-4o (OpenRouter) | `overflow` |
| Leo review | Opus (Claude Max) | — | `queue` (never overflow) |
| DEEP cross-family | GPT-4o (OpenRouter) | — | Always API (not yet implemented) |
**Claude Max** is a subscription — free but rate-limited. When rate-limited, the CLI returns `"You've hit your limit"` on **stdout** (not stderr) with exit code 1. The pipeline detects this and applies the overflow policy.
**Key design principle**: Opus is the scarce resource. Domain review (Sonnet) filters first — high volume, catches most issues. Leo review (Opus) only sees pre-filtered PRs. This maximizes value per scarce Opus call.
### Domain Routing
Domain detection reads diff file paths (`domains/`, `entities/`, `core/`, `foundations/`) and maps to the responsible agent:
| Domain | Agent |
|--------|-------|
| internet-finance, mechanisms, living-capital, teleological-economics | Rio |
| entertainment, cultural-dynamics | Clay |
| ai-alignment, living-agents, critical-systems, collective-intelligence | Theseus |
| health | Vida |
| space-development | Astra |
| teleohumanity, grand-strategy | Leo |
### Backoff and Resume
- **10-minute backoff**: PRs attempted within the last 10 minutes are skipped (prevents retry storms during rate limits)
- **Domain review resume**: If domain review completed but Leo review was rate-limited, domain review is skipped on retry (no wasted OpenRouter calls)
- **`last_attempt` tracking**: Set at the start of `evaluate_pr`, persists through status revert
### Review Attribution
- Domain review comments post from the domain agent's Forgejo account (e.g., Rio posts Rio's review)
- Leo review comments post from Leo's Forgejo account
- Formal approvals come from 2 agent tokens (not the PR author)
### Verdict Parsing
Reviews end with HTML comment tags:
```
<!-- VERDICT:RIO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
<!-- ISSUES: broken_wiki_links, confidence_miscalibration -->
```
---
## Stage 3: Merge (`lib/merge.py`)
Domain-serialized priority queue with rebase-before-merge.
### Design
- **Domain serialization**: Same-domain merges are serial (prevents `_map.md` conflicts). Cross-domain merges are parallel.
- **Two-layer locking**: `asyncio.Lock` per domain (fast path, lost on crash) + `prs.status='merging'` in SQLite (durable, crash recovery)
- **NOT EXISTS subquery**: SQL defense-in-depth prevents two PRs in the same domain from merging simultaneously
### Merge Flow
```
1. Discover external PRs (pagination over Forgejo API)
- Detect origin: pipeline vs human (by author login)
- Human PRs: priority='high', ack comment posted
2. For each domain with approved PRs:
a. Claim next PR (atomic UPDATE...RETURNING with priority queue)
b. Create git worktree at /tmp/teleo-merge-{branch}
c. Capture expected SHA (pin for force-with-lease)
d. Fetch origin/main, check if rebase needed
e. Rebase onto main (abort on conflict → status='conflict')
f. Force-push with --force-with-lease={branch}:{expected_sha}
g. Merge via Forgejo API
h. Delete remote branch
i. Cleanup worktree
```
### Priority Queue
```sql
COALESCE(p.priority, s.priority, 'medium')
-- PR-level priority > source-level priority > default 'medium'
-- NULL falls to ELSE 4 (intentionally below explicit medium)
```
| Priority | Value | Use |
|----------|-------|-----|
| critical | 0 | Reserved for explicit human override |
| high | 1 | Human-submitted PRs |
| medium | 2 | Standard pipeline PRs |
| low | 3 | Explicitly deprioritized |
| NULL | 4 | Unclassified (below medium) |
### Timeouts
- **Merge timeout**: 5 minutes per PR. Exceeding → `status='conflict'`
- **Rebase timeout**: 2 minutes
- **Push timeout**: 30 seconds
- **API merge failure**: Sets `status='conflict'` (not `approved` — prevents infinite retry)
---
## Database Schema
SQLite WAL mode. Schema version 2.
### Tables
**`sources`** — Source material pipeline
- `path` (PK), `status`, `priority`, `extraction_model`, `claims_count`, `pr_number`
- `transient_retries`, `substantive_retries`, `last_error`, `feedback`
**`prs`** — Pull request lifecycle
- `number` (PK), `source_path`, `branch`, `status`, `domain`, `tier`
- `tier0_pass`, `leo_verdict`, `domain_verdict`, `domain_agent`, `domain_model`
- `priority`, `origin` (pipeline/human), `last_attempt`
**`costs`** — API spend tracking
- `(date, model, stage)` (composite PK), `calls`, `input_tokens`, `output_tokens`, `cost_usd`
**`circuit_breakers`** — Per-stage health
- `name` (PK), `state` (closed/open/halfopen), `failures`, `successes`, `last_success_at`
**`audit_log`** — Event log
- `id`, `timestamp`, `stage`, `event`, `detail` (JSON)
### PR Status Lifecycle
```
open → validating → open (tier0_pass set)
→ reviewing → approved → merging → merged
→ open (rejected, feedback stored)
→ conflict (rebase/merge failed)
→ zombie (stuck, manual intervention)
```
---
## Health API
`GET localhost:8080/health` returns:
```json
{
"status": "healthy|degraded|stalled",
"breakers": {
"ingest": {"state": "closed", "failures": 0},
"validate": {"state": "closed", "failures": 0, "last_success_age_s": 30, "stalled": false},
"evaluate": {"state": "closed", "failures": 0, "last_success_age_s": 45, "stalled": false},
"merge": {"state": "closed", "failures": 0}
},
"sources": {"unprocessed": 10, "extracting": 2},
"prs": {"open": 117, "approved": 5, "merging": 1},
"merge_queue_by_domain": {"internet-finance": 3, "health": 2},
"budget": {"ok": true, "spend": 1.23, "budget": 20.0, "pct": 6.2},
"metabolic": {
"null_result_rate_24h": 0.05,
"domain_approval_rate_24h": 0.96,
"leo_approval_rate_24h": 0.85
}
}
```
**Stall detection**: If `now() - last_success_at > 2 * interval`, the stage is stalled.
---
## Circuit Breakers
Each stage has an independent circuit breaker:
- **Closed** (normal): All requests pass
- **Open** (tripped): Requests blocked for `BREAKER_COOLDOWN` (15 min)
- **Half-open**: One test request allowed; success → closed, failure → open
Triggers: 5 consecutive failures trip the breaker. Worker count reduces under pressure.
---
## Cost Management
- **Daily budget**: $20 USD (OpenRouter)
- **Warning threshold**: 80% of budget
- **Claude Max**: Free (tracked for volume, cost = $0)
- **Budget check**: Health API reports spend, pipeline can pause extraction when budget exhausted
---
## Known Issues and Deferred Work
### Active Issues
1. **PR #702 in `conflict`**: Archive-only PR, Forgejo returned 500 on merge API. Likely needs manual merge or close.
2. **36 PRs failed Tier 0**: Will not enter eval. Need either re-extraction or closure.
3. **Domain-rejected PR limbo** (Ganymede warning #4): PRs rejected by domain review have `status='open'` but exit the eval queue. No path to re-extraction or closure. Needs `domain_rejected` status or auto-close mechanism.
4. **DEEP cross-family review not implemented** (Ganymede warning #5): Docstring promises GPT-4o adversarial review for DEEP PRs after both domain and Leo approve. Not in code.
5. **Sonnet leniency tracking**: 96% domain approval rate. Need to measure Opus disagreement rate when it comes online (Mar 13, 5pm UTC). If Opus rejects >15% of domain-approved PRs, domain prompt needs tightening.
### Deferred Nits
- `entity_diff` from `_filter_diff()` is returned but unused
- Formal approvals use hardcoded agent order instead of actual reviewers
- `aiohttp.ClientSession` created per API call (should be one per cycle)
### Phase 4: Ingest Module (`lib/ingest.py`)
Not yet built. Will port `extract-cron.sh` + `extract-worker.sh`. When complete, the remaining v1 cron scripts can be disabled.
### Phase 5: Integration + Cutover
Full pipeline test with all 4 stages. Disable remaining cron scripts. Re-enable research sessions.
---
## Operational Runbook
### Check pipeline health
```bash
ssh root@77.42.65.182 'curl -s localhost:8080/health | python3 -m json.tool'
```
### View logs
```bash
ssh root@77.42.65.182 'journalctl -u teleo-pipeline -f' # live
ssh root@77.42.65.182 'journalctl -u teleo-pipeline -n 50' # recent
ssh root@77.42.65.182 'journalctl -u teleo-pipeline --since "1 hour ago"'
```
### Restart pipeline
```bash
ssh root@77.42.65.182 'systemctl restart teleo-pipeline'
```
### Query database
```bash
ssh root@77.42.65.182 'sqlite3 /opt/teleo-eval/pipeline/pipeline.db "SELECT status, count(*) FROM prs GROUP BY status"'
```
### Deploy code changes
```bash
scp lib/evaluate.py root@77.42.65.182:/opt/teleo-eval/pipeline/lib/evaluate.py
ssh root@77.42.65.182 'chown teleo:teleo /opt/teleo-eval/pipeline/lib/evaluate.py && systemctl restart teleo-pipeline'
```
### Reset a stuck PR
```bash
ssh root@77.42.65.182 'sqlite3 /opt/teleo-eval/pipeline/pipeline.db "UPDATE prs SET status = \"open\", leo_verdict = \"pending\", domain_verdict = \"pending\" WHERE number = 702"'
```
### Check circuit breakers
```bash
ssh root@77.42.65.182 'sqlite3 /opt/teleo-eval/pipeline/pipeline.db "SELECT * FROM circuit_breakers"'
```
### View cost breakdown
```bash
ssh root@77.42.65.182 'sqlite3 /opt/teleo-eval/pipeline/pipeline.db "SELECT model, stage, calls, cost_usd FROM costs WHERE date = date(\"now\") ORDER BY cost_usd DESC"'
```

0
lib/__init__.py Normal file
View file

139
lib/breaker.py Normal file
View file

@ -0,0 +1,139 @@
"""Circuit breaker state machine — per-stage, backed by SQLite."""
import logging
from datetime import datetime, timezone
from . import config
logger = logging.getLogger("pipeline.breaker")
# States
CLOSED = "closed"
OPEN = "open"
HALFOPEN = "halfopen"
class CircuitBreaker:
"""Per-stage circuit breaker.
CLOSED: normal operation
OPEN: stage paused (threshold consecutive failures reached)
HALFOPEN: cooldown expired, try 1 worker to probe recovery
"""
def __init__(self, name: str, conn):
self.name = name
self.conn = conn
self._ensure_row()
def _ensure_row(self):
self.conn.execute(
"INSERT OR IGNORE INTO circuit_breakers (name) VALUES (?)",
(self.name,),
)
def _get_state(self) -> dict:
row = self.conn.execute(
"SELECT state, failures, successes, tripped_at, last_success_at FROM circuit_breakers WHERE name = ?",
(self.name,),
).fetchone()
return dict(row) if row else {"state": CLOSED, "failures": 0, "successes": 0, "tripped_at": None, "last_success_at": None}
def _set_state(self, state: str, failures: int = None, successes: int = None,
tripped_at: str = None, last_success_at: str = None):
updates = ["state = ?", "last_update = datetime('now')"]
params = [state]
if failures is not None:
updates.append("failures = ?")
params.append(failures)
if successes is not None:
updates.append("successes = ?")
params.append(successes)
if tripped_at is not None:
updates.append("tripped_at = ?")
params.append(tripped_at)
if last_success_at is not None:
updates.append("last_success_at = ?")
params.append(last_success_at)
params.append(self.name)
self.conn.execute(
f"UPDATE circuit_breakers SET {', '.join(updates)} WHERE name = ?",
params,
)
def allow_request(self) -> bool:
"""Check if requests are allowed. Returns True if CLOSED or HALFOPEN."""
s = self._get_state()
if s["state"] == CLOSED:
return True
if s["state"] == OPEN:
# Check cooldown
if s["tripped_at"]:
tripped = datetime.fromisoformat(s["tripped_at"])
if tripped.tzinfo is None:
tripped = tripped.replace(tzinfo=timezone.utc)
elapsed = (datetime.now(timezone.utc) - tripped).total_seconds()
if elapsed >= config.BREAKER_COOLDOWN:
logger.info("Breaker %s: cooldown expired, entering HALFOPEN", self.name)
self._set_state(HALFOPEN, successes=0)
return True
return False
# HALFOPEN — allow one probe
return True
def max_workers(self) -> int:
"""Return max workers allowed in current state."""
s = self._get_state()
if s["state"] == HALFOPEN:
return 1 # probe with single worker
return None # no restriction from breaker
def record_success(self):
"""Record a successful cycle. Updates last_success_at for stall detection (Vida)."""
s = self._get_state()
now = datetime.now(timezone.utc).isoformat()
if s["state"] == HALFOPEN:
logger.info("Breaker %s: HALFOPEN probe succeeded, closing", self.name)
self._set_state(CLOSED, failures=0, successes=0, last_success_at=now)
elif s["state"] == CLOSED:
if s["failures"] > 0:
self._set_state(CLOSED, failures=0, last_success_at=now)
else:
self._set_state(CLOSED, last_success_at=now)
def record_failure(self):
"""Record a failed cycle."""
s = self._get_state()
if s["state"] == HALFOPEN:
logger.warning("Breaker %s: HALFOPEN probe failed, reopening", self.name)
self._set_state(
OPEN,
failures=s["failures"] + 1,
tripped_at=datetime.now(timezone.utc).isoformat(),
)
elif s["state"] == CLOSED:
new_failures = s["failures"] + 1
if new_failures >= config.BREAKER_THRESHOLD:
logger.warning(
"Breaker %s: threshold reached (%d failures), opening",
self.name, new_failures,
)
self._set_state(
OPEN,
failures=new_failures,
tripped_at=datetime.now(timezone.utc).isoformat(),
)
else:
self._set_state(CLOSED, failures=new_failures)
elif s["state"] == OPEN:
self._set_state(OPEN, failures=s["failures"] + 1)
def reset(self):
"""Force reset to CLOSED."""
logger.info("Breaker %s: force reset to CLOSED", self.name)
self._set_state(CLOSED, failures=0, successes=0)

116
lib/config.py Normal file
View file

@ -0,0 +1,116 @@
"""Pipeline v2 configuration — all constants and thresholds."""
import os
from pathlib import Path
# --- Paths ---
BASE_DIR = Path(os.environ.get("PIPELINE_BASE", "/opt/teleo-eval"))
REPO_DIR = BASE_DIR / "workspaces" / "teleo-codex.git"
MAIN_WORKTREE = BASE_DIR / "workspaces" / "main"
SECRETS_DIR = BASE_DIR / "secrets"
LOG_DIR = BASE_DIR / "logs"
DB_PATH = BASE_DIR / "pipeline" / "pipeline.db"
INBOX_ARCHIVE = "inbox/archive"
# --- Forgejo ---
FORGEJO_URL = os.environ.get("FORGEJO_URL", "http://localhost:3000")
FORGEJO_OWNER = "teleo"
FORGEJO_REPO = "teleo-codex"
FORGEJO_TOKEN_FILE = SECRETS_DIR / "forgejo-admin-token"
FORGEJO_PIPELINE_USER = "teleo" # git user for pipeline commits
# --- Models ---
CLAUDE_CLI = os.environ.get("CLAUDE_CLI", "/home/teleo/.local/bin/claude")
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
# Model IDs
MODEL_OPUS = "opus"
MODEL_SONNET = "sonnet"
MODEL_HAIKU = "anthropic/claude-3.5-haiku"
MODEL_GPT4O = "openai/gpt-4o"
# --- Model assignment per stage ---
# Principle: Opus is a scarce resource. Use it only where judgment quality matters.
# Sonnet handles volume. Haiku handles routing. Opus handles synthesis + critical eval.
#
# Pipeline eval ordering (domain-first, Leo-last):
# 1. Domain review → Sonnet (catches domain issues, evidence gaps — high volume filter)
# 2. Leo review → Opus (cross-domain synthesis, confidence calibration — only pre-filtered PRs)
# 3. DEEP cross-family → GPT-4o (adversarial blind-spot check — paid, highest-value claims only)
EXTRACT_MODEL = MODEL_SONNET # extraction: structured output, volume work
TRIAGE_MODEL = MODEL_HAIKU # triage: routing decision, cheapest
EVAL_DOMAIN_MODEL = MODEL_SONNET # domain review: high-volume filter
EVAL_LEO_MODEL = MODEL_OPUS # Leo review: scarce, high-value
EVAL_DEEP_MODEL = MODEL_GPT4O # DEEP cross-family: paid, adversarial
# --- Model backends ---
# Each model can run on Claude Max (subscription, base load) or API (overflow/spikes).
# Claude Max: free but rate-limited. API: paid but unlimited.
# When Claude Max is rate-limited, behavior per stage:
# "queue" — wait for capacity (preferred for non-urgent work)
# "overflow" — fall back to API (for time-sensitive work)
# "skip" — skip this cycle (for optional stages like sample audit)
OVERFLOW_POLICY = {
"extract": "queue", # extraction can wait
"triage": "overflow", # triage is cheap on API anyway
"eval_domain": "overflow", # domain review is the volume filter — don't let it bottleneck (Rhea)
"eval_leo": "queue", # Leo review is the bottleneck we protect
"eval_deep": "overflow", # DEEP is already on API
"sample_audit": "skip", # optional, skip if constrained
}
# OpenRouter cost rates per 1K tokens (only applies when using API, not Claude Max)
MODEL_COSTS = {
"opus": {"input": 0.015, "output": 0.075},
"sonnet": {"input": 0.003, "output": 0.015},
MODEL_HAIKU: {"input": 0.0008, "output": 0.004},
MODEL_GPT4O: {"input": 0.0025, "output": 0.01},
}
# --- Concurrency ---
MAX_EXTRACT_WORKERS = int(os.environ.get("MAX_EXTRACT_WORKERS", "5"))
MAX_EVAL_WORKERS = int(os.environ.get("MAX_EVAL_WORKERS", "7"))
MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain
# --- Timeouts (seconds) ---
EXTRACT_TIMEOUT = 600 # 10 min
EVAL_TIMEOUT = 300 # 5 min
MERGE_TIMEOUT = 300 # 5 min — force-reset to conflict if exceeded (Rhea)
CLAUDE_MAX_PROBE_TIMEOUT = 15
# --- Backpressure ---
BACKPRESSURE_HIGH = 40 # pause extraction above this
BACKPRESSURE_LOW = 20 # throttle extraction above this
BACKPRESSURE_THROTTLE_WORKERS = 2 # workers when throttled
# --- Retry budgets ---
TRANSIENT_RETRY_MAX = 5 # API timeouts, rate limits
SUBSTANTIVE_RETRY_STANDARD = 2 # reviewer request_changes
SUBSTANTIVE_RETRY_DEEP = 3
# --- Circuit breakers ---
BREAKER_THRESHOLD = 5
BREAKER_COOLDOWN = 900 # 15 min
# --- Cost budgets ---
OPENROUTER_DAILY_BUDGET = 20.0 # USD
OPENROUTER_WARN_THRESHOLD = 0.8 # 80% of budget
# --- Quality ---
SAMPLE_AUDIT_RATE = 0.10 # 10% of LIGHT merges
SAMPLE_AUDIT_DISAGREEMENT_THRESHOLD = 0.10 # 10% disagreement → tighten LIGHT criteria
# --- Polling intervals (seconds) ---
INGEST_INTERVAL = 60
VALIDATE_INTERVAL = 30
EVAL_INTERVAL = 30
MERGE_INTERVAL = 30
HEALTH_CHECK_INTERVAL = 60
# --- Health API ---
HEALTH_PORT = 8080
# --- Logging ---
LOG_FILE = LOG_DIR / "pipeline.jsonl"
LOG_ROTATION_MAX_BYTES = 50 * 1024 * 1024 # 50MB per file
LOG_ROTATION_BACKUP_COUNT = 7 # keep 7 days

88
lib/costs.py Normal file
View file

@ -0,0 +1,88 @@
"""Cost tracking — per-model per-day with budget enforcement."""
import logging
from datetime import date
from . import config
logger = logging.getLogger("pipeline.costs")
def record_usage(
conn,
model: str,
stage: str,
input_tokens: int = 0,
output_tokens: int = 0,
backend: str = "api",
):
"""Record usage and compute cost. Returns cost in USD.
backend: "max" (Claude Max subscription, free) or "api" (paid).
Claude Max calls are tracked for volume metrics but cost $0. (Ganymede)
"""
if backend == "max":
cost = 0.0
else:
rates = config.MODEL_COSTS.get(model)
if not rates:
logger.warning("No cost rates for model %s, recording zero cost", model)
cost = 0.0
else:
cost = (input_tokens * rates["input"] + output_tokens * rates["output"]) / 1000
today = date.today().isoformat()
# Include backend in the stage key so max vs api are tracked separately
stage_key = f"{stage}:{backend}" if backend != "api" else stage
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
VALUES (?, ?, ?, 1, ?, ?, ?)
ON CONFLICT (date, model, stage) DO UPDATE SET
calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd""",
(today, model, stage_key, input_tokens, output_tokens, cost),
)
return cost
def get_daily_spend(conn, day: str = None) -> float:
"""Get total OpenRouter spend for a given day (default: today)."""
if day is None:
day = date.today().isoformat()
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd), 0) as total FROM costs WHERE date = ?",
(day,),
).fetchone()
return row["total"]
def get_daily_breakdown(conn, day: str = None) -> list:
"""Get per-model per-stage breakdown for a day."""
if day is None:
day = date.today().isoformat()
rows = conn.execute(
"""SELECT model, stage, calls, input_tokens, output_tokens, cost_usd
FROM costs WHERE date = ? ORDER BY cost_usd DESC""",
(day,),
).fetchall()
return [dict(r) for r in rows]
def check_budget(conn) -> dict:
"""Check budget status. Returns {ok, spend, budget, pct}."""
spend = get_daily_spend(conn)
pct = spend / config.OPENROUTER_DAILY_BUDGET if config.OPENROUTER_DAILY_BUDGET > 0 else 0
return {
"ok": pct < 1.0,
"warn": pct >= config.OPENROUTER_WARN_THRESHOLD,
"spend": round(spend, 4),
"budget": config.OPENROUTER_DAILY_BUDGET,
"pct": round(pct * 100, 1),
}
def budget_allows(conn) -> bool:
"""Quick check: is spending under daily budget?"""
return check_budget(conn)["ok"]

228
lib/db.py Normal file
View file

@ -0,0 +1,228 @@
"""SQLite database — schema, migrations, connection management."""
import sqlite3
import json
import logging
from contextlib import contextmanager
from pathlib import Path
from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 2
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS sources (
path TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'unprocessed',
-- unprocessed, triaging, extracting, extracted, null_result,
-- needs_reextraction, error
priority TEXT DEFAULT 'medium',
-- critical, high, medium, low, skip
priority_log TEXT DEFAULT '[]',
-- JSON array: [{stage, priority, reasoning, ts}]
extraction_model TEXT,
claims_count INTEGER DEFAULT 0,
pr_number INTEGER,
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
feedback TEXT,
-- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS prs (
number INTEGER PRIMARY KEY,
source_path TEXT REFERENCES sources(path),
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
-- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict
-- conflict: rebase failed or merge timed out needs human intervention
domain TEXT,
agent TEXT,
tier TEXT,
-- LIGHT, STANDARD, DEEP
tier0_pass INTEGER,
-- 0/1
leo_verdict TEXT DEFAULT 'pending',
-- pending, approve, request_changes, skipped, failed
domain_verdict TEXT DEFAULT 'pending',
domain_agent TEXT,
domain_model TEXT,
priority TEXT,
-- NULL = inherit from source. Set explicitly for human-submitted PRs.
-- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium')
-- Human PRs: 'critical' (detected via missing source_path or non-agent author)
origin TEXT DEFAULT 'pipeline',
-- pipeline | human | external
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
CREATE TABLE IF NOT EXISTS costs (
date TEXT,
model TEXT,
stage TEXT,
calls INTEGER DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
PRIMARY KEY (date, model, stage)
);
CREATE TABLE IF NOT EXISTS circuit_breakers (
name TEXT PRIMARY KEY,
state TEXT DEFAULT 'closed',
-- closed, open, halfopen
failures INTEGER DEFAULT 0,
successes INTEGER DEFAULT 0,
tripped_at TEXT,
last_success_at TEXT,
-- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida)
last_update TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
stage TEXT,
event TEXT,
detail TEXT
);
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status);
CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain);
CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date);
CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
"""
def get_connection(readonly: bool = False) -> sqlite3.Connection:
"""Create a SQLite connection with WAL mode and proper settings."""
config.DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(
str(config.DB_PATH),
timeout=30,
isolation_level=None, # autocommit — we manage transactions explicitly
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
conn.execute("PRAGMA foreign_keys=ON")
if readonly:
conn.execute("PRAGMA query_only=ON")
return conn
@contextmanager
def transaction(conn: sqlite3.Connection):
"""Context manager for explicit transactions."""
conn.execute("BEGIN")
try:
yield conn
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def migrate(conn: sqlite3.Connection):
"""Run schema migrations."""
conn.executescript(SCHEMA_SQL)
# Check current version
try:
row = conn.execute(
"SELECT MAX(version) as v FROM schema_version"
).fetchone()
current = row["v"] if row and row["v"] else 0
except sqlite3.OperationalError:
current = 0
# --- Incremental migrations ---
if current < 2:
# Phase 2: add multiplayer columns to prs table
for stmt in [
"ALTER TABLE prs ADD COLUMN priority TEXT",
"ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'",
"ALTER TABLE prs ADD COLUMN last_error TEXT",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v2: added priority, origin, last_error to prs")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
(SCHEMA_VERSION,),
)
logger.info("Database migrated to schema version %d", SCHEMA_VERSION)
else:
logger.debug("Database at schema version %d", current)
def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None):
"""Write an audit log entry."""
conn.execute(
"INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)",
(stage, event, detail),
)
def append_priority_log(
conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str
):
"""Append a priority assessment to a source's priority_log.
NOTE: This does NOT update the source's priority column. The priority column
is the authoritative priority, set only by initial triage or human override.
The priority_log records each stage's opinion for offline calibration analysis.
(Bug caught by Theseus original version overwrote priority with each stage's opinion.)
(Race condition fix per Vida read-then-write wrapped in transaction.)
"""
conn.execute("BEGIN")
try:
row = conn.execute(
"SELECT priority_log FROM sources WHERE path = ?", (path,)
).fetchone()
if not row:
conn.execute("ROLLBACK")
return
log = json.loads(row["priority_log"] or "[]")
log.append(
{"stage": stage, "priority": priority, "reasoning": reasoning}
)
conn.execute(
"UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?",
(json.dumps(log), path),
)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"):
"""Set a source's authoritative priority. Used for human overrides and initial triage."""
conn.execute(
"UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?",
(priority, path),
)
append_priority_log(conn, path, "override", priority, reason)

725
lib/evaluate.py Normal file
View file

@ -0,0 +1,725 @@
"""Evaluate stage — triage + domain review + Leo review.
Ported from eval-worker.sh. Key architectural change: domain-first, Leo-last.
Sonnet (domain review) filters before Opus (Leo review) to maximize value per
scarce Opus call.
Flow per PR:
1. Triage Haiku (OpenRouter) DEEP / STANDARD / LIGHT
2. Domain review Sonnet (Claude Max, overflow: OpenRouter GPT-4o)
3. Leo review Opus (Claude Max, overflow: queue) skipped for LIGHT
4. DEEP cross-family GPT-4o (OpenRouter) only if domain + Leo both approve
5. Post reviews, submit formal Forgejo approvals, update SQLite
6. If both approve status = 'approved' (merge module picks it up)
Design reviewed by Ganymede, Rhea, Vida, Theseus.
"""
import asyncio
import json
import logging
import re
from datetime import datetime, timezone
from . import config, db
logger = logging.getLogger("pipeline.evaluate")
# ─── Constants ──────────────────────────────────────────────────────────────
DOMAIN_AGENT_MAP = {
"internet-finance": "Rio",
"entertainment": "Clay",
"health": "Vida",
"ai-alignment": "Theseus",
"space-development": "Astra",
"mechanisms": "Rio",
"living-capital": "Rio",
"living-agents": "Theseus",
"teleohumanity": "Leo",
"grand-strategy": "Leo",
"critical-systems": "Theseus",
"collective-intelligence": "Theseus",
"teleological-economics": "Rio",
"cultural-dynamics": "Clay",
}
def _agent_token(agent_name: str) -> str | None:
"""Read Forgejo token for a named agent. Returns token string or None."""
token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token"
if token_file.exists():
return token_file.read_text().strip()
return None
REVIEW_STYLE_GUIDE = (
"Be concise. Only mention what fails or is interesting. "
"Do not summarize what the PR does — the diff speaks for itself. "
"If everything passes, say so in one line and approve."
)
# ─── Prompt templates ──────────────────────────────────────────────────────
TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT.
DEEP use when ANY of these apply:
- PR adds or modifies claims rated "likely" or higher confidence
- PR touches agent beliefs or creates cross-domain wiki links
- PR challenges an existing claim (has "challenged_by" or contradicts existing)
- PR modifies axiom-level beliefs
- PR is a cross-domain synthesis claim
STANDARD use when:
- New claims in established domain areas
- Enrichments to existing claims (confirm/extend)
- New hypothesis-level beliefs
- Source archives with extraction results
LIGHT use ONLY when ALL changes fit these categories:
- Entity attribute updates (factual corrections, new data points)
- Source archiving without extraction
- Formatting fixes, typo corrections
- Status field changes
IMPORTANT: When uncertain, classify UP, not down. Always err toward more review.
Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line.
--- PR DIFF ---
{diff}"""
DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base.
Review this PR from your domain expertise:
1. Technical accuracy are the claims factually correct in your domain?
2. Domain duplicates does your domain already have substantially similar claims?
3. Missing context is important domain context absent that would change interpretation?
4. Confidence calibration from your domain expertise, is the confidence level right?
5. Enrichment opportunities should this connect to existing claims via wiki links?
{style_guide}
If you are requesting changes, tag the specific issues:
<!-- ISSUES: tag1, tag2 -->
Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by
End your review with exactly one of:
<!-- VERDICT:{agent_upper}:APPROVE -->
<!-- VERDICT:{agent_upper}:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR against the quality criteria:
1. Schema compliance YAML frontmatter, prose-as-title, required fields
2. Duplicate check does this claim already exist?
3. Confidence calibration appropriate for the evidence?
4. Wiki link validity references real claims?
5. Source quality credible for the claim?
6. Domain assignment correct domain?
7. Epistemic hygiene specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check:
1. Cross-domain implications does this claim affect beliefs in other domains?
2. Confidence calibration is the confidence level justified by the evidence?
3. Contradiction check does this contradict any existing claims without explicit argument?
4. Wiki link validity do all wiki links reference real, existing claims?
5. Axiom integrity if touching axiom-level beliefs, is the justification extraordinary?
6. Source quality is the source credible for the claim being made?
7. Duplicate check does a substantially similar claim already exist?
8. Enrichment vs new claim should this be an enrichment to an existing claim instead?
9. Domain assignment is the claim in the correct domain?
10. Schema compliance YAML frontmatter, prose-as-title format, required fields
11. Epistemic hygiene is the claim specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
# ─── API helpers ───────────────────────────────────────────────────────────
async def _forgejo_api(method: str, path: str, body: dict = None, token: str = None):
"""Call Forgejo API."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1{path}"
if token is None:
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession() as session:
async with session.request(method, url, headers=headers,
json=body, timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("Forgejo API %s %s%d: %s", method, path, resp.status, text[:200])
return None
if resp.status == 204:
return {}
return await resp.json()
except Exception as e:
logger.error("Forgejo API error: %s %s%s", method, path, e)
return None
async def _openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None:
"""Call OpenRouter API. Returns response text or None on failure."""
import aiohttp
key_file = config.SECRETS_DIR / "openrouter-key"
if not key_file.exists():
logger.error("OpenRouter key file not found")
return None
key = key_file.read_text().strip()
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 4096,
"temperature": 0.2,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
config.OPENROUTER_URL,
headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
json=payload,
timeout=aiohttp.ClientTimeout(total=timeout_sec),
) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("OpenRouter %s%d: %s", model, resp.status, text[:200])
return None
data = await resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content")
except Exception as e:
logger.error("OpenRouter error: %s%s", model, e)
return None
async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None:
"""Call Claude via CLI (Claude Max subscription). Returns response or None."""
proc = await asyncio.create_subprocess_exec(
str(config.CLAUDE_CLI), "-p", "--model", model, "--output-format", "text",
cwd=cwd or str(config.REPO_DIR),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
timeout=timeout_sec,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
logger.error("Claude CLI timed out after %ds", timeout_sec)
return None
if proc.returncode != 0:
err = (stderr or b"").decode()[:500]
out = (stdout or b"").decode()[:500]
# Check for rate limit — Claude CLI puts limit message on stdout, not stderr
combined = (err + out).lower()
if "hit your limit" in combined or "rate limit" in combined:
logger.warning("Claude Max rate limited (stdout: %s)", out[:200])
return "RATE_LIMITED"
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err[:200], out[:200])
return None
return (stdout or b"").decode().strip()
# ─── Diff helpers ──────────────────────────────────────────────────────────
async def _get_pr_diff(pr_number: int) -> str:
"""Fetch PR diff via Forgejo API."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff"
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url,
headers={"Authorization": f"token {token}", "Accept": "text/plain"},
timeout=aiohttp.ClientTimeout(total=60),
) as resp:
if resp.status >= 400:
return ""
diff = await resp.text()
if len(diff) > 2_000_000:
return ""
return diff
except Exception as e:
logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e)
return ""
def _filter_diff(diff: str) -> tuple[str, str]:
"""Filter diff to only review-relevant files.
Returns (review_diff, entity_diff).
Strips: inbox/archive/, schemas/, skills/, agents/*/musings/
"""
sections = re.split(r'(?=^diff --git )', diff, flags=re.MULTILINE)
skip_patterns = [r'^diff --git a/(inbox/archive|schemas|skills|agents/[^/]+/musings)/']
core_domains = {'living-agents', 'living-capital', 'teleohumanity', 'mechanisms'}
claim_sections = []
entity_sections = []
for section in sections:
if not section.strip():
continue
if any(re.match(p, section) for p in skip_patterns):
continue
entity_match = re.match(r'^diff --git a/entities/([^/]+)/', section)
if entity_match and entity_match.group(1) not in core_domains:
entity_sections.append(section)
continue
claim_sections.append(section)
return ''.join(claim_sections), ''.join(entity_sections)
def _extract_changed_files(diff: str) -> str:
"""Extract changed file paths from diff."""
return "\n".join(
line.replace("diff --git a/", "").split(" b/")[0]
for line in diff.split("\n")
if line.startswith("diff --git")
)
def _detect_domain_from_diff(diff: str) -> str | None:
"""Detect primary domain from changed file paths.
Checks domains/, entities/, core/, foundations/ for domain classification.
"""
domain_counts: dict[str, int] = {}
for line in diff.split("\n"):
if line.startswith("diff --git"):
# Check domains/ and entities/ (both carry domain info)
match = re.search(r'(?:domains|entities)/([^/]+)/', line)
if match:
d = match.group(1)
domain_counts[d] = domain_counts.get(d, 0) + 1
continue
# Check core/ subdirectories
match = re.search(r'core/([^/]+)/', line)
if match:
d = match.group(1)
if d in DOMAIN_AGENT_MAP:
domain_counts[d] = domain_counts.get(d, 0) + 1
continue
# Check foundations/ subdirectories
match = re.search(r'foundations/([^/]+)/', line)
if match:
d = match.group(1)
if d in DOMAIN_AGENT_MAP:
domain_counts[d] = domain_counts.get(d, 0) + 1
if domain_counts:
return max(domain_counts, key=domain_counts.get)
return None
def _is_musings_only(diff: str) -> bool:
"""Check if PR only modifies musing files."""
has_musings = False
has_other = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
if "agents/" in line and "/musings/" in line:
has_musings = True
else:
has_other = True
return has_musings and not has_other
# ─── Verdict parsing ──────────────────────────────────────────────────────
def _parse_verdict(review_text: str, reviewer: str) -> str:
"""Parse VERDICT tag from review. Returns 'approve' or 'request_changes'."""
upper = reviewer.upper()
if f"VERDICT:{upper}:APPROVE" in review_text:
return "approve"
elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text:
return "request_changes"
else:
logger.warning("No parseable verdict from %s — treating as request_changes", reviewer)
return "request_changes"
def _parse_issues(review_text: str) -> list[str]:
"""Extract issue tags from review."""
match = re.search(r'<!-- ISSUES: ([^>]+) -->', review_text)
if not match:
return []
return [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
# ─── Review execution ─────────────────────────────────────────────────────
async def _triage_pr(diff: str) -> str:
"""Triage PR via Haiku → DEEP/STANDARD/LIGHT."""
prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage
result = await _openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30)
if not result:
logger.warning("Triage failed, defaulting to STANDARD")
return "STANDARD"
tier = result.split("\n")[0].strip().upper()
if tier in ("DEEP", "STANDARD", "LIGHT"):
reason = result.split("\n")[1].strip() if "\n" in result else ""
logger.info("Triage: %s%s", tier, reason[:100])
return tier
logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20])
return "STANDARD"
async def _run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None:
"""Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o."""
prompt = DOMAIN_PROMPT.format(
agent=agent, agent_upper=agent.upper(), domain=domain,
style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files,
)
# Try Claude Max Sonnet first
result = await _claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck)
policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow")
if policy == "overflow":
logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o")
result = await _openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
else:
logger.info("Claude Max rate limited, queuing domain review")
return None
return result
async def _run_leo_review(diff: str, files: str, tier: str) -> str | None:
"""Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy)."""
prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD
prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files)
result = await _claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Leo review queues — don't waste Opus calls (never overflow)
logger.info("Claude Max Opus rate limited, queuing Leo review")
return None
return result
async def _post_formal_approvals(pr_number: int, pr_author: str):
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token_file = config.SECRETS_DIR / f"forgejo-{agent_name}-token"
if token_file.exists():
token = token_file.read_text().strip()
result = await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/reviews",
{"body": "Approved.", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
# ─── Single PR evaluation ─────────────────────────────────────────────────
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
"""Evaluate a single PR. Returns result dict."""
# Fetch diff
diff = await _get_pr_diff(pr_number)
if not diff:
return {"pr": pr_number, "skipped": True, "reason": "no_diff"}
# Musings bypass
if _is_musings_only(diff):
logger.info("PR #%d is musings-only — auto-approving", pr_number)
await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
{"body": "Auto-approved: musings bypass eval per collective policy."},
)
conn.execute(
"""UPDATE prs SET status = 'approved', leo_verdict = 'skipped',
domain_verdict = 'skipped' WHERE number = ?""",
(pr_number,),
)
return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"}
# Filter diff
review_diff, entity_diff = _filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
# Detect domain
domain = _detect_domain_from_diff(diff)
agent = DOMAIN_AGENT_MAP.get(domain, "Leo") if domain else "Leo"
# Default NULL domain to 'general' (archive-only PRs have no domain files)
if domain is None:
domain = "general"
# Update PR domain if not set
conn.execute(
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number = ?",
(domain, agent, pr_number),
)
# Step 1: Triage (if not already triaged)
if tier is None:
tier = await _triage_pr(diff)
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
# Mark as reviewing
conn.execute(
"UPDATE prs SET status = 'reviewing', last_attempt = datetime('now') WHERE number = ?",
(pr_number,),
)
# Check if domain review already completed (resuming after Leo rate limit)
existing = conn.execute(
"SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,)
).fetchone()
existing_domain_verdict = existing["domain_verdict"] if existing else "pending"
existing_leo_verdict = existing["leo_verdict"] if existing else "pending"
# Step 2: Domain review FIRST (Sonnet — high volume filter)
# Skip if already completed from a previous attempt
if existing_domain_verdict not in ("pending", None):
domain_verdict = existing_domain_verdict
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
else:
logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier)
domain_review = await _run_domain_review(review_diff, files, domain or "general", agent)
if domain_review is None:
# Rate limited, couldn't overflow — revert to open for retry
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "rate_limited"}
domain_verdict = _parse_verdict(domain_review, agent)
conn.execute(
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number),
)
# Post domain review as comment (from agent's Forgejo account)
agent_tok = _agent_token(agent)
await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
{"body": domain_review},
token=agent_tok,
)
# If domain review rejects, skip Leo review (save Opus)
if domain_verdict == "request_changes":
logger.info("PR #%d: domain rejected, skipping Leo review", pr_number)
conn.execute(
"""UPDATE prs SET status = 'open', leo_verdict = 'skipped',
last_error = 'domain review requested changes'
WHERE number = ?""",
(pr_number,),
)
db.audit(conn, "evaluate", "domain_rejected",
json.dumps({"pr": pr_number, "agent": agent}))
return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped"}
# Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT)
leo_verdict = "skipped"
if tier != "LIGHT":
logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier)
leo_review = await _run_leo_review(review_diff, files, tier)
if leo_review is None:
# Opus rate limited — revert to open for retry (keep domain verdict)
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "opus_rate_limited"}
leo_verdict = _parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number))
# Post Leo review as comment (from Leo's Forgejo account)
leo_tok = _agent_token("Leo")
await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
{"body": leo_review},
token=leo_tok,
)
else:
# LIGHT tier: Leo is auto-skipped, domain verdict is the only gate
conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,))
# Step 4: Determine final verdict
both_approve = (
(leo_verdict == "approve" or leo_verdict == "skipped")
and domain_verdict == "approve"
)
if both_approve:
# Get PR author for formal approvals
pr_info = await _forgejo_api(
"GET",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}",
)
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
# Submit formal Forgejo reviews (required for merge)
await _post_formal_approvals(pr_number, pr_author)
conn.execute(
"UPDATE prs SET status = 'approved' WHERE number = ?",
(pr_number,),
)
db.audit(conn, "evaluate", "approved",
json.dumps({"pr": pr_number, "tier": tier, "domain": domain,
"leo": leo_verdict, "domain_agent": agent}))
logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)",
pr_number, tier, leo_verdict, domain_verdict)
else:
conn.execute(
"UPDATE prs SET status = 'open' WHERE number = ?",
(pr_number,),
)
# Store feedback for re-extraction path
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier}
if domain_verdict == "request_changes":
feedback["domain_issues"] = _parse_issues(domain_review)
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
(json.dumps(feedback), pr_number),
)
db.audit(conn, "evaluate", "changes_requested",
json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict,
"domain": domain_verdict}))
logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s)",
pr_number, leo_verdict, domain_verdict)
# Record cost (domain review)
from . import costs
costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="max")
if tier != "LIGHT":
costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max")
return {
"pr": pr_number, "tier": tier, "domain": domain,
"leo_verdict": leo_verdict, "domain_verdict": domain_verdict,
"approved": both_approve,
}
# ─── Main entry point ──────────────────────────────────────────────────────
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one evaluation cycle.
Finds PRs with status='open', tier0_pass=1, and no pending verdicts.
Evaluates in priority order.
"""
# Find PRs ready for evaluation:
# - status = 'open'
# - tier0_pass = 1 (passed validation)
# - leo_verdict = 'pending' OR domain_verdict = 'pending'
# Skip PRs attempted within last 10 minutes (backoff during rate limits)
rows = conn.execute(
"""SELECT p.number, p.tier FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.tier0_pass = 1
AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')
AND (p.last_attempt IS NULL
OR p.last_attempt < datetime('now', '-10 minutes'))
ORDER BY
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
p.created_at ASC
LIMIT ?""",
(max_workers or config.MAX_EVAL_WORKERS,),
).fetchall()
if not rows:
return 0, 0
succeeded = 0
failed = 0
for row in rows:
try:
result = await evaluate_pr(conn, row["number"], tier=row["tier"])
if result.get("skipped"):
# Rate limited — don't count as failure, just skip
logger.debug("PR #%d skipped: %s", row["number"], result.get("reason"))
else:
succeeded += 1
except Exception:
logger.exception("Failed to evaluate PR #%d", row["number"])
failed += 1
# Revert to open on unhandled error
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],))
if succeeded or failed:
logger.info("Evaluate cycle: %d evaluated, %d errors", succeeded, failed)
return succeeded, failed

237
lib/health.py Normal file
View file

@ -0,0 +1,237 @@
"""Health API — HTTP server on configurable port for monitoring."""
import logging
from aiohttp import web
from datetime import date, datetime, timezone
from . import config, db, costs
logger = logging.getLogger("pipeline.health")
def _conn(request):
"""Get the persistent readonly connection from app state."""
return request.app["db"]
async def handle_health(request):
"""GET /health — overall pipeline health."""
conn = _conn(request)
# Stage status from circuit breakers
breakers = conn.execute(
"SELECT name, state, failures, last_success_at, last_update FROM circuit_breakers"
).fetchall()
# Queue depths
sources_by_status = conn.execute(
"SELECT status, COUNT(*) as n FROM sources GROUP BY status"
).fetchall()
prs_by_status = conn.execute(
"SELECT status, COUNT(*) as n FROM prs GROUP BY status"
).fetchall()
# Per-domain merge queue depth (Vida)
merge_queue = conn.execute(
"SELECT domain, COUNT(*) as n FROM prs WHERE status = 'approved' GROUP BY domain"
).fetchall()
# Cost
budget = costs.check_budget(conn)
# Metabolic metrics (Vida)
null_rate = conn.execute(
"""SELECT
CAST(SUM(CASE WHEN status = 'null_result' THEN 1 ELSE 0 END) AS REAL) /
NULLIF(COUNT(*), 0) as rate
FROM sources
WHERE updated_at > datetime('now', '-24 hours')
AND status IN ('extracted', 'null_result', 'error')"""
).fetchone()
approval_rate = conn.execute(
"""SELECT
CAST(SUM(CASE WHEN domain_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) /
NULLIF(COUNT(*), 0) as domain_rate,
CAST(SUM(CASE WHEN leo_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) /
NULLIF(COUNT(*), 0) as leo_rate
FROM prs
WHERE last_attempt > datetime('now', '-24 hours')
AND domain_verdict != 'pending'"""
).fetchone()
# Recent activity (last hour)
recent = conn.execute(
"""SELECT stage, event, COUNT(*) as n
FROM audit_log
WHERE timestamp > datetime('now', '-1 hour')
GROUP BY stage, event"""
).fetchall()
body = {
"status": "healthy",
"breakers": {},
"sources": {r["status"]: r["n"] for r in sources_by_status},
"prs": {r["status"]: r["n"] for r in prs_by_status},
"merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue},
"budget": budget,
"metabolic": {
"null_result_rate_24h": round(null_rate["rate"] or 0, 3),
"domain_approval_rate_24h": round(approval_rate["domain_rate"] or 0, 3) if approval_rate["domain_rate"] else None,
"leo_approval_rate_24h": round(approval_rate["leo_rate"] or 0, 3) if approval_rate["leo_rate"] else None,
},
"recent_activity": [
{"stage": r["stage"], "event": r["event"], "count": r["n"]}
for r in recent
],
}
# Breaker state + stall detection (Vida: last_success_at heartbeat)
for r in breakers:
breaker_info = {"state": r["state"], "failures": r["failures"]}
if r["last_success_at"]:
last = datetime.fromisoformat(r["last_success_at"])
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
age_s = (datetime.now(timezone.utc) - last).total_seconds()
breaker_info["last_success_age_s"] = round(age_s)
# Stall detection: no success in 2x the stage's interval
intervals = {"ingest": config.INGEST_INTERVAL, "validate": config.VALIDATE_INTERVAL,
"evaluate": config.EVAL_INTERVAL, "merge": config.MERGE_INTERVAL}
threshold = intervals.get(r["name"], 60) * 2
if age_s > threshold:
breaker_info["stalled"] = True
body["breakers"][r["name"]] = breaker_info
# Overall status
if any(b.get("stalled") for b in body["breakers"].values()):
body["status"] = "stalled"
if any(b["state"] == "open" for b in body["breakers"].values()):
body["status"] = "degraded"
if not budget["ok"]:
body["status"] = "budget_exhausted"
# Rubber-stamp warning (Vida)
if (approval_rate["domain_rate"] or 0) > 0.95:
body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping"
status_code = 200 if body["status"] == "healthy" else 503
return web.json_response(body, status=status_code)
async def handle_costs(request):
"""GET /costs — daily cost breakdown."""
conn = _conn(request)
day = request.query.get("date", date.today().isoformat())
breakdown = costs.get_daily_breakdown(conn, day)
budget = costs.check_budget(conn)
return web.json_response({"date": day, "budget": budget, "breakdown": breakdown})
async def handle_sources(request):
"""GET /sources — source pipeline status."""
conn = _conn(request)
status_filter = request.query.get("status")
if status_filter:
rows = conn.execute(
"SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources WHERE status = ? ORDER BY updated_at DESC LIMIT 50",
(status_filter,),
).fetchall()
else:
rows = conn.execute(
"SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources ORDER BY updated_at DESC LIMIT 50"
).fetchall()
return web.json_response({"sources": [dict(r) for r in rows]})
async def handle_prs(request):
"""GET /prs — PR pipeline status."""
conn = _conn(request)
status_filter = request.query.get("status")
if status_filter:
rows = conn.execute(
"SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs WHERE status = ? ORDER BY number DESC LIMIT 50",
(status_filter,),
).fetchall()
else:
rows = conn.execute(
"SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs ORDER BY number DESC LIMIT 50"
).fetchall()
return web.json_response({"prs": [dict(r) for r in rows]})
async def handle_breakers(request):
"""GET /breakers — circuit breaker states."""
conn = _conn(request)
rows = conn.execute("SELECT * FROM circuit_breakers").fetchall()
return web.json_response({"breakers": [dict(r) for r in rows]})
async def handle_calibration(request):
"""GET /calibration — priority calibration analysis (Vida)."""
conn = _conn(request)
# Find sources where eval disagreed with ingest priority
# Focus on upgrades (Theseus: upgrades are the learnable signal)
rows = conn.execute(
"""SELECT path, priority, priority_log FROM sources
WHERE json_array_length(priority_log) >= 2"""
).fetchall()
upgrades = []
downgrades = []
for r in rows:
import json
log = json.loads(r["priority_log"] or "[]")
if len(log) < 2:
continue
first = log[0]["priority"]
last = log[-1]["priority"]
levels = {"critical": 4, "high": 3, "medium": 2, "low": 1, "skip": 0}
if levels.get(last, 2) > levels.get(first, 2):
upgrades.append({"path": r["path"], "from": first, "to": last})
elif levels.get(last, 2) < levels.get(first, 2):
downgrades.append({"path": r["path"], "from": first, "to": last})
return web.json_response({
"upgrades": upgrades[:20],
"downgrades_count": len(downgrades),
"upgrades_count": len(upgrades),
"note": "Focus on upgrades — downgrades are expected (downstream has more context)"
})
def create_app() -> web.Application:
"""Create the health API application."""
app = web.Application()
# Persistent readonly connection — one connection, no churn (Ganymede)
app["db"] = db.get_connection(readonly=True)
app.router.add_get("/health", handle_health)
app.router.add_get("/costs", handle_costs)
app.router.add_get("/sources", handle_sources)
app.router.add_get("/prs", handle_prs)
app.router.add_get("/breakers", handle_breakers)
app.router.add_get("/calibration", handle_calibration)
app.on_cleanup.append(_cleanup)
return app
async def _cleanup(app):
app["db"].close()
async def start_health_server(runner_ref: list):
"""Start the health HTTP server. Stores runner in runner_ref for shutdown."""
app = create_app()
runner = web.AppRunner(app)
await runner.setup()
# Bind to 127.0.0.1 only — use reverse proxy for external access (Ganymede)
site = web.TCPSite(runner, "127.0.0.1", config.HEALTH_PORT)
await site.start()
runner_ref.append(runner)
logger.info("Health API listening on 127.0.0.1:%d", config.HEALTH_PORT)
async def stop_health_server(runner_ref: list):
"""Stop the health HTTP server."""
for runner in runner_ref:
await runner.cleanup()
logger.info("Health API stopped")

48
lib/log.py Normal file
View file

@ -0,0 +1,48 @@
"""Structured JSON logging with rotation."""
import json
import logging
import logging.handlers
from datetime import datetime, timezone
from . import config
class JSONFormatter(logging.Formatter):
"""Format log records as JSON lines."""
def format(self, record):
entry = {
"ts": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
if record.exc_info and record.exc_info[0]:
entry["exception"] = self.formatException(record.exc_info)
# Include extra fields if present
for key in ("stage", "source", "pr", "model", "cost", "event"):
if hasattr(record, key):
entry[key] = getattr(record, key)
return json.dumps(entry)
def setup_logging():
"""Configure structured JSON logging with rotation."""
config.LOG_DIR.mkdir(parents=True, exist_ok=True)
handler = logging.handlers.RotatingFileHandler(
str(config.LOG_FILE),
maxBytes=config.LOG_ROTATION_MAX_BYTES,
backupCount=config.LOG_ROTATION_BACKUP_COUNT,
)
handler.setFormatter(JSONFormatter())
# Also log to stderr for systemd journal
console = logging.StreamHandler()
console.setFormatter(logging.Formatter("%(name)s [%(levelname)s] %(message)s"))
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(console)

436
lib/merge.py Normal file
View file

@ -0,0 +1,436 @@
"""Merge stage — domain-serialized priority queue with rebase-before-merge.
Design reviewed by Ganymede (round 2) and Rhea. Key decisions:
- Two-layer locking: asyncio.Lock per domain (fast path) + prs.status (crash recovery)
- Rebase-before-merge with pinned force-with-lease SHA (Ganymede)
- Priority queue: COALESCE(p.priority, s.priority, 'medium') PR > source > default
- Human PRs default to 'high', not 'critical' (Ganymede prevents DoS on pipeline)
- 5-minute merge timeout force-reset to 'conflict' (Rhea)
- Ack comment on human PR discovery (Rhea)
- Pagination on all Forgejo list endpoints (Ganymede standing rule)
"""
import asyncio
import json
import logging
from collections import defaultdict
from datetime import datetime, timezone
from . import config, db
logger = logging.getLogger("pipeline.merge")
# In-memory domain locks — fast path, lost on crash (durable layer is prs.status)
_domain_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
# Merge timeout: if a PR stays 'merging' longer than this, force-reset (Rhea)
MERGE_TIMEOUT_SECONDS = 300 # 5 minutes
# --- Git helpers ---
async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]:
"""Run a git command async. Returns (returncode, stdout+stderr)."""
proc = await asyncio.create_subprocess_exec(
"git", *args,
cwd=cwd or str(config.REPO_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return -1, f"git {args[0]} timed out after {timeout}s"
output = (stdout or b"").decode().strip()
if stderr:
output += "\n" + stderr.decode().strip()
return proc.returncode, output
async def _forgejo_api(method: str, path: str, body: dict = None) -> dict | list | None:
"""Call Forgejo API. Returns parsed JSON or None on error."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1{path}"
token_file = config.FORGEJO_TOKEN_FILE
token = token_file.read_text().strip() if token_file.exists() else ""
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession() as session:
async with session.request(method, url, headers=headers,
json=body, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("Forgejo API %s %s%d: %s", method, path, resp.status, text[:200])
return None
if resp.status == 204: # No content (DELETE)
return {}
return await resp.json()
except Exception as e:
logger.error("Forgejo API error: %s %s%s", method, path, e)
return None
# --- PR Discovery (Multiplayer v1) ---
async def discover_external_prs(conn) -> int:
"""Scan Forgejo for open PRs not tracked in SQLite.
Human PRs (non-pipeline author) get priority 'high' and origin 'human'.
Critical is reserved for explicit human override only. (Ganymede)
Pagination on all Forgejo list endpoints. (Ganymede standing rule #5)
"""
known = {r["number"] for r in conn.execute("SELECT number FROM prs").fetchall()}
discovered = 0
page = 1
while True:
prs = await _forgejo_api(
"GET",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls"
f"?state=open&limit=50&page={page}",
)
if not prs:
break
for pr in prs:
if pr["number"] not in known:
# Detect origin: pipeline agents have per-agent Forgejo users
pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"}
author = pr.get("user", {}).get("login", "")
is_pipeline = author.lower() in pipeline_users
origin = "pipeline" if is_pipeline else "human"
priority = "high" if origin == "human" else None
domain = _detect_domain_from_files(pr) if not is_pipeline else _detect_domain_from_branch(pr["head"]["ref"])
conn.execute(
"""INSERT OR IGNORE INTO prs
(number, branch, status, origin, priority, domain)
VALUES (?, ?, 'open', ?, ?, ?)""",
(pr["number"], pr["head"]["ref"], origin, priority, domain),
)
db.audit(conn, "merge", "pr_discovered",
json.dumps({"pr": pr["number"], "origin": origin,
"author": pr.get("user", {}).get("login"),
"priority": priority or "inherited"}))
# Ack comment on human PRs so contributor feels acknowledged (Rhea)
if origin == "human":
await _post_ack_comment(pr["number"])
discovered += 1
if len(prs) < 50:
break # Last page
page += 1
if discovered:
logger.info("Discovered %d external PRs", discovered)
return discovered
def _detect_domain_from_branch(branch: str) -> str | None:
"""Extract domain from branch name like 'rio/claims-futarchy''internet-finance'.
Agent-to-domain mapping for pipeline branches.
"""
agent_domain = {
"rio": "internet-finance",
"clay": "entertainment",
"theseus": "ai-alignment",
"vida": "health",
"astra": "space-development",
"leo": "grand-strategy",
}
prefix = branch.split("/")[0].lower() if "/" in branch else ""
return agent_domain.get(prefix)
def _detect_domain_from_files(pr: dict) -> str | None:
"""Detect domain from PR's changed files for human-submitted PRs.
Humans may not follow agent branch naming. Fall back to inspecting
file paths. (Ganymede nit)
"""
# We'd need to fetch files from the API — do it lazily on first eval
# For now, return None. Domain gets set during evaluation.
return None
async def _post_ack_comment(pr_number: int):
"""Post acknowledgment comment on human-submitted PR. (Rhea)
Contributor should feel acknowledged immediately, not wonder if
their PR disappeared into a void.
"""
body = (
"Thanks for the contribution! Your PR is queued for evaluation "
"(priority: high). Expected review time: ~5 minutes.\n\n"
"_This is an automated message from the Teleo pipeline._"
)
await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
{"body": body},
)
# --- Merge operations ---
async def _claim_next_pr(conn, domain: str) -> dict | None:
"""Claim the next approved PR for a domain via atomic UPDATE.
Priority inheritance: COALESCE(p.priority, s.priority, 'medium')
- Explicit PR priority (human PRs) > source priority (pipeline) > default medium
- NULL priorities fall to ELSE 4, which ranks below explicit 'medium' (WHEN 2)
- This is intentional: unclassified PRs don't jump ahead of triaged ones
(Rhea: document the precedence for future maintainers)
NOT EXISTS enforces domain serialization in SQL defense-in-depth even if
asyncio.Lock is bypassed. (Ganymede: approved)
"""
row = conn.execute(
"""UPDATE prs SET status = 'merging', last_attempt = datetime('now')
WHERE number = (
SELECT p.number FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'approved'
AND p.domain = ?
AND NOT EXISTS (
SELECT 1 FROM prs p2
WHERE p2.domain = p.domain
AND p2.status = 'merging'
)
ORDER BY
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
p.created_at ASC
LIMIT 1
)
RETURNING number, source_path, branch, domain""",
(domain,),
).fetchone()
return dict(row) if row else None
async def _rebase_and_push(branch: str) -> tuple[bool, str]:
"""Rebase branch onto main and force-push with pinned SHA.
Always use --force-with-lease with pinned SHA for ALL branches
pipeline and human. No split logic. (Ganymede)
"""
worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}"
# Create worktree for the branch
rc, out = await _git("worktree", "add", worktree_path, branch)
if rc != 0:
return False, f"worktree add failed: {out}"
try:
# Capture expected SHA before rebase (Ganymede: pin for force-with-lease)
rc, expected_sha = await _git("rev-parse", f"origin/{branch}", cwd=worktree_path)
if rc != 0:
return False, f"rev-parse failed: {expected_sha}"
expected_sha = expected_sha.strip().split("\n")[0] # First line only
# Fetch latest main
rc, out = await _git("fetch", "origin", "main", cwd=worktree_path)
if rc != 0:
return False, f"fetch failed: {out}"
# Check if rebase is needed
rc, merge_base = await _git("merge-base", "origin/main", "HEAD", cwd=worktree_path)
rc2, main_sha = await _git("rev-parse", "origin/main", cwd=worktree_path)
if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip():
# Already up to date, no rebase needed
return True, "already up to date"
# Rebase onto main
rc, out = await _git("rebase", "origin/main", cwd=worktree_path, timeout=120)
if rc != 0:
# Rebase conflict
await _git("rebase", "--abort", cwd=worktree_path)
return False, f"rebase conflict: {out}"
# Force-push with pinned SHA (Ganymede: defeats tracking-ref update race)
rc, out = await _git(
"push",
f"--force-with-lease={branch}:{expected_sha}",
"origin", f"HEAD:{branch}",
cwd=worktree_path,
timeout=30,
)
if rc != 0:
return False, f"push rejected: {out}"
return True, "rebased and pushed"
finally:
# Cleanup worktree
await _git("worktree", "remove", "--force", worktree_path)
async def _merge_pr(pr_number: int) -> tuple[bool, str]:
"""Merge PR via Forgejo API. Preserves PR metadata and reviewer attribution."""
result = await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/merge",
{"Do": "merge", "merge_message_field": ""},
)
if result is None:
return False, "Forgejo merge API failed"
return True, "merged"
async def _delete_remote_branch(branch: str):
"""Delete remote branch immediately after merge. (Ganymede Q4: immediate, not batch)
If DELETE fails, log and move on stale branch is cosmetic,
stale merge is operational.
"""
result = await _forgejo_api(
"DELETE",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/branches/{branch}",
)
if result is None:
logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch)
# --- Domain merge task ---
async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
"""Process the merge queue for a single domain. Returns (succeeded, failed)."""
succeeded = 0
failed = 0
while True:
async with _domain_locks[domain]:
pr = await _claim_next_pr(conn, domain)
if not pr:
break # No more approved PRs for this domain
pr_num = pr["number"]
branch = pr["branch"]
logger.info("Merging PR #%d (%s) in domain %s", pr_num, branch, domain)
try:
# Rebase with timeout (Rhea: 5 min max, then force-reset to conflict)
rebase_ok, rebase_msg = await asyncio.wait_for(
_rebase_and_push(branch),
timeout=MERGE_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
logger.error("PR #%d merge timed out after %ds — resetting to conflict (Rhea)",
pr_num, MERGE_TIMEOUT_SECONDS)
conn.execute(
"UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?",
(f"merge timed out after {MERGE_TIMEOUT_SECONDS}s", pr_num),
)
db.audit(conn, "merge", "timeout",
json.dumps({"pr": pr_num, "timeout_seconds": MERGE_TIMEOUT_SECONDS}))
failed += 1
continue
if not rebase_ok:
logger.warning("PR #%d rebase failed: %s", pr_num, rebase_msg)
conn.execute(
"UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?",
(rebase_msg[:500], pr_num),
)
db.audit(conn, "merge", "rebase_failed",
json.dumps({"pr": pr_num, "error": rebase_msg[:200]}))
failed += 1
continue
# Merge via API
merge_ok, merge_msg = await _merge_pr(pr_num)
if not merge_ok:
logger.error("PR #%d API merge failed: %s", pr_num, merge_msg)
conn.execute(
"UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?",
(merge_msg[:500], pr_num),
)
db.audit(conn, "merge", "api_merge_failed",
json.dumps({"pr": pr_num, "error": merge_msg[:200]}))
failed += 1
continue
# Success — update status and cleanup
conn.execute(
"""UPDATE prs SET status = 'merged',
merged_at = datetime('now'),
last_error = NULL
WHERE number = ?""",
(pr_num,),
)
db.audit(conn, "merge", "merged", json.dumps({"pr": pr_num, "branch": branch}))
logger.info("PR #%d merged successfully", pr_num)
# Delete remote branch immediately (Ganymede Q4)
await _delete_remote_branch(branch)
# Prune local worktree metadata
await _git("worktree", "prune")
succeeded += 1
return succeeded, failed
# --- Main entry point ---
async def merge_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one merge cycle across all domains.
1. Discover external PRs (multiplayer v1)
2. Find all domains with approved PRs
3. Launch one async task per domain (cross-domain parallel, same-domain serial)
"""
# Step 1: Discover external PRs
await discover_external_prs(conn)
# Step 2: Find domains with approved work
rows = conn.execute(
"SELECT DISTINCT domain FROM prs WHERE status = 'approved' AND domain IS NOT NULL"
).fetchall()
domains = [r["domain"] for r in rows]
# Also check for NULL-domain PRs (human PRs with undetected domain)
null_domain = conn.execute(
"SELECT COUNT(*) as c FROM prs WHERE status = 'approved' AND domain IS NULL"
).fetchone()
if null_domain and null_domain["c"] > 0:
logger.warning("%d approved PRs have NULL domain — skipping until eval assigns domain",
null_domain["c"])
if not domains:
return 0, 0
# Step 3: Merge all domains concurrently
tasks = [_merge_domain_queue(conn, domain) for domain in domains]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_succeeded = 0
total_failed = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.exception("Domain %s merge failed with exception", domains[i])
total_failed += 1
else:
s, f = result
total_succeeded += s
total_failed += f
if total_succeeded or total_failed:
logger.info("Merge cycle: %d succeeded, %d failed across %d domains",
total_succeeded, total_failed, len(domains))
return total_succeeded, total_failed

591
lib/validate.py Normal file
View file

@ -0,0 +1,591 @@
"""Validate stage — Tier 0 deterministic validation gate.
Ported from tier0-gate.py + validate_claims.py. Pure Python, no LLM calls.
Validates claim frontmatter, title format, wiki links, domain-directory match,
proposition heuristic, universal quantifiers, near-duplicate detection.
Runs against PRs with status 'open' that have tier0_pass IS NULL.
Posts results as PR comments. In gate mode, sets tier0_pass = 0/1.
"""
import json
import logging
import re
from datetime import date, datetime, timezone
from difflib import SequenceMatcher
from pathlib import Path
from . import config, db
logger = logging.getLogger("pipeline.validate")
# ─── Constants ──────────────────────────────────────────────────────────────
VALID_DOMAINS = frozenset({
"internet-finance", "entertainment", "health", "ai-alignment",
"space-development", "grand-strategy", "mechanisms", "living-capital",
"living-agents", "teleohumanity", "critical-systems",
"collective-intelligence", "teleological-economics", "cultural-dynamics",
})
VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"})
VALID_TYPES = frozenset({"claim", "framework"})
REQUIRED_FIELDS = ("type", "domain", "description", "confidence", "source", "created")
DATE_MIN = date(2020, 1, 1)
WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
DEDUP_THRESHOLD = 0.85
# Proposition heuristic patterns
_STRONG_SIGNALS = re.compile(
r"\b(because|therefore|however|although|despite|since|"
r"rather than|instead of|not just|more than|less than|"
r"by\b|through\b|via\b|without\b|"
r"when\b|where\b|while\b|if\b|unless\b|"
r"which\b|that\b|"
r"is\b|are\b|was\b|were\b|will\b|would\b|"
r"can\b|could\b|should\b|must\b|"
r"has\b|have\b|had\b|does\b|did\b)",
re.IGNORECASE,
)
_VERB_ENDINGS = re.compile(
r"\b\w{2,}(ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns|ps|ts|rs|ns|ds)\b",
re.IGNORECASE,
)
_UNIVERSAL_QUANTIFIERS = re.compile(
r"\b(all|every|always|never|no one|nobody|nothing|none of|"
r"the only|the fundamental|the sole|the single|"
r"universally|invariably|without exception|in every case)\b",
re.IGNORECASE,
)
_SCOPING_LANGUAGE = re.compile(
r"\b(when|if|under|given|assuming|provided|in cases where|"
r"for .+ that|among|within|across|during|between|"
r"approximately|roughly|nearly|most|many|often|typically|"
r"tends? to|generally|usually|frequently)\b",
re.IGNORECASE,
)
# ─── YAML frontmatter parser ───────────────────────────────────────────────
def parse_frontmatter(text: str) -> tuple[dict | None, str]:
"""Extract YAML frontmatter and body from markdown text."""
if not text.startswith("---"):
return None, text
end = text.find("---", 3)
if end == -1:
return None, text
raw = text[3:end]
body = text[end + 3:].strip()
try:
import yaml
fm = yaml.safe_load(raw)
if not isinstance(fm, dict):
return None, body
return fm, body
except ImportError:
pass
except Exception:
return None, body
# Fallback: simple key-value parser
fm = {}
for line in raw.strip().split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue
if ":" not in line:
continue
key, _, val = line.partition(":")
key = key.strip()
val = val.strip().strip('"').strip("'")
if val.lower() == "null" or val == "":
val = None
elif val.startswith("["):
val = [v.strip().strip('"').strip("'")
for v in val.strip("[]").split(",") if v.strip()]
fm[key] = val
return fm if fm else None, body
# ─── Validators ─────────────────────────────────────────────────────────────
def validate_schema(fm: dict) -> list[str]:
"""Check required fields and valid enums."""
violations = []
for field in REQUIRED_FIELDS:
if field not in fm or fm[field] is None:
violations.append(f"missing_field:{field}")
ftype = fm.get("type")
if ftype and ftype not in VALID_TYPES:
violations.append(f"invalid_type:{ftype}")
domain = fm.get("domain")
if domain and domain not in VALID_DOMAINS:
violations.append(f"invalid_domain:{domain}")
confidence = fm.get("confidence")
if confidence and confidence not in VALID_CONFIDENCE:
violations.append(f"invalid_confidence:{confidence}")
desc = fm.get("description")
if isinstance(desc, str) and len(desc.strip()) < 10:
violations.append("description_too_short")
source = fm.get("source")
if isinstance(source, str) and len(source.strip()) < 3:
violations.append("source_too_short")
return violations
def validate_date(date_val) -> list[str]:
"""Validate created date."""
violations = []
if date_val is None:
return ["missing_field:created"]
parsed = None
if isinstance(date_val, date):
parsed = date_val
elif isinstance(date_val, str):
try:
parsed = datetime.strptime(date_val, "%Y-%m-%d").date()
except ValueError:
return [f"invalid_date_format:{date_val}"]
else:
return [f"invalid_date_type:{type(date_val).__name__}"]
today = date.today()
if parsed > today:
violations.append(f"future_date:{parsed}")
if parsed < DATE_MIN:
violations.append(f"date_before_2020:{parsed}")
return violations
def validate_title(filepath: str) -> list[str]:
"""Check filename follows prose-as-claim convention."""
violations = []
name = Path(filepath).stem
normalized = name.replace("-", " ")
if len(normalized) < 20:
violations.append("title_too_short")
words = normalized.split()
if len(words) < 4:
violations.append("title_too_few_words")
cleaned = re.sub(r"[a-zA-Z0-9\s\-\.,'()%]", "", name)
if cleaned:
violations.append(f"title_special_chars:{cleaned[:20]}")
return violations
def validate_wiki_links(body: str, existing_claims: set[str]) -> list[str]:
"""Check that [[wiki links]] resolve to known claims."""
violations = []
for link in WIKI_LINK_RE.findall(body):
if link.strip() and link.strip() not in existing_claims:
violations.append(f"broken_wiki_link:{link.strip()[:80]}")
return violations
def validate_proposition(title: str) -> list[str]:
"""Check title reads as a proposition, not a label."""
normalized = title.replace("-", " ")
words = normalized.split()
n = len(words)
if n < 4:
return ["title_not_proposition:too short to be a disagreeable sentence"]
if _STRONG_SIGNALS.search(normalized):
return []
if _VERB_ENDINGS.search(normalized):
return []
if n >= 8:
return []
return ["title_not_proposition:no verb or connective found"]
def validate_universal_quantifiers(title: str) -> list[str]:
"""Flag unscoped universal quantifiers (warning, not gate)."""
universals = _UNIVERSAL_QUANTIFIERS.findall(title)
if universals and not _SCOPING_LANGUAGE.search(title):
return [f"unscoped_universal:{','.join(universals)}"]
return []
def validate_domain_directory_match(filepath: str, fm: dict) -> list[str]:
"""Check file's directory matches its domain field."""
domain = fm.get("domain")
if not domain:
return []
parts = Path(filepath).parts
for i, part in enumerate(parts):
if part == "domains" and i + 1 < len(parts):
dir_domain = parts[i + 1]
if dir_domain != domain:
secondary = fm.get("secondary_domains", [])
if isinstance(secondary, str):
secondary = [secondary]
if dir_domain not in (secondary or []):
return [f"domain_directory_mismatch:file in domains/{dir_domain}/ "
f"but domain field says '{domain}'"]
break
return []
def validate_description_not_title(title: str, description: str) -> list[str]:
"""Check description adds info beyond the title."""
if not description:
return []
title_lower = title.lower().strip()
desc_lower = description.lower().strip().rstrip(".")
if desc_lower in title_lower or title_lower in desc_lower:
return ["description_echoes_title"]
ratio = SequenceMatcher(None, title_lower, desc_lower).ratio()
if ratio > 0.75:
return [f"description_too_similar:{ratio:.0%}"]
return []
def find_near_duplicates(title: str, existing_claims: set[str]) -> list[str]:
"""Find near-duplicate titles using SequenceMatcher with word pre-filter."""
title_lower = title.lower()
title_words = set(title_lower.split()[:6])
warnings = []
for existing in existing_claims:
existing_lower = existing.lower()
if len(title_words & set(existing_lower.split()[:6])) < 2:
continue
ratio = SequenceMatcher(None, title_lower, existing_lower).ratio()
if ratio >= DEDUP_THRESHOLD:
warnings.append(f"near_duplicate:{existing[:80]} (similarity={ratio:.2f})")
return warnings
# ─── Full Tier 0 validation ────────────────────────────────────────────────
def tier0_validate_claim(filepath: str, content: str, existing_claims: set[str]) -> dict:
"""Run full Tier 0 validation. Returns {filepath, passes, violations, warnings}."""
violations = []
warnings = []
fm, body = parse_frontmatter(content)
if fm is None:
return {"filepath": filepath, "passes": False,
"violations": ["no_frontmatter"], "warnings": []}
violations.extend(validate_schema(fm))
violations.extend(validate_date(fm.get("created")))
violations.extend(validate_title(filepath))
violations.extend(validate_wiki_links(body, existing_claims))
title = Path(filepath).stem
violations.extend(validate_proposition(title))
warnings.extend(validate_universal_quantifiers(title))
violations.extend(validate_domain_directory_match(filepath, fm))
desc = fm.get("description", "")
if isinstance(desc, str):
warnings.extend(validate_description_not_title(title, desc))
warnings.extend(find_near_duplicates(title, existing_claims))
return {"filepath": filepath, "passes": len(violations) == 0,
"violations": violations, "warnings": warnings}
# ─── Diff parsing ──────────────────────────────────────────────────────────
def extract_claim_files_from_diff(diff: str) -> dict[str, str]:
"""Parse unified diff to extract new/modified claim file contents."""
claim_dirs = ("domains/", "core/", "foundations/")
files = {}
current_file = None
current_lines = []
is_deletion = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
if current_file and not is_deletion:
files[current_file] = "\n".join(current_lines)
current_file = None
current_lines = []
is_deletion = False
elif line.startswith("deleted file mode") or line.startswith("+++ /dev/null"):
is_deletion = True
current_file = None
elif line.startswith("+++ b/") and not is_deletion:
path = line[6:]
basename = path.rsplit("/", 1)[-1] if "/" in path else path
if (any(path.startswith(d) for d in claim_dirs)
and path.endswith(".md")
and not basename.startswith("_")):
current_file = path
elif current_file and line.startswith("+") and not line.startswith("+++"):
current_lines.append(line[1:])
if current_file and not is_deletion:
files[current_file] = "\n".join(current_lines)
return files
# ─── Forgejo API (using merge module's helper) ─────────────────────────────
async def _forgejo_api(method: str, path: str, body: dict = None):
"""Call Forgejo API. Reuses merge module pattern."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1{path}"
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession() as session:
async with session.request(method, url, headers=headers,
json=body, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("Forgejo API %s %s%d: %s", method, path, resp.status, text[:200])
return None
if resp.status == 204:
return {}
return await resp.json()
except Exception as e:
logger.error("Forgejo API error: %s %s%s", method, path, e)
return None
async def _get_pr_diff(pr_number: int) -> str:
"""Fetch PR diff via Forgejo API."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff"
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
headers = {"Authorization": f"token {token}", "Accept": "text/plain"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers,
timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status >= 400:
return ""
diff = await resp.text()
if len(diff) > 2_000_000:
return "" # Too large
return diff
except Exception as e:
logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e)
return ""
async def _get_pr_head_sha(pr_number: int) -> str:
"""Get HEAD SHA of PR's branch."""
pr_info = await _forgejo_api(
"GET",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}",
)
if pr_info:
return pr_info.get("head", {}).get("sha", "")
return ""
async def _has_tier0_comment(pr_number: int, head_sha: str) -> bool:
"""Check if we already validated this exact commit."""
if not head_sha:
return False
# Paginate comments (Ganymede standing rule)
page = 1
while True:
comments = await _forgejo_api(
"GET",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments"
f"?limit=50&page={page}",
)
if not comments:
break
marker = f"<!-- TIER0-VALIDATION:{head_sha} -->"
for c in comments:
if marker in c.get("body", ""):
return True
if len(comments) < 50:
break
page += 1
return False
async def _post_validation_comment(pr_number: int, results: list[dict], head_sha: str):
"""Post Tier 0 validation results as PR comment."""
all_pass = all(r["passes"] for r in results)
total = len(results)
passing = sum(1 for r in results if r["passes"])
marker = f"<!-- TIER0-VALIDATION:{head_sha} -->" if head_sha else "<!-- TIER0-VALIDATION -->"
status = "PASS" if all_pass else "FAIL"
lines = [
marker,
f"**Tier 0 Validation: {status}** — {passing}/{total} claims pass\n",
]
for r in results:
icon = "pass" if r["passes"] else "FAIL"
short_path = r["filepath"].split("/", 1)[-1] if "/" in r["filepath"] else r["filepath"]
lines.append(f"**[{icon}]** `{short_path}`")
for v in r["violations"]:
lines.append(f" - {v}")
for w in r["warnings"]:
lines.append(f" - (warn) {w}")
lines.append("")
if not all_pass:
lines.append("---")
lines.append("Fix the violations above and push to trigger re-validation.")
lines.append(f"\n*tier0-gate v2 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*")
await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
{"body": "\n".join(lines)},
)
# ─── Existing claims index ─────────────────────────────────────────────────
def load_existing_claims() -> set[str]:
"""Build set of known claim titles from the main worktree."""
claims: set[str] = set()
base = config.MAIN_WORKTREE
for subdir in ["domains", "core", "foundations", "maps", "agents", "schemas"]:
full = base / subdir
if not full.is_dir():
continue
for f in full.rglob("*.md"):
claims.add(f.stem)
return claims
# ─── Main entry point ──────────────────────────────────────────────────────
async def validate_pr(conn, pr_number: int) -> dict:
"""Run Tier 0 validation on a single PR.
Returns {pr, all_pass, total, passing, skipped, reason}.
"""
# Get HEAD SHA for idempotency
head_sha = await _get_pr_head_sha(pr_number)
# Skip if already validated for this commit
if await _has_tier0_comment(pr_number, head_sha):
logger.debug("PR #%d already validated at %s", pr_number, head_sha[:8])
return {"pr": pr_number, "skipped": True, "reason": "already_validated"}
# Fetch diff
diff = await _get_pr_diff(pr_number)
if not diff:
logger.debug("PR #%d: empty or oversized diff", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "no_diff"}
# Extract claim files
claim_files = extract_claim_files_from_diff(diff)
if not claim_files:
logger.debug("PR #%d: no claim files in diff", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "no_claims"}
# Load existing claims index
existing_claims = load_existing_claims()
# Validate each claim
results = []
for filepath, content in claim_files.items():
result = tier0_validate_claim(filepath, content, existing_claims)
results.append(result)
status = "PASS" if result["passes"] else "FAIL"
logger.debug("PR #%d: %s %s v=%s w=%s", pr_number, status, filepath,
result["violations"], result["warnings"])
all_pass = all(r["passes"] for r in results)
total = len(results)
passing = sum(1 for r in results if r["passes"])
logger.info("PR #%d: Tier 0 — %d/%d pass, all_pass=%s", pr_number, passing, total, all_pass)
# Post comment
await _post_validation_comment(pr_number, results, head_sha)
# Update PR record
conn.execute(
"UPDATE prs SET tier0_pass = ? WHERE number = ?",
(1 if all_pass else 0, pr_number),
)
db.audit(conn, "validate", "tier0_complete",
json.dumps({"pr": pr_number, "pass": all_pass, "passing": passing, "total": total}))
return {"pr": pr_number, "all_pass": all_pass, "total": total, "passing": passing}
async def validate_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one validation cycle.
Finds PRs with status='open' and tier0_pass IS NULL, validates them.
"""
# Find unvalidated PRs (priority ordered)
rows = conn.execute(
"""SELECT p.number FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.tier0_pass IS NULL
ORDER BY
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
p.created_at ASC
LIMIT ?""",
(max_workers or 10,),
).fetchall()
if not rows:
return 0, 0
succeeded = 0
failed = 0
for row in rows:
try:
result = await validate_pr(conn, row["number"])
if result.get("skipped"):
# Mark as validated even if skipped (no claims = pass)
conn.execute(
"UPDATE prs SET tier0_pass = 1 WHERE number = ? AND tier0_pass IS NULL",
(row["number"],),
)
succeeded += 1
elif result.get("all_pass"):
succeeded += 1
else:
succeeded += 1 # Validation ran successfully, even if claims failed
except Exception:
logger.exception("Failed to validate PR #%d", row["number"])
failed += 1
if succeeded or failed:
logger.info("Validate cycle: %d validated, %d errors", succeeded, failed)
return succeeded, failed

240
teleo-pipeline.py Normal file
View file

@ -0,0 +1,240 @@
#!/usr/bin/env python3
"""Teleo Pipeline v2 — single async daemon replacing 7 cron scripts.
Four stages: Ingest Validate Evaluate Merge
SQLite WAL state store. systemd-managed. Graceful shutdown.
"""
import asyncio
import logging
import signal
import sys
from datetime import datetime, timezone
# Add parent dir to path so lib/ is importable
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from lib import config, db, log as logmod
from lib.health import start_health_server, stop_health_server
from lib.breaker import CircuitBreaker
from lib.merge import merge_cycle
from lib.validate import validate_cycle
from lib.evaluate import evaluate_cycle
logger = logging.getLogger("pipeline")
# Global shutdown event — stages check this between iterations
shutdown_event = asyncio.Event()
# Track active subprocesses for cleanup
active_subprocesses: set = set()
async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker):
"""Generic stage loop with interval, shutdown check, and circuit breaker."""
logger.info("Stage %s started (interval=%ds)", name, interval)
while not shutdown_event.is_set():
try:
if not breaker.allow_request():
logger.debug("Stage %s: breaker OPEN, skipping cycle", name)
else:
workers = breaker.max_workers()
succeeded, failed = await func(conn, max_workers=workers)
if failed > 0 and succeeded == 0:
breaker.record_failure()
elif succeeded > 0:
breaker.record_success()
except Exception:
logger.exception("Stage %s: unhandled error in cycle", name)
breaker.record_failure()
# Wait for interval or shutdown, whichever comes first
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=interval)
break # shutdown_event was set
except asyncio.TimeoutError:
pass # interval elapsed, continue loop
logger.info("Stage %s stopped", name)
# --- Stage stubs (Phase 1 — replaced in later phases) ---
async def ingest_cycle(conn, max_workers=None):
"""Stage 1: Scan inbox, extract claims. (stub)"""
return 0, 0
# validate_cycle imported from lib.validate
# evaluate_cycle imported from lib.evaluate
# merge_cycle imported from lib.merge
# --- Shutdown ---
def handle_signal(sig):
"""Signal handler — sets shutdown event."""
logger.info("Received %s, initiating graceful shutdown...", sig.name)
shutdown_event.set()
async def kill_subprocesses():
"""Kill any lingering Claude CLI subprocesses."""
for proc in list(active_subprocesses):
if proc.returncode is None:
logger.warning("Killing lingering subprocess PID %d", proc.pid)
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
active_subprocesses.clear()
async def cleanup_orphan_worktrees():
"""Remove any orphan worktrees from previous crashes."""
import glob
import shutil
# Use specific prefix to avoid colliding with other /tmp users (Ganymede)
orphans = glob.glob("/tmp/teleo-extract-*") + glob.glob("/tmp/teleo-merge-*")
for path in orphans:
logger.warning("Cleaning orphan worktree: %s", path)
try:
proc = await asyncio.create_subprocess_exec(
"git", "worktree", "remove", "--force", path,
cwd=str(config.REPO_DIR),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await asyncio.wait_for(proc.wait(), timeout=10)
except Exception:
shutil.rmtree(path, ignore_errors=True)
# Prune stale worktree metadata entries from bare repo (Ganymede)
try:
proc = await asyncio.create_subprocess_exec(
"git", "worktree", "prune",
cwd=str(config.REPO_DIR),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await asyncio.wait_for(proc.wait(), timeout=10)
except Exception:
logger.warning("git worktree prune failed, continuing")
# --- Main ---
async def main():
logmod.setup_logging()
logger.info("Teleo Pipeline v2 starting")
# Clean orphan worktrees from prior crashes (Ganymede's requirement)
await cleanup_orphan_worktrees()
# Initialize database
conn = db.get_connection()
db.migrate(conn)
logger.info("Database ready at %s", config.DB_PATH)
# Initialize circuit breakers
breakers = {
"ingest": CircuitBreaker("ingest", conn),
"validate": CircuitBreaker("validate", conn),
"evaluate": CircuitBreaker("evaluate", conn),
"merge": CircuitBreaker("merge", conn),
}
# Recover interrupted state from crashes
# Atomic recovery: all three resets in one transaction (Ganymede)
# Increment transient_retries on recovered sources to prevent infinite cycling (Vida)
with db.transaction(conn):
# Sources stuck in 'extracting' — increment retry counter, move to error if exhausted
c1 = conn.execute(
"""UPDATE sources SET
transient_retries = transient_retries + 1,
status = CASE
WHEN transient_retries + 1 >= ? THEN 'error'
ELSE 'unprocessed'
END,
last_error = CASE
WHEN transient_retries + 1 >= ? THEN 'crash recovery: retry budget exhausted'
ELSE last_error
END,
updated_at = datetime('now')
WHERE status = 'extracting'""",
(config.TRANSIENT_RETRY_MAX, config.TRANSIENT_RETRY_MAX),
)
# PRs stuck in 'merging' → approved (Ganymede's Q4 answer)
c2 = conn.execute(
"UPDATE prs SET status = 'approved' WHERE status = 'merging'"
)
# PRs stuck in 'reviewing' → open
c3 = conn.execute(
"UPDATE prs SET status = 'open' WHERE status = 'reviewing'"
)
recovered = c1.rowcount + c2.rowcount + c3.rowcount
if recovered:
logger.info("Recovered %d interrupted rows from prior crash", recovered)
# Register signal handlers
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, handle_signal, sig)
# Start health API
health_runners = []
await start_health_server(health_runners)
# Start stage loops
stages = [
asyncio.create_task(
stage_loop("ingest", config.INGEST_INTERVAL, ingest_cycle, conn, breakers["ingest"]),
name="ingest",
),
asyncio.create_task(
stage_loop("validate", config.VALIDATE_INTERVAL, validate_cycle, conn, breakers["validate"]),
name="validate",
),
asyncio.create_task(
stage_loop("evaluate", config.EVAL_INTERVAL, evaluate_cycle, conn, breakers["evaluate"]),
name="evaluate",
),
asyncio.create_task(
stage_loop("merge", config.MERGE_INTERVAL, merge_cycle, conn, breakers["merge"]),
name="merge",
),
]
logger.info("All stages running")
# Wait for shutdown signal
await shutdown_event.wait()
logger.info("Shutdown event received, waiting for stages to finish...")
# Give stages time to finish current work
try:
await asyncio.wait_for(asyncio.gather(*stages, return_exceptions=True), timeout=60)
except asyncio.TimeoutError:
logger.warning("Stages did not finish within 60s, force-cancelling")
for task in stages:
task.cancel()
await asyncio.gather(*stages, return_exceptions=True)
# Kill lingering subprocesses
await kill_subprocesses()
# Stop health API
await stop_health_server(health_runners)
# Close DB
conn.close()
logger.info("Teleo Pipeline v2 shut down cleanly")
if __name__ == "__main__":
asyncio.run(main())

36
teleo-pipeline.service Normal file
View file

@ -0,0 +1,36 @@
[Unit]
Description=Teleo Pipeline v2 — extraction/eval/merge daemon
After=network.target
Wants=network.target
[Service]
Type=simple
User=teleo
Group=teleo
WorkingDirectory=/opt/teleo-eval
ExecStart=/opt/teleo-eval/pipeline/.venv/bin/python3 /opt/teleo-eval/pipeline/teleo-pipeline.py
Restart=on-failure
RestartSec=30
# Graceful shutdown: SIGTERM → 60s drain → force-cancel → kill subprocesses
# 180s buffer handles in-flight extractions (up to 10 min each) (Ganymede)
KillSignal=SIGTERM
TimeoutStopSec=180
# Environment
Environment=PIPELINE_BASE=/opt/teleo-eval
EnvironmentFile=-/opt/teleo-eval/secrets/pipeline.env
# Logging goes to journal + pipeline.jsonl
StandardOutput=journal
StandardError=journal
# Security hardening
NoNewPrivileges=yes
ProtectSystem=strict
ReadWritePaths=/opt/teleo-eval /tmp
# PrivateTmp=no: daemon uses /tmp/teleo-extract-* worktrees shared with git (Ganymede)
PrivateTmp=no
[Install]
WantedBy=multi-user.target