teleo-infrastructure/lib/claim_index.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

196 lines
6.3 KiB
Python

"""Claim index generator — structured index of all KB claims.
Produces claim-index.json: every claim with title, domain, confidence,
wiki links (outgoing + incoming counts), created date, word count,
challenged_by status. Consumed by:
- Argus (diagnostics dashboard — charts, vital signs)
- Vida (KB health diagnostics — orphan ratio, linkage density, freshness)
- Extraction prompt (KB index for dedup — could replace /tmp/kb-indexes/)
Generated after each merge (post-merge hook) or on demand.
Served via GET /claim-index on the health API.
Epimetheus owns this module.
"""
import json
import logging
import re
from datetime import date, datetime
from pathlib import Path
from . import config
logger = logging.getLogger("pipeline.claim_index")
WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
def _parse_frontmatter(text: str) -> dict | None:
"""Quick YAML frontmatter parser."""
if not text.startswith("---"):
return None
end = text.find("---", 3)
if end == -1:
return None
raw = text[3:end]
try:
import yaml
fm = yaml.safe_load(raw)
return fm if isinstance(fm, dict) else None
except ImportError:
pass
except Exception:
return None
# Fallback parser
fm = {}
for line in raw.strip().split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue
if ":" not in line:
continue
key, _, val = line.partition(":")
key = key.strip()
val = val.strip().strip('"').strip("'")
if val.lower() == "null" or val == "":
val = None
fm[key] = val
return fm if fm else None
def build_claim_index(repo_root: str | None = None) -> dict:
"""Build the full claim index from the repo.
Returns {generated_at, total_claims, claims: [...], domains: {...}}
"""
base = Path(repo_root) if repo_root else config.MAIN_WORKTREE
claims = []
all_stems: dict[str, str] = {} # stem → filepath (for incoming link counting)
# Phase 1: Collect all claims with outgoing links
for subdir in ["domains", "core", "foundations", "decisions"]:
full = base / subdir
if not full.is_dir():
continue
for f in full.rglob("*.md"):
if f.name.startswith("_"):
continue
try:
content = f.read_text()
except Exception:
continue
fm = _parse_frontmatter(content)
if fm is None:
continue
ftype = fm.get("type")
if ftype not in ("claim", "framework", None):
continue # Skip entities, sources, etc.
# Extract wiki links
body_start = content.find("---", 3)
body = content[body_start + 3:] if body_start > 0 else content
outgoing_links = [link.strip() for link in WIKI_LINK_RE.findall(body) if link.strip()]
# Relative path from repo root
rel_path = str(f.relative_to(base))
# Word count (body only, not frontmatter)
body_text = re.sub(r"^# .+\n", "", body).strip()
body_text = re.split(r"\n---\n", body_text)[0] # Before Relevant Notes
word_count = len(body_text.split())
# Check for challenged_by
has_challenged_by = bool(fm.get("challenged_by"))
# Created date
created = fm.get("created")
if isinstance(created, date):
created = created.isoformat()
claim = {
"file": rel_path,
"stem": f.stem,
"title": f.stem.replace("-", " "),
"domain": fm.get("domain", subdir),
"confidence": fm.get("confidence"),
"created": created,
"outgoing_links": outgoing_links,
"outgoing_count": len(outgoing_links),
"incoming_count": 0, # Computed in phase 2
"has_challenged_by": has_challenged_by,
"word_count": word_count,
"type": ftype or "claim",
}
claims.append(claim)
all_stems[f.stem] = rel_path
# Phase 2: Count incoming links
incoming_counts: dict[str, int] = {}
for claim in claims:
for link in claim["outgoing_links"]:
if link in all_stems:
incoming_counts[link] = incoming_counts.get(link, 0) + 1
for claim in claims:
claim["incoming_count"] = incoming_counts.get(claim["stem"], 0)
# Domain summary
domain_counts: dict[str, int] = {}
for claim in claims:
d = claim["domain"]
domain_counts[d] = domain_counts.get(d, 0) + 1
# Orphan detection (0 incoming links)
orphans = sum(1 for c in claims if c["incoming_count"] == 0)
# Cross-domain links
cross_domain_links = 0
for claim in claims:
claim_domain = claim["domain"]
for link in claim["outgoing_links"]:
if link in all_stems:
# Find the linked claim's domain
for other in claims:
if other["stem"] == link and other["domain"] != claim_domain:
cross_domain_links += 1
break
index = {
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_claims": len(claims),
"domains": domain_counts,
"orphan_count": orphans,
"orphan_ratio": round(orphans / len(claims), 3) if claims else 0,
"cross_domain_links": cross_domain_links,
"claims": claims,
}
return index
def write_claim_index(repo_root: str | None = None, output_path: str | None = None) -> str:
"""Build and write claim-index.json. Returns the output path."""
index = build_claim_index(repo_root)
if output_path is None:
output_path = str(Path.home() / ".pentagon" / "workspace" / "collective" / "claim-index.json")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
# Atomic write
tmp = output_path + ".tmp"
with open(tmp, "w") as f:
json.dump(index, f, indent=2)
import os
os.rename(tmp, output_path)
logger.info("Wrote claim-index.json: %d claims, %d orphans, %d cross-domain links",
index["total_claims"], index["orphan_count"], index["cross_domain_links"])
return output_path