#!/usr/bin/env python3 """ extract-graph-data.py — Extract knowledge graph from teleo-codex markdown files. Reads all .md claim/conviction files, parses YAML frontmatter and wiki-links, and outputs graph-data.json matching the teleo-app GraphData interface. Usage: python3 ops/extract-graph-data.py [--output path/to/graph-data.json] Must be run from the teleo-codex repo root. """ import argparse import json import os import re import subprocess import sys from datetime import datetime, timezone from pathlib import Path # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- SCAN_DIRS = ["core", "domains", "foundations", "convictions"] # Only extract these content types (from frontmatter `type` field). # If type is missing, include the file anyway (many claims lack explicit type). INCLUDE_TYPES = {"claim", "conviction", "analysis", "belief", "position", None} # Domain → default agent mapping (fallback when git attribution unavailable) DOMAIN_AGENT_MAP = { "internet-finance": "rio", "entertainment": "clay", "health": "vida", "ai-alignment": "theseus", "space-development": "astra", "grand-strategy": "leo", "mechanisms": "leo", "living-capital": "leo", "living-agents": "leo", "teleohumanity": "leo", "critical-systems": "leo", "collective-intelligence": "leo", "teleological-economics": "leo", "cultural-dynamics": "clay", } DOMAIN_COLORS = { "internet-finance": "#4A90D9", "entertainment": "#9B59B6", "health": "#2ECC71", "ai-alignment": "#E74C3C", "space-development": "#F39C12", "grand-strategy": "#D4AF37", "mechanisms": "#1ABC9C", "living-capital": "#3498DB", "living-agents": "#E67E22", "teleohumanity": "#F1C40F", "critical-systems": "#95A5A6", "collective-intelligence": "#BDC3C7", "teleological-economics": "#7F8C8D", "cultural-dynamics": "#C0392B", } KNOWN_AGENTS = {"leo", "rio", "clay", "vida", "theseus", "astra"} # Regex patterns FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---", re.DOTALL) WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]") YAML_FIELD_RE = re.compile(r"^(\w[\w_]*):\s*(.+)$", re.MULTILINE) YAML_LIST_ITEM_RE = re.compile(r'^\s*-\s+"?(.+?)"?\s*$', re.MULTILINE) COUNTER_EVIDENCE_RE = re.compile(r"^##\s+Counter[\s-]?evidence", re.MULTILINE | re.IGNORECASE) COUNTERARGUMENT_RE = re.compile(r"^\*\*Counter\s*argument", re.MULTILINE | re.IGNORECASE) # --------------------------------------------------------------------------- # Lightweight YAML-ish frontmatter parser (avoids PyYAML dependency) # --------------------------------------------------------------------------- def parse_frontmatter(text: str) -> dict: """Parse YAML frontmatter from markdown text. Returns dict of fields.""" m = FRONTMATTER_RE.match(text) if not m: return {} yaml_block = m.group(1) result = {} for field_match in YAML_FIELD_RE.finditer(yaml_block): key = field_match.group(1) val = field_match.group(2).strip().strip('"').strip("'") # Handle list fields if val.startswith("["): # Inline YAML list: [item1, item2] items = re.findall(r'"([^"]+)"', val) if not items: items = [x.strip().strip('"').strip("'") for x in val.strip("[]").split(",") if x.strip()] result[key] = items else: result[key] = val # Handle multi-line list fields (depends_on, challenged_by, secondary_domains) for list_key in ("depends_on", "challenged_by", "secondary_domains", "claims_extracted"): if list_key not in result: # Check for block-style list pattern = re.compile( rf"^{list_key}:\s*\n((?:\s+-\s+.+\n?)+)", re.MULTILINE ) lm = pattern.search(yaml_block) if lm: items = YAML_LIST_ITEM_RE.findall(lm.group(1)) result[list_key] = [i.strip('"').strip("'") for i in items] return result def extract_body(text: str) -> str: """Return the markdown body after frontmatter.""" m = FRONTMATTER_RE.match(text) if m: return text[m.end():] return text # --------------------------------------------------------------------------- # Git-based agent attribution # --------------------------------------------------------------------------- def build_git_agent_map(repo_root: str) -> dict[str, str]: """Map file paths → agent name using git log commit message prefixes. Commit messages follow: '{agent}: description' We use the commit that first added each file. """ file_agent = {} try: result = subprocess.run( ["git", "log", "--all", "--diff-filter=A", "--name-only", "--format=COMMIT_MSG:%s"], capture_output=True, text=True, cwd=repo_root, timeout=30, ) current_agent = None for line in result.stdout.splitlines(): line = line.strip() if not line: continue if line.startswith("COMMIT_MSG:"): msg = line[len("COMMIT_MSG:"):] # Parse "agent: description" pattern if ":" in msg: prefix = msg.split(":")[0].strip().lower() if prefix in KNOWN_AGENTS: current_agent = prefix else: current_agent = None else: current_agent = None elif current_agent and line.endswith(".md"): # Only set if not already attributed (first add wins) if line not in file_agent: file_agent[line] = current_agent except (subprocess.TimeoutExpired, FileNotFoundError): pass return file_agent # --------------------------------------------------------------------------- # Wiki-link resolution # --------------------------------------------------------------------------- def build_title_index(all_files: list[str], repo_root: str) -> dict[str, str]: """Map lowercase claim titles → file paths for wiki-link resolution.""" index = {} for fpath in all_files: # Title = filename without .md extension fname = os.path.basename(fpath) if fname.endswith(".md"): title = fname[:-3].lower() index[title] = fpath # Also index by relative path index[fpath.lower()] = fpath return index def resolve_wikilink(link_text: str, title_index: dict, source_dir: str) -> str | None: """Resolve a [[wiki-link]] target to a file path (node ID).""" text = link_text.strip() # Skip map links and non-claim references if text.startswith("_") or text == "_map": return None # Direct path match (with or without .md) for candidate in [text, text + ".md"]: if candidate.lower() in title_index: return title_index[candidate.lower()] # Title-only match title = text.lower() if title in title_index: return title_index[title] # Fuzzy: try adding .md to the basename basename = os.path.basename(text) if basename.lower() in title_index: return title_index[basename.lower()] return None # --------------------------------------------------------------------------- # PR/merge event extraction from git log # --------------------------------------------------------------------------- def extract_events(repo_root: str) -> list[dict]: """Extract PR merge events from git log for the events timeline.""" events = [] try: result = subprocess.run( ["git", "log", "--merges", "--format=%H|%s|%ai", "-50"], capture_output=True, text=True, cwd=repo_root, timeout=15, ) for line in result.stdout.strip().splitlines(): parts = line.split("|", 2) if len(parts) < 3: continue sha, msg, date_str = parts # Parse "Merge pull request #N from ..." or agent commit patterns pr_match = re.search(r"#(\d+)", msg) if not pr_match: continue pr_num = int(pr_match.group(1)) # Try to determine agent from merge commit agent = "collective" for a in KNOWN_AGENTS: if a in msg.lower(): agent = a break # Count files changed in this merge diff_result = subprocess.run( ["git", "diff", "--name-only", f"{sha}^..{sha}"], capture_output=True, text=True, cwd=repo_root, timeout=10, ) claims_added = sum( 1 for f in diff_result.stdout.splitlines() if f.endswith(".md") and any(f.startswith(d) for d in SCAN_DIRS) ) if claims_added > 0: events.append({ "type": "pr-merge", "number": pr_num, "agent": agent, "claims_added": claims_added, "date": date_str[:10], }) except (subprocess.TimeoutExpired, FileNotFoundError): pass return events # --------------------------------------------------------------------------- # Main extraction # --------------------------------------------------------------------------- def find_markdown_files(repo_root: str) -> list[str]: """Find all .md files in SCAN_DIRS, return relative paths.""" files = [] for scan_dir in SCAN_DIRS: dirpath = os.path.join(repo_root, scan_dir) if not os.path.isdir(dirpath): continue for root, _dirs, filenames in os.walk(dirpath): for fname in filenames: if fname.endswith(".md") and not fname.startswith("_"): rel = os.path.relpath(os.path.join(root, fname), repo_root) files.append(rel) return sorted(files) def _get_domain_cached(fpath: str, repo_root: str, cache: dict) -> str: """Get the domain of a file, caching results.""" if fpath in cache: return cache[fpath] abs_path = os.path.join(repo_root, fpath) domain = "" try: text = open(abs_path, encoding="utf-8").read() fm = parse_frontmatter(text) domain = fm.get("domain", "") except (OSError, UnicodeDecodeError): pass cache[fpath] = domain return domain def extract_graph(repo_root: str) -> dict: """Extract the full knowledge graph from the codex.""" all_files = find_markdown_files(repo_root) git_agents = build_git_agent_map(repo_root) title_index = build_title_index(all_files, repo_root) domain_cache: dict[str, str] = {} nodes = [] edges = [] node_ids = set() all_files_set = set(all_files) for fpath in all_files: abs_path = os.path.join(repo_root, fpath) try: text = open(abs_path, encoding="utf-8").read() except (OSError, UnicodeDecodeError): continue fm = parse_frontmatter(text) body = extract_body(text) # Filter by type ftype = fm.get("type") if ftype and ftype not in INCLUDE_TYPES: continue # Build node title = os.path.basename(fpath)[:-3] # filename without .md domain = fm.get("domain", "") if not domain: # Infer domain from directory path parts = fpath.split(os.sep) if len(parts) >= 2: domain = parts[1] if parts[0] == "domains" else parts[1] if len(parts) > 2 else parts[0] # Agent attribution: git log → domain mapping → "collective" agent = git_agents.get(fpath, "") if not agent: agent = DOMAIN_AGENT_MAP.get(domain, "collective") created = fm.get("created", "") confidence = fm.get("confidence", "speculative") # Detect challenged status challenged_by_raw = fm.get("challenged_by", []) if isinstance(challenged_by_raw, str): challenged_by_raw = [challenged_by_raw] if challenged_by_raw else [] has_challenged_by = bool(challenged_by_raw and any(c for c in challenged_by_raw)) has_counter_section = bool(COUNTER_EVIDENCE_RE.search(body) or COUNTERARGUMENT_RE.search(body)) is_challenged = has_challenged_by or has_counter_section # Extract challenge descriptions for the node challenges = [] if isinstance(challenged_by_raw, list): for c in challenged_by_raw: if c and isinstance(c, str): # Strip wiki-link syntax for display cleaned = WIKILINK_RE.sub(lambda m: m.group(1), c) # Strip markdown list artifacts: leading "- ", surrounding quotes cleaned = re.sub(r'^-\s*', '', cleaned).strip() cleaned = cleaned.strip('"').strip("'").strip() if cleaned: challenges.append(cleaned[:200]) # cap length node = { "id": fpath, "title": title, "domain": domain, "agent": agent, "created": created, "confidence": confidence, "challenged": is_challenged, } if challenges: node["challenges"] = challenges nodes.append(node) node_ids.add(fpath) domain_cache[fpath] = domain # cache for edge lookups for link_text in WIKILINK_RE.findall(body): target = resolve_wikilink(link_text, title_index, os.path.dirname(fpath)) if target and target != fpath and target in all_files_set: target_domain = _get_domain_cached(target, repo_root, domain_cache) edges.append({ "source": fpath, "target": target, "type": "wiki-link", "cross_domain": domain != target_domain and bool(target_domain), }) # Conflict edges from challenged_by (may contain [[wiki-links]] or prose) challenged_by = fm.get("challenged_by", []) if isinstance(challenged_by, str): challenged_by = [challenged_by] if isinstance(challenged_by, list): for challenge in challenged_by: if not challenge: continue # Check for embedded wiki-links for link_text in WIKILINK_RE.findall(challenge): target = resolve_wikilink(link_text, title_index, os.path.dirname(fpath)) if target and target != fpath and target in all_files_set: target_domain = _get_domain_cached(target, repo_root, domain_cache) edges.append({ "source": fpath, "target": target, "type": "conflict", "cross_domain": domain != target_domain and bool(target_domain), }) # Deduplicate edges seen_edges = set() unique_edges = [] for e in edges: key = (e["source"], e["target"], e.get("type", "")) if key not in seen_edges: seen_edges.add(key) unique_edges.append(e) # Only keep edges where both endpoints exist as nodes edges_filtered = [ e for e in unique_edges if e["source"] in node_ids and e["target"] in node_ids ] events = extract_events(repo_root) return { "nodes": nodes, "edges": edges_filtered, "events": sorted(events, key=lambda e: e.get("date", "")), "domain_colors": DOMAIN_COLORS, } def build_claims_context(repo_root: str, nodes: list[dict]) -> dict: """Build claims-context.json for chat system prompt injection. Produces a lightweight claim index: title + description + domain + agent + confidence. Sorted by domain, then alphabetically within domain. Target: ~37KB for ~370 claims. Truncates descriptions at 100 chars if total > 100KB. """ claims = [] for node in nodes: fpath = node["id"] abs_path = os.path.join(repo_root, fpath) description = "" try: text = open(abs_path, encoding="utf-8").read() fm = parse_frontmatter(text) description = fm.get("description", "") except (OSError, UnicodeDecodeError): pass claims.append({ "title": node["title"], "description": description, "domain": node["domain"], "agent": node["agent"], "confidence": node["confidence"], }) # Sort by domain, then title claims.sort(key=lambda c: (c["domain"], c["title"])) context = { "generated": datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "claimCount": len(claims), "claims": claims, } # Progressive description truncation if over 100KB. # Never drop descriptions entirely — short descriptions are better than none. for max_desc in (120, 100, 80, 60): test_json = json.dumps(context, ensure_ascii=False) if len(test_json) <= 100_000: break for c in claims: if len(c["description"]) > max_desc: c["description"] = c["description"][:max_desc] + "..." return context def main(): parser = argparse.ArgumentParser(description="Extract graph data from teleo-codex") parser.add_argument("--output", "-o", default="graph-data.json", help="Output file path (default: graph-data.json)") parser.add_argument("--context-output", "-c", default=None, help="Output claims-context.json path (default: same dir as --output)") parser.add_argument("--repo", "-r", default=".", help="Path to teleo-codex repo root (default: current dir)") args = parser.parse_args() repo_root = os.path.abspath(args.repo) if not os.path.isdir(os.path.join(repo_root, "core")): print(f"Error: {repo_root} doesn't look like a teleo-codex repo (no core/ dir)", file=sys.stderr) sys.exit(1) print(f"Scanning {repo_root}...") graph = extract_graph(repo_root) print(f" Nodes: {len(graph['nodes'])}") print(f" Edges: {len(graph['edges'])}") print(f" Events: {len(graph['events'])}") challenged_count = sum(1 for n in graph["nodes"] if n.get("challenged")) print(f" Challenged: {challenged_count}") # Write graph-data.json output_path = os.path.abspath(args.output) with open(output_path, "w", encoding="utf-8") as f: json.dump(graph, f, indent=2, ensure_ascii=False) size_kb = os.path.getsize(output_path) / 1024 print(f" graph-data.json: {output_path} ({size_kb:.1f} KB)") # Write claims-context.json context_path = args.context_output if not context_path: context_path = os.path.join(os.path.dirname(output_path), "claims-context.json") context_path = os.path.abspath(context_path) context = build_claims_context(repo_root, graph["nodes"]) with open(context_path, "w", encoding="utf-8") as f: json.dump(context, f, indent=2, ensure_ascii=False) ctx_kb = os.path.getsize(context_path) / 1024 print(f" claims-context.json: {context_path} ({ctx_kb:.1f} KB)") if __name__ == "__main__": main()