Compare commits
20 commits
ship/readm
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 73880e138d | |||
| 1bc541ac93 | |||
| 50b888a751 | |||
| 0eb26327fc | |||
| fc002354d4 | |||
| 5db6a0248c | |||
| 4b2b59b184 | |||
| ba234ec4b3 | |||
| e63d27d259 | |||
| 517e9884cc | |||
| 3f8666ee0c | |||
| 87f97eb4fa | |||
| ad1d82f5ee | |||
| 923454c9ea | |||
| ed4af4d72e | |||
| ed5f7ef6cc | |||
| 7741c1e6de | |||
| 992b4ee36f | |||
| de204db539 | |||
| 1eb259de8a |
7 changed files with 1045 additions and 89 deletions
|
|
@ -204,7 +204,41 @@ sync_github_to_forgejo_with_prs() {
|
|||
|
||||
local FORGEJO_TOKEN
|
||||
FORGEJO_TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-admin-token 2>/dev/null)
|
||||
|
||||
# Lazy schema for sync-mirror's auto-create tracker. Records (branch, sha)
|
||||
# pairs we've already auto-created PRs for, so the loop below can skip
|
||||
# redundant creates after pipeline merge → _delete_remote_branch →
|
||||
# GitHub-only re-discovery → re-push. Cheap CREATE IF NOT EXISTS on each
|
||||
# cycle; no migration needed because this table is private to sync-mirror.
|
||||
sqlite3 "$PIPELINE_DB" "CREATE TABLE IF NOT EXISTS sync_autocreate_tracker (branch TEXT NOT NULL, sha TEXT NOT NULL, pr_number INTEGER, created_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (branch, sha));" 2>/dev/null || true
|
||||
|
||||
for branch in $GITHUB_ONLY; do
|
||||
# Already-tracked gate: if we've previously auto-created a PR for
|
||||
# this exact (branch, sha), skip the entire push+create sequence.
|
||||
# Closes the empty-PR loop (research and reweave both observed):
|
||||
# pipeline merges PR → _delete_remote_branch on Forgejo → next sync
|
||||
# sees branch GitHub-only (origin still has it) → re-pushes to
|
||||
# Forgejo → HAS_PR misses (Forgejo ?head= broken; closed PRs scroll
|
||||
# past 50-item paginated window) → auto-creates fresh PR → pipeline
|
||||
# merges (empty no-op via cherry-pick / reweave union) → repeat.
|
||||
# Tracker keys on SHA, so legitimate new commits on the same branch
|
||||
# produce a new SHA → tracker miss → auto-create proceeds normally.
|
||||
local BRANCH_SHA TRACKED_PR
|
||||
if [[ "$branch" == gh-pr-* ]]; then
|
||||
BRANCH_SHA=$(git rev-parse "refs/heads/$branch" 2>/dev/null || true)
|
||||
else
|
||||
BRANCH_SHA=$(git rev-parse "refs/remotes/origin/$branch" 2>/dev/null || true)
|
||||
fi
|
||||
if [ -n "$BRANCH_SHA" ]; then
|
||||
# stderr → $LOG so sustained sqlite3 contention surfaces in ops logs
|
||||
# rather than silently falling through to a redundant auto-create.
|
||||
TRACKED_PR=$(sqlite3 "$PIPELINE_DB" "SELECT pr_number FROM sync_autocreate_tracker WHERE branch=$(printf "'%s'" "${branch//\'/\'\'}") AND sha=$(printf "'%s'" "$BRANCH_SHA") LIMIT 1;" 2>>"$LOG" || echo "")
|
||||
if [ -n "$TRACKED_PR" ]; then
|
||||
log "Skip auto-create: $branch SHA $BRANCH_SHA already tracked (PR #$TRACKED_PR)"
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
|
||||
log "New from GitHub: $branch -> Forgejo"
|
||||
# Fork PR branches live as local refs (from Step 2.1), not on origin remote
|
||||
if [[ "$branch" == gh-pr-* ]]; then
|
||||
|
|
@ -275,6 +309,18 @@ print('no')
|
|||
fi
|
||||
log "Auto-created PR #$PR_NUM on Forgejo for $branch"
|
||||
|
||||
# Record (branch, sha, pr_number) so the tracker gate above can short-
|
||||
# circuit the next time we see this exact (branch, sha) combination.
|
||||
# INSERT OR IGNORE: idempotent if a concurrent run already inserted.
|
||||
# WARN log on failure: silent INSERT failure under sustained sqlite3
|
||||
# contention would mask the loop reappearing on the next cycle (HAS_PR
|
||||
# only saves us while the closed PR is in the 50-item pagination window).
|
||||
if [ -n "$BRANCH_SHA" ] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
|
||||
if ! sqlite3 "$PIPELINE_DB" "INSERT OR IGNORE INTO sync_autocreate_tracker (branch, sha, pr_number) VALUES ($(printf "'%s'" "${branch//\'/\'\'}"), $(printf "'%s'" "$BRANCH_SHA"), $PR_NUM);" 2>>"$LOG"; then
|
||||
log "WARN: tracker insert failed for $branch SHA $BRANCH_SHA (PR #$PR_NUM) — duplicate auto-create possible next cycle"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB
|
||||
if [[ "$branch" == gh-pr-* ]]; then
|
||||
GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
|
||||
|
|
@ -367,6 +413,34 @@ print(json.dumps({'chat_id': sys.argv[4], 'text': msg, 'parse_mode': 'HTML'}))
|
|||
REPO_TAG="main"
|
||||
log "Starting sync cycle"
|
||||
|
||||
# Step 0: self-heal any gh-pr-* PR rows missing github_pr.
|
||||
# Runs FIRST — before per-repo work (branch-mirror loop, auto-create-PR block).
|
||||
# Recovers from races/transient failures in Step 4.5's one-shot link UPDATE.
|
||||
# Idempotent: SELECT empty when clean, zero-cost path. Same SELECT/UPDATE
|
||||
# heals historical orphans (PR 4066 picked up on first cron tick post-deploy)
|
||||
# and future races on subsequent ticks. The branch name encodes the GitHub PR
|
||||
# number deterministically (gh-pr-{N}/...) so no API call is required.
|
||||
if [ -f "$PIPELINE_DB" ]; then
|
||||
sqlite3 -separator '|' "$PIPELINE_DB" \
|
||||
"SELECT number, branch FROM prs WHERE branch LIKE 'gh-pr-%' AND github_pr IS NULL;" \
|
||||
2>/dev/null | while IFS='|' read -r pr_num branch; do
|
||||
# Regex requires >=1 digit — empty/non-numeric branches fail to parse here,
|
||||
# not just at the empty-guard below. Keeps SQL-integer-safety load-bearing
|
||||
# on the regex alone. [0-9][0-9]* is the portable BRE form of [0-9]+,
|
||||
# works on both GNU sed (VPS) and BSD sed (dev macs).
|
||||
gh_pr_num=$(echo "$branch" | sed -n 's|^gh-pr-\([0-9][0-9]*\)/.*|\1|p')
|
||||
[ -z "$gh_pr_num" ] && continue
|
||||
# Both interpolated values are integer-validated upstream (pr_num from
|
||||
# INTEGER `number` column, gh_pr_num from regex above). No parametric
|
||||
# binding available in bash sqlite3 — safety relies on those invariants.
|
||||
if sqlite3 "$PIPELINE_DB" \
|
||||
"UPDATE prs SET github_pr = $gh_pr_num, source_channel = 'github' WHERE number = $pr_num;" \
|
||||
2>/dev/null; then
|
||||
log "self-heal: linked Forgejo PR #$pr_num -> GitHub PR #$gh_pr_num"
|
||||
fi
|
||||
done
|
||||
fi
|
||||
|
||||
for entry in "${MIRROR_REPOS[@]}"; do
|
||||
# Read the 4 fields. `read` splits on $IFS (whitespace) by default.
|
||||
read -r forgejo_repo github_repo bare_path mode <<< "$entry"
|
||||
|
|
|
|||
|
|
@ -1,29 +1,343 @@
|
|||
"""Claims API endpoint — serves claim data from the codex filesystem."""
|
||||
import os
|
||||
"""Claims API — list endpoint + canonical claim detail page.
|
||||
|
||||
Owner: Argus
|
||||
Routes:
|
||||
GET /api/claims — list/filter (frontmatter scan, lightweight)
|
||||
GET /api/claims/{slug} — full claim detail (Ship contract)
|
||||
GET /api/domains — domain rollups for sidebar
|
||||
|
||||
The detail endpoint is the canonical /claims/{slug} backend per Ship's
|
||||
2026-04-29 brief. One round-trip, no N+1 cascade. Wikilinks resolved
|
||||
server-side via title→slug index built from a tree walk.
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
import sqlite3
|
||||
import time
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
from aiohttp import web
|
||||
|
||||
CODEX_ROOT = Path("/opt/teleo-eval/workspaces/main/domains")
|
||||
_cache = {"data": None, "ts": 0}
|
||||
CACHE_TTL = 300 # 5 minutes
|
||||
# Codex tree roots — claims live in three places (Sourcer Apr 26 fix scope)
|
||||
CODEX_BASE = Path("/opt/teleo-eval/workspaces/main")
|
||||
CLAIM_TREES = [CODEX_BASE / "domains", CODEX_BASE / "foundations", CODEX_BASE / "core"]
|
||||
|
||||
def _parse_frontmatter(filepath):
|
||||
# pipeline.db for joins (review_records, prs, sources)
|
||||
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
|
||||
|
||||
# In-process caches
|
||||
_list_cache = {"data": None, "ts": 0}
|
||||
_LIST_CACHE_TTL = 300 # 5 min — list view tolerates staleness
|
||||
|
||||
_index_cache = {"by_title": None, "by_stem": None, "ts": 0}
|
||||
_INDEX_CACHE_TTL = 60 # 1 min — title→slug index for wikilink resolution
|
||||
|
||||
CORS_HEADERS = {"Access-Control-Allow-Origin": "*"}
|
||||
|
||||
# Wikilink pattern. [[text]] or [[text|alias]] — we keep the link text only.
|
||||
_WIKILINK_RE = re.compile(r"\[\[([^\]|#]+?)(?:[#|][^\]]*)?\]\]")
|
||||
|
||||
|
||||
# ─── Normalization ─────────────────────────────────────────────────────────
|
||||
|
||||
def _normalize_for_match(s):
|
||||
"""Collapse a title or slug to a comparable form.
|
||||
|
||||
Rules (from Ship's brief — match the link-fixer canonicalization):
|
||||
- lowercase
|
||||
- hyphen ↔ space tolerant (both → single space)
|
||||
- collapse runs of whitespace
|
||||
- strip leading/trailing whitespace
|
||||
- drop trailing punctuation that gets stripped from filenames
|
||||
(`.`, `?`, `!`, `:`, `--`)
|
||||
NOTE: lib/attribution.py exposes only normalize_handle today, not the
|
||||
title normalizer Ship referenced. Implementing inline; if a canonical
|
||||
helper lands later we point at it.
|
||||
"""
|
||||
if not s:
|
||||
return ""
|
||||
s = str(s).lower().strip()
|
||||
# Treat hyphens as spaces, then collapse whitespace runs
|
||||
s = s.replace("-", " ").replace("_", " ")
|
||||
s = re.sub(r"\s+", " ", s)
|
||||
# Strip ASCII punctuation that filenames drop
|
||||
s = re.sub(r"[^\w\s]", "", s)
|
||||
return s.strip()
|
||||
|
||||
|
||||
# ─── Frontmatter parse ─────────────────────────────────────────────────────
|
||||
|
||||
def _split_frontmatter(text):
|
||||
"""Return (frontmatter_dict, body_str) or (None, None) if not a claim file."""
|
||||
if not text.startswith("---"):
|
||||
return None, None
|
||||
try:
|
||||
end = text.index("\n---", 3)
|
||||
except ValueError:
|
||||
return None, None
|
||||
try:
|
||||
fm = yaml.safe_load(text[3:end])
|
||||
except Exception:
|
||||
return None, None
|
||||
if not isinstance(fm, dict):
|
||||
return None, None
|
||||
body = text[end + 4:].lstrip()
|
||||
return fm, body
|
||||
|
||||
|
||||
def _read_claim_file(filepath):
|
||||
"""Read a claim file from disk. Returns (frontmatter, body) or (None, None)."""
|
||||
try:
|
||||
text = filepath.read_text(encoding="utf-8")
|
||||
if not text.startswith("---"):
|
||||
return None
|
||||
end = text.index("---", 3)
|
||||
fm = yaml.safe_load(text[3:end])
|
||||
except (OSError, UnicodeDecodeError):
|
||||
return None, None
|
||||
return _split_frontmatter(text)
|
||||
|
||||
|
||||
# ─── Tree walk + indexing ──────────────────────────────────────────────────
|
||||
|
||||
def _walk_claim_files():
|
||||
"""Yield Path objects for every .md claim file in domains/, foundations/, core/."""
|
||||
for root in CLAIM_TREES:
|
||||
if not root.exists():
|
||||
continue
|
||||
for f in root.rglob("*.md"):
|
||||
if f.name == "_map.md":
|
||||
continue
|
||||
yield f
|
||||
|
||||
|
||||
def _build_indexes():
|
||||
"""Build (title→stem, stem→relpath) indexes for wikilink resolution.
|
||||
|
||||
Cached for _INDEX_CACHE_TTL. Pulls from claim-index endpoint when
|
||||
possible (already cached upstream) and falls back to filesystem walk.
|
||||
"""
|
||||
now = time.time()
|
||||
if _index_cache["by_title"] is not None and now - _index_cache["ts"] < _INDEX_CACHE_TTL:
|
||||
return _index_cache["by_title"], _index_cache["by_stem"]
|
||||
|
||||
by_title = {}
|
||||
by_stem = {}
|
||||
for f in _walk_claim_files():
|
||||
stem = f.stem
|
||||
rel = str(f.relative_to(CODEX_BASE))
|
||||
by_stem[stem] = rel
|
||||
# Index by stem-as-normalized too (covers wikilinks that use the slug)
|
||||
by_title[_normalize_for_match(stem)] = stem
|
||||
# Also try parsing the title from frontmatter for higher-fidelity matches
|
||||
fm, _ = _read_claim_file(f)
|
||||
if fm:
|
||||
title = fm.get("title")
|
||||
if title:
|
||||
key = _normalize_for_match(title)
|
||||
if key and key not in by_title:
|
||||
by_title[key] = stem
|
||||
|
||||
_index_cache["by_title"] = by_title
|
||||
_index_cache["by_stem"] = by_stem
|
||||
_index_cache["ts"] = now
|
||||
return by_title, by_stem
|
||||
|
||||
|
||||
def _resolve_wikilinks(body, by_title):
|
||||
"""Extract [[link]] occurrences from body, return {link_text: slug_or_null}."""
|
||||
out = {}
|
||||
for match in _WIKILINK_RE.finditer(body or ""):
|
||||
link_text = match.group(1).strip()
|
||||
if not link_text or link_text in out:
|
||||
continue
|
||||
norm = _normalize_for_match(link_text)
|
||||
out[link_text] = by_title.get(norm)
|
||||
return out
|
||||
|
||||
|
||||
# ─── Edge extraction from frontmatter ──────────────────────────────────────
|
||||
|
||||
_EDGE_FIELDS = {
|
||||
"supports": "supports",
|
||||
"challenges": "challenges",
|
||||
"challenged_by": "challenges", # canonical: store as challenges direction
|
||||
"related": "related",
|
||||
"related_claims": "related",
|
||||
"depends_on": "depends_on",
|
||||
}
|
||||
|
||||
|
||||
def _extract_edges(fm, by_title, by_stem):
|
||||
"""Return edges dict shaped per Ship's contract.
|
||||
|
||||
Each edge is {slug, title, exists}. Slug resolved through title index.
|
||||
"""
|
||||
edges = {"supports": [], "challenges": [], "related": [], "depends_on": []}
|
||||
|
||||
for fm_key, edge_kind in _EDGE_FIELDS.items():
|
||||
raw = fm.get(fm_key)
|
||||
if not raw:
|
||||
continue
|
||||
items = raw if isinstance(raw, list) else [raw]
|
||||
for item in items:
|
||||
if not isinstance(item, str):
|
||||
continue
|
||||
text = item.strip()
|
||||
# Strip wikilink wrapping if present
|
||||
text = re.sub(r"^\[\[|\]\]$", "", text)
|
||||
# Strip pipe annotations: "[[link|alias]]" style or "claim | edge_type | date"
|
||||
text = text.split("|")[0].strip()
|
||||
if not text:
|
||||
continue
|
||||
# Try title match first, fall back to stem match
|
||||
slug = by_title.get(_normalize_for_match(text))
|
||||
if not slug and text in by_stem:
|
||||
slug = text
|
||||
edges[edge_kind].append({
|
||||
"slug": slug,
|
||||
"title": text,
|
||||
"exists": slug is not None,
|
||||
})
|
||||
|
||||
return edges
|
||||
|
||||
|
||||
# ─── Source provenance ─────────────────────────────────────────────────────
|
||||
|
||||
def _resolve_sourced_from(conn, claim_filepath, fm, title, stem):
|
||||
"""Build sourced_from list for the claim.
|
||||
|
||||
Strategy: find PRs that produced this claim (via prs.description LIKE
|
||||
or branch slug match), look at prs.source_path → inbox archive file →
|
||||
parse that source's frontmatter for title/url. Falls back to the raw
|
||||
`source` string from the claim's own frontmatter.
|
||||
|
||||
Both `title` and `stem` must be non-empty — caller (handler) already
|
||||
falls back stem→title; passing empty values would leak `LIKE '%%'`
|
||||
and match unrelated PRs.
|
||||
"""
|
||||
out = []
|
||||
seen_paths = set()
|
||||
pr_rows = []
|
||||
if (title or "").strip() and (stem or "").strip():
|
||||
try:
|
||||
pr_rows = conn.execute(
|
||||
"""SELECT DISTINCT source_path
|
||||
FROM prs
|
||||
WHERE source_path IS NOT NULL AND source_path != ''
|
||||
AND (description LIKE ? OR branch LIKE ?)
|
||||
LIMIT 10""",
|
||||
(f"%{title}%", f"%{stem}%"),
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
pr_rows = []
|
||||
|
||||
for row in pr_rows:
|
||||
path = row["source_path"]
|
||||
if not path or path in seen_paths:
|
||||
continue
|
||||
seen_paths.add(path)
|
||||
out.append(_resolve_source_file(path))
|
||||
|
||||
# 2. Fallback: parse raw source frontmatter field if no PR match
|
||||
if not out:
|
||||
raw = fm.get("source")
|
||||
if isinstance(raw, str) and raw.strip():
|
||||
out.append({"path": None, "title": raw.strip()[:200], "url": None})
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _resolve_source_file(rel_path):
|
||||
"""Given inbox/archive/... path, parse frontmatter for title+url. Best-effort."""
|
||||
full = CODEX_BASE / rel_path
|
||||
entry = {"path": rel_path, "title": None, "url": None}
|
||||
if full.exists():
|
||||
fm, _ = _read_claim_file(full)
|
||||
if fm:
|
||||
entry["title"] = fm.get("title") or fm.get("source") or rel_path
|
||||
entry["url"] = fm.get("url")
|
||||
if not entry["title"]:
|
||||
# Last resort: derive from filename
|
||||
entry["title"] = Path(rel_path).stem.replace("-", " ")
|
||||
return entry
|
||||
|
||||
|
||||
# ─── Reviews + PRs ─────────────────────────────────────────────────────────
|
||||
|
||||
def _load_pr_history(conn, title, stem):
|
||||
"""Find PRs that touched this claim and their reviews.
|
||||
|
||||
Both title and stem must be non-empty strings — empty leaks `LIKE '%%'`
|
||||
which matches every PR. Handler already populates a fallback so this
|
||||
is a defense-in-depth guard.
|
||||
"""
|
||||
if not (title or "").strip() or not (stem or "").strip():
|
||||
return [], []
|
||||
|
||||
try:
|
||||
pr_rows = conn.execute(
|
||||
"""SELECT number, merged_at, commit_type, agent, branch, status
|
||||
FROM prs
|
||||
WHERE merged_at IS NOT NULL
|
||||
AND (description LIKE ? OR branch LIKE ?)
|
||||
ORDER BY merged_at ASC
|
||||
LIMIT 50""",
|
||||
(f"%{title}%", f"%{stem}%"),
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
return [], []
|
||||
|
||||
prs = [
|
||||
{
|
||||
"number": r["number"],
|
||||
"merged_at": r["merged_at"],
|
||||
"kind": r["commit_type"] or "unknown",
|
||||
"agent": r["agent"],
|
||||
"branch": r["branch"],
|
||||
}
|
||||
for r in pr_rows
|
||||
]
|
||||
|
||||
pr_numbers = [p["number"] for p in prs]
|
||||
if not pr_numbers:
|
||||
return prs, []
|
||||
|
||||
placeholders = ",".join("?" * len(pr_numbers))
|
||||
try:
|
||||
review_rows = conn.execute(
|
||||
f"""SELECT pr_number, reviewer, reviewer_model, outcome,
|
||||
rejection_reason, notes, reviewed_at
|
||||
FROM review_records
|
||||
WHERE pr_number IN ({placeholders})
|
||||
ORDER BY reviewed_at ASC""",
|
||||
pr_numbers,
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
review_rows = []
|
||||
|
||||
reviews = [
|
||||
{
|
||||
"pr_number": r["pr_number"],
|
||||
"reviewer": r["reviewer"],
|
||||
"model": r["reviewer_model"],
|
||||
"outcome": r["outcome"],
|
||||
"rejection_reason": r["rejection_reason"],
|
||||
"notes": r["notes"],
|
||||
"reviewed_at": r["reviewed_at"],
|
||||
}
|
||||
for r in review_rows
|
||||
]
|
||||
return prs, reviews
|
||||
|
||||
|
||||
# ─── List view (preserved) ─────────────────────────────────────────────────
|
||||
|
||||
def _parse_list_entry(filepath):
|
||||
fm, body = _read_claim_file(filepath)
|
||||
if not fm or fm.get("type") != "claim":
|
||||
return None
|
||||
body = text[end+3:].strip()
|
||||
# Count wiki-links
|
||||
links = re.findall(r"\[\[([^\]]+)\]\]", body)
|
||||
# Extract first paragraph as summary
|
||||
paragraphs = [p.strip() for p in body.split("\n\n") if p.strip() and not p.strip().startswith("#")]
|
||||
links = _WIKILINK_RE.findall(body or "")
|
||||
paragraphs = [p.strip() for p in (body or "").split("\n\n")
|
||||
if p.strip() and not p.strip().startswith("#")]
|
||||
summary = paragraphs[0][:300] if paragraphs else ""
|
||||
return {
|
||||
"slug": filepath.stem,
|
||||
|
|
@ -40,40 +354,32 @@ def _parse_frontmatter(filepath):
|
|||
"challenged_by": fm.get("challenged_by"),
|
||||
"related_claims": fm.get("related_claims", []),
|
||||
}
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _load_all_claims():
|
||||
def _load_all_claims_list():
|
||||
now = time.time()
|
||||
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
|
||||
return _cache["data"]
|
||||
|
||||
if _list_cache["data"] and now - _list_cache["ts"] < _LIST_CACHE_TTL:
|
||||
return _list_cache["data"]
|
||||
claims = []
|
||||
for domain_dir in sorted(CODEX_ROOT.iterdir()):
|
||||
if not domain_dir.is_dir():
|
||||
continue
|
||||
for f in sorted(domain_dir.glob("*.md")):
|
||||
if f.name == "_map.md":
|
||||
continue
|
||||
c = _parse_frontmatter(f)
|
||||
if c:
|
||||
claims.append(c)
|
||||
|
||||
_cache["data"] = claims
|
||||
_cache["ts"] = now
|
||||
for f in _walk_claim_files():
|
||||
entry = _parse_list_entry(f)
|
||||
if entry:
|
||||
claims.append(entry)
|
||||
_list_cache["data"] = claims
|
||||
_list_cache["ts"] = now
|
||||
return claims
|
||||
|
||||
|
||||
async def handle_claims(request):
|
||||
claims = _load_all_claims()
|
||||
# ─── Handlers ──────────────────────────────────────────────────────────────
|
||||
|
||||
async def handle_claims(request):
|
||||
claims = _load_all_claims_list()
|
||||
|
||||
# Filters
|
||||
domain = request.query.get("domain")
|
||||
search = request.query.get("q", "").lower()
|
||||
confidence = request.query.get("confidence")
|
||||
agent = request.query.get("agent")
|
||||
sort = request.query.get("sort", "recent") # recent, alpha, domain
|
||||
sort = request.query.get("sort", "recent")
|
||||
|
||||
filtered = claims
|
||||
if domain:
|
||||
|
|
@ -83,9 +389,9 @@ async def handle_claims(request):
|
|||
if agent:
|
||||
filtered = [c for c in filtered if c["agent"] == agent]
|
||||
if search:
|
||||
filtered = [c for c in filtered if search in c["title"].lower() or search in c["summary"].lower()]
|
||||
filtered = [c for c in filtered
|
||||
if search in c["title"].lower() or search in c["summary"].lower()]
|
||||
|
||||
# Sort
|
||||
if sort == "recent":
|
||||
filtered.sort(key=lambda c: c["created"], reverse=True)
|
||||
elif sort == "alpha":
|
||||
|
|
@ -93,12 +399,10 @@ async def handle_claims(request):
|
|||
elif sort == "domain":
|
||||
filtered.sort(key=lambda c: (c["domain"], c["title"].lower()))
|
||||
|
||||
# Pagination
|
||||
limit = min(int(request.query.get("limit", "50")), 200)
|
||||
offset = int(request.query.get("offset", "0"))
|
||||
page = filtered[offset:offset + limit]
|
||||
|
||||
# Domain counts for sidebar
|
||||
domain_counts = {}
|
||||
for c in claims:
|
||||
domain_counts[c["domain"]] = domain_counts.get(c["domain"], 0) + 1
|
||||
|
|
@ -111,31 +415,109 @@ async def handle_claims(request):
|
|||
"domains": dict(sorted(domain_counts.items(), key=lambda x: -x[1])),
|
||||
"confidence_levels": sorted(set(c["confidence"] for c in claims)),
|
||||
"agents": sorted(set(c["agent"] for c in claims if c["agent"])),
|
||||
}, headers={"Access-Control-Allow-Origin": "*"})
|
||||
}, headers=CORS_HEADERS)
|
||||
|
||||
|
||||
async def handle_claim_detail(request):
|
||||
slug = request.match_info["slug"]
|
||||
claims = _load_all_claims()
|
||||
for c in claims:
|
||||
if c["slug"] == slug:
|
||||
# Read full body for detail view
|
||||
for domain_dir in CODEX_ROOT.iterdir():
|
||||
if not domain_dir.is_dir():
|
||||
continue
|
||||
f = domain_dir / f"{slug}.md"
|
||||
if f.exists():
|
||||
text = f.read_text(encoding="utf-8")
|
||||
end = text.index("---", 3)
|
||||
body = text[end+3:].strip()
|
||||
c["body"] = body
|
||||
"""GET /api/claims/{slug} — canonical claim detail page (Ship contract).
|
||||
|
||||
One round-trip, all data resolved server-side. Wikilinks pre-resolved.
|
||||
"""
|
||||
requested_slug = request.match_info["slug"]
|
||||
by_title, by_stem = _build_indexes()
|
||||
|
||||
# Resolution order: exact stem → title-normalized (handles description-derived
|
||||
# slugs from /api/activity-feed that are longer than on-disk file stems) →
|
||||
# stem-as-prefix (handles description-derived slugs that are shorter than the
|
||||
# file stem because the description was truncated upstream).
|
||||
slug = requested_slug
|
||||
rel_path = by_stem.get(slug)
|
||||
if not rel_path:
|
||||
# Title fallback: requested slug = slugified frontmatter title
|
||||
norm = _normalize_for_match(requested_slug)
|
||||
resolved_stem = by_title.get(norm)
|
||||
if resolved_stem:
|
||||
slug = resolved_stem
|
||||
rel_path = by_stem.get(resolved_stem)
|
||||
if not rel_path:
|
||||
# Prefix fallback: walk stems sharing a common prefix with the request,
|
||||
# pick longest match. Anchored at 32 chars to avoid spurious hits.
|
||||
norm_req = _normalize_for_match(requested_slug)
|
||||
best_stem = None
|
||||
best_len = 0
|
||||
for stem in by_stem:
|
||||
norm_stem = _normalize_for_match(stem)
|
||||
common = 0
|
||||
for a, b in zip(norm_req, norm_stem):
|
||||
if a != b:
|
||||
break
|
||||
return web.json_response(c, headers={"Access-Control-Allow-Origin": "*"})
|
||||
return web.json_response({"error": "claim not found"}, status=404)
|
||||
common += 1
|
||||
if common >= 32 and common > best_len:
|
||||
best_stem = stem
|
||||
best_len = common
|
||||
if best_stem:
|
||||
slug = best_stem
|
||||
rel_path = by_stem.get(best_stem)
|
||||
if not rel_path:
|
||||
return web.json_response({"error": "claim not found", "slug": requested_slug},
|
||||
status=404, headers=CORS_HEADERS)
|
||||
|
||||
filepath = CODEX_BASE / rel_path
|
||||
fm, body = _read_claim_file(filepath)
|
||||
if not fm:
|
||||
return web.json_response({"error": "frontmatter parse failed", "slug": slug},
|
||||
status=500, headers=CORS_HEADERS)
|
||||
|
||||
# Open read-only DB connection for this request
|
||||
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
title = fm.get("title") or slug.replace("-", " ")
|
||||
prs, reviews = _load_pr_history(conn, title, slug)
|
||||
sourced_from = _resolve_sourced_from(conn, filepath, fm, title, slug)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
last_review = None
|
||||
if reviews:
|
||||
latest = reviews[-1]
|
||||
last_review = {
|
||||
"outcome": latest["outcome"],
|
||||
"reviewer": latest["reviewer"],
|
||||
"date": (latest["reviewed_at"] or "")[:10],
|
||||
}
|
||||
|
||||
# secondary_domains: explicit list, or empty
|
||||
secondary = fm.get("secondary_domains") or fm.get("cross_domain_links") or []
|
||||
if isinstance(secondary, str):
|
||||
secondary = [secondary]
|
||||
|
||||
description = fm.get("description") or ""
|
||||
|
||||
edges = _extract_edges(fm, by_title, by_stem)
|
||||
wikilinks = _resolve_wikilinks(body, by_title)
|
||||
|
||||
response = {
|
||||
"slug": slug,
|
||||
"title": title,
|
||||
"domain": fm.get("domain", "unknown"),
|
||||
"secondary_domains": secondary,
|
||||
"confidence": fm.get("confidence", "unknown"),
|
||||
"description": description,
|
||||
"created": str(fm.get("created", "")),
|
||||
"last_review": last_review,
|
||||
"body": body or "",
|
||||
"sourced_from": sourced_from,
|
||||
"reviews": reviews,
|
||||
"prs": prs,
|
||||
"edges": edges,
|
||||
"wikilinks": wikilinks,
|
||||
}
|
||||
return web.json_response(response, headers=CORS_HEADERS)
|
||||
|
||||
|
||||
async def handle_domains(request):
|
||||
claims = _load_all_claims()
|
||||
claims = _load_all_claims_list()
|
||||
domains = {}
|
||||
for c in claims:
|
||||
d = c["domain"]
|
||||
|
|
@ -146,13 +528,11 @@ async def handle_domains(request):
|
|||
domains[d]["agents"].add(c["agent"])
|
||||
conf = c["confidence"]
|
||||
domains[d]["confidence_dist"][conf] = domains[d]["confidence_dist"].get(conf, 0) + 1
|
||||
|
||||
result = []
|
||||
for d in sorted(domains.values(), key=lambda x: -x["count"]):
|
||||
d["agents"] = sorted(d["agents"])
|
||||
result.append(d)
|
||||
|
||||
return web.json_response(result, headers={"Access-Control-Allow-Origin": "*"})
|
||||
return web.json_response(result, headers=CORS_HEADERS)
|
||||
|
||||
|
||||
def register_claims_routes(app):
|
||||
|
|
|
|||
|
|
@ -84,6 +84,14 @@ MAX_EXTRACT_WORKERS = int(os.environ.get("MAX_EXTRACT_WORKERS", "5"))
|
|||
MAX_EVAL_WORKERS = int(os.environ.get("MAX_EVAL_WORKERS", "7"))
|
||||
MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain
|
||||
|
||||
# --- External GitHub PR merge strategy ---
|
||||
# When True, gh-pr-N/* branches merge with --no-ff (preserves contributor SHA in
|
||||
# main's history → GitHub recognizes "merged" badge). When False, fall back to
|
||||
# cherry-pick (the default for all other branches). Default True; flip to False
|
||||
# as an emergency backout if the no-ff path destabilizes merge throughput.
|
||||
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
|
||||
EXTERNAL_PR_NO_FF_MERGE = True
|
||||
|
||||
# --- Timeouts (seconds) ---
|
||||
EXTRACT_TIMEOUT = 600 # 10 min
|
||||
EVAL_TIMEOUT = 120 # 2 min — routine Sonnet/Gemini Flash calls (was 600, caused 10-min stalls)
|
||||
|
|
@ -203,6 +211,14 @@ HEALTH_CHECK_INTERVAL = 60
|
|||
# --- Extraction gates ---
|
||||
EXTRACTION_COOLDOWN_HOURS = 4 # Skip sources with any PR activity in this window. Defense-in-depth for DB-status filter.
|
||||
|
||||
# --- Verdict-deadlock reaper ---
|
||||
# Defaults safe (dry-run, 24h age, hourly throttle). Operator flips REAPER_DRY_RUN
|
||||
# to "false" via systemctl edit teleo-pipeline → restart, no code change required.
|
||||
REAPER_DRY_RUN = os.environ.get("REAPER_DRY_RUN", "true").lower() == "true"
|
||||
REAPER_DEADLOCK_AGE_HOURS = int(os.environ.get("REAPER_DEADLOCK_AGE_HOURS", "24"))
|
||||
REAPER_INTERVAL_SECONDS = int(os.environ.get("REAPER_INTERVAL_SECONDS", "3600"))
|
||||
REAPER_MAX_PER_RUN = int(os.environ.get("REAPER_MAX_PER_RUN", "50"))
|
||||
|
||||
# --- Retrieval (Telegram bot) ---
|
||||
RETRIEVAL_RRF_K = 20 # RRF smoothing constant — tuned for 5-10 results per source
|
||||
RETRIEVAL_ENTITY_BOOST = 1.5 # RRF score multiplier for claims wiki-linked from matched entities
|
||||
|
|
|
|||
|
|
@ -923,6 +923,36 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
except Exception:
|
||||
logger.debug("Failed to read source %s", f, exc_info=True)
|
||||
|
||||
# Archive-basename filter: skip queue files whose basename already exists in
|
||||
# inbox/archive/. Research-session commits on agent branches occasionally
|
||||
# re-introduce already-archived queue files when the branch is re-merged,
|
||||
# producing same-source re-extractions every cooldown cycle. The archive
|
||||
# copy is the source of truth — if a file with this basename is in archive,
|
||||
# the source is processed regardless of queue state. Single archive scan
|
||||
# per cycle, cheap (~1k files).
|
||||
#
|
||||
# Assumes basename uniqueness across queue+archive — current naming
|
||||
# convention (date-prefix + topic-slug) makes collisions vanishingly
|
||||
# rare. If short generic names like "notes.md" enter the queue, this
|
||||
# filter silently false-positives.
|
||||
if unprocessed:
|
||||
archive_dir = main / "inbox" / "archive"
|
||||
archived_basenames: set[str] = set()
|
||||
if archive_dir.exists():
|
||||
for af in archive_dir.rglob("*.md"):
|
||||
if af.name.startswith("_"):
|
||||
continue
|
||||
archived_basenames.add(af.name)
|
||||
if archived_basenames:
|
||||
before = len(unprocessed)
|
||||
unprocessed = [
|
||||
(sp, c, f) for sp, c, f in unprocessed
|
||||
if Path(sp).name not in archived_basenames
|
||||
]
|
||||
skipped = before - len(unprocessed)
|
||||
if skipped:
|
||||
logger.info("Skipped %d queue source(s) — basename already in inbox/archive/", skipped)
|
||||
|
||||
# Don't early-return here — re-extraction sources may exist even when queue is empty
|
||||
# (the re-extraction check runs after open-PR filtering below)
|
||||
|
||||
|
|
|
|||
223
lib/merge.py
223
lib/merge.py
|
|
@ -429,6 +429,171 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
|
|||
await _git("branch", "-D", clean_branch)
|
||||
|
||||
|
||||
_GH_PR_BRANCH_RE = re.compile(r"^gh-pr-(\d+)/(.+)$")
|
||||
|
||||
|
||||
async def _merge_no_ff_external(branch: str) -> tuple[bool, str]:
|
||||
"""Merge an external GitHub fork PR with --no-ff so contributor SHA lands in main.
|
||||
|
||||
Why this differs from _cherry_pick_onto_main:
|
||||
- Cherry-pick rewrites the contributor's commit SHA → GitHub's "is PR head SHA
|
||||
an ancestor of main?" check returns false → "merged" badge never fires.
|
||||
- --no-ff preserves the contributor's commit SHA as a parent of the merge
|
||||
commit. After ff-push to main (the existing dispatch step), GitHub sees
|
||||
the SHA in ancestry and marks the PR merged.
|
||||
|
||||
Mechanics:
|
||||
1. Fetch origin/main + origin/{branch}
|
||||
2. Worktree on local branch _merged-{slug} from origin/main
|
||||
3. git merge --no-ff origin/{branch} with verbose message:
|
||||
"Merge external GitHub PR #{N}: {branch_slug}"
|
||||
4. Push merge commit to origin/_merged/{branch} (synthetic audit ref)
|
||||
5. ff-push merge_sha → origin/main directly (function owns the push, NOT
|
||||
dispatch — see sentinel return below)
|
||||
|
||||
The merge commit M has parents [main_sha, branch_sha]. M is a fast-forward
|
||||
descendant of main_sha (via first-parent chain), so the push to main
|
||||
works without --force.
|
||||
|
||||
Synthetic branch (Ship review Apr 28): we deliberately do NOT force-push
|
||||
the contributor's gh-pr-N/* branch. Force-pushing it would rewrite the
|
||||
branch tip with a merge commit the contributor didn't author, showing as
|
||||
a confusing bot force-push in Forgejo's PR UI. The synthetic _merged/*
|
||||
audit ref lets us track the merge commit without touching the contributor's
|
||||
branch. Mirrors the _clean/* synthetic branch pattern in cherry-pick.
|
||||
|
||||
Sentinel return: function pushes merge_sha → main itself (dispatch's ff-push
|
||||
can't, since origin/{branch} is unchanged and not a descendant of main).
|
||||
Returns a "merged --no-ff" sentinel string that dispatch detects to skip
|
||||
its ff-push step and route directly to PR-close + mark_merged + audit.
|
||||
The full 40-char merge SHA is in the return string for dispatch to extract.
|
||||
|
||||
Conflict handling: same auto-resolve pattern as cherry-pick — entity-only
|
||||
conflicts take main's version (--ours = current worktree HEAD = main),
|
||||
other conflicts abort and return False with detail.
|
||||
|
||||
Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
|
||||
"""
|
||||
m = _GH_PR_BRANCH_RE.match(branch)
|
||||
if not m:
|
||||
return False, f"branch {branch} doesn't match gh-pr-N/* format"
|
||||
gh_pr_num = m.group(1)
|
||||
branch_slug = m.group(2)
|
||||
|
||||
slug = branch.replace("/", "-")
|
||||
worktree_path = f"/tmp/teleo-merge-{slug}"
|
||||
local_branch = f"_merged-{slug}" # local working branch in worktree
|
||||
audit_ref = f"_merged/{branch}" # remote synthetic ref (preserves hierarchy)
|
||||
|
||||
# Fetch latest state — separate calls (long branch names break combined refspec)
|
||||
rc, out = await _git("fetch", "origin", "main", timeout=15)
|
||||
if rc != 0:
|
||||
return False, f"fetch main failed: {out}"
|
||||
rc, out = await _git("fetch", "origin", branch, timeout=15)
|
||||
if rc != 0:
|
||||
return False, f"fetch branch failed: {out}"
|
||||
|
||||
# Up-to-date check (mirrors cherry-pick path semantics)
|
||||
rc, merge_base = await _git("merge-base", "origin/main", f"origin/{branch}")
|
||||
rc2, main_sha = await _git("rev-parse", "origin/main")
|
||||
if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip():
|
||||
rc_diff, diff_out = await _git(
|
||||
"diff", "--stat", f"origin/main..origin/{branch}", timeout=10,
|
||||
)
|
||||
if rc_diff != 0 or not diff_out.strip():
|
||||
return True, "already up to date"
|
||||
logger.info("External PR branch %s is descendant of main but has new content — proceeding", branch)
|
||||
|
||||
async with _bare_repo_lock:
|
||||
# Clean up any stale local branch from a prior failed run
|
||||
await _git("branch", "-D", local_branch)
|
||||
rc, out = await _git("worktree", "add", "-b", local_branch, worktree_path, "origin/main")
|
||||
if rc != 0:
|
||||
return False, f"worktree add failed: {out}"
|
||||
|
||||
try:
|
||||
merge_msg = f"Merge external GitHub PR #{gh_pr_num}: {branch_slug}"
|
||||
rc, out = await _git(
|
||||
"merge", "--no-ff", f"origin/{branch}",
|
||||
"-m", merge_msg,
|
||||
cwd=worktree_path, timeout=60,
|
||||
)
|
||||
|
||||
if rc != 0:
|
||||
# Identify conflicts
|
||||
rc_ls, conflicting = await _git(
|
||||
"diff", "--name-only", "--diff-filter=U", cwd=worktree_path,
|
||||
)
|
||||
conflict_files = [
|
||||
f.strip() for f in conflicting.split("\n") if f.strip()
|
||||
] if rc_ls == 0 else []
|
||||
|
||||
if conflict_files and all(f.startswith("entities/") for f in conflict_files):
|
||||
# Entity-only conflicts: take main's version (entities are recoverable)
|
||||
# In merge: --ours = branch we're ON (worktree HEAD = main)
|
||||
# --theirs = branch merging in (origin/{branch})
|
||||
for cf in conflict_files:
|
||||
await _git("checkout", "--ours", cf, cwd=worktree_path)
|
||||
await _git("add", cf, cwd=worktree_path)
|
||||
# Complete the merge using the prepared MERGE_MSG (no editor)
|
||||
rc_cont, cont_out = await _git(
|
||||
"-c", "core.editor=true",
|
||||
"commit", "--no-edit",
|
||||
cwd=worktree_path, timeout=60,
|
||||
)
|
||||
if rc_cont != 0:
|
||||
await _git("merge", "--abort", cwd=worktree_path)
|
||||
return False, f"merge entity resolution failed for PR #{gh_pr_num}: {cont_out}"
|
||||
logger.info(
|
||||
"External PR #%s merge: entity conflict auto-resolved (dropped %s)",
|
||||
gh_pr_num, ", ".join(sorted(conflict_files)),
|
||||
)
|
||||
else:
|
||||
conflict_detail = ", ".join(conflict_files) if conflict_files else out[:200]
|
||||
await _git("merge", "--abort", cwd=worktree_path)
|
||||
return False, f"merge conflict on PR #{gh_pr_num}: {conflict_detail}"
|
||||
|
||||
# Capture the merge commit SHA before any pushes
|
||||
rc, merge_sha = await _git("rev-parse", "HEAD", cwd=worktree_path)
|
||||
if rc != 0:
|
||||
return False, f"rev-parse merge HEAD failed: {merge_sha}"
|
||||
merge_sha = merge_sha.strip().split("\n")[0]
|
||||
|
||||
# Push to synthetic audit ref _merged/{branch} (does not touch contributor's
|
||||
# gh-pr-N/* branch). Plain --force: the audit ref is bot-owned and per-PR;
|
||||
# if a prior aborted attempt left a stale ref, overwriting it is the
|
||||
# intended behavior, and there's no concurrent writer to lease against.
|
||||
rc, out = await _git(
|
||||
"push", "--force", "origin", f"HEAD:refs/heads/{audit_ref}",
|
||||
cwd=worktree_path, timeout=30,
|
||||
)
|
||||
if rc != 0:
|
||||
return False, f"push to audit ref {audit_ref} failed: {out}"
|
||||
|
||||
# ff-push the merge commit to main. This is a true fast-forward (M is a
|
||||
# descendant of origin/main via its first parent), so no --force needed.
|
||||
# Forgejo's branch protection allows ff-push to main from authorized users.
|
||||
rc, out = await _git(
|
||||
"push", "origin", f"{merge_sha}:main",
|
||||
cwd=worktree_path, timeout=30,
|
||||
)
|
||||
if rc != 0:
|
||||
# Roll back audit ref if main push failed — keeps state consistent.
|
||||
await _git("push", "--delete", "origin", f"refs/heads/{audit_ref}",
|
||||
cwd=worktree_path, timeout=15)
|
||||
return False, f"ff-push to main failed: {out}"
|
||||
|
||||
# Sentinel return: "merged --no-ff" prefix triggers dispatch's external-PR
|
||||
# close path (skips ff-push, does PR-close + mark_merged + audit).
|
||||
# Full 40-char merge SHA in the message so dispatch can parse it for audit.
|
||||
return True, f"merged --no-ff (external PR #{gh_pr_num}, M={merge_sha}, audit_ref={audit_ref})"
|
||||
|
||||
finally:
|
||||
async with _bare_repo_lock:
|
||||
await _git("worktree", "remove", "--force", worktree_path)
|
||||
await _git("branch", "-D", local_branch)
|
||||
|
||||
|
||||
from .frontmatter import (
|
||||
REWEAVE_EDGE_FIELDS,
|
||||
parse_yaml_frontmatter,
|
||||
|
|
@ -733,6 +898,12 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
|
|||
# (Ganymede: manifest approach, Theseus: superset assertion + order-preserving dedup)
|
||||
if branch.startswith("reweave/"):
|
||||
merge_fn = _merge_reweave_pr(branch)
|
||||
elif branch.startswith("gh-pr-") and config.EXTERNAL_PR_NO_FF_MERGE:
|
||||
# External GitHub fork PRs: --no-ff merge so contributor SHA lands
|
||||
# in main's history → GitHub recognizes "merged" badge.
|
||||
# Backout via config.EXTERNAL_PR_NO_FF_MERGE = False (falls back to cherry-pick).
|
||||
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
|
||||
merge_fn = _merge_no_ff_external(branch)
|
||||
else:
|
||||
# Extraction commits ADD new files — cherry-pick applies cleanly.
|
||||
merge_fn = _cherry_pick_onto_main(branch)
|
||||
|
|
@ -786,6 +957,58 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
|
|||
succeeded += 1
|
||||
continue
|
||||
|
||||
# External GitHub PR (gh-pr-*): _merge_no_ff_external already pushed
|
||||
# the merge commit to origin/main + the synthetic _merged/{branch}
|
||||
# audit ref. Skip dispatch's ff-push (would fail — origin/{branch} is
|
||||
# the contributor's untouched branch, not a descendant of main).
|
||||
# Just close PR + mark_merged + audit, parsing merge SHA from sentinel.
|
||||
if pick_msg.startswith("merged --no-ff"):
|
||||
m = re.search(r"M=([a-f0-9]{40})", pick_msg)
|
||||
merge_sha = m.group(1) if m else None
|
||||
m_ref = re.search(r"audit_ref=(\S+?)\)", pick_msg)
|
||||
audit_ref = m_ref.group(1) if m_ref else None
|
||||
m_pr = re.search(r"external PR #(\d+)", pick_msg)
|
||||
gh_pr_num = m_pr.group(1) if m_pr else None
|
||||
# Surface drift between dispatch and _merge_no_ff_external if the
|
||||
# success-message contract changes. Merge already succeeded; this
|
||||
# is signal-only, not a gate on the close path.
|
||||
if not (m and m_ref and m_pr):
|
||||
logger.warning(
|
||||
"PR #%d sentinel parse incomplete: M=%s, audit_ref=%s, gh_pr=%s, msg=%r",
|
||||
pr_num, bool(m), bool(m_ref), bool(m_pr), pick_msg,
|
||||
)
|
||||
|
||||
leo_token = get_agent_token("leo")
|
||||
comment_body = (
|
||||
f"Merged via --no-ff into main.\n"
|
||||
f"Merge commit: `{merge_sha}`\n"
|
||||
f"Audit ref: `{audit_ref}`\n"
|
||||
f"Branch: `{branch}` (preserved unchanged)"
|
||||
)
|
||||
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
|
||||
{"body": comment_body})
|
||||
result = await forgejo_api("PATCH", repo_path(f"pulls/{pr_num}"),
|
||||
{"state": "closed"}, token=leo_token)
|
||||
if result is None:
|
||||
logger.error("PR #%d: Forgejo close failed (no-ff path), skipping DB update", pr_num)
|
||||
failed += 1
|
||||
continue
|
||||
mark_merged(conn, pr_num)
|
||||
db.audit(conn, "merge", "merged", json.dumps({
|
||||
"pr": pr_num, "branch": branch, "method": "no-ff",
|
||||
"merge_commit_sha": merge_sha,
|
||||
"audit_ref": audit_ref,
|
||||
"github_pr": gh_pr_num,
|
||||
}))
|
||||
# NOTE: do NOT _delete_remote_branch(branch) here. The contributor's
|
||||
# gh-pr-N/* branch is the mirror of their fork PR head — leaving it
|
||||
# in place lets sync-mirror keep the GitHub PR <-> Forgejo PR link
|
||||
# observable. The synthetic _merged/{branch} ref carries the merge.
|
||||
logger.info("PR #%d merged via --no-ff (M=%s)", pr_num,
|
||||
merge_sha[:8] if merge_sha else "?")
|
||||
succeeded += 1
|
||||
continue
|
||||
|
||||
# Local ff-push: cherry-picked branch is a descendant of origin/main.
|
||||
# Regular push = fast-forward. Non-ff rejected by default (same safety).
|
||||
# --force-with-lease removed: Forgejo categorically blocks it on protected branches.
|
||||
|
|
|
|||
|
|
@ -522,29 +522,52 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
Finds PRs with substantive issue tags that haven't exceeded fix budget.
|
||||
Processes up to 3 per cycle (Rhea: 180s interval, don't overwhelm eval).
|
||||
"""
|
||||
# Build the actionable-tag list from the routing constants so adding a new
|
||||
# tag to FIXABLE_TAGS / CONVERTIBLE_TAGS / UNFIXABLE_TAGS auto-updates the
|
||||
# SELECT filter — no two-place edit footgun.
|
||||
actionable_tags = sorted(FIXABLE_TAGS | CONVERTIBLE_TAGS | UNFIXABLE_TAGS)
|
||||
placeholders = ",".join(["?"] * len(actionable_tags))
|
||||
|
||||
# Push the actionable-tag filter into SQL (was a post-fetch Python loop).
|
||||
# The old shape selected the 3 oldest request_changes PRs and then dropped
|
||||
# ones without actionable tags, so empty-eval_issues rows occupied LIMIT-3
|
||||
# forever (head-of-line). Now LIMIT-3 always returns 3 actionable rows.
|
||||
# Reaper handles the empty-tag PRs after their 24h cooldown.
|
||||
rows = conn.execute(
|
||||
"""SELECT number, eval_issues FROM prs
|
||||
f"""SELECT number, eval_issues FROM prs
|
||||
WHERE status = 'open'
|
||||
AND tier0_pass = 1
|
||||
AND (domain_verdict = 'request_changes' OR leo_verdict = 'request_changes')
|
||||
AND COALESCE(fix_attempts, 0) < ?
|
||||
AND (last_attempt IS NULL OR last_attempt < datetime('now', '-3 minutes'))
|
||||
AND json_valid(eval_issues)
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM json_each(eval_issues)
|
||||
WHERE value IN ({placeholders})
|
||||
)
|
||||
ORDER BY created_at ASC
|
||||
LIMIT 3""",
|
||||
(MAX_SUBSTANTIVE_FIXES + config.MAX_FIX_ATTEMPTS,), # Total budget: mechanical + substantive
|
||||
(MAX_SUBSTANTIVE_FIXES + config.MAX_FIX_ATTEMPTS, *actionable_tags),
|
||||
).fetchall()
|
||||
|
||||
if not rows:
|
||||
return 0, 0
|
||||
|
||||
# Filter to only PRs with substantive issues (not just mechanical)
|
||||
# Defense-in-depth: json_valid(eval_issues) in the SELECT already filters
|
||||
# corrupt JSON before json_each runs, so this WARN should be unreachable.
|
||||
# Kept anyway: json_valid and json.loads use technically distinct parsers,
|
||||
# and the journal entry names the failure mode if SQLite ever surfaces a
|
||||
# row that passes json_valid + json_each but fails json.loads.
|
||||
substantive_rows = []
|
||||
for row in rows:
|
||||
try:
|
||||
issues = json.loads(row["eval_issues"] or "[]")
|
||||
json.loads(row["eval_issues"] or "[]")
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
logger.warning(
|
||||
"PR #%d: corrupt eval_issues JSON — skipping in substantive fix cycle",
|
||||
row["number"],
|
||||
)
|
||||
continue
|
||||
if set(issues) & (FIXABLE_TAGS | CONVERTIBLE_TAGS | UNFIXABLE_TAGS):
|
||||
substantive_rows.append(row)
|
||||
|
||||
if not substantive_rows:
|
||||
|
|
@ -559,7 +582,13 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
if result.get("action"):
|
||||
fixed += 1
|
||||
elif result.get("skipped"):
|
||||
logger.debug("PR #%d: substantive fix skipped: %s", row["number"], result.get("reason"))
|
||||
# Was DEBUG — promoted to INFO to make stuck-PR root cause
|
||||
# visible without enabling DEBUG fleet-wide. (Ship Apr 24+
|
||||
# silent skip diagnosis.)
|
||||
logger.info(
|
||||
"PR #%d: substantive fix skipped: %s",
|
||||
row["number"], result.get("reason"),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("PR #%d: substantive fix failed", row["number"])
|
||||
errors += 1
|
||||
|
|
@ -569,3 +598,191 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
logger.info("Substantive fix cycle: %d fixed, %d errors", fixed, errors)
|
||||
|
||||
return fixed, errors
|
||||
|
||||
|
||||
# ─── Verdict-deadlock reaper ──────────────────────────────────────────────
|
||||
#
|
||||
# Defense-in-depth for PRs that substantive_fixer can't make progress on.
|
||||
# Targets two stuck-verdict shapes empirically observed in production:
|
||||
#
|
||||
# 1. leo:request_changes + domain:approve
|
||||
# Leo asked for substantive fix; fixer either failed silently
|
||||
# (no_claim_files / no_review_comments / etc.) or the issue tag isn't
|
||||
# in FIXABLE | CONVERTIBLE | UNFIXABLE. PR sits forever.
|
||||
#
|
||||
# 2. leo:skipped + domain:request_changes
|
||||
# Eval bypassed Leo (eval_attempts >= MAX). Domain rejected with no
|
||||
# structured eval_issues. fixer can't classify → silent skip → forever.
|
||||
#
|
||||
# Both shapes need a clearance path. Reaper closes them after a 24h cooldown
|
||||
# with audit_log breadcrumbs for forensics. First deploy runs in dry-run mode
|
||||
# (audit "would_close" events only — no Forgejo writes, no DB closes).
|
||||
#
|
||||
# Reaper config (REAPER_DRY_RUN, REAPER_DEADLOCK_AGE_HOURS, REAPER_INTERVAL_SECONDS,
|
||||
# REAPER_MAX_PER_RUN) lives in lib/config.py with env-var overrides — operator
|
||||
# flips dry-run to live via `systemctl edit teleo-pipeline.service`
|
||||
# (Environment=REAPER_DRY_RUN=false) + restart. No code change, no commit, no
|
||||
# redeploy required.
|
||||
|
||||
|
||||
async def verdict_deadlock_reaper_cycle(conn) -> int:
|
||||
"""Reap PRs stuck in conflicting-verdict deadlock for >24h.
|
||||
|
||||
Returns count of PRs closed (or "would-close" in dry-run mode).
|
||||
Throttled to once per REAPER_INTERVAL_SECONDS via sentinel audit event.
|
||||
"""
|
||||
# Throttle: skip if last reaper run was within REAPER_INTERVAL_SECONDS.
|
||||
# Uses audit_log as the rate-limit ledger so no schema/state needed.
|
||||
# stage='reaper' filter so the planner uses idx_audit_stage (avoids full scan).
|
||||
last_run = conn.execute(
|
||||
"SELECT MAX(timestamp) FROM audit_log "
|
||||
"WHERE stage = 'reaper' AND event = 'verdict_deadlock_reaper_run'"
|
||||
).fetchone()[0]
|
||||
if last_run:
|
||||
cur = conn.execute(
|
||||
"SELECT (julianday('now') - julianday(?)) * 86400 < ?",
|
||||
(last_run, config.REAPER_INTERVAL_SECONDS),
|
||||
).fetchone()[0]
|
||||
if cur:
|
||||
return 0
|
||||
|
||||
# Two stuck-verdict shapes: leo:rc+domain:approve, leo:skipped+domain:rc.
|
||||
#
|
||||
# Branch allowlist invariant: the reaper closes ONLY disposable, pipeline-
|
||||
# generated branches — content the pipeline (or a daily cron) created and
|
||||
# can recreate. Four classes qualify:
|
||||
#
|
||||
# extract/* — per-source extraction PRs, regenerated next ingest cycle
|
||||
# reweave/* — nightly graph-edge maintenance, regenerated next reweave
|
||||
# fix/* — pipeline-internal fix branches
|
||||
# */research-YYYY-MM-DD — daily {agent}/research-{date} cron sessions.
|
||||
# Matched via SQLite `_` single-char wildcards as
|
||||
# `research-20__-__-__` to literally enforce the date-
|
||||
# suffix shape. Excludes hand-named research branches
|
||||
# (rio/research-batch-agents-memory-harnesses,
|
||||
# theseus/research-2nd-attempt-on-X, etc.) which are
|
||||
# feature work owned by the agent. Pattern good through
|
||||
# 2099; revisit then.
|
||||
#
|
||||
# WIP agent feature branches (theseus/feature-foo, epimetheus/some-fix,
|
||||
# rio/research-thesis-name) are NEVER reaped — owners review their own PRs
|
||||
# on their own cadence. The date-shaped pattern threads the needle: picks
|
||||
# up daily synthesis output the agent regenerates tomorrow while leaving
|
||||
# manually-named research work alone.
|
||||
rows = conn.execute(
|
||||
"""SELECT number, branch, eval_issues, leo_verdict, domain_verdict,
|
||||
last_attempt, fix_attempts
|
||||
FROM prs
|
||||
WHERE status = 'open'
|
||||
AND tier0_pass = 1
|
||||
AND last_attempt IS NOT NULL
|
||||
AND last_attempt < datetime('now', ? || ' hours')
|
||||
AND (branch LIKE 'extract/%'
|
||||
OR branch LIKE 'reweave/%'
|
||||
OR branch LIKE 'fix/%'
|
||||
OR branch LIKE '%/research-20__-__-__')
|
||||
AND (
|
||||
(leo_verdict = 'request_changes' AND domain_verdict = 'approve')
|
||||
OR (leo_verdict = 'skipped' AND domain_verdict = 'request_changes')
|
||||
)
|
||||
ORDER BY last_attempt ASC
|
||||
LIMIT ?""",
|
||||
(f"-{config.REAPER_DEADLOCK_AGE_HOURS}", config.REAPER_MAX_PER_RUN),
|
||||
).fetchall()
|
||||
|
||||
mode = "dryrun" if config.REAPER_DRY_RUN else "live"
|
||||
|
||||
if not rows:
|
||||
# Heartbeat anyway so throttle ticks even when nothing to reap.
|
||||
db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({
|
||||
"candidates": 0, "closed": 0, "mode": mode,
|
||||
}))
|
||||
return 0
|
||||
|
||||
logger.info(
|
||||
"Verdict-deadlock reaper [%s]: %d candidate(s) in deadlock >%dh",
|
||||
mode, len(rows), config.REAPER_DEADLOCK_AGE_HOURS,
|
||||
)
|
||||
|
||||
closed = 0
|
||||
would_close = 0
|
||||
errors = 0
|
||||
for row in rows:
|
||||
pr = row["number"]
|
||||
reason_detail = {
|
||||
"pr": pr,
|
||||
"branch": row["branch"],
|
||||
"leo_verdict": row["leo_verdict"],
|
||||
"domain_verdict": row["domain_verdict"],
|
||||
"eval_issues": row["eval_issues"],
|
||||
"last_attempt": row["last_attempt"],
|
||||
"fix_attempts": row["fix_attempts"],
|
||||
}
|
||||
|
||||
if config.REAPER_DRY_RUN:
|
||||
# Audit only — do NOT touch DB row or Forgejo state.
|
||||
db.audit(conn, "reaper", "verdict_deadlock_would_close",
|
||||
json.dumps(reason_detail))
|
||||
logger.info(
|
||||
"Reaper [dryrun]: would close PR #%d (leo=%s domain=%s issues=%s)",
|
||||
pr, row["leo_verdict"], row["domain_verdict"], row["eval_issues"],
|
||||
)
|
||||
would_close += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
comment_body = (
|
||||
"Closed by verdict-deadlock reaper.\n\n"
|
||||
f"This PR sat for >{config.REAPER_DEADLOCK_AGE_HOURS}h with conflicting "
|
||||
f"verdicts (leo={row['leo_verdict']}, domain={row['domain_verdict']}) "
|
||||
f"that the substantive fixer couldn't auto-resolve.\n\n"
|
||||
f"Eval issues: `{row['eval_issues']}`\n"
|
||||
f"Last attempt: {row['last_attempt']}\n\n"
|
||||
"_Automated message from the LivingIP pipeline._"
|
||||
)
|
||||
await forgejo_api(
|
||||
"POST", repo_path(f"issues/{pr}/comments"), {"body": comment_body},
|
||||
)
|
||||
patch_result = await forgejo_api(
|
||||
"PATCH", repo_path(f"pulls/{pr}"), {"state": "closed"},
|
||||
token=get_agent_token("leo"),
|
||||
)
|
||||
if patch_result is None:
|
||||
logger.warning(
|
||||
"Reaper: PR #%d Forgejo close failed — skipping DB close to "
|
||||
"avoid drift", pr,
|
||||
)
|
||||
errors += 1
|
||||
continue
|
||||
# Forgejo already closed at the PATCH above — pass close_on_forgejo=False
|
||||
# so close_pr() doesn't issue a redundant PATCH (which on transient
|
||||
# failure returns False and skips the DB close → status drift).
|
||||
await close_pr(
|
||||
conn, pr,
|
||||
last_error=(
|
||||
f"verdict_deadlock_reaper: leo={row['leo_verdict']} "
|
||||
f"domain={row['domain_verdict']} age>{config.REAPER_DEADLOCK_AGE_HOURS}h"
|
||||
),
|
||||
close_on_forgejo=False,
|
||||
)
|
||||
db.audit(conn, "reaper", "verdict_deadlock_closed",
|
||||
json.dumps(reason_detail))
|
||||
closed += 1
|
||||
except Exception:
|
||||
logger.exception("Reaper: PR #%d close failed", pr)
|
||||
errors += 1
|
||||
|
||||
db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({
|
||||
"candidates": len(rows), "closed": closed, "would_close": would_close,
|
||||
"errors": errors, "mode": mode,
|
||||
}))
|
||||
if errors:
|
||||
logger.warning(
|
||||
"Verdict-deadlock reaper [%s]: %d closed, %d would-close, %d errors",
|
||||
mode, closed, would_close, errors,
|
||||
)
|
||||
elif config.REAPER_DRY_RUN:
|
||||
logger.info("Verdict-deadlock reaper [dryrun]: %d would-close", would_close)
|
||||
else:
|
||||
logger.info("Verdict-deadlock reaper [live]: %d closed", closed)
|
||||
return closed + would_close
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ from lib import log as logmod
|
|||
from lib.breaker import CircuitBreaker
|
||||
from lib.evaluate import evaluate_cycle
|
||||
from lib.fixer import fix_cycle as mechanical_fix_cycle
|
||||
from lib.substantive_fixer import substantive_fix_cycle
|
||||
from lib.substantive_fixer import substantive_fix_cycle, verdict_deadlock_reaper_cycle
|
||||
from lib.health import start_health_server, stop_health_server
|
||||
from lib.llm import kill_active_subprocesses
|
||||
from lib.merge import merge_cycle
|
||||
|
|
@ -91,14 +91,30 @@ async def ingest_cycle(conn, max_workers=None):
|
|||
|
||||
|
||||
async def fix_cycle(conn, max_workers=None):
|
||||
"""Combined fix stage: mechanical fixes first, then substantive fixes.
|
||||
"""Combined fix stage: mechanical fixes first, then substantive fixes,
|
||||
finally the verdict-deadlock reaper.
|
||||
|
||||
Mechanical (fixer.py): wiki link bracket stripping, $0
|
||||
Substantive (substantive_fixer.py): confidence/title/scope fixes via LLM, $0.001
|
||||
Reaper (substantive_fixer.verdict_deadlock_reaper_cycle): defense-in-depth
|
||||
for stuck-verdict PRs that the substantive fixer can't progress on.
|
||||
Hourly throttle, dry-run by default. Cost $0.
|
||||
"""
|
||||
m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers)
|
||||
s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers)
|
||||
return m_fixed + s_fixed, m_errors + s_errors
|
||||
# Defense-in-depth: reaper exception must never block primary fix paths.
|
||||
# Same exception-isolation pattern as ingest_cycle's extract_cycle wrapper —
|
||||
# propagating would trip the fix breaker and lock out mechanical+substantive
|
||||
# for 15 min after 5 reaper failures.
|
||||
try:
|
||||
r_closed = await verdict_deadlock_reaper_cycle(conn)
|
||||
except Exception:
|
||||
import logging
|
||||
logging.getLogger("pipeline").exception(
|
||||
"Reaper cycle failed (non-fatal)"
|
||||
)
|
||||
r_closed = 0
|
||||
return m_fixed + s_fixed + r_closed, m_errors + s_errors
|
||||
|
||||
|
||||
async def snapshot_cycle(conn, max_workers=None):
|
||||
|
|
|
|||
Loading…
Reference in a new issue