- What: ops/kb-health-check.py — computes Tier 1 health metrics for the collective knowledge base. Claim inventory, confidence distribution, orphan ratio, cross-domain linkage density (with reciprocal link tracking), source diversity, evidence freshness, belief grounding depth, challenge coverage, most central claims. Generates claim-index as runtime cache. Outputs markdown report to stdout, JSON snapshots to configurable dir. - Why: First collective infrastructure for measuring KB health. Designed by Vida (domain health), Leo (cross-domain coordination), Theseus (measurement theory / Goodhart mitigation), Ganymede (ops architecture). Design spec at agents/vida/musings/kb-health-assessment-design.md. - Connections: Implements the vital signs from [[collective knowledge health is measurable through five vital signs]]. Second-order metrics (link reciprocity, argumentative vs footer links) resist Goodhart's Law per Theseus. Ganymede's ops pattern: script in repo (tracked), output on VPS (not tracked). Pentagon-Agent: Vida <3B5A4B2A-DE12-4C05-8006-D63942F19807>
562 lines
20 KiB
Python
Executable file
562 lines
20 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Teleo Codex — Knowledge Base Health Assessment
|
|
|
|
Computes Tier 1 (automated) and Tier 2 (semi-automated) health metrics for the
|
|
collective knowledge base. Outputs JSON snapshot + markdown report.
|
|
|
|
Usage:
|
|
REPO_ROOT=/path/to/teleo-codex python3 ops/kb-health-check.py
|
|
|
|
Optional env vars:
|
|
REPO_ROOT Path to repo checkout (default: current directory)
|
|
OUTPUT_DIR Where to write snapshots (default: stdout + agents/vida/musings/)
|
|
METRICS_DIR VPS metrics directory (default: none, for local runs)
|
|
|
|
Designed to run:
|
|
- Manually by any agent during a session
|
|
- Daily via VPS cron at /opt/teleo-eval/metrics/
|
|
- claim-index.json is a runtime cache, regenerated each run
|
|
|
|
Infrastructure decisions (from collective design review):
|
|
- Script lives in ops/ (shared infrastructure, not any agent's territory)
|
|
- claim-index.json is a runtime cache, not git-tracked (derived artifact)
|
|
- Daily snapshots go to VPS filesystem, not main branch (repo is for knowledge, not telemetry)
|
|
- Weekly digests go IN repo via normal PR flow (agent-authored analysis = knowledge)
|
|
|
|
Design: Vida (domain health), Leo (cross-domain), Theseus (measurement theory), Ganymede (ops)
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, date
|
|
from pathlib import Path
|
|
|
|
REPO_ROOT = os.environ.get("REPO_ROOT", ".")
|
|
CLAIM_DIRS = ["domains", "core", "foundations"]
|
|
AGENT_DIR = "agents"
|
|
TODAY = date.today().isoformat()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_frontmatter(filepath):
|
|
"""Extract YAML frontmatter from a markdown file."""
|
|
try:
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
except Exception:
|
|
return None, ""
|
|
|
|
if not content.startswith("---"):
|
|
return None, content
|
|
|
|
end = content.find("---", 3)
|
|
if end == -1:
|
|
return None, content
|
|
|
|
fm_text = content[3:end].strip()
|
|
fm = {}
|
|
for line in fm_text.split("\n"):
|
|
if ":" in line:
|
|
key, val = line.split(":", 1)
|
|
fm[key.strip()] = val.strip().strip('"').strip("'")
|
|
|
|
body = content[end + 3 :]
|
|
return fm, body
|
|
|
|
|
|
def extract_wiki_links(text):
|
|
"""Extract all [[wiki links]] from text, ignoring pipe aliases."""
|
|
return re.findall(r"\[\[([^\]|]+?)(?:\|[^\]]+?)?\]\]", text)
|
|
|
|
|
|
def extract_argumentative_links(body):
|
|
"""
|
|
Split wiki links into argumentative (in prose paragraphs) vs structural
|
|
(in 'Relevant Notes' / 'Topics' footer sections).
|
|
|
|
Argumentative links carry more weight per Theseus's Goodhart mitigation.
|
|
"""
|
|
# Split at common footer markers
|
|
footer_markers = ["Relevant Notes:", "Topics:", "---"]
|
|
prose_section = body
|
|
for marker in footer_markers:
|
|
idx = body.rfind(marker)
|
|
if idx != -1:
|
|
prose_section = body[:idx]
|
|
break
|
|
|
|
prose_links = extract_wiki_links(prose_section)
|
|
all_links = extract_wiki_links(body)
|
|
footer_links = [l for l in all_links if l not in prose_links]
|
|
|
|
return prose_links, footer_links
|
|
|
|
|
|
def get_domain_from_path(filepath):
|
|
"""Determine domain from file path."""
|
|
parts = Path(filepath).parts
|
|
for i, p in enumerate(parts):
|
|
if p == "domains" and i + 1 < len(parts):
|
|
return parts[i + 1]
|
|
if p == "core":
|
|
# Sub-categorize core
|
|
if i + 1 < len(parts):
|
|
return parts[i + 1]
|
|
return "core"
|
|
if p == "foundations" and i + 1 < len(parts):
|
|
return parts[i + 1]
|
|
return "unknown"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Claim index (runtime cache — the spine everything else computes from)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_claim_index(repo_root):
|
|
"""
|
|
Build the claim index. Includes both outgoing and incoming links
|
|
per Leo's feedback (incoming links = votes of relevance, PageRank intuition).
|
|
"""
|
|
claims = []
|
|
title_to_idx = {}
|
|
|
|
# First pass: collect all claims with outgoing links
|
|
for base_dir in CLAIM_DIRS:
|
|
full_path = os.path.join(repo_root, base_dir)
|
|
if not os.path.exists(full_path):
|
|
continue
|
|
for root, _dirs, files in os.walk(full_path):
|
|
for f in files:
|
|
if f.endswith(".md") and not f.startswith("_") and not f.startswith("."):
|
|
filepath = os.path.join(root, f)
|
|
fm, body = parse_frontmatter(filepath)
|
|
if fm and fm.get("type") == "claim":
|
|
rel_path = os.path.relpath(filepath, repo_root)
|
|
domain = get_domain_from_path(rel_path)
|
|
prose_links, footer_links = extract_argumentative_links(body)
|
|
all_links = extract_wiki_links(body)
|
|
|
|
idx = len(claims)
|
|
title = f[:-3]
|
|
title_to_idx[title.lower()] = idx
|
|
|
|
claims.append({
|
|
"title": title,
|
|
"path": rel_path,
|
|
"domain": domain,
|
|
"confidence": fm.get("confidence", "unknown"),
|
|
"source": fm.get("source", ""),
|
|
"created": fm.get("created", ""),
|
|
"outgoing_links": all_links,
|
|
"prose_links": prose_links,
|
|
"footer_links": footer_links,
|
|
"incoming_links": [], # populated in second pass
|
|
"body": body,
|
|
})
|
|
|
|
# Second pass: compute incoming links
|
|
for i, claim in enumerate(claims):
|
|
for link in claim["outgoing_links"]:
|
|
target_idx = title_to_idx.get(link.lower())
|
|
if target_idx is not None:
|
|
claims[target_idx]["incoming_links"].append(claim["title"])
|
|
|
|
return claims, title_to_idx
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Belief parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_beliefs(repo_root):
|
|
"""Parse all agent belief files for grounding depth analysis."""
|
|
beliefs = {}
|
|
agents_path = os.path.join(repo_root, AGENT_DIR)
|
|
if not os.path.exists(agents_path):
|
|
return beliefs
|
|
|
|
for agent_name in os.listdir(agents_path):
|
|
beliefs_file = os.path.join(agents_path, agent_name, "beliefs.md")
|
|
if os.path.exists(beliefs_file):
|
|
with open(beliefs_file, "r") as f:
|
|
content = f.read()
|
|
belief_headings = re.findall(r"### \d+\.", content)
|
|
grounding_links = extract_wiki_links(content)
|
|
beliefs[agent_name] = {
|
|
"count": len(belief_headings),
|
|
"total_grounding_links": len(grounding_links),
|
|
"avg_grounding": round(
|
|
len(grounding_links) / max(len(belief_headings), 1), 1
|
|
),
|
|
}
|
|
return beliefs
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Metrics
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def compute_metrics(claims, title_to_idx, beliefs):
|
|
"""Compute all Tier 1 and Tier 2 metrics."""
|
|
total = len(claims)
|
|
results = {
|
|
"generated": datetime.now().isoformat(),
|
|
"date": TODAY,
|
|
}
|
|
|
|
# --- 1. Claim counts ---
|
|
by_domain = defaultdict(int)
|
|
for c in claims:
|
|
by_domain[c["domain"]] += 1
|
|
|
|
results["claims"] = {
|
|
"total": total,
|
|
"by_domain": dict(sorted(by_domain.items(), key=lambda x: -x[1])),
|
|
}
|
|
|
|
# --- 2. Confidence distribution ---
|
|
conf_dist = defaultdict(int)
|
|
conf_by_domain = defaultdict(lambda: defaultdict(int))
|
|
for c in claims:
|
|
conf_dist[c["confidence"]] += 1
|
|
conf_by_domain[c["domain"]][c["confidence"]] += 1
|
|
|
|
results["confidence_distribution"] = {
|
|
"overall": dict(conf_dist),
|
|
"by_domain": {d: dict(v) for d, v in conf_by_domain.items()},
|
|
}
|
|
|
|
# --- 3. Orphan ratio ---
|
|
orphans = []
|
|
for c in claims:
|
|
if len(c["incoming_links"]) == 0:
|
|
orphans.append({
|
|
"title": c["title"][:100],
|
|
"domain": c["domain"],
|
|
"outgoing_links": len(c["outgoing_links"]),
|
|
})
|
|
|
|
orphan_ratio = len(orphans) / max(total, 1)
|
|
results["orphan_ratio"] = {
|
|
"total_claims": total,
|
|
"orphans": len(orphans),
|
|
"ratio": round(orphan_ratio, 3),
|
|
"status": (
|
|
"healthy" if orphan_ratio < 0.10
|
|
else "warning" if orphan_ratio < 0.20
|
|
else "critical"
|
|
),
|
|
"target": 0.10,
|
|
"sample_orphans": orphans[:10],
|
|
}
|
|
|
|
# --- 4. Cross-domain linkage density ---
|
|
total_links = 0
|
|
cross_domain_links = 0
|
|
unresolved_links = 0
|
|
cross_by_domain = defaultdict(lambda: {"total": 0, "cross": 0})
|
|
|
|
# Track reciprocal links (higher quality per Theseus)
|
|
reciprocal_count = 0
|
|
|
|
for c in claims:
|
|
for link in c["outgoing_links"]:
|
|
total_links += 1
|
|
cross_by_domain[c["domain"]]["total"] += 1
|
|
|
|
target_idx = title_to_idx.get(link.lower())
|
|
if target_idx is None:
|
|
unresolved_links += 1
|
|
else:
|
|
target = claims[target_idx]
|
|
if target["domain"] != c["domain"]:
|
|
cross_domain_links += 1
|
|
cross_by_domain[c["domain"]]["cross"] += 1
|
|
# Check reciprocity
|
|
if c["title"].lower() in [
|
|
l.lower() for l in target["outgoing_links"]
|
|
]:
|
|
reciprocal_count += 1
|
|
|
|
cross_ratio = cross_domain_links / max(total_links, 1)
|
|
results["cross_domain_linkage"] = {
|
|
"total_links": total_links,
|
|
"cross_domain": cross_domain_links,
|
|
"ratio": round(cross_ratio, 3),
|
|
"reciprocal_links": reciprocal_count // 2, # each pair counted twice
|
|
"unresolved_links": unresolved_links,
|
|
"status": "healthy" if cross_ratio >= 0.35 else "warning" if cross_ratio >= 0.15 else "critical",
|
|
"target": 0.35,
|
|
"by_domain": {
|
|
d: {
|
|
"total": v["total"],
|
|
"cross": v["cross"],
|
|
"ratio": round(v["cross"] / max(v["total"], 1), 3),
|
|
}
|
|
for d, v in cross_by_domain.items()
|
|
},
|
|
}
|
|
|
|
# --- 5. Source diversity (Tier 1 per Leo) ---
|
|
source_by_domain = defaultdict(set)
|
|
for c in claims:
|
|
if c["source"]:
|
|
source_by_domain[c["domain"]].add(c["source"][:100].strip())
|
|
|
|
source_diversity = {}
|
|
for domain in by_domain:
|
|
n_sources = len(source_by_domain.get(domain, set()))
|
|
n_claims = by_domain[domain]
|
|
ratio = round(n_sources / max(n_claims, 1), 3)
|
|
source_diversity[domain] = {
|
|
"unique_sources": n_sources,
|
|
"total_claims": n_claims,
|
|
"ratio": ratio,
|
|
"status": "healthy" if ratio >= 0.3 else "warning",
|
|
}
|
|
|
|
results["source_diversity"] = source_diversity
|
|
|
|
# --- 6. Evidence freshness ---
|
|
ages = []
|
|
stale = []
|
|
fast_domains = {"health", "ai-alignment", "internet-finance", "entertainment"}
|
|
|
|
for c in claims:
|
|
if c["created"]:
|
|
try:
|
|
created = datetime.strptime(c["created"], "%Y-%m-%d").date()
|
|
age = (date.today() - created).days
|
|
ages.append(age)
|
|
threshold = 180 if c["domain"] in fast_domains else 365
|
|
if age > threshold:
|
|
stale.append({
|
|
"title": c["title"][:80],
|
|
"domain": c["domain"],
|
|
"age_days": age,
|
|
})
|
|
except ValueError:
|
|
pass
|
|
|
|
results["evidence_freshness"] = {
|
|
"median_age_days": sorted(ages)[len(ages) // 2] if ages else None,
|
|
"mean_age_days": round(sum(ages) / len(ages), 1) if ages else None,
|
|
"stale_count": len(stale),
|
|
"total_with_dates": len(ages),
|
|
"stale_claims": stale[:10],
|
|
}
|
|
|
|
# --- 7. Belief grounding depth ---
|
|
results["belief_grounding"] = beliefs
|
|
|
|
# --- 8. Challenge coverage ---
|
|
likely_proven = [c for c in claims if c["confidence"] in ("likely", "proven")]
|
|
has_challenge = 0
|
|
for c in likely_proven:
|
|
body_lower = c["body"].lower()
|
|
if any(
|
|
marker in body_lower
|
|
for marker in ["challenged_by", "counter-evidence", "counter:", "challenges considered"]
|
|
):
|
|
has_challenge += 1
|
|
|
|
challenge_ratio = has_challenge / max(len(likely_proven), 1)
|
|
results["challenge_coverage"] = {
|
|
"likely_proven_claims": len(likely_proven),
|
|
"with_challenges": has_challenge,
|
|
"ratio": round(challenge_ratio, 3),
|
|
"status": "healthy" if challenge_ratio >= 0.25 else "warning",
|
|
"target": 0.25,
|
|
}
|
|
|
|
# --- 9. Most-linked claims (centrality, from incoming links) ---
|
|
centrality = sorted(claims, key=lambda c: len(c["incoming_links"]), reverse=True)
|
|
results["most_central_claims"] = [
|
|
{
|
|
"title": c["title"][:100],
|
|
"domain": c["domain"],
|
|
"incoming_links": len(c["incoming_links"]),
|
|
}
|
|
for c in centrality[:10]
|
|
]
|
|
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Report formatting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def format_report(results):
|
|
"""Format results as readable markdown."""
|
|
lines = []
|
|
lines.append("# Teleo Codex — Knowledge Base Health Assessment")
|
|
lines.append(f"*Generated: {results['generated']}*")
|
|
lines.append("")
|
|
|
|
# Claims
|
|
c = results["claims"]
|
|
lines.append(f"## 1. Claim Inventory — {c['total']} total")
|
|
lines.append("")
|
|
lines.append("| Domain | Claims |")
|
|
lines.append("|--------|--------|")
|
|
for domain, count in c["by_domain"].items():
|
|
lines.append(f"| {domain} | {count} |")
|
|
lines.append("")
|
|
|
|
# Confidence
|
|
cd = results["confidence_distribution"]
|
|
lines.append("## 2. Confidence Distribution")
|
|
lines.append("")
|
|
lines.append("| Domain | proven | likely | experimental | speculative |")
|
|
lines.append("|--------|--------|--------|-------------|-------------|")
|
|
for domain, dist in cd["by_domain"].items():
|
|
lines.append(
|
|
f"| {domain} | {dist.get('proven',0)} | {dist.get('likely',0)} "
|
|
f"| {dist.get('experimental',0)} | {dist.get('speculative',0)} |"
|
|
)
|
|
lines.append("")
|
|
|
|
# Orphans
|
|
o = results["orphan_ratio"]
|
|
lines.append(f"## 3. Orphan Ratio — {o['status'].upper()}")
|
|
lines.append(
|
|
f"**{o['orphans']}/{o['total_claims']} claims are orphans "
|
|
f"({o['ratio']:.1%})** — target: <{o['target']:.0%}"
|
|
)
|
|
lines.append("")
|
|
|
|
# Cross-domain
|
|
cl = results["cross_domain_linkage"]
|
|
lines.append(f"## 4. Cross-Domain Linkage — {cl['status'].upper()}")
|
|
lines.append(
|
|
f"**{cl['cross_domain']}/{cl['total_links']} links cross domain boundaries "
|
|
f"({cl['ratio']:.1%})** — target: >{cl['target']:.0%}"
|
|
)
|
|
lines.append(f"Reciprocal link pairs: {cl['reciprocal_links']}")
|
|
lines.append(f"Unresolved links: {cl['unresolved_links']}")
|
|
lines.append("")
|
|
lines.append("| Domain | Total links | Cross-domain | Ratio |")
|
|
lines.append("|--------|------------|-------------|-------|")
|
|
for domain, v in sorted(cl["by_domain"].items(), key=lambda x: -x[1]["total"]):
|
|
lines.append(f"| {domain} | {v['total']} | {v['cross']} | {v['ratio']:.1%} |")
|
|
lines.append("")
|
|
|
|
# Source diversity
|
|
sd = results["source_diversity"]
|
|
lines.append("## 5. Source Diversity")
|
|
lines.append("")
|
|
lines.append("| Domain | Unique sources | Claims | Ratio | Status |")
|
|
lines.append("|--------|---------------|--------|-------|--------|")
|
|
for domain, v in sorted(sd.items(), key=lambda x: x[1]["ratio"]):
|
|
lines.append(
|
|
f"| {domain} | {v['unique_sources']} | {v['total_claims']} "
|
|
f"| {v['ratio']:.2f} | {v['status']} |"
|
|
)
|
|
lines.append("")
|
|
|
|
# Evidence freshness
|
|
ef = results["evidence_freshness"]
|
|
lines.append("## 6. Evidence Freshness")
|
|
lines.append(
|
|
f"**Median claim age: {ef['median_age_days']} days "
|
|
f"| Mean: {ef['mean_age_days']} days**"
|
|
)
|
|
lines.append(f"Stale claims: {ef['stale_count']}")
|
|
lines.append("")
|
|
|
|
# Belief grounding
|
|
bg = results["belief_grounding"]
|
|
lines.append("## 7. Belief Grounding Depth")
|
|
lines.append("")
|
|
lines.append("| Agent | Beliefs | Total grounding links | Avg per belief |")
|
|
lines.append("|-------|---------|---------------------|----------------|")
|
|
for agent, v in sorted(bg.items()):
|
|
lines.append(
|
|
f"| {agent} | {v['count']} | {v['total_grounding_links']} "
|
|
f"| {v['avg_grounding']} |"
|
|
)
|
|
lines.append("")
|
|
|
|
# Challenge coverage
|
|
cc = results["challenge_coverage"]
|
|
lines.append(f"## 8. Challenge Coverage — {cc['status'].upper()}")
|
|
lines.append(
|
|
f"**{cc['with_challenges']}/{cc['likely_proven_claims']} likely/proven claims "
|
|
f"acknowledge counter-evidence ({cc['ratio']:.1%})** — target: >{cc['target']:.0%}"
|
|
)
|
|
lines.append("")
|
|
|
|
# Most central
|
|
mc = results["most_central_claims"]
|
|
lines.append("## 9. Most Central Claims (by incoming links)")
|
|
lines.append("")
|
|
lines.append("| Claim | Domain | Incoming |")
|
|
lines.append("|-------|--------|----------|")
|
|
for item in mc:
|
|
lines.append(f"| {item['title'][:70]}... | {item['domain']} | {item['incoming_links']} |")
|
|
lines.append("")
|
|
|
|
# Automation note
|
|
lines.append("---")
|
|
lines.append("")
|
|
lines.append("*Automate more of this over time: daily VPS cron, belief drift detection,")
|
|
lines.append("reasoning chain depth, weekly digest template. See agents/vida/musings/kb-health-assessment-design.md.*")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
repo_root = os.environ.get("REPO_ROOT", ".")
|
|
output_dir = os.environ.get("OUTPUT_DIR", os.path.join(repo_root, "agents", "vida", "musings"))
|
|
metrics_dir = os.environ.get("METRICS_DIR", None)
|
|
|
|
# Build index + compute
|
|
claims, title_to_idx = build_claim_index(repo_root)
|
|
beliefs = parse_beliefs(repo_root)
|
|
results = compute_metrics(claims, title_to_idx, beliefs)
|
|
|
|
# Strip body from claims before serializing (too large for JSON output)
|
|
for c in claims:
|
|
c.pop("body", None)
|
|
c.pop("prose_links", None)
|
|
c.pop("footer_links", None)
|
|
|
|
# Write claim-index (runtime cache)
|
|
index_output = {
|
|
"generated": results["generated"],
|
|
"total_claims": len(claims),
|
|
"claims": claims,
|
|
}
|
|
|
|
# Write outputs
|
|
report_md = format_report(results)
|
|
|
|
if metrics_dir:
|
|
# VPS mode: write to metrics directory
|
|
os.makedirs(os.path.join(metrics_dir, "daily-evolution"), exist_ok=True)
|
|
snapshot_path = os.path.join(metrics_dir, "daily-evolution", f"{TODAY}.json")
|
|
index_path = os.path.join(metrics_dir, "claim-index.json")
|
|
|
|
with open(snapshot_path, "w") as f:
|
|
json.dump(results, f, indent=2)
|
|
with open(index_path, "w") as f:
|
|
json.dump(index_output, f, indent=2)
|
|
|
|
print(f"Snapshot written to {snapshot_path}", file=sys.stderr)
|
|
print(f"Index written to {index_path}", file=sys.stderr)
|
|
|
|
# Always write markdown report to stdout
|
|
print(report_md)
|