From a7251d752972fc84df9b2482998e2ecf2e7a19d2 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Fri, 13 Mar 2026 14:24:27 +0000 Subject: [PATCH] =?UTF-8?q?ganymede:=20add=20dev=20infrastructure=20?= =?UTF-8?q?=E2=80=94=20pyproject.toml,=20CI,=20deploy=20script?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 of pipeline refactoring: - pyproject.toml: Python >=3.11, aiohttp dep, dev extras (pytest, pytest-asyncio, ruff). Ruff configured with sane defaults + ignore rules for existing code patterns (implicit Optional, timezone.utc). - .forgejo/workflows/ci.yml: Forgejo Actions CI — syntax check, ruff lint, ruff format, pytest on every PR and push to main. - deploy.sh: Pull + venv update + syntax check + optional restart. Replaces ad-hoc scp workflow. - tests/conftest.py: Shared fixture for in-memory SQLite with full schema. Ready for Phase 4 test suite. - .gitignore: Added venv, pytest cache, coverage, build artifacts. - Ruff auto-fixes: import sorting, unused imports removed across all modules. All files pass ruff check + ruff format. Pentagon-Agent: Ganymede --- .forgejo/workflows/ci.yml | 51 +++++++++++++++++++ .gitignore | 13 +++++ deploy.sh | 56 +++++++++++++++++++++ lib/breaker.py | 19 +++++-- lib/config.py | 28 +++++------ lib/db.py | 19 ++----- lib/evaluate.py | 103 ++++++++++++++++++++++---------------- lib/health.py | 53 +++++++++++--------- lib/merge.py | 72 +++++++++++++++----------- lib/validate.py | 74 ++++++++++++++++----------- pyproject.toml | 52 +++++++++++++++++++ teleo-pipeline.py | 31 +++++++----- tests/__init__.py | 0 tests/conftest.py | 20 ++++++++ 14 files changed, 424 insertions(+), 167 deletions(-) create mode 100644 .forgejo/workflows/ci.yml create mode 100755 deploy.sh create mode 100644 pyproject.toml create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py diff --git a/.forgejo/workflows/ci.yml b/.forgejo/workflows/ci.yml new file mode 100644 index 0000000..e0df0c3 --- /dev/null +++ b/.forgejo/workflows/ci.yml @@ -0,0 +1,51 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + lint-and-test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + + - name: Syntax check + run: | + python -c " + import ast, pathlib, sys + errors = [] + for f in pathlib.Path('.').rglob('*.py'): + if '.venv' in str(f) or '.forgejo' in str(f): + continue + try: + ast.parse(f.read_text()) + except SyntaxError as e: + errors.append(f'{f}: {e}') + if errors: + for e in errors: + print(f'SYNTAX ERROR: {e}', file=sys.stderr) + sys.exit(1) + print('All Python files pass syntax check') + " + + - name: Ruff lint + run: ruff check . + + - name: Ruff format check + run: ruff format --check . + + - name: Run tests + run: pytest -v --tb=short + continue-on-error: true # Tests don't exist yet — remove this line after Phase 4 diff --git a/.gitignore b/.gitignore index c1d7dfb..96cc2ae 100644 --- a/.gitignore +++ b/.gitignore @@ -15,5 +15,18 @@ secrets/ logs/ *.log +# Virtual environment +.venv/ + +# Test artifacts +.pytest_cache/ +htmlcov/ +.coverage + +# Build +*.egg-info/ +dist/ +build/ + # OS .DS_Store diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..db2a710 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# Deploy teleo-pipeline to VPS. +# Usage: ./deploy.sh [--restart] +# +# Pulls latest from current branch, updates venv, optionally restarts service. +# Run from the VPS as the teleo user, or via SSH: +# ssh teleo@77.42.65.182 'cd /opt/teleo-eval/pipeline && ./deploy.sh --restart' + +set -euo pipefail + +DEPLOY_DIR="/opt/teleo-eval/pipeline" +VENV_DIR="${DEPLOY_DIR}/.venv" +SERVICE="teleo-pipeline" + +cd "$DEPLOY_DIR" + +echo "=== Pulling latest ===" +git pull --ff-only + +echo "=== Updating venv ===" +"${VENV_DIR}/bin/pip" install -q -e ".[dev]" 2>/dev/null || \ + "${VENV_DIR}/bin/pip" install -q -e . + +echo "=== Syntax check ===" +"${VENV_DIR}/bin/python3" -c " +import ast, pathlib, sys +errors = [] +for f in pathlib.Path('.').rglob('*.py'): + if '.venv' in str(f): + continue + try: + ast.parse(f.read_text()) + except SyntaxError as e: + errors.append(f'{f}: {e}') +if errors: + for e in errors: + print(f'SYNTAX ERROR: {e}', file=sys.stderr) + sys.exit(1) +print('All Python files pass syntax check') +" + +if [[ "${1:-}" == "--restart" ]]; then + echo "=== Restarting ${SERVICE} ===" + sudo systemctl restart "$SERVICE" + sleep 2 + if systemctl is-active --quiet "$SERVICE"; then + echo "=== ${SERVICE} is running ===" + systemctl status "$SERVICE" --no-pager -l | head -15 + else + echo "ERROR: ${SERVICE} failed to start" >&2 + journalctl -u "$SERVICE" --no-pager -n 20 + exit 1 + fi +else + echo "=== Deploy complete (service not restarted — use --restart to restart) ===" +fi diff --git a/lib/breaker.py b/lib/breaker.py index 6a7b4ca..bd62ac5 100644 --- a/lib/breaker.py +++ b/lib/breaker.py @@ -37,10 +37,20 @@ class CircuitBreaker: "SELECT state, failures, successes, tripped_at, last_success_at FROM circuit_breakers WHERE name = ?", (self.name,), ).fetchone() - return dict(row) if row else {"state": CLOSED, "failures": 0, "successes": 0, "tripped_at": None, "last_success_at": None} + return ( + dict(row) + if row + else {"state": CLOSED, "failures": 0, "successes": 0, "tripped_at": None, "last_success_at": None} + ) - def _set_state(self, state: str, failures: int = None, successes: int = None, - tripped_at: str = None, last_success_at: str = None): + def _set_state( + self, + state: str, + failures: int = None, + successes: int = None, + tripped_at: str = None, + last_success_at: str = None, + ): updates = ["state = ?", "last_update = datetime('now')"] params = [state] if failures is not None: @@ -121,7 +131,8 @@ class CircuitBreaker: if new_failures >= config.BREAKER_THRESHOLD: logger.warning( "Breaker %s: threshold reached (%d failures), opening", - self.name, new_failures, + self.name, + new_failures, ) self._set_state( OPEN, diff --git a/lib/config.py b/lib/config.py index 6791d20..c24d65c 100644 --- a/lib/config.py +++ b/lib/config.py @@ -37,11 +37,11 @@ MODEL_GPT4O = "openai/gpt-4o" # 1. Domain review → Sonnet (catches domain issues, evidence gaps — high volume filter) # 2. Leo review → Opus (cross-domain synthesis, confidence calibration — only pre-filtered PRs) # 3. DEEP cross-family → GPT-4o (adversarial blind-spot check — paid, highest-value claims only) -EXTRACT_MODEL = MODEL_SONNET # extraction: structured output, volume work -TRIAGE_MODEL = MODEL_HAIKU # triage: routing decision, cheapest -EVAL_DOMAIN_MODEL = MODEL_SONNET # domain review: high-volume filter -EVAL_LEO_MODEL = MODEL_OPUS # Leo review: scarce, high-value -EVAL_DEEP_MODEL = MODEL_GPT4O # DEEP cross-family: paid, adversarial +EXTRACT_MODEL = MODEL_SONNET # extraction: structured output, volume work +TRIAGE_MODEL = MODEL_HAIKU # triage: routing decision, cheapest +EVAL_DOMAIN_MODEL = MODEL_SONNET # domain review: high-volume filter +EVAL_LEO_MODEL = MODEL_OPUS # Leo review: scarce, high-value +EVAL_DEEP_MODEL = MODEL_GPT4O # DEEP cross-family: paid, adversarial # --- Model backends --- # Each model can run on Claude Max (subscription, base load) or API (overflow/spikes). @@ -51,11 +51,11 @@ EVAL_DEEP_MODEL = MODEL_GPT4O # DEEP cross-family: paid, adversarial # "overflow" — fall back to API (for time-sensitive work) # "skip" — skip this cycle (for optional stages like sample audit) OVERFLOW_POLICY = { - "extract": "queue", # extraction can wait - "triage": "overflow", # triage is cheap on API anyway + "extract": "queue", # extraction can wait + "triage": "overflow", # triage is cheap on API anyway "eval_domain": "overflow", # domain review is the volume filter — don't let it bottleneck (Rhea) - "eval_leo": "queue", # Leo review is the bottleneck we protect - "eval_deep": "overflow", # DEEP is already on API + "eval_leo": "queue", # Leo review is the bottleneck we protect + "eval_deep": "overflow", # DEEP is already on API "sample_audit": "skip", # optional, skip if constrained } @@ -73,18 +73,18 @@ MAX_EVAL_WORKERS = int(os.environ.get("MAX_EVAL_WORKERS", "7")) MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain # --- Timeouts (seconds) --- -EXTRACT_TIMEOUT = 600 # 10 min -EVAL_TIMEOUT = 300 # 5 min -MERGE_TIMEOUT = 300 # 5 min — force-reset to conflict if exceeded (Rhea) +EXTRACT_TIMEOUT = 600 # 10 min +EVAL_TIMEOUT = 300 # 5 min +MERGE_TIMEOUT = 300 # 5 min — force-reset to conflict if exceeded (Rhea) CLAUDE_MAX_PROBE_TIMEOUT = 15 # --- Backpressure --- BACKPRESSURE_HIGH = 40 # pause extraction above this -BACKPRESSURE_LOW = 20 # throttle extraction above this +BACKPRESSURE_LOW = 20 # throttle extraction above this BACKPRESSURE_THROTTLE_WORKERS = 2 # workers when throttled # --- Retry budgets --- -TRANSIENT_RETRY_MAX = 5 # API timeouts, rate limits +TRANSIENT_RETRY_MAX = 5 # API timeouts, rate limits SUBSTANTIVE_RETRY_STANDARD = 2 # reviewer request_changes SUBSTANTIVE_RETRY_DEEP = 3 diff --git a/lib/db.py b/lib/db.py index 9552b47..9828a4c 100644 --- a/lib/db.py +++ b/lib/db.py @@ -1,10 +1,9 @@ """SQLite database — schema, migrations, connection management.""" -import sqlite3 import json import logging +import sqlite3 from contextlib import contextmanager -from pathlib import Path from . import config @@ -147,9 +146,7 @@ def migrate(conn: sqlite3.Connection): # Check current version try: - row = conn.execute( - "SELECT MAX(version) as v FROM schema_version" - ).fetchone() + row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone() current = row["v"] if row and row["v"] else 0 except sqlite3.OperationalError: current = 0 @@ -186,9 +183,7 @@ def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None): ) -def append_priority_log( - conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str -): +def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str): """Append a priority assessment to a source's priority_log. NOTE: This does NOT update the source's priority column. The priority column @@ -199,16 +194,12 @@ def append_priority_log( """ conn.execute("BEGIN") try: - row = conn.execute( - "SELECT priority_log FROM sources WHERE path = ?", (path,) - ).fetchone() + row = conn.execute("SELECT priority_log FROM sources WHERE path = ?", (path,)).fetchone() if not row: conn.execute("ROLLBACK") return log = json.loads(row["priority_log"] or "[]") - log.append( - {"stage": stage, "priority": priority, "reasoning": reasoning} - ) + log.append({"stage": stage, "priority": priority, "reasoning": reasoning}) conn.execute( "UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?", (json.dumps(log), path), diff --git a/lib/evaluate.py b/lib/evaluate.py index 033f59a..197befd 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -191,9 +191,11 @@ End your review with exactly one of: # ─── API helpers ─────────────────────────────────────────────────────────── + async def _forgejo_api(method: str, path: str, body: dict = None, token: str = None): """Call Forgejo API.""" import aiohttp + url = f"{config.FORGEJO_URL}/api/v1{path}" if token is None: token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" @@ -201,8 +203,9 @@ async def _forgejo_api(method: str, path: str, body: dict = None, token: str = N try: async with aiohttp.ClientSession() as session: - async with session.request(method, url, headers=headers, - json=body, timeout=aiohttp.ClientTimeout(total=60)) as resp: + async with session.request( + method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=60) + ) as resp: if resp.status >= 400: text = await resp.text() logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) @@ -218,6 +221,7 @@ async def _forgejo_api(method: str, path: str, body: dict = None, token: str = N async def _openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None: """Call OpenRouter API. Returns response text or None on failure.""" import aiohttp + key_file = config.SECRETS_DIR / "openrouter-key" if not key_file.exists(): logger.error("OpenRouter key file not found") @@ -253,7 +257,12 @@ async def _openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> s async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None: """Call Claude via CLI (Claude Max subscription). Returns response or None.""" proc = await asyncio.create_subprocess_exec( - str(config.CLAUDE_CLI), "-p", "--model", model, "--output-format", "text", + str(config.CLAUDE_CLI), + "-p", + "--model", + model, + "--output-format", + "text", cwd=cwd or str(config.REPO_DIR), stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, @@ -291,9 +300,11 @@ async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: # ─── Diff helpers ────────────────────────────────────────────────────────── + async def _get_pr_diff(pr_number: int) -> str: """Fetch PR diff via Forgejo API.""" import aiohttp + url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff" token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" @@ -321,9 +332,9 @@ def _filter_diff(diff: str) -> tuple[str, str]: Returns (review_diff, entity_diff). Strips: inbox/archive/, schemas/, skills/, agents/*/musings/ """ - sections = re.split(r'(?=^diff --git )', diff, flags=re.MULTILINE) - skip_patterns = [r'^diff --git a/(inbox/archive|schemas|skills|agents/[^/]+/musings)/'] - core_domains = {'living-agents', 'living-capital', 'teleohumanity', 'mechanisms'} + sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE) + skip_patterns = [r"^diff --git a/(inbox/archive|schemas|skills|agents/[^/]+/musings)/"] + core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"} claim_sections = [] entity_sections = [] @@ -333,21 +344,19 @@ def _filter_diff(diff: str) -> tuple[str, str]: continue if any(re.match(p, section) for p in skip_patterns): continue - entity_match = re.match(r'^diff --git a/entities/([^/]+)/', section) + entity_match = re.match(r"^diff --git a/entities/([^/]+)/", section) if entity_match and entity_match.group(1) not in core_domains: entity_sections.append(section) continue claim_sections.append(section) - return ''.join(claim_sections), ''.join(entity_sections) + return "".join(claim_sections), "".join(entity_sections) def _extract_changed_files(diff: str) -> str: """Extract changed file paths from diff.""" return "\n".join( - line.replace("diff --git a/", "").split(" b/")[0] - for line in diff.split("\n") - if line.startswith("diff --git") + line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git") ) @@ -360,20 +369,20 @@ def _detect_domain_from_diff(diff: str) -> str | None: for line in diff.split("\n"): if line.startswith("diff --git"): # Check domains/ and entities/ (both carry domain info) - match = re.search(r'(?:domains|entities)/([^/]+)/', line) + match = re.search(r"(?:domains|entities)/([^/]+)/", line) if match: d = match.group(1) domain_counts[d] = domain_counts.get(d, 0) + 1 continue # Check core/ subdirectories - match = re.search(r'core/([^/]+)/', line) + match = re.search(r"core/([^/]+)/", line) if match: d = match.group(1) if d in DOMAIN_AGENT_MAP: domain_counts[d] = domain_counts.get(d, 0) + 1 continue # Check foundations/ subdirectories - match = re.search(r'foundations/([^/]+)/', line) + match = re.search(r"foundations/([^/]+)/", line) if match: d = match.group(1) if d in DOMAIN_AGENT_MAP: @@ -398,6 +407,7 @@ def _is_musings_only(diff: str) -> bool: # ─── Verdict parsing ────────────────────────────────────────────────────── + def _parse_verdict(review_text: str, reviewer: str) -> str: """Parse VERDICT tag from review. Returns 'approve' or 'request_changes'.""" upper = reviewer.upper() @@ -412,7 +422,7 @@ def _parse_verdict(review_text: str, reviewer: str) -> str: def _parse_issues(review_text: str) -> list[str]: """Extract issue tags from review.""" - match = re.search(r'', review_text) + match = re.search(r"", review_text) if not match: return [] return [tag.strip() for tag in match.group(1).split(",") if tag.strip()] @@ -420,6 +430,7 @@ def _parse_issues(review_text: str) -> list[str]: # ─── Review execution ───────────────────────────────────────────────────── + async def _triage_pr(diff: str) -> str: """Triage PR via Haiku → DEEP/STANDARD/LIGHT.""" prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage @@ -441,8 +452,12 @@ async def _triage_pr(diff: str) -> str: async def _run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None: """Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o.""" prompt = DOMAIN_PROMPT.format( - agent=agent, agent_upper=agent.upper(), domain=domain, - style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files, + agent=agent, + agent_upper=agent.upper(), + domain=domain, + style_guide=REVIEW_STYLE_GUIDE, + diff=diff, + files=files, ) # Try Claude Max Sonnet first @@ -500,6 +515,7 @@ async def _post_formal_approvals(pr_number: int, pr_author: str): # ─── Single PR evaluation ───────────────────────────────────────────────── + async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: """Evaluate a single PR. Returns result dict.""" # Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11) @@ -532,7 +548,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"} # Filter diff - review_diff, entity_diff = _filter_diff(diff) + review_diff, _entity_diff = _filter_diff(diff) if not review_diff: review_diff = diff files = _extract_changed_files(diff) @@ -563,11 +579,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: ) # Check if domain review already completed (resuming after Leo rate limit) - existing = conn.execute( - "SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,) - ).fetchone() + existing = conn.execute("SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,)).fetchone() existing_domain_verdict = existing["domain_verdict"] if existing else "pending" - existing_leo_verdict = existing["leo_verdict"] if existing else "pending" + _existing_leo_verdict = existing["leo_verdict"] if existing else "pending" # Step 2: Domain review FIRST (Sonnet — high volume filter) # Skip if already completed from a previous attempt @@ -608,8 +622,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: WHERE number = ?""", (pr_number,), ) - db.audit(conn, "evaluate", "domain_rejected", - json.dumps({"pr": pr_number, "agent": agent})) + db.audit(conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent})) return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped"} # Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT) @@ -639,10 +652,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,)) # Step 4: Determine final verdict - both_approve = ( - (leo_verdict == "approve" or leo_verdict == "skipped") - and domain_verdict == "approve" - ) + both_approve = (leo_verdict == "approve" or leo_verdict == "skipped") and domain_verdict == "approve" if both_approve: # Get PR author for formal approvals @@ -659,11 +669,13 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: "UPDATE prs SET status = 'approved' WHERE number = ?", (pr_number,), ) - db.audit(conn, "evaluate", "approved", - json.dumps({"pr": pr_number, "tier": tier, "domain": domain, - "leo": leo_verdict, "domain_agent": agent})) - logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", - pr_number, tier, leo_verdict, domain_verdict) + db.audit( + conn, + "evaluate", + "approved", + json.dumps({"pr": pr_number, "tier": tier, "domain": domain, "leo": leo_verdict, "domain_agent": agent}), + ) + logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", pr_number, tier, leo_verdict, domain_verdict) else: conn.execute( "UPDATE prs SET status = 'open' WHERE number = ?", @@ -677,21 +689,27 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: "UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)", (json.dumps(feedback), pr_number), ) - db.audit(conn, "evaluate", "changes_requested", - json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, - "domain": domain_verdict})) - logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s)", - pr_number, leo_verdict, domain_verdict) + db.audit( + conn, + "evaluate", + "changes_requested", + json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict}), + ) + logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s)", pr_number, leo_verdict, domain_verdict) # Record cost (domain review) from . import costs + costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="max") if tier != "LIGHT": costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max") return { - "pr": pr_number, "tier": tier, "domain": domain, - "leo_verdict": leo_verdict, "domain_verdict": domain_verdict, + "pr": pr_number, + "tier": tier, + "domain": domain, + "leo_verdict": leo_verdict, + "domain_verdict": domain_verdict, "approved": both_approve, } @@ -706,6 +724,7 @@ _RATE_LIMIT_BACKOFF_MINUTES = 15 # ─── Main entry point ────────────────────────────────────────────────────── + async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: """Run one evaluation cycle. @@ -769,11 +788,11 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: # cycling through OTHER PRs that will also hit the same limit. if "rate_limited" in reason: from datetime import timedelta + _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( minutes=_RATE_LIMIT_BACKOFF_MINUTES ) - logger.info("Rate limited (%s) — backing off for %d minutes", - reason, _RATE_LIMIT_BACKOFF_MINUTES) + logger.info("Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES) break else: succeeded += 1 diff --git a/lib/health.py b/lib/health.py index 3ef09dd..e3d2b4b 100644 --- a/lib/health.py +++ b/lib/health.py @@ -1,10 +1,11 @@ """Health API — HTTP server on configurable port for monitoring.""" import logging -from aiohttp import web from datetime import date, datetime, timezone -from . import config, db, costs +from aiohttp import web + +from . import config, costs, db logger = logging.getLogger("pipeline.health") @@ -24,12 +25,8 @@ async def handle_health(request): ).fetchall() # Queue depths - sources_by_status = conn.execute( - "SELECT status, COUNT(*) as n FROM sources GROUP BY status" - ).fetchall() - prs_by_status = conn.execute( - "SELECT status, COUNT(*) as n FROM prs GROUP BY status" - ).fetchall() + sources_by_status = conn.execute("SELECT status, COUNT(*) as n FROM sources GROUP BY status").fetchall() + prs_by_status = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall() # Per-domain merge queue depth (Vida) merge_queue = conn.execute( @@ -76,14 +73,17 @@ async def handle_health(request): "merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue}, "budget": budget, "metabolic": { - "null_result_rate_24h": round(null_rate["rate"], 3) if null_rate and null_rate["rate"] is not None else None, - "domain_approval_rate_24h": round(approval_rate["domain_rate"], 3) if approval_rate and approval_rate["domain_rate"] is not None else None, - "leo_approval_rate_24h": round(approval_rate["leo_rate"], 3) if approval_rate and approval_rate["leo_rate"] is not None else None, + "null_result_rate_24h": round(null_rate["rate"], 3) + if null_rate and null_rate["rate"] is not None + else None, + "domain_approval_rate_24h": round(approval_rate["domain_rate"], 3) + if approval_rate and approval_rate["domain_rate"] is not None + else None, + "leo_approval_rate_24h": round(approval_rate["leo_rate"], 3) + if approval_rate and approval_rate["leo_rate"] is not None + else None, }, - "recent_activity": [ - {"stage": r["stage"], "event": r["event"], "count": r["n"]} - for r in recent - ], + "recent_activity": [{"stage": r["stage"], "event": r["event"], "count": r["n"]} for r in recent], } # Breaker state + stall detection (Vida: last_success_at heartbeat) @@ -96,8 +96,12 @@ async def handle_health(request): age_s = (datetime.now(timezone.utc) - last).total_seconds() breaker_info["last_success_age_s"] = round(age_s) # Stall detection: no success in 2x the stage's interval - intervals = {"ingest": config.INGEST_INTERVAL, "validate": config.VALIDATE_INTERVAL, - "evaluate": config.EVAL_INTERVAL, "merge": config.MERGE_INTERVAL} + intervals = { + "ingest": config.INGEST_INTERVAL, + "validate": config.VALIDATE_INTERVAL, + "evaluate": config.EVAL_INTERVAL, + "merge": config.MERGE_INTERVAL, + } threshold = intervals.get(r["name"], 60) * 2 if age_s > threshold: breaker_info["stalled"] = True @@ -180,6 +184,7 @@ async def handle_calibration(request): downgrades = [] for r in rows: import json + log = json.loads(r["priority_log"] or "[]") if len(log) < 2: continue @@ -191,12 +196,14 @@ async def handle_calibration(request): elif levels.get(last, 2) < levels.get(first, 2): downgrades.append({"path": r["path"], "from": first, "to": last}) - return web.json_response({ - "upgrades": upgrades[:20], - "downgrades_count": len(downgrades), - "upgrades_count": len(upgrades), - "note": "Focus on upgrades — downgrades are expected (downstream has more context)" - }) + return web.json_response( + { + "upgrades": upgrades[:20], + "downgrades_count": len(downgrades), + "upgrades_count": len(upgrades), + "note": "Focus on upgrades — downgrades are expected (downstream has more context)", + } + ) def create_app() -> web.Application: diff --git a/lib/merge.py b/lib/merge.py index e56451b..a134e05 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -14,7 +14,6 @@ import asyncio import json import logging from collections import defaultdict -from datetime import datetime, timezone from . import config, db @@ -29,10 +28,12 @@ MERGE_TIMEOUT_SECONDS = 300 # 5 minutes # --- Git helpers --- + async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]: """Run a git command async. Returns (returncode, stdout+stderr).""" proc = await asyncio.create_subprocess_exec( - "git", *args, + "git", + *args, cwd=cwd or str(config.REPO_DIR), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, @@ -52,6 +53,7 @@ async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]: async def _forgejo_api(method: str, path: str, body: dict = None) -> dict | list | None: """Call Forgejo API. Returns parsed JSON or None on error.""" import aiohttp + url = f"{config.FORGEJO_URL}/api/v1{path}" token_file = config.FORGEJO_TOKEN_FILE token = token_file.read_text().strip() if token_file.exists() else "" @@ -59,8 +61,9 @@ async def _forgejo_api(method: str, path: str, body: dict = None) -> dict | list try: async with aiohttp.ClientSession() as session: - async with session.request(method, url, headers=headers, - json=body, timeout=aiohttp.ClientTimeout(total=30)) as resp: + async with session.request( + method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=30) + ) as resp: if resp.status >= 400: text = await resp.text() logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) @@ -75,6 +78,7 @@ async def _forgejo_api(method: str, path: str, body: dict = None) -> dict | list # --- PR Discovery (Multiplayer v1) --- + async def discover_external_prs(conn) -> int: """Scan Forgejo for open PRs not tracked in SQLite. @@ -90,8 +94,7 @@ async def discover_external_prs(conn) -> int: while True: prs = await _forgejo_api( "GET", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls" - f"?state=open&limit=50&page={page}", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls?state=open&limit=50&page={page}", ) if not prs: break @@ -104,7 +107,9 @@ async def discover_external_prs(conn) -> int: is_pipeline = author.lower() in pipeline_users origin = "pipeline" if is_pipeline else "human" priority = "high" if origin == "human" else None - domain = _detect_domain_from_files(pr) if not is_pipeline else _detect_domain_from_branch(pr["head"]["ref"]) + domain = ( + _detect_domain_from_files(pr) if not is_pipeline else _detect_domain_from_branch(pr["head"]["ref"]) + ) conn.execute( """INSERT OR IGNORE INTO prs @@ -112,10 +117,19 @@ async def discover_external_prs(conn) -> int: VALUES (?, ?, 'open', ?, ?, ?)""", (pr["number"], pr["head"]["ref"], origin, priority, domain), ) - db.audit(conn, "merge", "pr_discovered", - json.dumps({"pr": pr["number"], "origin": origin, - "author": pr.get("user", {}).get("login"), - "priority": priority or "inherited"})) + db.audit( + conn, + "merge", + "pr_discovered", + json.dumps( + { + "pr": pr["number"], + "origin": origin, + "author": pr.get("user", {}).get("login"), + "priority": priority or "inherited", + } + ), + ) # Ack comment on human PRs so contributor feels acknowledged (Rhea) if origin == "human": @@ -180,6 +194,7 @@ async def _post_ack_comment(pr_number: int): # --- Merge operations --- + async def _claim_next_pr(conn, domain: str) -> dict | None: """Claim the next approved PR for a domain via atomic UPDATE. @@ -264,7 +279,8 @@ async def _rebase_and_push(branch: str) -> tuple[bool, str]: rc, out = await _git( "push", f"--force-with-lease={branch}:{expected_sha}", - "origin", f"HEAD:{branch}", + "origin", + f"HEAD:{branch}", cwd=worktree_path, timeout=30, ) @@ -306,6 +322,7 @@ async def _delete_remote_branch(branch: str): # --- Domain merge task --- + async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: """Process the merge queue for a single domain. Returns (succeeded, failed).""" succeeded = 0 @@ -328,14 +345,14 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: timeout=MERGE_TIMEOUT_SECONDS, ) except asyncio.TimeoutError: - logger.error("PR #%d merge timed out after %ds — resetting to conflict (Rhea)", - pr_num, MERGE_TIMEOUT_SECONDS) + logger.error( + "PR #%d merge timed out after %ds — resetting to conflict (Rhea)", pr_num, MERGE_TIMEOUT_SECONDS + ) conn.execute( "UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?", (f"merge timed out after {MERGE_TIMEOUT_SECONDS}s", pr_num), ) - db.audit(conn, "merge", "timeout", - json.dumps({"pr": pr_num, "timeout_seconds": MERGE_TIMEOUT_SECONDS})) + db.audit(conn, "merge", "timeout", json.dumps({"pr": pr_num, "timeout_seconds": MERGE_TIMEOUT_SECONDS})) failed += 1 continue @@ -345,8 +362,7 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: "UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?", (rebase_msg[:500], pr_num), ) - db.audit(conn, "merge", "rebase_failed", - json.dumps({"pr": pr_num, "error": rebase_msg[:200]})) + db.audit(conn, "merge", "rebase_failed", json.dumps({"pr": pr_num, "error": rebase_msg[:200]})) failed += 1 continue @@ -358,8 +374,7 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: "UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?", (merge_msg[:500], pr_num), ) - db.audit(conn, "merge", "api_merge_failed", - json.dumps({"pr": pr_num, "error": merge_msg[:200]})) + db.audit(conn, "merge", "api_merge_failed", json.dumps({"pr": pr_num, "error": merge_msg[:200]})) failed += 1 continue @@ -387,6 +402,7 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: # --- Main entry point --- + async def merge_cycle(conn, max_workers=None) -> tuple[int, int]: """Run one merge cycle across all domains. @@ -398,18 +414,13 @@ async def merge_cycle(conn, max_workers=None) -> tuple[int, int]: await discover_external_prs(conn) # Step 2: Find domains with approved work - rows = conn.execute( - "SELECT DISTINCT domain FROM prs WHERE status = 'approved' AND domain IS NOT NULL" - ).fetchall() + rows = conn.execute("SELECT DISTINCT domain FROM prs WHERE status = 'approved' AND domain IS NOT NULL").fetchall() domains = [r["domain"] for r in rows] # Also check for NULL-domain PRs (human PRs with undetected domain) - null_domain = conn.execute( - "SELECT COUNT(*) as c FROM prs WHERE status = 'approved' AND domain IS NULL" - ).fetchone() + null_domain = conn.execute("SELECT COUNT(*) as c FROM prs WHERE status = 'approved' AND domain IS NULL").fetchone() if null_domain and null_domain["c"] > 0: - logger.warning("%d approved PRs have NULL domain — skipping until eval assigns domain", - null_domain["c"]) + logger.warning("%d approved PRs have NULL domain — skipping until eval assigns domain", null_domain["c"]) if not domains: return 0, 0 @@ -430,7 +441,8 @@ async def merge_cycle(conn, max_workers=None) -> tuple[int, int]: total_failed += f if total_succeeded or total_failed: - logger.info("Merge cycle: %d succeeded, %d failed across %d domains", - total_succeeded, total_failed, len(domains)) + logger.info( + "Merge cycle: %d succeeded, %d failed across %d domains", total_succeeded, total_failed, len(domains) + ) return total_succeeded, total_failed diff --git a/lib/validate.py b/lib/validate.py index 10569f8..4a3bbec 100644 --- a/lib/validate.py +++ b/lib/validate.py @@ -21,12 +21,24 @@ logger = logging.getLogger("pipeline.validate") # ─── Constants ────────────────────────────────────────────────────────────── -VALID_DOMAINS = frozenset({ - "internet-finance", "entertainment", "health", "ai-alignment", - "space-development", "grand-strategy", "mechanisms", "living-capital", - "living-agents", "teleohumanity", "critical-systems", - "collective-intelligence", "teleological-economics", "cultural-dynamics", -}) +VALID_DOMAINS = frozenset( + { + "internet-finance", + "entertainment", + "health", + "ai-alignment", + "space-development", + "grand-strategy", + "mechanisms", + "living-capital", + "living-agents", + "teleohumanity", + "critical-systems", + "collective-intelligence", + "teleological-economics", + "cultural-dynamics", + } +) VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"}) VALID_TYPES = frozenset({"claim", "framework"}) @@ -71,6 +83,7 @@ _SCOPING_LANGUAGE = re.compile( # ─── YAML frontmatter parser ─────────────────────────────────────────────── + def parse_frontmatter(text: str) -> tuple[dict | None, str]: """Extract YAML frontmatter and body from markdown text.""" if not text.startswith("---"): @@ -79,10 +92,11 @@ def parse_frontmatter(text: str) -> tuple[dict | None, str]: if end == -1: return None, text raw = text[3:end] - body = text[end + 3:].strip() + body = text[end + 3 :].strip() try: import yaml + fm = yaml.safe_load(raw) if not isinstance(fm, dict): return None, body @@ -106,14 +120,14 @@ def parse_frontmatter(text: str) -> tuple[dict | None, str]: if val.lower() == "null" or val == "": val = None elif val.startswith("["): - val = [v.strip().strip('"').strip("'") - for v in val.strip("[]").split(",") if v.strip()] + val = [v.strip().strip('"').strip("'") for v in val.strip("[]").split(",") if v.strip()] fm[key] = val return fm if fm else None, body # ─── Validators ───────────────────────────────────────────────────────────── + def validate_schema(fm: dict) -> list[str]: """Check required fields and valid enums.""" violations = [] @@ -240,8 +254,7 @@ def validate_domain_directory_match(filepath: str, fm: dict) -> list[str]: if isinstance(secondary, str): secondary = [secondary] if dir_domain not in (secondary or []): - return [f"domain_directory_mismatch:file in domains/{dir_domain}/ " - f"but domain field says '{domain}'"] + return [f"domain_directory_mismatch:file in domains/{dir_domain}/ but domain field says '{domain}'"] break return [] @@ -279,6 +292,7 @@ def find_near_duplicates(title: str, existing_claims: set[str]) -> list[str]: # ─── Full Tier 0 validation ──────────────────────────────────────────────── + def tier0_validate_claim(filepath: str, content: str, existing_claims: set[str]) -> dict: """Run full Tier 0 validation. Returns {filepath, passes, violations, warnings}.""" violations = [] @@ -286,8 +300,7 @@ def tier0_validate_claim(filepath: str, content: str, existing_claims: set[str]) fm, body = parse_frontmatter(content) if fm is None: - return {"filepath": filepath, "passes": False, - "violations": ["no_frontmatter"], "warnings": []} + return {"filepath": filepath, "passes": False, "violations": ["no_frontmatter"], "warnings": []} violations.extend(validate_schema(fm)) violations.extend(validate_date(fm.get("created"))) @@ -305,12 +318,12 @@ def tier0_validate_claim(filepath: str, content: str, existing_claims: set[str]) warnings.extend(find_near_duplicates(title, existing_claims)) - return {"filepath": filepath, "passes": len(violations) == 0, - "violations": violations, "warnings": warnings} + return {"filepath": filepath, "passes": len(violations) == 0, "violations": violations, "warnings": warnings} # ─── Diff parsing ────────────────────────────────────────────────────────── + def extract_claim_files_from_diff(diff: str) -> dict[str, str]: """Parse unified diff to extract new/modified claim file contents.""" claim_dirs = ("domains/", "core/", "foundations/") @@ -332,9 +345,7 @@ def extract_claim_files_from_diff(diff: str) -> dict[str, str]: elif line.startswith("+++ b/") and not is_deletion: path = line[6:] basename = path.rsplit("/", 1)[-1] if "/" in path else path - if (any(path.startswith(d) for d in claim_dirs) - and path.endswith(".md") - and not basename.startswith("_")): + if any(path.startswith(d) for d in claim_dirs) and path.endswith(".md") and not basename.startswith("_"): current_file = path elif current_file and line.startswith("+") and not line.startswith("+++"): current_lines.append(line[1:]) @@ -347,17 +358,20 @@ def extract_claim_files_from_diff(diff: str) -> dict[str, str]: # ─── Forgejo API (using merge module's helper) ───────────────────────────── + async def _forgejo_api(method: str, path: str, body: dict = None): """Call Forgejo API. Reuses merge module pattern.""" import aiohttp + url = f"{config.FORGEJO_URL}/api/v1{path}" token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} try: async with aiohttp.ClientSession() as session: - async with session.request(method, url, headers=headers, - json=body, timeout=aiohttp.ClientTimeout(total=30)) as resp: + async with session.request( + method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=30) + ) as resp: if resp.status >= 400: text = await resp.text() logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) @@ -373,14 +387,14 @@ async def _forgejo_api(method: str, path: str, body: dict = None): async def _get_pr_diff(pr_number: int) -> str: """Fetch PR diff via Forgejo API.""" import aiohttp + url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff" token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" headers = {"Authorization": f"token {token}", "Accept": "text/plain"} try: async with aiohttp.ClientSession() as session: - async with session.get(url, headers=headers, - timeout=aiohttp.ClientTimeout(total=60)) as resp: + async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp: if resp.status >= 400: return "" diff = await resp.text() @@ -412,8 +426,7 @@ async def _has_tier0_comment(pr_number: int, head_sha: str) -> bool: while True: comments = await _forgejo_api( "GET", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments" - f"?limit=50&page={page}", + f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments?limit=50&page={page}", ) if not comments: break @@ -465,6 +478,7 @@ async def _post_validation_comment(pr_number: int, results: list[dict], head_sha # ─── Existing claims index ───────────────────────────────────────────────── + def load_existing_claims() -> set[str]: """Build set of known claim titles from the main worktree.""" claims: set[str] = set() @@ -480,6 +494,7 @@ def load_existing_claims() -> set[str]: # ─── Main entry point ────────────────────────────────────────────────────── + async def validate_pr(conn, pr_number: int) -> dict: """Run Tier 0 validation on a single PR. @@ -514,8 +529,7 @@ async def validate_pr(conn, pr_number: int) -> dict: result = tier0_validate_claim(filepath, content, existing_claims) results.append(result) status = "PASS" if result["passes"] else "FAIL" - logger.debug("PR #%d: %s %s v=%s w=%s", pr_number, status, filepath, - result["violations"], result["warnings"]) + logger.debug("PR #%d: %s %s v=%s w=%s", pr_number, status, filepath, result["violations"], result["warnings"]) all_pass = all(r["passes"] for r in results) total = len(results) @@ -531,8 +545,12 @@ async def validate_pr(conn, pr_number: int) -> dict: "UPDATE prs SET tier0_pass = ? WHERE number = ?", (1 if all_pass else 0, pr_number), ) - db.audit(conn, "validate", "tier0_complete", - json.dumps({"pr": pr_number, "pass": all_pass, "passing": passing, "total": total})) + db.audit( + conn, + "validate", + "tier0_complete", + json.dumps({"pr": pr_number, "pass": all_pass, "passing": passing, "total": total}), + ) return {"pr": pr_number, "all_pass": all_pass, "total": total, "passing": passing} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..dc964b6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,52 @@ +[project] +name = "teleo-pipeline" +version = "2.0.0" +description = "Teleo Pipeline v2 — async daemon for claim extraction, validation, evaluation, and merge" +requires-python = ">=3.11" +dependencies = [ + "aiohttp>=3.9,<4", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8", + "pytest-asyncio>=0.23", + "ruff>=0.3", +] + +[tool.ruff] +target-version = "py311" +line-length = 120 + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "F", # pyflakes (undefined names, unused imports) + "W", # pycodestyle warnings + "I", # isort + "UP", # pyupgrade + "B", # flake8-bugbear + "SIM", # flake8-simplify + "RUF", # ruff-specific rules +] +ignore = [ + "E501", # line length (handled by formatter) + "B008", # function call in default argument (common in aiohttp) + "RUF013", # implicit Optional — existing code uses = None pattern throughout + "UP017", # datetime.UTC alias — keep timezone.utc for consistency with existing code + "UP041", # PEP 604 union syntax — existing code mixes styles, fix in Phase 3 + "SIM105", # contextlib.suppress — try/except/pass is clearer in migration code + "SIM117", # merge with blocks — readability judgment call, not a bug +] + +[tool.ruff.format] +quote-style = "double" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +python_files = "test_*.py" +python_functions = "test_*" + +[tool.ruff.lint.isort] +known-first-party = ["lib"] diff --git a/teleo-pipeline.py b/teleo-pipeline.py index ad66b97..5c3da0e 100644 --- a/teleo-pipeline.py +++ b/teleo-pipeline.py @@ -9,18 +9,19 @@ import asyncio import logging import signal import sys -from datetime import datetime, timezone # Add parent dir to path so lib/ is importable from pathlib import Path + sys.path.insert(0, str(Path(__file__).parent)) -from lib import config, db, log as logmod -from lib.health import start_health_server, stop_health_server +from lib import config, db +from lib import log as logmod from lib.breaker import CircuitBreaker +from lib.evaluate import evaluate_cycle, kill_active_subprocesses +from lib.health import start_health_server, stop_health_server from lib.merge import merge_cycle from lib.validate import validate_cycle -from lib.evaluate import evaluate_cycle, kill_active_subprocesses logger = logging.getLogger("pipeline") @@ -58,6 +59,7 @@ async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreak # --- Stage stubs (Phase 1 — replaced in later phases) --- + async def ingest_cycle(conn, max_workers=None): """Stage 1: Scan inbox, extract claims. (stub)""" return 0, 0 @@ -74,6 +76,7 @@ async def ingest_cycle(conn, max_workers=None): # --- Shutdown --- + def handle_signal(sig): """Signal handler — sets shutdown event.""" logger.info("Received %s, initiating graceful shutdown...", sig.name) @@ -89,13 +92,18 @@ async def cleanup_orphan_worktrees(): """Remove any orphan worktrees from previous crashes.""" import glob import shutil + # Use specific prefix to avoid colliding with other /tmp users (Ganymede) orphans = glob.glob("/tmp/teleo-extract-*") + glob.glob("/tmp/teleo-merge-*") for path in orphans: logger.warning("Cleaning orphan worktree: %s", path) try: proc = await asyncio.create_subprocess_exec( - "git", "worktree", "remove", "--force", path, + "git", + "worktree", + "remove", + "--force", + path, cwd=str(config.REPO_DIR), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, @@ -106,7 +114,9 @@ async def cleanup_orphan_worktrees(): # Prune stale worktree metadata entries from bare repo (Ganymede) try: proc = await asyncio.create_subprocess_exec( - "git", "worktree", "prune", + "git", + "worktree", + "prune", cwd=str(config.REPO_DIR), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, @@ -118,6 +128,7 @@ async def cleanup_orphan_worktrees(): # --- Main --- + async def main(): logmod.setup_logging() logger.info("Teleo Pipeline v2 starting") @@ -159,13 +170,9 @@ async def main(): (config.TRANSIENT_RETRY_MAX, config.TRANSIENT_RETRY_MAX), ) # PRs stuck in 'merging' → approved (Ganymede's Q4 answer) - c2 = conn.execute( - "UPDATE prs SET status = 'approved' WHERE status = 'merging'" - ) + c2 = conn.execute("UPDATE prs SET status = 'approved' WHERE status = 'merging'") # PRs stuck in 'reviewing' → open - c3 = conn.execute( - "UPDATE prs SET status = 'open' WHERE status = 'reviewing'" - ) + c3 = conn.execute("UPDATE prs SET status = 'open' WHERE status = 'reviewing'") recovered = c1.rowcount + c2.rowcount + c3.rowcount if recovered: logger.info("Recovered %d interrupted rows from prior crash", recovered) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..fc1e5a7 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,20 @@ +"""Shared test fixtures for teleo-pipeline tests.""" + +import sqlite3 + +import pytest + +from lib import db + + +@pytest.fixture +def conn(): + """In-memory SQLite connection with full schema. Fresh per test.""" + connection = sqlite3.connect(":memory:") + connection.row_factory = sqlite3.Row + connection.execute("PRAGMA journal_mode=WAL") + connection.execute("PRAGMA busy_timeout=10000") + # Run migrations to create schema + db.migrate(connection) + yield connection + connection.close()