#!/usr/bin/env python3 """ Teleo Codex — Knowledge Base Health Assessment Computes Tier 1 (automated) and Tier 2 (semi-automated) health metrics for the collective knowledge base. Outputs JSON snapshot + markdown report. Usage: REPO_ROOT=/path/to/teleo-codex python3 ops/kb-health-check.py Optional env vars: REPO_ROOT Path to repo checkout (default: current directory) OUTPUT_DIR Where to write snapshots (default: stdout + agents/vida/musings/) METRICS_DIR VPS metrics directory (default: none, for local runs) Designed to run: - Manually by any agent during a session - Daily via VPS cron at /opt/teleo-eval/metrics/ - claim-index.json is a runtime cache, regenerated each run Infrastructure decisions (from collective design review): - Script lives in ops/ (shared infrastructure, not any agent's territory) - claim-index.json is a runtime cache, not git-tracked (derived artifact) - Daily snapshots go to VPS filesystem, not main branch (repo is for knowledge, not telemetry) - Weekly digests go IN repo via normal PR flow (agent-authored analysis = knowledge) Design: Vida (domain health), Leo (cross-domain), Theseus (measurement theory), Ganymede (ops) """ import os import re import json import sys from collections import defaultdict from datetime import datetime, date from pathlib import Path REPO_ROOT = os.environ.get("REPO_ROOT", ".") CLAIM_DIRS = ["domains", "core", "foundations"] AGENT_DIR = "agents" TODAY = date.today().isoformat() # --------------------------------------------------------------------------- # Parsing # --------------------------------------------------------------------------- def parse_frontmatter(filepath): """Extract YAML frontmatter from a markdown file.""" try: with open(filepath, "r", encoding="utf-8") as f: content = f.read() except Exception: return None, "" if not content.startswith("---"): return None, content end = content.find("---", 3) if end == -1: return None, content fm_text = content[3:end].strip() fm = {} for line in fm_text.split("\n"): if ":" in line: key, val = line.split(":", 1) fm[key.strip()] = val.strip().strip('"').strip("'") body = content[end + 3 :] return fm, body def extract_wiki_links(text): """Extract all [[wiki links]] from text, ignoring pipe aliases.""" return re.findall(r"\[\[([^\]|]+?)(?:\|[^\]]+?)?\]\]", text) def extract_argumentative_links(body): """ Split wiki links into argumentative (in prose paragraphs) vs structural (in 'Relevant Notes' / 'Topics' footer sections). Argumentative links carry more weight per Theseus's Goodhart mitigation. """ # Split at common footer markers footer_markers = ["Relevant Notes:", "Topics:", "---"] prose_section = body for marker in footer_markers: idx = body.rfind(marker) if idx != -1: prose_section = body[:idx] break prose_links = extract_wiki_links(prose_section) all_links = extract_wiki_links(body) footer_links = [l for l in all_links if l not in prose_links] return prose_links, footer_links def get_domain_from_path(filepath): """Determine domain from file path.""" parts = Path(filepath).parts for i, p in enumerate(parts): if p == "domains" and i + 1 < len(parts): return parts[i + 1] if p == "core": # Sub-categorize core if i + 1 < len(parts): return parts[i + 1] return "core" if p == "foundations" and i + 1 < len(parts): return parts[i + 1] return "unknown" # --------------------------------------------------------------------------- # Claim index (runtime cache — the spine everything else computes from) # --------------------------------------------------------------------------- def build_claim_index(repo_root): """ Build the claim index. Includes both outgoing and incoming links per Leo's feedback (incoming links = votes of relevance, PageRank intuition). """ claims = [] title_to_idx = {} # First pass: collect all claims with outgoing links for base_dir in CLAIM_DIRS: full_path = os.path.join(repo_root, base_dir) if not os.path.exists(full_path): continue for root, _dirs, files in os.walk(full_path): for f in files: if f.endswith(".md") and not f.startswith("_") and not f.startswith("."): filepath = os.path.join(root, f) fm, body = parse_frontmatter(filepath) if fm and fm.get("type") == "claim": rel_path = os.path.relpath(filepath, repo_root) domain = get_domain_from_path(rel_path) prose_links, footer_links = extract_argumentative_links(body) all_links = extract_wiki_links(body) idx = len(claims) title = f[:-3] title_to_idx[title.lower()] = idx claims.append({ "title": title, "path": rel_path, "domain": domain, "confidence": fm.get("confidence", "unknown"), "source": fm.get("source", ""), "created": fm.get("created", ""), "outgoing_links": all_links, "prose_links": prose_links, "footer_links": footer_links, "incoming_links": [], # populated in second pass "body": body, }) # Second pass: compute incoming links for i, claim in enumerate(claims): for link in claim["outgoing_links"]: target_idx = title_to_idx.get(link.lower()) if target_idx is not None: claims[target_idx]["incoming_links"].append(claim["title"]) return claims, title_to_idx # --------------------------------------------------------------------------- # Belief parsing # --------------------------------------------------------------------------- def parse_beliefs(repo_root): """Parse all agent belief files for grounding depth analysis.""" beliefs = {} agents_path = os.path.join(repo_root, AGENT_DIR) if not os.path.exists(agents_path): return beliefs for agent_name in os.listdir(agents_path): beliefs_file = os.path.join(agents_path, agent_name, "beliefs.md") if os.path.exists(beliefs_file): with open(beliefs_file, "r") as f: content = f.read() belief_headings = re.findall(r"### \d+\.", content) grounding_links = extract_wiki_links(content) beliefs[agent_name] = { "count": len(belief_headings), "total_grounding_links": len(grounding_links), "avg_grounding": round( len(grounding_links) / max(len(belief_headings), 1), 1 ), } return beliefs # --------------------------------------------------------------------------- # Metrics # --------------------------------------------------------------------------- def compute_metrics(claims, title_to_idx, beliefs): """Compute all Tier 1 and Tier 2 metrics.""" total = len(claims) results = { "generated": datetime.now().isoformat(), "date": TODAY, } # --- 1. Claim counts --- by_domain = defaultdict(int) for c in claims: by_domain[c["domain"]] += 1 results["claims"] = { "total": total, "by_domain": dict(sorted(by_domain.items(), key=lambda x: -x[1])), } # --- 2. Confidence distribution --- conf_dist = defaultdict(int) conf_by_domain = defaultdict(lambda: defaultdict(int)) for c in claims: conf_dist[c["confidence"]] += 1 conf_by_domain[c["domain"]][c["confidence"]] += 1 results["confidence_distribution"] = { "overall": dict(conf_dist), "by_domain": {d: dict(v) for d, v in conf_by_domain.items()}, } # --- 3. Orphan ratio --- orphans = [] for c in claims: if len(c["incoming_links"]) == 0: orphans.append({ "title": c["title"][:100], "domain": c["domain"], "outgoing_links": len(c["outgoing_links"]), }) orphan_ratio = len(orphans) / max(total, 1) results["orphan_ratio"] = { "total_claims": total, "orphans": len(orphans), "ratio": round(orphan_ratio, 3), "status": ( "healthy" if orphan_ratio < 0.10 else "warning" if orphan_ratio < 0.20 else "critical" ), "target": 0.10, "sample_orphans": orphans[:10], } # --- 4. Cross-domain linkage density --- total_links = 0 cross_domain_links = 0 unresolved_links = 0 cross_by_domain = defaultdict(lambda: {"total": 0, "cross": 0}) # Track reciprocal links (higher quality per Theseus) reciprocal_count = 0 for c in claims: for link in c["outgoing_links"]: total_links += 1 cross_by_domain[c["domain"]]["total"] += 1 target_idx = title_to_idx.get(link.lower()) if target_idx is None: unresolved_links += 1 else: target = claims[target_idx] if target["domain"] != c["domain"]: cross_domain_links += 1 cross_by_domain[c["domain"]]["cross"] += 1 # Check reciprocity if c["title"].lower() in [ l.lower() for l in target["outgoing_links"] ]: reciprocal_count += 1 cross_ratio = cross_domain_links / max(total_links, 1) results["cross_domain_linkage"] = { "total_links": total_links, "cross_domain": cross_domain_links, "ratio": round(cross_ratio, 3), "reciprocal_links": reciprocal_count // 2, # each pair counted twice "unresolved_links": unresolved_links, "status": "healthy" if cross_ratio >= 0.35 else "warning" if cross_ratio >= 0.15 else "critical", "target": 0.35, "by_domain": { d: { "total": v["total"], "cross": v["cross"], "ratio": round(v["cross"] / max(v["total"], 1), 3), } for d, v in cross_by_domain.items() }, } # --- 5. Source diversity (Tier 1 per Leo) --- source_by_domain = defaultdict(set) for c in claims: if c["source"]: source_by_domain[c["domain"]].add(c["source"][:100].strip()) source_diversity = {} for domain in by_domain: n_sources = len(source_by_domain.get(domain, set())) n_claims = by_domain[domain] ratio = round(n_sources / max(n_claims, 1), 3) source_diversity[domain] = { "unique_sources": n_sources, "total_claims": n_claims, "ratio": ratio, "status": "healthy" if ratio >= 0.3 else "warning", } results["source_diversity"] = source_diversity # --- 6. Evidence freshness --- ages = [] stale = [] fast_domains = {"health", "ai-alignment", "internet-finance", "entertainment"} for c in claims: if c["created"]: try: created = datetime.strptime(c["created"], "%Y-%m-%d").date() age = (date.today() - created).days ages.append(age) threshold = 180 if c["domain"] in fast_domains else 365 if age > threshold: stale.append({ "title": c["title"][:80], "domain": c["domain"], "age_days": age, }) except ValueError: pass results["evidence_freshness"] = { "median_age_days": sorted(ages)[len(ages) // 2] if ages else None, "mean_age_days": round(sum(ages) / len(ages), 1) if ages else None, "stale_count": len(stale), "total_with_dates": len(ages), "stale_claims": stale[:10], } # --- 7. Belief grounding depth --- results["belief_grounding"] = beliefs # --- 8. Challenge coverage --- likely_proven = [c for c in claims if c["confidence"] in ("likely", "proven")] has_challenge = 0 for c in likely_proven: body_lower = c["body"].lower() if any( marker in body_lower for marker in ["challenged_by", "counter-evidence", "counter:", "challenges considered"] ): has_challenge += 1 challenge_ratio = has_challenge / max(len(likely_proven), 1) results["challenge_coverage"] = { "likely_proven_claims": len(likely_proven), "with_challenges": has_challenge, "ratio": round(challenge_ratio, 3), "status": "healthy" if challenge_ratio >= 0.25 else "warning", "target": 0.25, } # --- 9. Most-linked claims (centrality, from incoming links) --- centrality = sorted(claims, key=lambda c: len(c["incoming_links"]), reverse=True) results["most_central_claims"] = [ { "title": c["title"][:100], "domain": c["domain"], "incoming_links": len(c["incoming_links"]), } for c in centrality[:10] ] return results # --------------------------------------------------------------------------- # Report formatting # --------------------------------------------------------------------------- def format_report(results): """Format results as readable markdown.""" lines = [] lines.append("# Teleo Codex — Knowledge Base Health Assessment") lines.append(f"*Generated: {results['generated']}*") lines.append("") # Claims c = results["claims"] lines.append(f"## 1. Claim Inventory — {c['total']} total") lines.append("") lines.append("| Domain | Claims |") lines.append("|--------|--------|") for domain, count in c["by_domain"].items(): lines.append(f"| {domain} | {count} |") lines.append("") # Confidence cd = results["confidence_distribution"] lines.append("## 2. Confidence Distribution") lines.append("") lines.append("| Domain | proven | likely | experimental | speculative |") lines.append("|--------|--------|--------|-------------|-------------|") for domain, dist in cd["by_domain"].items(): lines.append( f"| {domain} | {dist.get('proven',0)} | {dist.get('likely',0)} " f"| {dist.get('experimental',0)} | {dist.get('speculative',0)} |" ) lines.append("") # Orphans o = results["orphan_ratio"] lines.append(f"## 3. Orphan Ratio — {o['status'].upper()}") lines.append( f"**{o['orphans']}/{o['total_claims']} claims are orphans " f"({o['ratio']:.1%})** — target: <{o['target']:.0%}" ) lines.append("") # Cross-domain cl = results["cross_domain_linkage"] lines.append(f"## 4. Cross-Domain Linkage — {cl['status'].upper()}") lines.append( f"**{cl['cross_domain']}/{cl['total_links']} links cross domain boundaries " f"({cl['ratio']:.1%})** — target: >{cl['target']:.0%}" ) lines.append(f"Reciprocal link pairs: {cl['reciprocal_links']}") lines.append(f"Unresolved links: {cl['unresolved_links']}") lines.append("") lines.append("| Domain | Total links | Cross-domain | Ratio |") lines.append("|--------|------------|-------------|-------|") for domain, v in sorted(cl["by_domain"].items(), key=lambda x: -x[1]["total"]): lines.append(f"| {domain} | {v['total']} | {v['cross']} | {v['ratio']:.1%} |") lines.append("") # Source diversity sd = results["source_diversity"] lines.append("## 5. Source Diversity") lines.append("") lines.append("| Domain | Unique sources | Claims | Ratio | Status |") lines.append("|--------|---------------|--------|-------|--------|") for domain, v in sorted(sd.items(), key=lambda x: x[1]["ratio"]): lines.append( f"| {domain} | {v['unique_sources']} | {v['total_claims']} " f"| {v['ratio']:.2f} | {v['status']} |" ) lines.append("") # Evidence freshness ef = results["evidence_freshness"] lines.append("## 6. Evidence Freshness") lines.append( f"**Median claim age: {ef['median_age_days']} days " f"| Mean: {ef['mean_age_days']} days**" ) lines.append(f"Stale claims: {ef['stale_count']}") lines.append("") # Belief grounding bg = results["belief_grounding"] lines.append("## 7. Belief Grounding Depth") lines.append("") lines.append("| Agent | Beliefs | Total grounding links | Avg per belief |") lines.append("|-------|---------|---------------------|----------------|") for agent, v in sorted(bg.items()): lines.append( f"| {agent} | {v['count']} | {v['total_grounding_links']} " f"| {v['avg_grounding']} |" ) lines.append("") # Challenge coverage cc = results["challenge_coverage"] lines.append(f"## 8. Challenge Coverage — {cc['status'].upper()}") lines.append( f"**{cc['with_challenges']}/{cc['likely_proven_claims']} likely/proven claims " f"acknowledge counter-evidence ({cc['ratio']:.1%})** — target: >{cc['target']:.0%}" ) lines.append("") # Most central mc = results["most_central_claims"] lines.append("## 9. Most Central Claims (by incoming links)") lines.append("") lines.append("| Claim | Domain | Incoming |") lines.append("|-------|--------|----------|") for item in mc: lines.append(f"| {item['title'][:70]}... | {item['domain']} | {item['incoming_links']} |") lines.append("") # Automation note lines.append("---") lines.append("") lines.append("*Automate more of this over time: daily VPS cron, belief drift detection,") lines.append("reasoning chain depth, weekly digest template. See agents/vida/musings/kb-health-assessment-design.md.*") lines.append("") return "\n".join(lines) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": repo_root = os.environ.get("REPO_ROOT", ".") output_dir = os.environ.get("OUTPUT_DIR", os.path.join(repo_root, "agents", "vida", "musings")) metrics_dir = os.environ.get("METRICS_DIR", None) # Build index + compute claims, title_to_idx = build_claim_index(repo_root) beliefs = parse_beliefs(repo_root) results = compute_metrics(claims, title_to_idx, beliefs) # Strip body from claims before serializing (too large for JSON output) for c in claims: c.pop("body", None) c.pop("prose_links", None) c.pop("footer_links", None) # Write claim-index (runtime cache) index_output = { "generated": results["generated"], "total_claims": len(claims), "claims": claims, } # Write outputs report_md = format_report(results) if metrics_dir: # VPS mode: write to metrics directory os.makedirs(os.path.join(metrics_dir, "daily-evolution"), exist_ok=True) snapshot_path = os.path.join(metrics_dir, "daily-evolution", f"{TODAY}.json") index_path = os.path.join(metrics_dir, "claim-index.json") with open(snapshot_path, "w") as f: json.dump(results, f, indent=2) with open(index_path, "w") as f: json.dump(index_output, f, indent=2) print(f"Snapshot written to {snapshot_path}", file=sys.stderr) print(f"Index written to {index_path}", file=sys.stderr) # Always write markdown report to stdout print(report_md)