Compare commits

..

2 commits

Author SHA1 Message Date
b57cb41e31 fix(mirror): Step 0b self-heals stuck submitted_by via cron retry
Step 4.5's submitted_by write inherits the same one-shot failure mode
that Phase 1's Step 0 sweep was designed to retire. On transient
GitHub API failure, link-only fallback writes github_pr and permanently
closes the retry window — submitted_by stays stuck on 'm3taversal'
(bot identity), and contribution_events never gets the author event.

Step 0b adds a second sweep alongside Step 0:
  SELECT ... WHERE github_pr IS NOT NULL
              AND (submitted_by IS NULL OR submitted_by = 'm3taversal')

Same idempotent cron-retry shape: SELECT empty when clean, per-row
GitHub API call + UPDATE only when stuck. Targets bidirectional
repos only (gh-pr-* branches don't exist for main_only mirrors).

Derives bidirectional GitHub repo from MIRROR_REPOS at sweep time
since Step 0 runs before sync_repo() sets GITHUB_REPO scope.

Doubles as future-proof safety net: any external PR landing during
the deploy window with submitted_by stuck on bot identity gets
self-healed on the next cron tick. No backfill script needed.

Per Ganymede line-level review of 1decf09.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 18:21:04 +01:00
1decf09598 fix(mirror): Step 4.5 writes submitted_by from GitHub user.login
Closes the systematic external-PR attribution gap diagnosed on FwazB PR #4066.
The Forgejo PR was being created via admin-token (m3taversal), so
prs.submitted_by ended up as the bot identity. record_contributor_attribution
treats m3taversal as a bot and finds no other signal for fork PRs (commit
authors are bot-rewritten by sync-mirror), so zero author events emit.

Mechanism — for gh-pr-* branches in the auto-create path:
1. After deriving GH_PR_NUM from branch name, GET /pulls/{N} from GitHub API
2. Extract user.login (e.g. "FwazB") from the PR's head author
3. Validate against GitHub username regex: ^[a-zA-Z0-9]([a-zA-Z0-9-]{0,37}[a-zA-Z0-9])?$
4. Lowercase to match contributor.py canonical handle storage
5. Include submitted_by in the same UPDATE that sets github_pr + source_channel

The regex doubles as the SQL-injection safety boundary (no parametric binding
in bash sqlite3). 14 cases tested locally including SQL injection probes —
all rejected, all real handles pass.

Failure modes:
- API call fails or returns no user.login → fall back to link-only UPDATE
  (existing behavior). Better than failing the whole step.
- Regex rejects malformed login → same fall-back. Preserves audit trail.

Smoke-tested against real FwazB PR #90 on VPS: extracts "FwazB", lowercases
to "fwazb", regex passes, would write submitted_by='fwazb'. Once deployed,
record_contributor_attribution will trust submitted_by as fallback when no
agent trailer found, emit author event with weight 0.30 → FwazB-class
contributors auto-attributed end-to-end with zero manual backfill.

Architectural decisions per Ship's Apr 28 sign-off:
- (a) Sweep stays zero-API: no per-row API calls in Step 0 self-heal.
  This Step 4.5 fix is create-time only. Existing rows (FwazB) already
  manually backfilled — no further population to backfill.
- (b) Skip _BOT_AUTHORS exception (#2 in original ticket): once submitted_by
  is correct, the bot filter doesn't fire on external PRs anymore.
- (c) Defer frontmatter rewriting: convenience, not load-bearing.
2026-04-28 18:15:13 +01:00
7 changed files with 186 additions and 920 deletions

View file

@ -204,41 +204,7 @@ sync_github_to_forgejo_with_prs() {
local FORGEJO_TOKEN
FORGEJO_TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-admin-token 2>/dev/null)
# Lazy schema for sync-mirror's auto-create tracker. Records (branch, sha)
# pairs we've already auto-created PRs for, so the loop below can skip
# redundant creates after pipeline merge → _delete_remote_branch →
# GitHub-only re-discovery → re-push. Cheap CREATE IF NOT EXISTS on each
# cycle; no migration needed because this table is private to sync-mirror.
sqlite3 "$PIPELINE_DB" "CREATE TABLE IF NOT EXISTS sync_autocreate_tracker (branch TEXT NOT NULL, sha TEXT NOT NULL, pr_number INTEGER, created_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (branch, sha));" 2>/dev/null || true
for branch in $GITHUB_ONLY; do
# Already-tracked gate: if we've previously auto-created a PR for
# this exact (branch, sha), skip the entire push+create sequence.
# Closes the empty-PR loop (research and reweave both observed):
# pipeline merges PR → _delete_remote_branch on Forgejo → next sync
# sees branch GitHub-only (origin still has it) → re-pushes to
# Forgejo → HAS_PR misses (Forgejo ?head= broken; closed PRs scroll
# past 50-item paginated window) → auto-creates fresh PR → pipeline
# merges (empty no-op via cherry-pick / reweave union) → repeat.
# Tracker keys on SHA, so legitimate new commits on the same branch
# produce a new SHA → tracker miss → auto-create proceeds normally.
local BRANCH_SHA TRACKED_PR
if [[ "$branch" == gh-pr-* ]]; then
BRANCH_SHA=$(git rev-parse "refs/heads/$branch" 2>/dev/null || true)
else
BRANCH_SHA=$(git rev-parse "refs/remotes/origin/$branch" 2>/dev/null || true)
fi
if [ -n "$BRANCH_SHA" ]; then
# stderr → $LOG so sustained sqlite3 contention surfaces in ops logs
# rather than silently falling through to a redundant auto-create.
TRACKED_PR=$(sqlite3 "$PIPELINE_DB" "SELECT pr_number FROM sync_autocreate_tracker WHERE branch=$(printf "'%s'" "${branch//\'/\'\'}") AND sha=$(printf "'%s'" "$BRANCH_SHA") LIMIT 1;" 2>>"$LOG" || echo "")
if [ -n "$TRACKED_PR" ]; then
log "Skip auto-create: $branch SHA $BRANCH_SHA already tracked (PR #$TRACKED_PR)"
continue
fi
fi
log "New from GitHub: $branch -> Forgejo"
# Fork PR branches live as local refs (from Step 2.1), not on origin remote
if [[ "$branch" == gh-pr-* ]]; then
@ -309,21 +275,26 @@ print('no')
fi
log "Auto-created PR #$PR_NUM on Forgejo for $branch"
# Record (branch, sha, pr_number) so the tracker gate above can short-
# circuit the next time we see this exact (branch, sha) combination.
# INSERT OR IGNORE: idempotent if a concurrent run already inserted.
# WARN log on failure: silent INSERT failure under sustained sqlite3
# contention would mask the loop reappearing on the next cycle (HAS_PR
# only saves us while the closed PR is in the 50-item pagination window).
if [ -n "$BRANCH_SHA" ] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
if ! sqlite3 "$PIPELINE_DB" "INSERT OR IGNORE INTO sync_autocreate_tracker (branch, sha, pr_number) VALUES ($(printf "'%s'" "${branch//\'/\'\'}"), $(printf "'%s'" "$BRANCH_SHA"), $PR_NUM);" 2>>"$LOG"; then
log "WARN: tracker insert failed for $branch SHA $BRANCH_SHA (PR #$PR_NUM) — duplicate auto-create possible next cycle"
fi
fi
# Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB
# Step 4.5: Link GitHub PR to Forgejo PR + populate submitted_by from GitHub user.login.
#
# Why submitted_by here: sync-mirror creates the Forgejo PR using the admin
# token (user=m3taversal), so the Forgejo PR's submitter is bot-identity, not
# the GitHub author (FwazB, etc). Without this fix, contribution_events emits
# zero author events for external PRs because record_contributor_attribution
# treats m3taversal as a bot and finds no other signal. Architectural fix per
# Ship + Cory after the FwazB PR #4066 attribution gap (Apr 28).
local GH_USER
GH_USER=""
if [[ "$branch" == gh-pr-* ]]; then
GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
# Look up GitHub PR author for submitted_by attribution.
local PAT_GHU
PAT_GHU=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
if [ -n "$PAT_GHU" ] && [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]]; then
GH_USER=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls/$GH_PR_NUM" \
-H "Authorization: token $PAT_GHU" 2>/dev/null | \
python3 -c "import sys,json; print((json.load(sys.stdin).get('user') or {}).get('login',''))" 2>/dev/null || true)
fi
else
local PAT
PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
@ -335,9 +306,24 @@ print('no')
fi
fi
if [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM, source_channel = 'github' WHERE number = $PR_NUM;" 2>/dev/null && \
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM" || \
# GitHub usernames: 1-39 chars, alphanumeric + hyphen, can't start/end with hyphen.
# Strict regex is the SQL-injection safety boundary (no bash sqlite3 parametric binding).
if [[ "$GH_USER" =~ ^[a-zA-Z0-9]([a-zA-Z0-9-]{0,37}[a-zA-Z0-9])?$ ]]; then
# Lowercase for canonical handle storage (matches contributor.py normalize)
local GH_USER_LC
GH_USER_LC=$(echo "$GH_USER" | tr '[:upper:]' '[:lower:]')
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM, source_channel = 'github', submitted_by = '$GH_USER_LC' WHERE number = $PR_NUM;" 2>/dev/null && \
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM (author: $GH_USER_LC)" || \
log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB"
else
# Fall back to link-only when user.login lookup failed or was malformed.
# Still better than no link; record_contributor_attribution can use
# other signals (commit author trailer, etc) — author event may still
# not emit, but submitted_by stays NULL/whatever discovery set.
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM, source_channel = 'github' WHERE number = $PR_NUM;" 2>/dev/null && \
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM (author lookup failed)" || \
log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB"
fi
fi
done
}
@ -439,6 +425,50 @@ if [ -f "$PIPELINE_DB" ]; then
log "self-heal: linked Forgejo PR #$pr_num -> GitHub PR #$gh_pr_num"
fi
done
# Step 0b: self-heal stuck submitted_by on linked gh-pr-* PRs.
# Step 4.5 fall-through writes github_pr but leaves submitted_by as 'm3taversal'
# (bot identity) on transient GitHub API failures. Without retry, contribution_events
# never gets the author event — same one-shot failure mode Step 0 was designed to
# retire. Same idempotent cron-retry shape: SELECT empty when clean, per-row API
# call only when stuck. Targets bidirectional repos only (gh-pr-* branches don't
# exist for main_only mirrors).
GH_REPO_BIDIR=""
for entry in "${MIRROR_REPOS[@]}"; do
read -r _f gh_repo _bare mode <<< "$entry"
if [ "$mode" = "bidirectional" ]; then
GH_REPO_BIDIR="$gh_repo"
break
fi
done
if [ -n "$GH_REPO_BIDIR" ] && [ -f "$GITHUB_PAT_FILE" ]; then
sqlite3 -separator '|' "$PIPELINE_DB" \
"SELECT number, branch FROM prs
WHERE branch LIKE 'gh-pr-%'
AND github_pr IS NOT NULL
AND (submitted_by IS NULL OR submitted_by = 'm3taversal');" \
2>/dev/null | while IFS='|' read -r pr_num branch; do
gh_pr_num=$(echo "$branch" | sed -n 's|^gh-pr-\([0-9][0-9]*\)/.*|\1|p')
[ -z "$gh_pr_num" ] && continue
PAT_S0=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
[ -z "$PAT_S0" ] && continue
gh_user=$(curl -sf "https://api.github.com/repos/$GH_REPO_BIDIR/pulls/$gh_pr_num" \
-H "Authorization: token $PAT_S0" 2>/dev/null | \
python3 -c "import sys,json; print((json.load(sys.stdin).get('user') or {}).get('login',''))" 2>/dev/null || true)
# Regex matches Step 4.5: GitHub username spec (anchored, alnum + hyphen,
# no consecutive-hyphen check). SQL-injection boundary: char class excludes
# quotes/semicolons/backslashes, so single-quoted literal is safe.
if [[ "$gh_user" =~ ^[a-zA-Z0-9]([a-zA-Z0-9-]{0,37}[a-zA-Z0-9])?$ ]]; then
gh_user_lc=$(echo "$gh_user" | tr '[:upper:]' '[:lower:]')
# SQL-integer-safe: pr_num from INTEGER column, gh_user_lc regex-validated.
if sqlite3 "$PIPELINE_DB" \
"UPDATE prs SET submitted_by = '$gh_user_lc' WHERE number = $pr_num;" \
2>/dev/null; then
log "self-heal: set submitted_by=$gh_user_lc on Forgejo PR #$pr_num (GitHub PR #$gh_pr_num)"
fi
fi
done
fi
fi
for entry in "${MIRROR_REPOS[@]}"; do

View file

@ -32,12 +32,13 @@ def _is_source_slug(slug):
def _classify_event(branch, description, commit_type, candidate_slug=None):
"""Return one of: create | enrich | challenge | source | session_digest | None.
"""Return one of: create | enrich | challenge | source | None.
Source-archive PRs are extract/* branches that filed a source into
inbox/archive/ but didn't produce a claim. Session-digest PRs are
agent research/entity commits with no per-claim description they
represent session-level rollups, not specific knowledge artifacts.
inbox/archive/ but didn't produce a claim. Two signals classify them
as 'source' (defense in depth):
1. extract/* branch with empty description (no claim title produced)
2. candidate_slug matches YYYY-MM-DD-...-HASH4 (inbox filename pattern)
"""
commit_type_l = (commit_type or "").lower()
branch = branch or ""
@ -59,12 +60,6 @@ def _classify_event(branch, description, commit_type, candidate_slug=None):
or branch.startswith("reweave/")):
return "enrich"
# Research and entity commits with no description are session-level
# rollups (e.g. astra/research-2026-05-11). They have no claim to
# link to — surface as session_digest, not as a phantom create.
if commit_type_l in ("research", "entity") and not has_desc:
return "session_digest"
# Source-only: extract/* with no claim description means inbox archive
# landed but no domain claim was written.
if branch.startswith("extract/") and not has_desc:
@ -81,54 +76,6 @@ def _classify_event(branch, description, commit_type, candidate_slug=None):
return "create"
# Internal classifier value -> canonical `kind` enum returned to frontend.
_KIND_MAP = {
"create": "claim_merged",
"enrich": "claim_enriched",
"challenge": "claim_challenged",
"source": "source_archived",
"session_digest": "session_digest",
}
def _archive_slug_from_branch(branch):
"""For extract/YYYY-MM-DD-...-HASH4, return YYYY-MM-DD-... (keep date,
drop the 4-hex hash suffix). Matches inbox/archive filename convention.
"""
if not branch or "/" not in branch:
return ""
slug = branch.split("/", 1)[1]
return re.sub(r"-[a-f0-9]{4}$", "", slug)
def _source_target_url(domain, archive_slug):
"""Forgejo blob URL for an archived source file. Falls back to the
repo-wide inbox/archive directory when domain is unknown so the link
still resolves to something useful instead of a 404.
"""
if not archive_slug:
return None
domain = (domain or "").strip()
if not domain or domain == "unknown":
return "https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive"
return (
"https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive/"
f"{domain}/{archive_slug}.md"
)
def _claim_target_url(claim_slug):
if not claim_slug:
return None
return f"/claims/{claim_slug}"
def _github_pr_url(github_pr):
if not github_pr:
return None
return f"https://github.com/living-ip/teleo-codex/pull/{github_pr}"
def _normalize_contributor(submitted_by, agent):
if submitted_by and submitted_by.strip():
name = submitted_by.strip().lstrip("@")
@ -181,7 +128,7 @@ def _build_events():
rows = conn.execute(f"""
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
p.merged_at, p.description, p.commit_type, p.cost_usd,
p.source_channel, p.source_path, p.github_pr
p.source_channel, p.source_path
FROM prs p
WHERE p.status = 'merged'
AND p.commit_type IN ({placeholders})
@ -205,57 +152,28 @@ def _build_events():
contributor = _normalize_contributor(row["submitted_by"], row["agent"])
merged_at = row["merged_at"] or ""
domain = row["domain"] or "unknown"
kind = _KIND_MAP.get(event_type, event_type)
ci_map = {
"create": 0.35, "enrich": 0.25, "challenge": 0.40,
"source": 0.15, "session_digest": 0.05,
}
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40, "source": 0.15}
ci_earned = ci_map.get(event_type, 0)
# Source events never carry a claim_slug — no claim was written.
# target_url points at the archived file on Forgejo instead.
# Source events never carry a claim_slug — no claim was written —
# so the frontend can't produce a 404-ing claim link.
if event_type == "source":
archive_slug = _archive_slug_from_branch(row["branch"])
summary_text = _summary_from_branch(row["branch"])
source_display_slug = (
summary_text.lower().replace(" ", "-") or row["branch"]
source_slug = (
_summary_from_branch(row["branch"]).lower().replace(" ", "-")
or row["branch"]
)
events.append({
"kind": kind,
"type": "source",
"target_url": _source_target_url(domain, archive_slug),
"claim_slug": "",
"source_slug": source_display_slug,
"domain": domain,
"source_slug": source_slug,
"domain": row["domain"] or "unknown",
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"pr_url": _github_pr_url(row["github_pr"]),
"source_channel": row["source_channel"] or "unknown",
})
continue
# Session digests have no clickthrough surface yet (per-agent
# session pages not built). target_url=null so frontend renders
# plain text instead of a broken /claims/research-... link.
if event_type == "session_digest":
summary_text = _summary_from_branch(row["branch"]) or "Research session"
events.append({
"kind": kind,
"type": "session_digest",
"target_url": None,
"claim_slug": "",
"domain": domain,
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"pr_url": _github_pr_url(row["github_pr"]),
"source_channel": row["source_channel"] or "unknown",
})
continue
@ -284,17 +202,14 @@ def _build_events():
for slug in (slugs[:1] if slugs else [""]):
events.append({
"kind": kind,
"type": event_type,
"target_url": _claim_target_url(slug),
"claim_slug": slug,
"domain": domain,
"domain": row["domain"] or "unknown",
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"pr_url": _github_pr_url(row["github_pr"]),
"source_channel": row["source_channel"] or "unknown",
})
@ -319,11 +234,8 @@ def _sort_events(events, claim_activity, sort_mode, now_ts):
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
events.sort(key=hot_key, reverse=True)
elif sort_mode == "important":
type_rank = {
"challenge": 0, "enrich": 1, "create": 2,
"source": 3, "session_digest": 4,
}
events.sort(key=lambda e: (type_rank.get(e["type"], 5), -len(e["summary"])))
type_rank = {"challenge": 0, "enrich": 1, "create": 2, "source": 3}
events.sort(key=lambda e: (type_rank.get(e["type"], 4), -len(e["summary"])))
return events
@ -357,13 +269,7 @@ async def handle_activity_feed(request):
if contributor:
filtered = [e for e in filtered if e["contributor"] == contributor]
if type_filter:
# Accept both legacy `type` values (create/enrich/challenge/source/
# session_digest) and canonical `kind` values (claim_merged/etc.) so
# callers can migrate at their own pace.
filtered = [
e for e in filtered
if e["type"] in type_filter or e.get("kind") in type_filter
]
filtered = [e for e in filtered if e["type"] in type_filter]
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
total = len(sorted_events)

View file

@ -1,357 +1,29 @@
"""Claims API — list endpoint + canonical claim detail page.
Owner: Argus
Routes:
GET /api/claims list/filter (frontmatter scan, lightweight)
GET /api/claims/{slug} full claim detail (Ship contract)
GET /api/domains domain rollups for sidebar
The detail endpoint is the canonical /claims/{slug} backend per Ship's
2026-04-29 brief. One round-trip, no N+1 cascade. Wikilinks resolved
server-side via titleslug index built from a tree walk.
"""
import json
"""Claims API endpoint — serves claim data from the codex filesystem."""
import os
import re
import sqlite3
import time
from pathlib import Path
import yaml
from pathlib import Path
from aiohttp import web
# Codex tree roots — claims live in three places (Sourcer Apr 26 fix scope)
CODEX_BASE = Path("/opt/teleo-eval/workspaces/main")
CLAIM_TREES = [CODEX_BASE / "domains", CODEX_BASE / "foundations", CODEX_BASE / "core"]
CODEX_ROOT = Path("/opt/teleo-eval/workspaces/main/domains")
_cache = {"data": None, "ts": 0}
CACHE_TTL = 300 # 5 minutes
# pipeline.db for joins (review_records, prs, sources)
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
# In-process caches
_list_cache = {"data": None, "ts": 0}
_LIST_CACHE_TTL = 300 # 5 min — list view tolerates staleness
_index_cache = {"by_title": None, "by_stem": None, "ts": 0}
_INDEX_CACHE_TTL = 60 # 1 min — title→slug index for wikilink resolution
CORS_HEADERS = {"Access-Control-Allow-Origin": "*"}
# Wikilink pattern. [[text]] or [[text|alias]] — we keep the link text only.
_WIKILINK_RE = re.compile(r"\[\[([^\]|#]+?)(?:[#|][^\]]*)?\]\]")
# ─── Normalization ─────────────────────────────────────────────────────────
def _normalize_for_match(s):
"""Collapse a title or slug to a comparable form.
Rules (from Ship's brief — match the link-fixer canonicalization):
- lowercase
- hyphen space tolerant (both single space)
- collapse runs of whitespace
- strip leading/trailing whitespace
- drop trailing punctuation that gets stripped from filenames
(`.`, `?`, `!`, `:`, `--`)
NOTE: lib/attribution.py exposes only normalize_handle today, not the
title normalizer Ship referenced. Implementing inline; if a canonical
helper lands later we point at it.
"""
if not s:
return ""
s = str(s).lower().strip()
# Treat hyphens as spaces, then collapse whitespace runs
s = s.replace("-", " ").replace("_", " ")
s = re.sub(r"\s+", " ", s)
# Strip ASCII punctuation that filenames drop
s = re.sub(r"[^\w\s]", "", s)
return s.strip()
# ─── Frontmatter parse ─────────────────────────────────────────────────────
_CODE_FENCE_WRAPPER_RE = re.compile(r"^\s*```(?:markdown|md)?\s*\n(.*?)\n```\s*$", re.DOTALL)
def _split_frontmatter(text):
"""Return (frontmatter_dict, body_str) or (None, None) if not a claim file.
Tolerates files wrapped in a top-level ```markdown ... ``` code fence
some agents have produced these (e.g. Montreal Protocol claim from Astra,
2024-12-09). Unwrap once before frontmatter detection.
"""
if not text:
return None, None
m = _CODE_FENCE_WRAPPER_RE.match(text)
if m:
text = m.group(1)
text = text.lstrip()
if not text.startswith("---"):
return None, None
try:
end = text.index("\n---", 3)
except ValueError:
return None, None
try:
fm = yaml.safe_load(text[3:end])
except Exception:
return None, None
if not isinstance(fm, dict):
return None, None
body = text[end + 4:].lstrip()
return fm, body
def _read_claim_file(filepath):
"""Read a claim file from disk. Returns (frontmatter, body) or (None, None)."""
def _parse_frontmatter(filepath):
try:
text = filepath.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
return None, None
return _split_frontmatter(text)
# ─── Tree walk + indexing ──────────────────────────────────────────────────
def _walk_claim_files():
"""Yield Path objects for every .md claim file in domains/, foundations/, core/."""
for root in CLAIM_TREES:
if not root.exists():
continue
for f in root.rglob("*.md"):
if f.name == "_map.md":
continue
yield f
def _build_indexes():
"""Build (title→stem, stem→relpath) indexes for wikilink resolution.
Cached for _INDEX_CACHE_TTL. Pulls from claim-index endpoint when
possible (already cached upstream) and falls back to filesystem walk.
"""
now = time.time()
if _index_cache["by_title"] is not None and now - _index_cache["ts"] < _INDEX_CACHE_TTL:
return _index_cache["by_title"], _index_cache["by_stem"]
by_title = {}
by_stem = {}
for f in _walk_claim_files():
stem = f.stem
rel = str(f.relative_to(CODEX_BASE))
by_stem[stem] = rel
# Index by stem-as-normalized too (covers wikilinks that use the slug)
by_title[_normalize_for_match(stem)] = stem
# Also try parsing the title from frontmatter for higher-fidelity matches
fm, _ = _read_claim_file(f)
if fm:
title = fm.get("title")
if title:
key = _normalize_for_match(title)
if key and key not in by_title:
by_title[key] = stem
_index_cache["by_title"] = by_title
_index_cache["by_stem"] = by_stem
_index_cache["ts"] = now
return by_title, by_stem
def _resolve_wikilinks(body, by_title):
"""Extract [[link]] occurrences from body, return {link_text: slug_or_null}."""
out = {}
for match in _WIKILINK_RE.finditer(body or ""):
link_text = match.group(1).strip()
if not link_text or link_text in out:
continue
norm = _normalize_for_match(link_text)
out[link_text] = by_title.get(norm)
return out
# ─── Edge extraction from frontmatter ──────────────────────────────────────
_EDGE_FIELDS = {
"supports": "supports",
"challenges": "challenges",
"challenged_by": "challenges", # canonical: store as challenges direction
"related": "related",
"related_claims": "related",
"depends_on": "depends_on",
}
def _extract_edges(fm, by_title, by_stem):
"""Return edges dict shaped per Ship's contract.
Each edge is {slug, title, exists}. Slug resolved through title index.
"""
edges = {"supports": [], "challenges": [], "related": [], "depends_on": []}
for fm_key, edge_kind in _EDGE_FIELDS.items():
raw = fm.get(fm_key)
if not raw:
continue
items = raw if isinstance(raw, list) else [raw]
for item in items:
if not isinstance(item, str):
continue
text = item.strip()
# Strip wikilink wrapping if present
text = re.sub(r"^\[\[|\]\]$", "", text)
# Strip pipe annotations: "[[link|alias]]" style or "claim | edge_type | date"
text = text.split("|")[0].strip()
if not text:
continue
# Try title match first, fall back to stem match
slug = by_title.get(_normalize_for_match(text))
if not slug and text in by_stem:
slug = text
edges[edge_kind].append({
"slug": slug,
"title": text,
"exists": slug is not None,
})
return edges
# ─── Source provenance ─────────────────────────────────────────────────────
def _resolve_sourced_from(conn, claim_filepath, fm, title, stem):
"""Build sourced_from list for the claim.
Strategy: find PRs that produced this claim (via prs.description LIKE
or branch slug match), look at prs.source_path inbox archive file
parse that source's frontmatter for title/url. Falls back to the raw
`source` string from the claim's own frontmatter.
Both `title` and `stem` must be non-empty caller (handler) already
falls back stemtitle; passing empty values would leak `LIKE '%%'`
and match unrelated PRs.
"""
out = []
seen_paths = set()
pr_rows = []
if (title or "").strip() and (stem or "").strip():
try:
pr_rows = conn.execute(
"""SELECT DISTINCT source_path
FROM prs
WHERE source_path IS NOT NULL AND source_path != ''
AND (description LIKE ? OR branch LIKE ?)
LIMIT 10""",
(f"%{title}%", f"%{stem}%"),
).fetchall()
except sqlite3.OperationalError:
pr_rows = []
for row in pr_rows:
path = row["source_path"]
if not path or path in seen_paths:
continue
seen_paths.add(path)
out.append(_resolve_source_file(path))
# 2. Fallback: parse raw source frontmatter field if no PR match
if not out:
raw = fm.get("source")
if isinstance(raw, str) and raw.strip():
out.append({"path": None, "title": raw.strip()[:200], "url": None})
return out
def _resolve_source_file(rel_path):
"""Given inbox/archive/... path, parse frontmatter for title+url. Best-effort."""
full = CODEX_BASE / rel_path
entry = {"path": rel_path, "title": None, "url": None}
if full.exists():
fm, _ = _read_claim_file(full)
if fm:
entry["title"] = fm.get("title") or fm.get("source") or rel_path
entry["url"] = fm.get("url")
if not entry["title"]:
# Last resort: derive from filename
entry["title"] = Path(rel_path).stem.replace("-", " ")
return entry
# ─── Reviews + PRs ─────────────────────────────────────────────────────────
def _load_pr_history(conn, title, stem):
"""Find PRs that touched this claim and their reviews.
Both title and stem must be non-empty strings empty leaks `LIKE '%%'`
which matches every PR. Handler already populates a fallback so this
is a defense-in-depth guard.
"""
if not (title or "").strip() or not (stem or "").strip():
return [], []
try:
pr_rows = conn.execute(
"""SELECT number, merged_at, commit_type, agent, branch, status
FROM prs
WHERE merged_at IS NOT NULL
AND (description LIKE ? OR branch LIKE ?)
ORDER BY merged_at ASC
LIMIT 50""",
(f"%{title}%", f"%{stem}%"),
).fetchall()
except sqlite3.OperationalError:
return [], []
prs = [
{
"number": r["number"],
"merged_at": r["merged_at"],
"kind": r["commit_type"] or "unknown",
"agent": r["agent"],
"branch": r["branch"],
}
for r in pr_rows
]
pr_numbers = [p["number"] for p in prs]
if not pr_numbers:
return prs, []
placeholders = ",".join("?" * len(pr_numbers))
try:
review_rows = conn.execute(
f"""SELECT pr_number, reviewer, reviewer_model, outcome,
rejection_reason, notes, reviewed_at
FROM review_records
WHERE pr_number IN ({placeholders})
ORDER BY reviewed_at ASC""",
pr_numbers,
).fetchall()
except sqlite3.OperationalError:
review_rows = []
reviews = [
{
"pr_number": r["pr_number"],
"reviewer": r["reviewer"],
"model": r["reviewer_model"],
"outcome": r["outcome"],
"rejection_reason": r["rejection_reason"],
"notes": r["notes"],
"reviewed_at": r["reviewed_at"],
}
for r in review_rows
]
return prs, reviews
# ─── List view (preserved) ─────────────────────────────────────────────────
def _parse_list_entry(filepath):
fm, body = _read_claim_file(filepath)
if not text.startswith("---"):
return None
end = text.index("---", 3)
fm = yaml.safe_load(text[3:end])
if not fm or fm.get("type") != "claim":
return None
links = _WIKILINK_RE.findall(body or "")
paragraphs = [p.strip() for p in (body or "").split("\n\n")
if p.strip() and not p.strip().startswith("#")]
body = text[end+3:].strip()
# Count wiki-links
links = re.findall(r"\[\[([^\]]+)\]\]", body)
# Extract first paragraph as summary
paragraphs = [p.strip() for p in body.split("\n\n") if p.strip() and not p.strip().startswith("#")]
summary = paragraphs[0][:300] if paragraphs else ""
return {
"slug": filepath.stem,
@ -368,32 +40,40 @@ def _parse_list_entry(filepath):
"challenged_by": fm.get("challenged_by"),
"related_claims": fm.get("related_claims", []),
}
except Exception:
return None
def _load_all_claims_list():
def _load_all_claims():
now = time.time()
if _list_cache["data"] and now - _list_cache["ts"] < _LIST_CACHE_TTL:
return _list_cache["data"]
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
return _cache["data"]
claims = []
for f in _walk_claim_files():
entry = _parse_list_entry(f)
if entry:
claims.append(entry)
_list_cache["data"] = claims
_list_cache["ts"] = now
for domain_dir in sorted(CODEX_ROOT.iterdir()):
if not domain_dir.is_dir():
continue
for f in sorted(domain_dir.glob("*.md")):
if f.name == "_map.md":
continue
c = _parse_frontmatter(f)
if c:
claims.append(c)
_cache["data"] = claims
_cache["ts"] = now
return claims
# ─── Handlers ──────────────────────────────────────────────────────────────
async def handle_claims(request):
claims = _load_all_claims_list()
claims = _load_all_claims()
# Filters
domain = request.query.get("domain")
search = request.query.get("q", "").lower()
confidence = request.query.get("confidence")
agent = request.query.get("agent")
sort = request.query.get("sort", "recent")
sort = request.query.get("sort", "recent") # recent, alpha, domain
filtered = claims
if domain:
@ -403,9 +83,9 @@ async def handle_claims(request):
if agent:
filtered = [c for c in filtered if c["agent"] == agent]
if search:
filtered = [c for c in filtered
if search in c["title"].lower() or search in c["summary"].lower()]
filtered = [c for c in filtered if search in c["title"].lower() or search in c["summary"].lower()]
# Sort
if sort == "recent":
filtered.sort(key=lambda c: c["created"], reverse=True)
elif sort == "alpha":
@ -413,10 +93,12 @@ async def handle_claims(request):
elif sort == "domain":
filtered.sort(key=lambda c: (c["domain"], c["title"].lower()))
# Pagination
limit = min(int(request.query.get("limit", "50")), 200)
offset = int(request.query.get("offset", "0"))
page = filtered[offset:offset+limit]
# Domain counts for sidebar
domain_counts = {}
for c in claims:
domain_counts[c["domain"]] = domain_counts.get(c["domain"], 0) + 1
@ -429,114 +111,31 @@ async def handle_claims(request):
"domains": dict(sorted(domain_counts.items(), key=lambda x: -x[1])),
"confidence_levels": sorted(set(c["confidence"] for c in claims)),
"agents": sorted(set(c["agent"] for c in claims if c["agent"])),
}, headers=CORS_HEADERS)
}, headers={"Access-Control-Allow-Origin": "*"})
async def handle_claim_detail(request):
"""GET /api/claims/{slug} — canonical claim detail page (Ship contract).
One round-trip, all data resolved server-side. Wikilinks pre-resolved.
"""
requested_slug = request.match_info["slug"]
by_title, by_stem = _build_indexes()
# Resolution order: exact stem → title-normalized (handles description-derived
# slugs from /api/activity-feed that are longer than on-disk file stems) →
# stem-as-prefix (handles description-derived slugs that are shorter than the
# file stem because the description was truncated upstream).
slug = requested_slug
rel_path = by_stem.get(slug)
if not rel_path:
# Title fallback: requested slug = slugified frontmatter title
norm = _normalize_for_match(requested_slug)
resolved_stem = by_title.get(norm)
if resolved_stem:
slug = resolved_stem
rel_path = by_stem.get(resolved_stem)
if not rel_path:
# Prefix fallback: walk stems sharing a common prefix with the request,
# pick longest match. Anchored at 32 chars to avoid spurious hits.
norm_req = _normalize_for_match(requested_slug)
best_stem = None
best_len = 0
for stem in by_stem:
norm_stem = _normalize_for_match(stem)
common = 0
for a, b in zip(norm_req, norm_stem):
if a != b:
slug = request.match_info["slug"]
claims = _load_all_claims()
for c in claims:
if c["slug"] == slug:
# Read full body for detail view
for domain_dir in CODEX_ROOT.iterdir():
if not domain_dir.is_dir():
continue
f = domain_dir / f"{slug}.md"
if f.exists():
text = f.read_text(encoding="utf-8")
end = text.index("---", 3)
body = text[end+3:].strip()
c["body"] = body
break
common += 1
if common >= 32 and common > best_len:
best_stem = stem
best_len = common
if best_stem:
slug = best_stem
rel_path = by_stem.get(best_stem)
if not rel_path:
return web.json_response({"error": "claim not found", "slug": requested_slug},
status=404, headers=CORS_HEADERS)
filepath = CODEX_BASE / rel_path
fm, body = _read_claim_file(filepath)
if not fm:
# File exists at this stem but has no parseable frontmatter — almost
# always a stray enrichment fragment that landed in domains/ without
# being merged into a parent claim. Surfacing as 404 (no claim here)
# not 500: the caller can't act on it differently anyway.
return web.json_response({"error": "claim not found", "slug": slug,
"reason": "file_no_frontmatter"},
status=404, headers=CORS_HEADERS)
# Open read-only DB connection for this request
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
try:
title = fm.get("title") or slug.replace("-", " ")
prs, reviews = _load_pr_history(conn, title, slug)
sourced_from = _resolve_sourced_from(conn, filepath, fm, title, slug)
finally:
conn.close()
last_review = None
if reviews:
latest = reviews[-1]
last_review = {
"outcome": latest["outcome"],
"reviewer": latest["reviewer"],
"date": (latest["reviewed_at"] or "")[:10],
}
# secondary_domains: explicit list, or empty
secondary = fm.get("secondary_domains") or fm.get("cross_domain_links") or []
if isinstance(secondary, str):
secondary = [secondary]
description = fm.get("description") or ""
edges = _extract_edges(fm, by_title, by_stem)
wikilinks = _resolve_wikilinks(body, by_title)
response = {
"slug": slug,
"title": title,
"domain": fm.get("domain", "unknown"),
"secondary_domains": secondary,
"confidence": fm.get("confidence", "unknown"),
"description": description,
"created": str(fm.get("created", "")),
"last_review": last_review,
"body": body or "",
"sourced_from": sourced_from,
"reviews": reviews,
"prs": prs,
"edges": edges,
"wikilinks": wikilinks,
}
return web.json_response(response, headers=CORS_HEADERS)
return web.json_response(c, headers={"Access-Control-Allow-Origin": "*"})
return web.json_response({"error": "claim not found"}, status=404)
async def handle_domains(request):
claims = _load_all_claims_list()
claims = _load_all_claims()
domains = {}
for c in claims:
d = c["domain"]
@ -547,11 +146,13 @@ async def handle_domains(request):
domains[d]["agents"].add(c["agent"])
conf = c["confidence"]
domains[d]["confidence_dist"][conf] = domains[d]["confidence_dist"].get(conf, 0) + 1
result = []
for d in sorted(domains.values(), key=lambda x: -x["count"]):
d["agents"] = sorted(d["agents"])
result.append(d)
return web.json_response(result, headers=CORS_HEADERS)
return web.json_response(result, headers={"Access-Control-Allow-Origin": "*"})
def register_claims_routes(app):

View file

@ -211,14 +211,6 @@ HEALTH_CHECK_INTERVAL = 60
# --- Extraction gates ---
EXTRACTION_COOLDOWN_HOURS = 4 # Skip sources with any PR activity in this window. Defense-in-depth for DB-status filter.
# --- Verdict-deadlock reaper ---
# Defaults safe (dry-run, 24h age, hourly throttle). Operator flips REAPER_DRY_RUN
# to "false" via systemctl edit teleo-pipeline → restart, no code change required.
REAPER_DRY_RUN = os.environ.get("REAPER_DRY_RUN", "true").lower() == "true"
REAPER_DEADLOCK_AGE_HOURS = int(os.environ.get("REAPER_DEADLOCK_AGE_HOURS", "24"))
REAPER_INTERVAL_SECONDS = int(os.environ.get("REAPER_INTERVAL_SECONDS", "3600"))
REAPER_MAX_PER_RUN = int(os.environ.get("REAPER_MAX_PER_RUN", "50"))
# --- Retrieval (Telegram bot) ---
RETRIEVAL_RRF_K = 20 # RRF smoothing constant — tuned for 5-10 results per source
RETRIEVAL_ENTITY_BOOST = 1.5 # RRF score multiplier for claims wiki-linked from matched entities

View file

@ -923,36 +923,6 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
except Exception:
logger.debug("Failed to read source %s", f, exc_info=True)
# Archive-basename filter: skip queue files whose basename already exists in
# inbox/archive/. Research-session commits on agent branches occasionally
# re-introduce already-archived queue files when the branch is re-merged,
# producing same-source re-extractions every cooldown cycle. The archive
# copy is the source of truth — if a file with this basename is in archive,
# the source is processed regardless of queue state. Single archive scan
# per cycle, cheap (~1k files).
#
# Assumes basename uniqueness across queue+archive — current naming
# convention (date-prefix + topic-slug) makes collisions vanishingly
# rare. If short generic names like "notes.md" enter the queue, this
# filter silently false-positives.
if unprocessed:
archive_dir = main / "inbox" / "archive"
archived_basenames: set[str] = set()
if archive_dir.exists():
for af in archive_dir.rglob("*.md"):
if af.name.startswith("_"):
continue
archived_basenames.add(af.name)
if archived_basenames:
before = len(unprocessed)
unprocessed = [
(sp, c, f) for sp, c, f in unprocessed
if Path(sp).name not in archived_basenames
]
skipped = before - len(unprocessed)
if skipped:
logger.info("Skipped %d queue source(s) — basename already in inbox/archive/", skipped)
# Don't early-return here — re-extraction sources may exist even when queue is empty
# (the re-extraction check runs after open-PR filtering below)

View file

@ -522,52 +522,29 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
Finds PRs with substantive issue tags that haven't exceeded fix budget.
Processes up to 3 per cycle (Rhea: 180s interval, don't overwhelm eval).
"""
# Build the actionable-tag list from the routing constants so adding a new
# tag to FIXABLE_TAGS / CONVERTIBLE_TAGS / UNFIXABLE_TAGS auto-updates the
# SELECT filter — no two-place edit footgun.
actionable_tags = sorted(FIXABLE_TAGS | CONVERTIBLE_TAGS | UNFIXABLE_TAGS)
placeholders = ",".join(["?"] * len(actionable_tags))
# Push the actionable-tag filter into SQL (was a post-fetch Python loop).
# The old shape selected the 3 oldest request_changes PRs and then dropped
# ones without actionable tags, so empty-eval_issues rows occupied LIMIT-3
# forever (head-of-line). Now LIMIT-3 always returns 3 actionable rows.
# Reaper handles the empty-tag PRs after their 24h cooldown.
rows = conn.execute(
f"""SELECT number, eval_issues FROM prs
"""SELECT number, eval_issues FROM prs
WHERE status = 'open'
AND tier0_pass = 1
AND (domain_verdict = 'request_changes' OR leo_verdict = 'request_changes')
AND COALESCE(fix_attempts, 0) < ?
AND (last_attempt IS NULL OR last_attempt < datetime('now', '-3 minutes'))
AND json_valid(eval_issues)
AND EXISTS (
SELECT 1 FROM json_each(eval_issues)
WHERE value IN ({placeholders})
)
ORDER BY created_at ASC
LIMIT 3""",
(MAX_SUBSTANTIVE_FIXES + config.MAX_FIX_ATTEMPTS, *actionable_tags),
(MAX_SUBSTANTIVE_FIXES + config.MAX_FIX_ATTEMPTS,), # Total budget: mechanical + substantive
).fetchall()
if not rows:
return 0, 0
# Defense-in-depth: json_valid(eval_issues) in the SELECT already filters
# corrupt JSON before json_each runs, so this WARN should be unreachable.
# Kept anyway: json_valid and json.loads use technically distinct parsers,
# and the journal entry names the failure mode if SQLite ever surfaces a
# row that passes json_valid + json_each but fails json.loads.
# Filter to only PRs with substantive issues (not just mechanical)
substantive_rows = []
for row in rows:
try:
json.loads(row["eval_issues"] or "[]")
issues = json.loads(row["eval_issues"] or "[]")
except (json.JSONDecodeError, TypeError):
logger.warning(
"PR #%d: corrupt eval_issues JSON — skipping in substantive fix cycle",
row["number"],
)
continue
if set(issues) & (FIXABLE_TAGS | CONVERTIBLE_TAGS | UNFIXABLE_TAGS):
substantive_rows.append(row)
if not substantive_rows:
@ -582,13 +559,7 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
if result.get("action"):
fixed += 1
elif result.get("skipped"):
# Was DEBUG — promoted to INFO to make stuck-PR root cause
# visible without enabling DEBUG fleet-wide. (Ship Apr 24+
# silent skip diagnosis.)
logger.info(
"PR #%d: substantive fix skipped: %s",
row["number"], result.get("reason"),
)
logger.debug("PR #%d: substantive fix skipped: %s", row["number"], result.get("reason"))
except Exception:
logger.exception("PR #%d: substantive fix failed", row["number"])
errors += 1
@ -598,191 +569,3 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
logger.info("Substantive fix cycle: %d fixed, %d errors", fixed, errors)
return fixed, errors
# ─── Verdict-deadlock reaper ──────────────────────────────────────────────
#
# Defense-in-depth for PRs that substantive_fixer can't make progress on.
# Targets two stuck-verdict shapes empirically observed in production:
#
# 1. leo:request_changes + domain:approve
# Leo asked for substantive fix; fixer either failed silently
# (no_claim_files / no_review_comments / etc.) or the issue tag isn't
# in FIXABLE | CONVERTIBLE | UNFIXABLE. PR sits forever.
#
# 2. leo:skipped + domain:request_changes
# Eval bypassed Leo (eval_attempts >= MAX). Domain rejected with no
# structured eval_issues. fixer can't classify → silent skip → forever.
#
# Both shapes need a clearance path. Reaper closes them after a 24h cooldown
# with audit_log breadcrumbs for forensics. First deploy runs in dry-run mode
# (audit "would_close" events only — no Forgejo writes, no DB closes).
#
# Reaper config (REAPER_DRY_RUN, REAPER_DEADLOCK_AGE_HOURS, REAPER_INTERVAL_SECONDS,
# REAPER_MAX_PER_RUN) lives in lib/config.py with env-var overrides — operator
# flips dry-run to live via `systemctl edit teleo-pipeline.service`
# (Environment=REAPER_DRY_RUN=false) + restart. No code change, no commit, no
# redeploy required.
async def verdict_deadlock_reaper_cycle(conn) -> int:
"""Reap PRs stuck in conflicting-verdict deadlock for >24h.
Returns count of PRs closed (or "would-close" in dry-run mode).
Throttled to once per REAPER_INTERVAL_SECONDS via sentinel audit event.
"""
# Throttle: skip if last reaper run was within REAPER_INTERVAL_SECONDS.
# Uses audit_log as the rate-limit ledger so no schema/state needed.
# stage='reaper' filter so the planner uses idx_audit_stage (avoids full scan).
last_run = conn.execute(
"SELECT MAX(timestamp) FROM audit_log "
"WHERE stage = 'reaper' AND event = 'verdict_deadlock_reaper_run'"
).fetchone()[0]
if last_run:
cur = conn.execute(
"SELECT (julianday('now') - julianday(?)) * 86400 < ?",
(last_run, config.REAPER_INTERVAL_SECONDS),
).fetchone()[0]
if cur:
return 0
# Two stuck-verdict shapes: leo:rc+domain:approve, leo:skipped+domain:rc.
#
# Branch allowlist invariant: the reaper closes ONLY disposable, pipeline-
# generated branches — content the pipeline (or a daily cron) created and
# can recreate. Four classes qualify:
#
# extract/* — per-source extraction PRs, regenerated next ingest cycle
# reweave/* — nightly graph-edge maintenance, regenerated next reweave
# fix/* — pipeline-internal fix branches
# */research-YYYY-MM-DD — daily {agent}/research-{date} cron sessions.
# Matched via SQLite `_` single-char wildcards as
# `research-20__-__-__` to literally enforce the date-
# suffix shape. Excludes hand-named research branches
# (rio/research-batch-agents-memory-harnesses,
# theseus/research-2nd-attempt-on-X, etc.) which are
# feature work owned by the agent. Pattern good through
# 2099; revisit then.
#
# WIP agent feature branches (theseus/feature-foo, epimetheus/some-fix,
# rio/research-thesis-name) are NEVER reaped — owners review their own PRs
# on their own cadence. The date-shaped pattern threads the needle: picks
# up daily synthesis output the agent regenerates tomorrow while leaving
# manually-named research work alone.
rows = conn.execute(
"""SELECT number, branch, eval_issues, leo_verdict, domain_verdict,
last_attempt, fix_attempts
FROM prs
WHERE status = 'open'
AND tier0_pass = 1
AND last_attempt IS NOT NULL
AND last_attempt < datetime('now', ? || ' hours')
AND (branch LIKE 'extract/%'
OR branch LIKE 'reweave/%'
OR branch LIKE 'fix/%'
OR branch LIKE '%/research-20__-__-__')
AND (
(leo_verdict = 'request_changes' AND domain_verdict = 'approve')
OR (leo_verdict = 'skipped' AND domain_verdict = 'request_changes')
)
ORDER BY last_attempt ASC
LIMIT ?""",
(f"-{config.REAPER_DEADLOCK_AGE_HOURS}", config.REAPER_MAX_PER_RUN),
).fetchall()
mode = "dryrun" if config.REAPER_DRY_RUN else "live"
if not rows:
# Heartbeat anyway so throttle ticks even when nothing to reap.
db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({
"candidates": 0, "closed": 0, "mode": mode,
}))
return 0
logger.info(
"Verdict-deadlock reaper [%s]: %d candidate(s) in deadlock >%dh",
mode, len(rows), config.REAPER_DEADLOCK_AGE_HOURS,
)
closed = 0
would_close = 0
errors = 0
for row in rows:
pr = row["number"]
reason_detail = {
"pr": pr,
"branch": row["branch"],
"leo_verdict": row["leo_verdict"],
"domain_verdict": row["domain_verdict"],
"eval_issues": row["eval_issues"],
"last_attempt": row["last_attempt"],
"fix_attempts": row["fix_attempts"],
}
if config.REAPER_DRY_RUN:
# Audit only — do NOT touch DB row or Forgejo state.
db.audit(conn, "reaper", "verdict_deadlock_would_close",
json.dumps(reason_detail))
logger.info(
"Reaper [dryrun]: would close PR #%d (leo=%s domain=%s issues=%s)",
pr, row["leo_verdict"], row["domain_verdict"], row["eval_issues"],
)
would_close += 1
continue
try:
comment_body = (
"Closed by verdict-deadlock reaper.\n\n"
f"This PR sat for >{config.REAPER_DEADLOCK_AGE_HOURS}h with conflicting "
f"verdicts (leo={row['leo_verdict']}, domain={row['domain_verdict']}) "
f"that the substantive fixer couldn't auto-resolve.\n\n"
f"Eval issues: `{row['eval_issues']}`\n"
f"Last attempt: {row['last_attempt']}\n\n"
"_Automated message from the LivingIP pipeline._"
)
await forgejo_api(
"POST", repo_path(f"issues/{pr}/comments"), {"body": comment_body},
)
patch_result = await forgejo_api(
"PATCH", repo_path(f"pulls/{pr}"), {"state": "closed"},
token=get_agent_token("leo"),
)
if patch_result is None:
logger.warning(
"Reaper: PR #%d Forgejo close failed — skipping DB close to "
"avoid drift", pr,
)
errors += 1
continue
# Forgejo already closed at the PATCH above — pass close_on_forgejo=False
# so close_pr() doesn't issue a redundant PATCH (which on transient
# failure returns False and skips the DB close → status drift).
await close_pr(
conn, pr,
last_error=(
f"verdict_deadlock_reaper: leo={row['leo_verdict']} "
f"domain={row['domain_verdict']} age>{config.REAPER_DEADLOCK_AGE_HOURS}h"
),
close_on_forgejo=False,
)
db.audit(conn, "reaper", "verdict_deadlock_closed",
json.dumps(reason_detail))
closed += 1
except Exception:
logger.exception("Reaper: PR #%d close failed", pr)
errors += 1
db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({
"candidates": len(rows), "closed": closed, "would_close": would_close,
"errors": errors, "mode": mode,
}))
if errors:
logger.warning(
"Verdict-deadlock reaper [%s]: %d closed, %d would-close, %d errors",
mode, closed, would_close, errors,
)
elif config.REAPER_DRY_RUN:
logger.info("Verdict-deadlock reaper [dryrun]: %d would-close", would_close)
else:
logger.info("Verdict-deadlock reaper [live]: %d closed", closed)
return closed + would_close

View file

@ -20,7 +20,7 @@ from lib import log as logmod
from lib.breaker import CircuitBreaker
from lib.evaluate import evaluate_cycle
from lib.fixer import fix_cycle as mechanical_fix_cycle
from lib.substantive_fixer import substantive_fix_cycle, verdict_deadlock_reaper_cycle
from lib.substantive_fixer import substantive_fix_cycle
from lib.health import start_health_server, stop_health_server
from lib.llm import kill_active_subprocesses
from lib.merge import merge_cycle
@ -91,30 +91,14 @@ async def ingest_cycle(conn, max_workers=None):
async def fix_cycle(conn, max_workers=None):
"""Combined fix stage: mechanical fixes first, then substantive fixes,
finally the verdict-deadlock reaper.
"""Combined fix stage: mechanical fixes first, then substantive fixes.
Mechanical (fixer.py): wiki link bracket stripping, $0
Substantive (substantive_fixer.py): confidence/title/scope fixes via LLM, $0.001
Reaper (substantive_fixer.verdict_deadlock_reaper_cycle): defense-in-depth
for stuck-verdict PRs that the substantive fixer can't progress on.
Hourly throttle, dry-run by default. Cost $0.
"""
m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers)
s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers)
# Defense-in-depth: reaper exception must never block primary fix paths.
# Same exception-isolation pattern as ingest_cycle's extract_cycle wrapper —
# propagating would trip the fix breaker and lock out mechanical+substantive
# for 15 min after 5 reaper failures.
try:
r_closed = await verdict_deadlock_reaper_cycle(conn)
except Exception:
import logging
logging.getLogger("pipeline").exception(
"Reaper cycle failed (non-fatal)"
)
r_closed = 0
return m_fixed + s_fixed + r_closed, m_errors + s_errors
return m_fixed + s_fixed, m_errors + s_errors
async def snapshot_cycle(conn, max_workers=None):