diff --git a/ops/kb-health-check.py b/ops/kb-health-check.py new file mode 100644 index 000000000..43addb07e --- /dev/null +++ b/ops/kb-health-check.py @@ -0,0 +1,562 @@ +#!/usr/bin/env python3 +""" +Teleo Codex — Knowledge Base Health Assessment + +Computes Tier 1 (automated) and Tier 2 (semi-automated) health metrics for the +collective knowledge base. Outputs JSON snapshot + markdown report. + +Usage: + REPO_ROOT=/path/to/teleo-codex python3 ops/kb-health-check.py + + Optional env vars: + REPO_ROOT Path to repo checkout (default: current directory) + OUTPUT_DIR Where to write snapshots (default: stdout + agents/vida/musings/) + METRICS_DIR VPS metrics directory (default: none, for local runs) + +Designed to run: + - Manually by any agent during a session + - Daily via VPS cron at /opt/teleo-eval/metrics/ + - claim-index.json is a runtime cache, regenerated each run + +Infrastructure decisions (from collective design review): + - Script lives in ops/ (shared infrastructure, not any agent's territory) + - claim-index.json is a runtime cache, not git-tracked (derived artifact) + - Daily snapshots go to VPS filesystem, not main branch (repo is for knowledge, not telemetry) + - Weekly digests go IN repo via normal PR flow (agent-authored analysis = knowledge) + +Design: Vida (domain health), Leo (cross-domain), Theseus (measurement theory), Ganymede (ops) +""" + +import os +import re +import json +import sys +from collections import defaultdict +from datetime import datetime, date +from pathlib import Path + +REPO_ROOT = os.environ.get("REPO_ROOT", ".") +CLAIM_DIRS = ["domains", "core", "foundations"] +AGENT_DIR = "agents" +TODAY = date.today().isoformat() + + +# --------------------------------------------------------------------------- +# Parsing +# --------------------------------------------------------------------------- + +def parse_frontmatter(filepath): + """Extract YAML frontmatter from a markdown file.""" + try: + with open(filepath, "r", encoding="utf-8") as f: + content = f.read() + except Exception: + return None, "" + + if not content.startswith("---"): + return None, content + + end = content.find("---", 3) + if end == -1: + return None, content + + fm_text = content[3:end].strip() + fm = {} + for line in fm_text.split("\n"): + if ":" in line: + key, val = line.split(":", 1) + fm[key.strip()] = val.strip().strip('"').strip("'") + + body = content[end + 3 :] + return fm, body + + +def extract_wiki_links(text): + """Extract all [[wiki links]] from text, ignoring pipe aliases.""" + return re.findall(r"\[\[([^\]|]+?)(?:\|[^\]]+?)?\]\]", text) + + +def extract_argumentative_links(body): + """ + Split wiki links into argumentative (in prose paragraphs) vs structural + (in 'Relevant Notes' / 'Topics' footer sections). + + Argumentative links carry more weight per Theseus's Goodhart mitigation. + """ + # Split at common footer markers + footer_markers = ["Relevant Notes:", "Topics:", "---"] + prose_section = body + for marker in footer_markers: + idx = body.rfind(marker) + if idx != -1: + prose_section = body[:idx] + break + + prose_links = extract_wiki_links(prose_section) + all_links = extract_wiki_links(body) + footer_links = [l for l in all_links if l not in prose_links] + + return prose_links, footer_links + + +def get_domain_from_path(filepath): + """Determine domain from file path.""" + parts = Path(filepath).parts + for i, p in enumerate(parts): + if p == "domains" and i + 1 < len(parts): + return parts[i + 1] + if p == "core": + # Sub-categorize core + if i + 1 < len(parts): + return parts[i + 1] + return "core" + if p == "foundations" and i + 1 < len(parts): + return parts[i + 1] + return "unknown" + + +# --------------------------------------------------------------------------- +# Claim index (runtime cache — the spine everything else computes from) +# --------------------------------------------------------------------------- + +def build_claim_index(repo_root): + """ + Build the claim index. Includes both outgoing and incoming links + per Leo's feedback (incoming links = votes of relevance, PageRank intuition). + """ + claims = [] + title_to_idx = {} + + # First pass: collect all claims with outgoing links + for base_dir in CLAIM_DIRS: + full_path = os.path.join(repo_root, base_dir) + if not os.path.exists(full_path): + continue + for root, _dirs, files in os.walk(full_path): + for f in files: + if f.endswith(".md") and not f.startswith("_") and not f.startswith("."): + filepath = os.path.join(root, f) + fm, body = parse_frontmatter(filepath) + if fm and fm.get("type") == "claim": + rel_path = os.path.relpath(filepath, repo_root) + domain = get_domain_from_path(rel_path) + prose_links, footer_links = extract_argumentative_links(body) + all_links = extract_wiki_links(body) + + idx = len(claims) + title = f[:-3] + title_to_idx[title.lower()] = idx + + claims.append({ + "title": title, + "path": rel_path, + "domain": domain, + "confidence": fm.get("confidence", "unknown"), + "source": fm.get("source", ""), + "created": fm.get("created", ""), + "outgoing_links": all_links, + "prose_links": prose_links, + "footer_links": footer_links, + "incoming_links": [], # populated in second pass + "body": body, + }) + + # Second pass: compute incoming links + for i, claim in enumerate(claims): + for link in claim["outgoing_links"]: + target_idx = title_to_idx.get(link.lower()) + if target_idx is not None: + claims[target_idx]["incoming_links"].append(claim["title"]) + + return claims, title_to_idx + + +# --------------------------------------------------------------------------- +# Belief parsing +# --------------------------------------------------------------------------- + +def parse_beliefs(repo_root): + """Parse all agent belief files for grounding depth analysis.""" + beliefs = {} + agents_path = os.path.join(repo_root, AGENT_DIR) + if not os.path.exists(agents_path): + return beliefs + + for agent_name in os.listdir(agents_path): + beliefs_file = os.path.join(agents_path, agent_name, "beliefs.md") + if os.path.exists(beliefs_file): + with open(beliefs_file, "r") as f: + content = f.read() + belief_headings = re.findall(r"### \d+\.", content) + grounding_links = extract_wiki_links(content) + beliefs[agent_name] = { + "count": len(belief_headings), + "total_grounding_links": len(grounding_links), + "avg_grounding": round( + len(grounding_links) / max(len(belief_headings), 1), 1 + ), + } + return beliefs + + +# --------------------------------------------------------------------------- +# Metrics +# --------------------------------------------------------------------------- + +def compute_metrics(claims, title_to_idx, beliefs): + """Compute all Tier 1 and Tier 2 metrics.""" + total = len(claims) + results = { + "generated": datetime.now().isoformat(), + "date": TODAY, + } + + # --- 1. Claim counts --- + by_domain = defaultdict(int) + for c in claims: + by_domain[c["domain"]] += 1 + + results["claims"] = { + "total": total, + "by_domain": dict(sorted(by_domain.items(), key=lambda x: -x[1])), + } + + # --- 2. Confidence distribution --- + conf_dist = defaultdict(int) + conf_by_domain = defaultdict(lambda: defaultdict(int)) + for c in claims: + conf_dist[c["confidence"]] += 1 + conf_by_domain[c["domain"]][c["confidence"]] += 1 + + results["confidence_distribution"] = { + "overall": dict(conf_dist), + "by_domain": {d: dict(v) for d, v in conf_by_domain.items()}, + } + + # --- 3. Orphan ratio --- + orphans = [] + for c in claims: + if len(c["incoming_links"]) == 0: + orphans.append({ + "title": c["title"][:100], + "domain": c["domain"], + "outgoing_links": len(c["outgoing_links"]), + }) + + orphan_ratio = len(orphans) / max(total, 1) + results["orphan_ratio"] = { + "total_claims": total, + "orphans": len(orphans), + "ratio": round(orphan_ratio, 3), + "status": ( + "healthy" if orphan_ratio < 0.10 + else "warning" if orphan_ratio < 0.20 + else "critical" + ), + "target": 0.10, + "sample_orphans": orphans[:10], + } + + # --- 4. Cross-domain linkage density --- + total_links = 0 + cross_domain_links = 0 + unresolved_links = 0 + cross_by_domain = defaultdict(lambda: {"total": 0, "cross": 0}) + + # Track reciprocal links (higher quality per Theseus) + reciprocal_count = 0 + + for c in claims: + for link in c["outgoing_links"]: + total_links += 1 + cross_by_domain[c["domain"]]["total"] += 1 + + target_idx = title_to_idx.get(link.lower()) + if target_idx is None: + unresolved_links += 1 + else: + target = claims[target_idx] + if target["domain"] != c["domain"]: + cross_domain_links += 1 + cross_by_domain[c["domain"]]["cross"] += 1 + # Check reciprocity + if c["title"].lower() in [ + l.lower() for l in target["outgoing_links"] + ]: + reciprocal_count += 1 + + cross_ratio = cross_domain_links / max(total_links, 1) + results["cross_domain_linkage"] = { + "total_links": total_links, + "cross_domain": cross_domain_links, + "ratio": round(cross_ratio, 3), + "reciprocal_links": reciprocal_count // 2, # each pair counted twice + "unresolved_links": unresolved_links, + "status": "healthy" if cross_ratio >= 0.35 else "warning" if cross_ratio >= 0.15 else "critical", + "target": 0.35, + "by_domain": { + d: { + "total": v["total"], + "cross": v["cross"], + "ratio": round(v["cross"] / max(v["total"], 1), 3), + } + for d, v in cross_by_domain.items() + }, + } + + # --- 5. Source diversity (Tier 1 per Leo) --- + source_by_domain = defaultdict(set) + for c in claims: + if c["source"]: + source_by_domain[c["domain"]].add(c["source"][:100].strip()) + + source_diversity = {} + for domain in by_domain: + n_sources = len(source_by_domain.get(domain, set())) + n_claims = by_domain[domain] + ratio = round(n_sources / max(n_claims, 1), 3) + source_diversity[domain] = { + "unique_sources": n_sources, + "total_claims": n_claims, + "ratio": ratio, + "status": "healthy" if ratio >= 0.3 else "warning", + } + + results["source_diversity"] = source_diversity + + # --- 6. Evidence freshness --- + ages = [] + stale = [] + fast_domains = {"health", "ai-alignment", "internet-finance", "entertainment"} + + for c in claims: + if c["created"]: + try: + created = datetime.strptime(c["created"], "%Y-%m-%d").date() + age = (date.today() - created).days + ages.append(age) + threshold = 180 if c["domain"] in fast_domains else 365 + if age > threshold: + stale.append({ + "title": c["title"][:80], + "domain": c["domain"], + "age_days": age, + }) + except ValueError: + pass + + results["evidence_freshness"] = { + "median_age_days": sorted(ages)[len(ages) // 2] if ages else None, + "mean_age_days": round(sum(ages) / len(ages), 1) if ages else None, + "stale_count": len(stale), + "total_with_dates": len(ages), + "stale_claims": stale[:10], + } + + # --- 7. Belief grounding depth --- + results["belief_grounding"] = beliefs + + # --- 8. Challenge coverage --- + likely_proven = [c for c in claims if c["confidence"] in ("likely", "proven")] + has_challenge = 0 + for c in likely_proven: + body_lower = c["body"].lower() + if any( + marker in body_lower + for marker in ["challenged_by", "counter-evidence", "counter:", "challenges considered"] + ): + has_challenge += 1 + + challenge_ratio = has_challenge / max(len(likely_proven), 1) + results["challenge_coverage"] = { + "likely_proven_claims": len(likely_proven), + "with_challenges": has_challenge, + "ratio": round(challenge_ratio, 3), + "status": "healthy" if challenge_ratio >= 0.25 else "warning", + "target": 0.25, + } + + # --- 9. Most-linked claims (centrality, from incoming links) --- + centrality = sorted(claims, key=lambda c: len(c["incoming_links"]), reverse=True) + results["most_central_claims"] = [ + { + "title": c["title"][:100], + "domain": c["domain"], + "incoming_links": len(c["incoming_links"]), + } + for c in centrality[:10] + ] + + return results + + +# --------------------------------------------------------------------------- +# Report formatting +# --------------------------------------------------------------------------- + +def format_report(results): + """Format results as readable markdown.""" + lines = [] + lines.append("# Teleo Codex — Knowledge Base Health Assessment") + lines.append(f"*Generated: {results['generated']}*") + lines.append("") + + # Claims + c = results["claims"] + lines.append(f"## 1. Claim Inventory — {c['total']} total") + lines.append("") + lines.append("| Domain | Claims |") + lines.append("|--------|--------|") + for domain, count in c["by_domain"].items(): + lines.append(f"| {domain} | {count} |") + lines.append("") + + # Confidence + cd = results["confidence_distribution"] + lines.append("## 2. Confidence Distribution") + lines.append("") + lines.append("| Domain | proven | likely | experimental | speculative |") + lines.append("|--------|--------|--------|-------------|-------------|") + for domain, dist in cd["by_domain"].items(): + lines.append( + f"| {domain} | {dist.get('proven',0)} | {dist.get('likely',0)} " + f"| {dist.get('experimental',0)} | {dist.get('speculative',0)} |" + ) + lines.append("") + + # Orphans + o = results["orphan_ratio"] + lines.append(f"## 3. Orphan Ratio — {o['status'].upper()}") + lines.append( + f"**{o['orphans']}/{o['total_claims']} claims are orphans " + f"({o['ratio']:.1%})** — target: <{o['target']:.0%}" + ) + lines.append("") + + # Cross-domain + cl = results["cross_domain_linkage"] + lines.append(f"## 4. Cross-Domain Linkage — {cl['status'].upper()}") + lines.append( + f"**{cl['cross_domain']}/{cl['total_links']} links cross domain boundaries " + f"({cl['ratio']:.1%})** — target: >{cl['target']:.0%}" + ) + lines.append(f"Reciprocal link pairs: {cl['reciprocal_links']}") + lines.append(f"Unresolved links: {cl['unresolved_links']}") + lines.append("") + lines.append("| Domain | Total links | Cross-domain | Ratio |") + lines.append("|--------|------------|-------------|-------|") + for domain, v in sorted(cl["by_domain"].items(), key=lambda x: -x[1]["total"]): + lines.append(f"| {domain} | {v['total']} | {v['cross']} | {v['ratio']:.1%} |") + lines.append("") + + # Source diversity + sd = results["source_diversity"] + lines.append("## 5. Source Diversity") + lines.append("") + lines.append("| Domain | Unique sources | Claims | Ratio | Status |") + lines.append("|--------|---------------|--------|-------|--------|") + for domain, v in sorted(sd.items(), key=lambda x: x[1]["ratio"]): + lines.append( + f"| {domain} | {v['unique_sources']} | {v['total_claims']} " + f"| {v['ratio']:.2f} | {v['status']} |" + ) + lines.append("") + + # Evidence freshness + ef = results["evidence_freshness"] + lines.append("## 6. Evidence Freshness") + lines.append( + f"**Median claim age: {ef['median_age_days']} days " + f"| Mean: {ef['mean_age_days']} days**" + ) + lines.append(f"Stale claims: {ef['stale_count']}") + lines.append("") + + # Belief grounding + bg = results["belief_grounding"] + lines.append("## 7. Belief Grounding Depth") + lines.append("") + lines.append("| Agent | Beliefs | Total grounding links | Avg per belief |") + lines.append("|-------|---------|---------------------|----------------|") + for agent, v in sorted(bg.items()): + lines.append( + f"| {agent} | {v['count']} | {v['total_grounding_links']} " + f"| {v['avg_grounding']} |" + ) + lines.append("") + + # Challenge coverage + cc = results["challenge_coverage"] + lines.append(f"## 8. Challenge Coverage — {cc['status'].upper()}") + lines.append( + f"**{cc['with_challenges']}/{cc['likely_proven_claims']} likely/proven claims " + f"acknowledge counter-evidence ({cc['ratio']:.1%})** — target: >{cc['target']:.0%}" + ) + lines.append("") + + # Most central + mc = results["most_central_claims"] + lines.append("## 9. Most Central Claims (by incoming links)") + lines.append("") + lines.append("| Claim | Domain | Incoming |") + lines.append("|-------|--------|----------|") + for item in mc: + lines.append(f"| {item['title'][:70]}... | {item['domain']} | {item['incoming_links']} |") + lines.append("") + + # Automation note + lines.append("---") + lines.append("") + lines.append("*Automate more of this over time: daily VPS cron, belief drift detection,") + lines.append("reasoning chain depth, weekly digest template. See agents/vida/musings/kb-health-assessment-design.md.*") + lines.append("") + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + repo_root = os.environ.get("REPO_ROOT", ".") + output_dir = os.environ.get("OUTPUT_DIR", os.path.join(repo_root, "agents", "vida", "musings")) + metrics_dir = os.environ.get("METRICS_DIR", None) + + # Build index + compute + claims, title_to_idx = build_claim_index(repo_root) + beliefs = parse_beliefs(repo_root) + results = compute_metrics(claims, title_to_idx, beliefs) + + # Strip body from claims before serializing (too large for JSON output) + for c in claims: + c.pop("body", None) + c.pop("prose_links", None) + c.pop("footer_links", None) + + # Write claim-index (runtime cache) + index_output = { + "generated": results["generated"], + "total_claims": len(claims), + "claims": claims, + } + + # Write outputs + report_md = format_report(results) + + if metrics_dir: + # VPS mode: write to metrics directory + os.makedirs(os.path.join(metrics_dir, "daily-evolution"), exist_ok=True) + snapshot_path = os.path.join(metrics_dir, "daily-evolution", f"{TODAY}.json") + index_path = os.path.join(metrics_dir, "claim-index.json") + + with open(snapshot_path, "w") as f: + json.dump(results, f, indent=2) + with open(index_path, "w") as f: + json.dump(index_output, f, indent=2) + + print(f"Snapshot written to {snapshot_path}", file=sys.stderr) + print(f"Index written to {index_path}", file=sys.stderr) + + # Always write markdown report to stdout + print(report_md)