teleo-codex/ops/extract-graph-data.py
Leo 75f1709110
Some checks are pending
Sync Graph Data to teleo-app / sync (push) Waiting to run
leo: add ingest skill — full X-to-claims pipeline (#103)
2026-03-10 10:42:25 +00:00

520 lines
19 KiB
Python

#!/usr/bin/env python3
"""
extract-graph-data.py — Extract knowledge graph from teleo-codex markdown files.
Reads all .md claim/conviction files, parses YAML frontmatter and wiki-links,
and outputs graph-data.json matching the teleo-app GraphData interface.
Usage:
python3 ops/extract-graph-data.py [--output path/to/graph-data.json]
Must be run from the teleo-codex repo root.
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
SCAN_DIRS = ["core", "domains", "foundations", "convictions"]
# Only extract these content types (from frontmatter `type` field).
# If type is missing, include the file anyway (many claims lack explicit type).
INCLUDE_TYPES = {"claim", "conviction", "analysis", "belief", "position", None}
# Domain → default agent mapping (fallback when git attribution unavailable)
DOMAIN_AGENT_MAP = {
"internet-finance": "rio",
"entertainment": "clay",
"health": "vida",
"ai-alignment": "theseus",
"space-development": "astra",
"grand-strategy": "leo",
"mechanisms": "leo",
"living-capital": "leo",
"living-agents": "leo",
"teleohumanity": "leo",
"critical-systems": "leo",
"collective-intelligence": "leo",
"teleological-economics": "leo",
"cultural-dynamics": "clay",
}
DOMAIN_COLORS = {
"internet-finance": "#4A90D9",
"entertainment": "#9B59B6",
"health": "#2ECC71",
"ai-alignment": "#E74C3C",
"space-development": "#F39C12",
"grand-strategy": "#D4AF37",
"mechanisms": "#1ABC9C",
"living-capital": "#3498DB",
"living-agents": "#E67E22",
"teleohumanity": "#F1C40F",
"critical-systems": "#95A5A6",
"collective-intelligence": "#BDC3C7",
"teleological-economics": "#7F8C8D",
"cultural-dynamics": "#C0392B",
}
KNOWN_AGENTS = {"leo", "rio", "clay", "vida", "theseus", "astra"}
# Regex patterns
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---", re.DOTALL)
WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
YAML_FIELD_RE = re.compile(r"^(\w[\w_]*):\s*(.+)$", re.MULTILINE)
YAML_LIST_ITEM_RE = re.compile(r'^\s*-\s+"?(.+?)"?\s*$', re.MULTILINE)
COUNTER_EVIDENCE_RE = re.compile(r"^##\s+Counter[\s-]?evidence", re.MULTILINE | re.IGNORECASE)
COUNTERARGUMENT_RE = re.compile(r"^\*\*Counter\s*argument", re.MULTILINE | re.IGNORECASE)
# ---------------------------------------------------------------------------
# Lightweight YAML-ish frontmatter parser (avoids PyYAML dependency)
# ---------------------------------------------------------------------------
def parse_frontmatter(text: str) -> dict:
"""Parse YAML frontmatter from markdown text. Returns dict of fields."""
m = FRONTMATTER_RE.match(text)
if not m:
return {}
yaml_block = m.group(1)
result = {}
for field_match in YAML_FIELD_RE.finditer(yaml_block):
key = field_match.group(1)
val = field_match.group(2).strip().strip('"').strip("'")
# Handle list fields
if val.startswith("["):
# Inline YAML list: [item1, item2]
items = re.findall(r'"([^"]+)"', val)
if not items:
items = [x.strip().strip('"').strip("'")
for x in val.strip("[]").split(",") if x.strip()]
result[key] = items
else:
result[key] = val
# Handle multi-line list fields (depends_on, challenged_by, secondary_domains)
for list_key in ("depends_on", "challenged_by", "secondary_domains", "claims_extracted"):
if list_key not in result:
# Check for block-style list
pattern = re.compile(
rf"^{list_key}:\s*\n((?:\s+-\s+.+\n?)+)", re.MULTILINE
)
lm = pattern.search(yaml_block)
if lm:
items = YAML_LIST_ITEM_RE.findall(lm.group(1))
result[list_key] = [i.strip('"').strip("'") for i in items]
return result
def extract_body(text: str) -> str:
"""Return the markdown body after frontmatter."""
m = FRONTMATTER_RE.match(text)
if m:
return text[m.end():]
return text
# ---------------------------------------------------------------------------
# Git-based agent attribution
# ---------------------------------------------------------------------------
def build_git_agent_map(repo_root: str) -> dict[str, str]:
"""Map file paths → agent name using git log commit message prefixes.
Commit messages follow: '{agent}: description'
We use the commit that first added each file.
"""
file_agent = {}
try:
result = subprocess.run(
["git", "log", "--all", "--diff-filter=A", "--name-only",
"--format=COMMIT_MSG:%s"],
capture_output=True, text=True, cwd=repo_root, timeout=30,
)
current_agent = None
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
if line.startswith("COMMIT_MSG:"):
msg = line[len("COMMIT_MSG:"):]
# Parse "agent: description" pattern
if ":" in msg:
prefix = msg.split(":")[0].strip().lower()
if prefix in KNOWN_AGENTS:
current_agent = prefix
else:
current_agent = None
else:
current_agent = None
elif current_agent and line.endswith(".md"):
# Only set if not already attributed (first add wins)
if line not in file_agent:
file_agent[line] = current_agent
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return file_agent
# ---------------------------------------------------------------------------
# Wiki-link resolution
# ---------------------------------------------------------------------------
def build_title_index(all_files: list[str], repo_root: str) -> dict[str, str]:
"""Map lowercase claim titles → file paths for wiki-link resolution."""
index = {}
for fpath in all_files:
# Title = filename without .md extension
fname = os.path.basename(fpath)
if fname.endswith(".md"):
title = fname[:-3].lower()
index[title] = fpath
# Also index by relative path
index[fpath.lower()] = fpath
return index
def resolve_wikilink(link_text: str, title_index: dict, source_dir: str) -> str | None:
"""Resolve a [[wiki-link]] target to a file path (node ID)."""
text = link_text.strip()
# Skip map links and non-claim references
if text.startswith("_") or text == "_map":
return None
# Direct path match (with or without .md)
for candidate in [text, text + ".md"]:
if candidate.lower() in title_index:
return title_index[candidate.lower()]
# Title-only match
title = text.lower()
if title in title_index:
return title_index[title]
# Fuzzy: try adding .md to the basename
basename = os.path.basename(text)
if basename.lower() in title_index:
return title_index[basename.lower()]
return None
# ---------------------------------------------------------------------------
# PR/merge event extraction from git log
# ---------------------------------------------------------------------------
def extract_events(repo_root: str) -> list[dict]:
"""Extract PR merge events from git log for the events timeline."""
events = []
try:
result = subprocess.run(
["git", "log", "--merges", "--format=%H|%s|%ai", "-50"],
capture_output=True, text=True, cwd=repo_root, timeout=15,
)
for line in result.stdout.strip().splitlines():
parts = line.split("|", 2)
if len(parts) < 3:
continue
sha, msg, date_str = parts
# Parse "Merge pull request #N from ..." or agent commit patterns
pr_match = re.search(r"#(\d+)", msg)
if not pr_match:
continue
pr_num = int(pr_match.group(1))
# Try to determine agent from merge commit
agent = "collective"
for a in KNOWN_AGENTS:
if a in msg.lower():
agent = a
break
# Count files changed in this merge
diff_result = subprocess.run(
["git", "diff", "--name-only", f"{sha}^..{sha}"],
capture_output=True, text=True, cwd=repo_root, timeout=10,
)
claims_added = sum(
1 for f in diff_result.stdout.splitlines()
if f.endswith(".md") and any(f.startswith(d) for d in SCAN_DIRS)
)
if claims_added > 0:
events.append({
"type": "pr-merge",
"number": pr_num,
"agent": agent,
"claims_added": claims_added,
"date": date_str[:10],
})
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return events
# ---------------------------------------------------------------------------
# Main extraction
# ---------------------------------------------------------------------------
def find_markdown_files(repo_root: str) -> list[str]:
"""Find all .md files in SCAN_DIRS, return relative paths."""
files = []
for scan_dir in SCAN_DIRS:
dirpath = os.path.join(repo_root, scan_dir)
if not os.path.isdir(dirpath):
continue
for root, _dirs, filenames in os.walk(dirpath):
for fname in filenames:
if fname.endswith(".md") and not fname.startswith("_"):
rel = os.path.relpath(os.path.join(root, fname), repo_root)
files.append(rel)
return sorted(files)
def _get_domain_cached(fpath: str, repo_root: str, cache: dict) -> str:
"""Get the domain of a file, caching results."""
if fpath in cache:
return cache[fpath]
abs_path = os.path.join(repo_root, fpath)
domain = ""
try:
text = open(abs_path, encoding="utf-8").read()
fm = parse_frontmatter(text)
domain = fm.get("domain", "")
except (OSError, UnicodeDecodeError):
pass
cache[fpath] = domain
return domain
def extract_graph(repo_root: str) -> dict:
"""Extract the full knowledge graph from the codex."""
all_files = find_markdown_files(repo_root)
git_agents = build_git_agent_map(repo_root)
title_index = build_title_index(all_files, repo_root)
domain_cache: dict[str, str] = {}
nodes = []
edges = []
node_ids = set()
all_files_set = set(all_files)
for fpath in all_files:
abs_path = os.path.join(repo_root, fpath)
try:
text = open(abs_path, encoding="utf-8").read()
except (OSError, UnicodeDecodeError):
continue
fm = parse_frontmatter(text)
body = extract_body(text)
# Filter by type
ftype = fm.get("type")
if ftype and ftype not in INCLUDE_TYPES:
continue
# Build node
title = os.path.basename(fpath)[:-3] # filename without .md
domain = fm.get("domain", "")
if not domain:
# Infer domain from directory path
parts = fpath.split(os.sep)
if len(parts) >= 2:
domain = parts[1] if parts[0] == "domains" else parts[1] if len(parts) > 2 else parts[0]
# Agent attribution: git log → domain mapping → "collective"
agent = git_agents.get(fpath, "")
if not agent:
agent = DOMAIN_AGENT_MAP.get(domain, "collective")
created = fm.get("created", "")
confidence = fm.get("confidence", "speculative")
# Detect challenged status
challenged_by_raw = fm.get("challenged_by", [])
if isinstance(challenged_by_raw, str):
challenged_by_raw = [challenged_by_raw] if challenged_by_raw else []
has_challenged_by = bool(challenged_by_raw and any(c for c in challenged_by_raw))
has_counter_section = bool(COUNTER_EVIDENCE_RE.search(body) or COUNTERARGUMENT_RE.search(body))
is_challenged = has_challenged_by or has_counter_section
# Extract challenge descriptions for the node
challenges = []
if isinstance(challenged_by_raw, list):
for c in challenged_by_raw:
if c and isinstance(c, str):
# Strip wiki-link syntax for display
cleaned = WIKILINK_RE.sub(lambda m: m.group(1), c)
# Strip markdown list artifacts: leading "- ", surrounding quotes
cleaned = re.sub(r'^-\s*', '', cleaned).strip()
cleaned = cleaned.strip('"').strip("'").strip()
if cleaned:
challenges.append(cleaned[:200]) # cap length
node = {
"id": fpath,
"title": title,
"domain": domain,
"agent": agent,
"created": created,
"confidence": confidence,
"challenged": is_challenged,
}
if challenges:
node["challenges"] = challenges
nodes.append(node)
node_ids.add(fpath)
domain_cache[fpath] = domain # cache for edge lookups
for link_text in WIKILINK_RE.findall(body):
target = resolve_wikilink(link_text, title_index, os.path.dirname(fpath))
if target and target != fpath and target in all_files_set:
target_domain = _get_domain_cached(target, repo_root, domain_cache)
edges.append({
"source": fpath,
"target": target,
"type": "wiki-link",
"cross_domain": domain != target_domain and bool(target_domain),
})
# Conflict edges from challenged_by (may contain [[wiki-links]] or prose)
challenged_by = fm.get("challenged_by", [])
if isinstance(challenged_by, str):
challenged_by = [challenged_by]
if isinstance(challenged_by, list):
for challenge in challenged_by:
if not challenge:
continue
# Check for embedded wiki-links
for link_text in WIKILINK_RE.findall(challenge):
target = resolve_wikilink(link_text, title_index, os.path.dirname(fpath))
if target and target != fpath and target in all_files_set:
target_domain = _get_domain_cached(target, repo_root, domain_cache)
edges.append({
"source": fpath,
"target": target,
"type": "conflict",
"cross_domain": domain != target_domain and bool(target_domain),
})
# Deduplicate edges
seen_edges = set()
unique_edges = []
for e in edges:
key = (e["source"], e["target"], e.get("type", ""))
if key not in seen_edges:
seen_edges.add(key)
unique_edges.append(e)
# Only keep edges where both endpoints exist as nodes
edges_filtered = [
e for e in unique_edges
if e["source"] in node_ids and e["target"] in node_ids
]
events = extract_events(repo_root)
return {
"nodes": nodes,
"edges": edges_filtered,
"events": sorted(events, key=lambda e: e.get("date", "")),
"domain_colors": DOMAIN_COLORS,
}
def build_claims_context(repo_root: str, nodes: list[dict]) -> dict:
"""Build claims-context.json for chat system prompt injection.
Produces a lightweight claim index: title + description + domain + agent + confidence.
Sorted by domain, then alphabetically within domain.
Target: ~37KB for ~370 claims. Truncates descriptions at 100 chars if total > 100KB.
"""
claims = []
for node in nodes:
fpath = node["id"]
abs_path = os.path.join(repo_root, fpath)
description = ""
try:
text = open(abs_path, encoding="utf-8").read()
fm = parse_frontmatter(text)
description = fm.get("description", "")
except (OSError, UnicodeDecodeError):
pass
claims.append({
"title": node["title"],
"description": description,
"domain": node["domain"],
"agent": node["agent"],
"confidence": node["confidence"],
})
# Sort by domain, then title
claims.sort(key=lambda c: (c["domain"], c["title"]))
context = {
"generated": datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"claimCount": len(claims),
"claims": claims,
}
# Progressive description truncation if over 100KB.
# Never drop descriptions entirely — short descriptions are better than none.
for max_desc in (120, 100, 80, 60):
test_json = json.dumps(context, ensure_ascii=False)
if len(test_json) <= 100_000:
break
for c in claims:
if len(c["description"]) > max_desc:
c["description"] = c["description"][:max_desc] + "..."
return context
def main():
parser = argparse.ArgumentParser(description="Extract graph data from teleo-codex")
parser.add_argument("--output", "-o", default="graph-data.json",
help="Output file path (default: graph-data.json)")
parser.add_argument("--context-output", "-c", default=None,
help="Output claims-context.json path (default: same dir as --output)")
parser.add_argument("--repo", "-r", default=".",
help="Path to teleo-codex repo root (default: current dir)")
args = parser.parse_args()
repo_root = os.path.abspath(args.repo)
if not os.path.isdir(os.path.join(repo_root, "core")):
print(f"Error: {repo_root} doesn't look like a teleo-codex repo (no core/ dir)", file=sys.stderr)
sys.exit(1)
print(f"Scanning {repo_root}...")
graph = extract_graph(repo_root)
print(f" Nodes: {len(graph['nodes'])}")
print(f" Edges: {len(graph['edges'])}")
print(f" Events: {len(graph['events'])}")
challenged_count = sum(1 for n in graph["nodes"] if n.get("challenged"))
print(f" Challenged: {challenged_count}")
# Write graph-data.json
output_path = os.path.abspath(args.output)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(graph, f, indent=2, ensure_ascii=False)
size_kb = os.path.getsize(output_path) / 1024
print(f" graph-data.json: {output_path} ({size_kb:.1f} KB)")
# Write claims-context.json
context_path = args.context_output
if not context_path:
context_path = os.path.join(os.path.dirname(output_path), "claims-context.json")
context_path = os.path.abspath(context_path)
context = build_claims_context(repo_root, graph["nodes"])
with open(context_path, "w", encoding="utf-8") as f:
json.dump(context, f, indent=2, ensure_ascii=False)
ctx_kb = os.path.getsize(context_path) / 1024
print(f" claims-context.json: {context_path} ({ctx_kb:.1f} KB)")
if __name__ == "__main__":
main()