Auto: 3 files | 3 files changed, 677 insertions(+), 81 deletions(-)

This commit is contained in:
m3taversal 2026-03-09 22:26:36 +00:00
parent 876a01a4da
commit c9e2970cfb
3 changed files with 676 additions and 80 deletions

67
.github/workflows/sync-graph-data.yml vendored Normal file
View file

@ -0,0 +1,67 @@
name: Sync Graph Data to teleo-app
# Runs on every merge to main. Extracts graph data from the codex and
# pushes graph-data.json + claims-context.json to teleo-app/public/.
# This triggers a Vercel rebuild automatically.
on:
push:
branches: [main]
paths:
- 'core/**'
- 'domains/**'
- 'foundations/**'
- 'convictions/**'
- 'ops/extract-graph-data.py'
workflow_dispatch: # manual trigger
jobs:
sync:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout teleo-codex
uses: actions/checkout@v4
with:
fetch-depth: 0 # full history for git log agent attribution
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Run extraction
run: |
python3 ops/extract-graph-data.py \
--repo . \
--output /tmp/graph-data.json \
--context-output /tmp/claims-context.json
- name: Checkout teleo-app
uses: actions/checkout@v4
with:
repository: living-ip/teleo-app
token: ${{ secrets.TELEO_APP_TOKEN }}
path: teleo-app
- name: Copy data files
run: |
cp /tmp/graph-data.json teleo-app/public/graph-data.json
cp /tmp/claims-context.json teleo-app/public/claims-context.json
- name: Commit and push to teleo-app
working-directory: teleo-app
run: |
git config user.name "teleo-codex-bot"
git config user.email "bot@livingip.io"
git add public/graph-data.json public/claims-context.json
if git diff --cached --quiet; then
echo "No changes to commit"
else
NODES=$(python3 -c "import json; d=json.load(open('public/graph-data.json')); print(len(d['nodes']))")
EDGES=$(python3 -c "import json; d=json.load(open('public/graph-data.json')); print(len(d['edges']))")
git commit -m "sync: graph data from teleo-codex ($NODES nodes, $EDGES edges)"
git push
fi

View file

@ -6,8 +6,8 @@
# 2. Domain agent — domain expertise, duplicate check, technical accuracy
#
# After both reviews, auto-merges if:
# - Leo approved (gh pr review --approve)
# - Domain agent verdict is "Approve" (parsed from comment)
# - Leo's comment contains "**Verdict:** approve"
# - Domain agent's comment contains "**Verdict:** approve"
# - No territory violations (files outside proposer's domain)
#
# Usage:
@ -26,8 +26,14 @@
# - Lockfile prevents concurrent runs
# - Auto-merge requires ALL reviewers to approve + no territory violations
# - Each PR runs sequentially to avoid branch conflicts
# - Timeout: 10 minutes per agent per PR
# - Timeout: 20 minutes per agent per PR
# - Pre-flight checks: clean working tree, gh auth
#
# Verdict protocol:
# All agents use `gh pr comment` (NOT `gh pr review`) because all agents
# share the m3taversal GitHub account — `gh pr review --approve` fails
# when the PR author and reviewer are the same user. The merge check
# parses issue comments for structured verdict markers instead.
set -euo pipefail
@ -39,7 +45,7 @@ cd "$REPO_ROOT"
LOCKFILE="/tmp/evaluate-trigger.lock"
LOG_DIR="$REPO_ROOT/ops/sessions"
TIMEOUT_SECONDS=600
TIMEOUT_SECONDS=1200
DRY_RUN=false
LEO_ONLY=false
NO_MERGE=false
@ -62,24 +68,30 @@ detect_domain_agent() {
vida/*|*/health*) agent="vida"; domain="health" ;;
astra/*|*/space-development*) agent="astra"; domain="space-development" ;;
leo/*|*/grand-strategy*) agent="leo"; domain="grand-strategy" ;;
contrib/*)
# External contributor — detect domain from changed files (fall through to file check)
agent=""; domain=""
;;
*)
# Fall back to checking which domain directory has changed files
if echo "$files" | grep -q "domains/internet-finance/"; then
agent="rio"; domain="internet-finance"
elif echo "$files" | grep -q "domains/entertainment/"; then
agent="clay"; domain="entertainment"
elif echo "$files" | grep -q "domains/ai-alignment/"; then
agent="theseus"; domain="ai-alignment"
elif echo "$files" | grep -q "domains/health/"; then
agent="vida"; domain="health"
elif echo "$files" | grep -q "domains/space-development/"; then
agent="astra"; domain="space-development"
else
agent=""; domain=""
fi
agent=""; domain=""
;;
esac
# If no agent detected from branch prefix, check changed files
if [ -z "$agent" ]; then
if echo "$files" | grep -q "domains/internet-finance/"; then
agent="rio"; domain="internet-finance"
elif echo "$files" | grep -q "domains/entertainment/"; then
agent="clay"; domain="entertainment"
elif echo "$files" | grep -q "domains/ai-alignment/"; then
agent="theseus"; domain="ai-alignment"
elif echo "$files" | grep -q "domains/health/"; then
agent="vida"; domain="health"
elif echo "$files" | grep -q "domains/space-development/"; then
agent="astra"; domain="space-development"
fi
fi
echo "$agent $domain"
}
@ -112,8 +124,8 @@ if ! command -v claude >/dev/null 2>&1; then
exit 1
fi
# Check for dirty working tree (ignore ops/ and .claude/ which may contain uncommitted scripts)
DIRTY_FILES=$(git status --porcelain | grep -v '^?? ops/' | grep -v '^ M ops/' | grep -v '^?? \.claude/' | grep -v '^ M \.claude/' || true)
# Check for dirty working tree (ignore ops/, .claude/, .github/ which may contain local-only files)
DIRTY_FILES=$(git status --porcelain | grep -v '^?? ops/' | grep -v '^ M ops/' | grep -v '^?? \.claude/' | grep -v '^ M \.claude/' | grep -v '^?? \.github/' | grep -v '^ M \.github/' || true)
if [ -n "$DIRTY_FILES" ]; then
echo "ERROR: Working tree is dirty. Clean up before running."
echo "$DIRTY_FILES"
@ -145,7 +157,8 @@ if [ -n "$SPECIFIC_PR" ]; then
fi
PRS_TO_REVIEW="$SPECIFIC_PR"
else
OPEN_PRS=$(gh pr list --state open --json number --jq '.[].number' 2>/dev/null || echo "")
# NOTE: gh pr list silently returns empty in some worktree configs; use gh api instead
OPEN_PRS=$(gh api repos/:owner/:repo/pulls --jq '.[].number' 2>/dev/null || echo "")
if [ -z "$OPEN_PRS" ]; then
echo "No open PRs found. Nothing to review."
@ -154,17 +167,23 @@ else
PRS_TO_REVIEW=""
for pr in $OPEN_PRS; do
LAST_REVIEW_DATE=$(gh api "repos/{owner}/{repo}/pulls/$pr/reviews" \
--jq 'map(select(.state != "DISMISSED")) | sort_by(.submitted_at) | last | .submitted_at' 2>/dev/null || echo "")
# Check if this PR already has a Leo verdict comment (avoid re-reviewing)
LEO_COMMENTED=$(gh pr view "$pr" --json comments \
--jq '[.comments[] | select(.body | test("VERDICT:LEO:(APPROVE|REQUEST_CHANGES)"))] | length' 2>/dev/null || echo "0")
LAST_COMMIT_DATE=$(gh pr view "$pr" --json commits --jq '.commits[-1].committedDate' 2>/dev/null || echo "")
if [ -z "$LAST_REVIEW_DATE" ]; then
PRS_TO_REVIEW="$PRS_TO_REVIEW $pr"
elif [ -n "$LAST_COMMIT_DATE" ] && [[ "$LAST_COMMIT_DATE" > "$LAST_REVIEW_DATE" ]]; then
echo "PR #$pr: New commits since last review. Queuing for re-review."
if [ "$LEO_COMMENTED" = "0" ]; then
PRS_TO_REVIEW="$PRS_TO_REVIEW $pr"
else
echo "PR #$pr: No new commits since last review. Skipping."
# Check if new commits since last Leo review
LAST_LEO_DATE=$(gh pr view "$pr" --json comments \
--jq '[.comments[] | select(.body | test("VERDICT:LEO:")) | .createdAt] | last' 2>/dev/null || echo "")
if [ -n "$LAST_COMMIT_DATE" ] && [ -n "$LAST_LEO_DATE" ] && [[ "$LAST_COMMIT_DATE" > "$LAST_LEO_DATE" ]]; then
echo "PR #$pr: New commits since last review. Queuing for re-review."
PRS_TO_REVIEW="$PRS_TO_REVIEW $pr"
else
echo "PR #$pr: Already reviewed. Skipping."
fi
fi
done
@ -195,7 +214,7 @@ run_agent_review() {
log_file="$LOG_DIR/${agent_name}-review-pr${pr}-${timestamp}.log"
review_file="/tmp/${agent_name}-review-pr${pr}.md"
echo " Running ${agent_name}..."
echo " Running ${agent_name} (model: ${model})..."
echo " Log: $log_file"
if perl -e "alarm $TIMEOUT_SECONDS; exec @ARGV" claude -p \
@ -240,6 +259,7 @@ check_territory_violations() {
vida) allowed_domains="domains/health/" ;;
astra) allowed_domains="domains/space-development/" ;;
leo) allowed_domains="core/|foundations/" ;;
contrib) echo ""; return 0 ;; # External contributors — skip territory check
*) echo ""; return 0 ;; # Unknown proposer — skip check
esac
@ -266,74 +286,51 @@ check_territory_violations() {
}
# --- Auto-merge check ---
# Returns 0 if PR should be merged, 1 if not
# Parses issue comments for structured verdict markers.
# Verdict protocol: agents post `<!-- VERDICT:AGENT_KEY:APPROVE -->` or
# `<!-- VERDICT:AGENT_KEY:REQUEST_CHANGES -->` as HTML comments in their review.
# This is machine-parseable and invisible in the rendered comment.
check_merge_eligible() {
local pr_number="$1"
local domain_agent="$2"
local leo_passed="$3"
# Gate 1: Leo must have passed
# Gate 1: Leo must have completed without timeout/error
if [ "$leo_passed" != "true" ]; then
echo "BLOCK: Leo review failed or timed out"
return 1
fi
# Gate 2: Check Leo's review state via GitHub API
local leo_review_state
leo_review_state=$(gh api "repos/{owner}/{repo}/pulls/${pr_number}/reviews" \
--jq '[.[] | select(.state != "DISMISSED" and .state != "PENDING")] | last | .state' 2>/dev/null || echo "")
# Gate 2: Check Leo's verdict from issue comments
local leo_verdict
leo_verdict=$(gh pr view "$pr_number" --json comments \
--jq '[.comments[] | select(.body | test("VERDICT:LEO:")) | .body] | last' 2>/dev/null || echo "")
if [ "$leo_review_state" = "APPROVED" ]; then
echo "Leo: APPROVED (via review API)"
elif [ "$leo_review_state" = "CHANGES_REQUESTED" ]; then
echo "BLOCK: Leo requested changes (review API state: CHANGES_REQUESTED)"
if echo "$leo_verdict" | grep -q "VERDICT:LEO:APPROVE"; then
echo "Leo: APPROVED"
elif echo "$leo_verdict" | grep -q "VERDICT:LEO:REQUEST_CHANGES"; then
echo "BLOCK: Leo requested changes"
return 1
else
# Fallback: check PR comments for Leo's verdict
local leo_verdict
leo_verdict=$(gh pr view "$pr_number" --json comments \
--jq '.comments[] | select(.body | test("## Leo Review")) | .body' 2>/dev/null \
| grep -oiE '\*\*Verdict:[^*]+\*\*' | tail -1 || echo "")
if echo "$leo_verdict" | grep -qi "approve"; then
echo "Leo: APPROVED (via comment verdict)"
elif echo "$leo_verdict" | grep -qi "request changes\|reject"; then
echo "BLOCK: Leo verdict: $leo_verdict"
return 1
else
echo "BLOCK: Could not determine Leo's verdict"
return 1
fi
echo "BLOCK: Could not find Leo's verdict marker in PR comments"
return 1
fi
# Gate 3: Check domain agent verdict (if applicable)
if [ -n "$domain_agent" ] && [ "$domain_agent" != "leo" ]; then
local domain_key
domain_key=$(echo "$domain_agent" | tr '[:lower:]' '[:upper:]')
local domain_verdict
# Search for verdict in domain agent's review — match agent name, "domain reviewer", or "Domain Review"
domain_verdict=$(gh pr view "$pr_number" --json comments \
--jq ".comments[] | select(.body | test(\"domain review|${domain_agent}|peer review\"; \"i\")) | .body" 2>/dev/null \
| grep -oiE '\*\*Verdict:[^*]+\*\*' | tail -1 || echo "")
--jq "[.comments[] | select(.body | test(\"VERDICT:${domain_key}:\")) | .body] | last" 2>/dev/null || echo "")
if [ -z "$domain_verdict" ]; then
# Also check review API for domain agent approval
# Since all agents use the same GitHub account, we check for multiple approvals
local approval_count
approval_count=$(gh api "repos/{owner}/{repo}/pulls/${pr_number}/reviews" \
--jq '[.[] | select(.state == "APPROVED")] | length' 2>/dev/null || echo "0")
if [ "$approval_count" -ge 2 ]; then
echo "Domain agent: APPROVED (multiple approvals via review API)"
else
echo "BLOCK: No domain agent verdict found"
return 1
fi
elif echo "$domain_verdict" | grep -qi "approve"; then
echo "Domain agent ($domain_agent): APPROVED (via comment verdict)"
elif echo "$domain_verdict" | grep -qi "request changes\|reject"; then
echo "BLOCK: Domain agent verdict: $domain_verdict"
if echo "$domain_verdict" | grep -q "VERDICT:${domain_key}:APPROVE"; then
echo "Domain agent ($domain_agent): APPROVED"
elif echo "$domain_verdict" | grep -q "VERDICT:${domain_key}:REQUEST_CHANGES"; then
echo "BLOCK: $domain_agent requested changes"
return 1
else
echo "BLOCK: Unclear domain agent verdict: $domain_verdict"
echo "BLOCK: No verdict marker found for $domain_agent"
return 1
fi
else
@ -403,11 +400,15 @@ Also check:
- Cross-domain connections that the proposer may have missed
Write your complete review to ${LEO_REVIEW_FILE}
Then post it with: gh pr review ${pr} --comment --body-file ${LEO_REVIEW_FILE}
If ALL claims pass quality gates: gh pr review ${pr} --approve --body-file ${LEO_REVIEW_FILE}
If ANY claim needs changes: gh pr review ${pr} --request-changes --body-file ${LEO_REVIEW_FILE}
CRITICAL — Verdict format: Your review MUST end with exactly one of these verdict markers (as an HTML comment on its own line):
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
Then post the review as an issue comment:
gh pr comment ${pr} --body-file ${LEO_REVIEW_FILE}
IMPORTANT: Use 'gh pr comment' NOT 'gh pr review'. We use a shared GitHub account so gh pr review --approve fails.
DO NOT merge — the orchestrator handles merge decisions after all reviews are posted.
Work autonomously. Do not ask for confirmation."
@ -432,6 +433,7 @@ Work autonomously. Do not ask for confirmation."
else
DOMAIN_REVIEW_FILE="/tmp/${DOMAIN_AGENT}-review-pr${pr}.md"
AGENT_NAME_UPPER=$(echo "${DOMAIN_AGENT}" | awk '{print toupper(substr($0,1,1)) substr($0,2)}')
AGENT_KEY_UPPER=$(echo "${DOMAIN_AGENT}" | tr '[:lower:]' '[:upper:]')
DOMAIN_PROMPT="You are ${AGENT_NAME_UPPER}. Read agents/${DOMAIN_AGENT}/identity.md, agents/${DOMAIN_AGENT}/beliefs.md, and skills/evaluate.md.
You are reviewing PR #${pr} as the domain expert for ${DOMAIN}.
@ -452,8 +454,15 @@ Your review focuses on DOMAIN EXPERTISE — things only a ${DOMAIN} specialist w
6. **Confidence calibration** — From your domain expertise, is the confidence level right?
Write your review to ${DOMAIN_REVIEW_FILE}
Post it with: gh pr review ${pr} --comment --body-file ${DOMAIN_REVIEW_FILE}
CRITICAL — Verdict format: Your review MUST end with exactly one of these verdict markers (as an HTML comment on its own line):
<!-- VERDICT:${AGENT_KEY_UPPER}:APPROVE -->
<!-- VERDICT:${AGENT_KEY_UPPER}:REQUEST_CHANGES -->
Then post the review as an issue comment:
gh pr comment ${pr} --body-file ${DOMAIN_REVIEW_FILE}
IMPORTANT: Use 'gh pr comment' NOT 'gh pr review'. We use a shared GitHub account so gh pr review --approve fails.
Sign your review as ${AGENT_NAME_UPPER} (domain reviewer for ${DOMAIN}).
DO NOT duplicate Leo's quality gate checks — he covers those.
DO NOT merge — the orchestrator handles merge decisions after all reviews are posted.
@ -486,7 +495,7 @@ Work autonomously. Do not ask for confirmation."
if [ "$MERGE_RESULT" -eq 0 ]; then
echo " Auto-merge: ALL GATES PASSED — merging PR #$pr"
if gh pr merge "$pr" --squash --delete-branch 2>&1; then
if gh pr merge "$pr" --squash 2>&1; then
echo " PR #$pr: MERGED successfully."
MERGED=$((MERGED + 1))
else

520
ops/extract-graph-data.py Normal file
View file

@ -0,0 +1,520 @@
#!/usr/bin/env python3
"""
extract-graph-data.py Extract knowledge graph from teleo-codex markdown files.
Reads all .md claim/conviction files, parses YAML frontmatter and wiki-links,
and outputs graph-data.json matching the teleo-app GraphData interface.
Usage:
python3 ops/extract-graph-data.py [--output path/to/graph-data.json]
Must be run from the teleo-codex repo root.
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
SCAN_DIRS = ["core", "domains", "foundations", "convictions"]
# Only extract these content types (from frontmatter `type` field).
# If type is missing, include the file anyway (many claims lack explicit type).
INCLUDE_TYPES = {"claim", "conviction", "analysis", "belief", "position", None}
# Domain → default agent mapping (fallback when git attribution unavailable)
DOMAIN_AGENT_MAP = {
"internet-finance": "rio",
"entertainment": "clay",
"health": "vida",
"ai-alignment": "theseus",
"space-development": "astra",
"grand-strategy": "leo",
"mechanisms": "leo",
"living-capital": "leo",
"living-agents": "leo",
"teleohumanity": "leo",
"critical-systems": "leo",
"collective-intelligence": "leo",
"teleological-economics": "leo",
"cultural-dynamics": "clay",
}
DOMAIN_COLORS = {
"internet-finance": "#4A90D9",
"entertainment": "#9B59B6",
"health": "#2ECC71",
"ai-alignment": "#E74C3C",
"space-development": "#F39C12",
"grand-strategy": "#D4AF37",
"mechanisms": "#1ABC9C",
"living-capital": "#3498DB",
"living-agents": "#E67E22",
"teleohumanity": "#F1C40F",
"critical-systems": "#95A5A6",
"collective-intelligence": "#BDC3C7",
"teleological-economics": "#7F8C8D",
"cultural-dynamics": "#C0392B",
}
KNOWN_AGENTS = {"leo", "rio", "clay", "vida", "theseus", "astra"}
# Regex patterns
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---", re.DOTALL)
WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
YAML_FIELD_RE = re.compile(r"^(\w[\w_]*):\s*(.+)$", re.MULTILINE)
YAML_LIST_ITEM_RE = re.compile(r'^\s*-\s+"?(.+?)"?\s*$', re.MULTILINE)
COUNTER_EVIDENCE_RE = re.compile(r"^##\s+Counter[\s-]?evidence", re.MULTILINE | re.IGNORECASE)
COUNTERARGUMENT_RE = re.compile(r"^\*\*Counter\s*argument", re.MULTILINE | re.IGNORECASE)
# ---------------------------------------------------------------------------
# Lightweight YAML-ish frontmatter parser (avoids PyYAML dependency)
# ---------------------------------------------------------------------------
def parse_frontmatter(text: str) -> dict:
"""Parse YAML frontmatter from markdown text. Returns dict of fields."""
m = FRONTMATTER_RE.match(text)
if not m:
return {}
yaml_block = m.group(1)
result = {}
for field_match in YAML_FIELD_RE.finditer(yaml_block):
key = field_match.group(1)
val = field_match.group(2).strip().strip('"').strip("'")
# Handle list fields
if val.startswith("["):
# Inline YAML list: [item1, item2]
items = re.findall(r'"([^"]+)"', val)
if not items:
items = [x.strip().strip('"').strip("'")
for x in val.strip("[]").split(",") if x.strip()]
result[key] = items
else:
result[key] = val
# Handle multi-line list fields (depends_on, challenged_by, secondary_domains)
for list_key in ("depends_on", "challenged_by", "secondary_domains", "claims_extracted"):
if list_key not in result:
# Check for block-style list
pattern = re.compile(
rf"^{list_key}:\s*\n((?:\s+-\s+.+\n?)+)", re.MULTILINE
)
lm = pattern.search(yaml_block)
if lm:
items = YAML_LIST_ITEM_RE.findall(lm.group(1))
result[list_key] = [i.strip('"').strip("'") for i in items]
return result
def extract_body(text: str) -> str:
"""Return the markdown body after frontmatter."""
m = FRONTMATTER_RE.match(text)
if m:
return text[m.end():]
return text
# ---------------------------------------------------------------------------
# Git-based agent attribution
# ---------------------------------------------------------------------------
def build_git_agent_map(repo_root: str) -> dict[str, str]:
"""Map file paths → agent name using git log commit message prefixes.
Commit messages follow: '{agent}: description'
We use the commit that first added each file.
"""
file_agent = {}
try:
result = subprocess.run(
["git", "log", "--all", "--diff-filter=A", "--name-only",
"--format=COMMIT_MSG:%s"],
capture_output=True, text=True, cwd=repo_root, timeout=30,
)
current_agent = None
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
if line.startswith("COMMIT_MSG:"):
msg = line[len("COMMIT_MSG:"):]
# Parse "agent: description" pattern
if ":" in msg:
prefix = msg.split(":")[0].strip().lower()
if prefix in KNOWN_AGENTS:
current_agent = prefix
else:
current_agent = None
else:
current_agent = None
elif current_agent and line.endswith(".md"):
# Only set if not already attributed (first add wins)
if line not in file_agent:
file_agent[line] = current_agent
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return file_agent
# ---------------------------------------------------------------------------
# Wiki-link resolution
# ---------------------------------------------------------------------------
def build_title_index(all_files: list[str], repo_root: str) -> dict[str, str]:
"""Map lowercase claim titles → file paths for wiki-link resolution."""
index = {}
for fpath in all_files:
# Title = filename without .md extension
fname = os.path.basename(fpath)
if fname.endswith(".md"):
title = fname[:-3].lower()
index[title] = fpath
# Also index by relative path
index[fpath.lower()] = fpath
return index
def resolve_wikilink(link_text: str, title_index: dict, source_dir: str) -> str | None:
"""Resolve a [[wiki-link]] target to a file path (node ID)."""
text = link_text.strip()
# Skip map links and non-claim references
if text.startswith("_") or text == "_map":
return None
# Direct path match (with or without .md)
for candidate in [text, text + ".md"]:
if candidate.lower() in title_index:
return title_index[candidate.lower()]
# Title-only match
title = text.lower()
if title in title_index:
return title_index[title]
# Fuzzy: try adding .md to the basename
basename = os.path.basename(text)
if basename.lower() in title_index:
return title_index[basename.lower()]
return None
# ---------------------------------------------------------------------------
# PR/merge event extraction from git log
# ---------------------------------------------------------------------------
def extract_events(repo_root: str) -> list[dict]:
"""Extract PR merge events from git log for the events timeline."""
events = []
try:
result = subprocess.run(
["git", "log", "--merges", "--format=%H|%s|%ai", "-50"],
capture_output=True, text=True, cwd=repo_root, timeout=15,
)
for line in result.stdout.strip().splitlines():
parts = line.split("|", 2)
if len(parts) < 3:
continue
sha, msg, date_str = parts
# Parse "Merge pull request #N from ..." or agent commit patterns
pr_match = re.search(r"#(\d+)", msg)
if not pr_match:
continue
pr_num = int(pr_match.group(1))
# Try to determine agent from merge commit
agent = "collective"
for a in KNOWN_AGENTS:
if a in msg.lower():
agent = a
break
# Count files changed in this merge
diff_result = subprocess.run(
["git", "diff", "--name-only", f"{sha}^..{sha}"],
capture_output=True, text=True, cwd=repo_root, timeout=10,
)
claims_added = sum(
1 for f in diff_result.stdout.splitlines()
if f.endswith(".md") and any(f.startswith(d) for d in SCAN_DIRS)
)
if claims_added > 0:
events.append({
"type": "pr-merge",
"number": pr_num,
"agent": agent,
"claims_added": claims_added,
"date": date_str[:10],
})
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return events
# ---------------------------------------------------------------------------
# Main extraction
# ---------------------------------------------------------------------------
def find_markdown_files(repo_root: str) -> list[str]:
"""Find all .md files in SCAN_DIRS, return relative paths."""
files = []
for scan_dir in SCAN_DIRS:
dirpath = os.path.join(repo_root, scan_dir)
if not os.path.isdir(dirpath):
continue
for root, _dirs, filenames in os.walk(dirpath):
for fname in filenames:
if fname.endswith(".md") and not fname.startswith("_"):
rel = os.path.relpath(os.path.join(root, fname), repo_root)
files.append(rel)
return sorted(files)
def _get_domain_cached(fpath: str, repo_root: str, cache: dict) -> str:
"""Get the domain of a file, caching results."""
if fpath in cache:
return cache[fpath]
abs_path = os.path.join(repo_root, fpath)
domain = ""
try:
text = open(abs_path, encoding="utf-8").read()
fm = parse_frontmatter(text)
domain = fm.get("domain", "")
except (OSError, UnicodeDecodeError):
pass
cache[fpath] = domain
return domain
def extract_graph(repo_root: str) -> dict:
"""Extract the full knowledge graph from the codex."""
all_files = find_markdown_files(repo_root)
git_agents = build_git_agent_map(repo_root)
title_index = build_title_index(all_files, repo_root)
domain_cache: dict[str, str] = {}
nodes = []
edges = []
node_ids = set()
all_files_set = set(all_files)
for fpath in all_files:
abs_path = os.path.join(repo_root, fpath)
try:
text = open(abs_path, encoding="utf-8").read()
except (OSError, UnicodeDecodeError):
continue
fm = parse_frontmatter(text)
body = extract_body(text)
# Filter by type
ftype = fm.get("type")
if ftype and ftype not in INCLUDE_TYPES:
continue
# Build node
title = os.path.basename(fpath)[:-3] # filename without .md
domain = fm.get("domain", "")
if not domain:
# Infer domain from directory path
parts = fpath.split(os.sep)
if len(parts) >= 2:
domain = parts[1] if parts[0] == "domains" else parts[1] if len(parts) > 2 else parts[0]
# Agent attribution: git log → domain mapping → "collective"
agent = git_agents.get(fpath, "")
if not agent:
agent = DOMAIN_AGENT_MAP.get(domain, "collective")
created = fm.get("created", "")
confidence = fm.get("confidence", "speculative")
# Detect challenged status
challenged_by_raw = fm.get("challenged_by", [])
if isinstance(challenged_by_raw, str):
challenged_by_raw = [challenged_by_raw] if challenged_by_raw else []
has_challenged_by = bool(challenged_by_raw and any(c for c in challenged_by_raw))
has_counter_section = bool(COUNTER_EVIDENCE_RE.search(body) or COUNTERARGUMENT_RE.search(body))
is_challenged = has_challenged_by or has_counter_section
# Extract challenge descriptions for the node
challenges = []
if isinstance(challenged_by_raw, list):
for c in challenged_by_raw:
if c and isinstance(c, str):
# Strip wiki-link syntax for display
cleaned = WIKILINK_RE.sub(lambda m: m.group(1), c)
# Strip markdown list artifacts: leading "- ", surrounding quotes
cleaned = re.sub(r'^-\s*', '', cleaned).strip()
cleaned = cleaned.strip('"').strip("'").strip()
if cleaned:
challenges.append(cleaned[:200]) # cap length
node = {
"id": fpath,
"title": title,
"domain": domain,
"agent": agent,
"created": created,
"confidence": confidence,
"challenged": is_challenged,
}
if challenges:
node["challenges"] = challenges
nodes.append(node)
node_ids.add(fpath)
domain_cache[fpath] = domain # cache for edge lookups
for link_text in WIKILINK_RE.findall(body):
target = resolve_wikilink(link_text, title_index, os.path.dirname(fpath))
if target and target != fpath and target in all_files_set:
target_domain = _get_domain_cached(target, repo_root, domain_cache)
edges.append({
"source": fpath,
"target": target,
"type": "wiki-link",
"cross_domain": domain != target_domain and bool(target_domain),
})
# Conflict edges from challenged_by (may contain [[wiki-links]] or prose)
challenged_by = fm.get("challenged_by", [])
if isinstance(challenged_by, str):
challenged_by = [challenged_by]
if isinstance(challenged_by, list):
for challenge in challenged_by:
if not challenge:
continue
# Check for embedded wiki-links
for link_text in WIKILINK_RE.findall(challenge):
target = resolve_wikilink(link_text, title_index, os.path.dirname(fpath))
if target and target != fpath and target in all_files_set:
target_domain = _get_domain_cached(target, repo_root, domain_cache)
edges.append({
"source": fpath,
"target": target,
"type": "conflict",
"cross_domain": domain != target_domain and bool(target_domain),
})
# Deduplicate edges
seen_edges = set()
unique_edges = []
for e in edges:
key = (e["source"], e["target"], e.get("type", ""))
if key not in seen_edges:
seen_edges.add(key)
unique_edges.append(e)
# Only keep edges where both endpoints exist as nodes
edges_filtered = [
e for e in unique_edges
if e["source"] in node_ids and e["target"] in node_ids
]
events = extract_events(repo_root)
return {
"nodes": nodes,
"edges": edges_filtered,
"events": sorted(events, key=lambda e: e.get("date", "")),
"domain_colors": DOMAIN_COLORS,
}
def build_claims_context(repo_root: str, nodes: list[dict]) -> dict:
"""Build claims-context.json for chat system prompt injection.
Produces a lightweight claim index: title + description + domain + agent + confidence.
Sorted by domain, then alphabetically within domain.
Target: ~37KB for ~370 claims. Truncates descriptions at 100 chars if total > 100KB.
"""
claims = []
for node in nodes:
fpath = node["id"]
abs_path = os.path.join(repo_root, fpath)
description = ""
try:
text = open(abs_path, encoding="utf-8").read()
fm = parse_frontmatter(text)
description = fm.get("description", "")
except (OSError, UnicodeDecodeError):
pass
claims.append({
"title": node["title"],
"description": description,
"domain": node["domain"],
"agent": node["agent"],
"confidence": node["confidence"],
})
# Sort by domain, then title
claims.sort(key=lambda c: (c["domain"], c["title"]))
context = {
"generated": datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"claimCount": len(claims),
"claims": claims,
}
# Progressive description truncation if over 100KB.
# Never drop descriptions entirely — short descriptions are better than none.
for max_desc in (120, 100, 80, 60):
test_json = json.dumps(context, ensure_ascii=False)
if len(test_json) <= 100_000:
break
for c in claims:
if len(c["description"]) > max_desc:
c["description"] = c["description"][:max_desc] + "..."
return context
def main():
parser = argparse.ArgumentParser(description="Extract graph data from teleo-codex")
parser.add_argument("--output", "-o", default="graph-data.json",
help="Output file path (default: graph-data.json)")
parser.add_argument("--context-output", "-c", default=None,
help="Output claims-context.json path (default: same dir as --output)")
parser.add_argument("--repo", "-r", default=".",
help="Path to teleo-codex repo root (default: current dir)")
args = parser.parse_args()
repo_root = os.path.abspath(args.repo)
if not os.path.isdir(os.path.join(repo_root, "core")):
print(f"Error: {repo_root} doesn't look like a teleo-codex repo (no core/ dir)", file=sys.stderr)
sys.exit(1)
print(f"Scanning {repo_root}...")
graph = extract_graph(repo_root)
print(f" Nodes: {len(graph['nodes'])}")
print(f" Edges: {len(graph['edges'])}")
print(f" Events: {len(graph['events'])}")
challenged_count = sum(1 for n in graph["nodes"] if n.get("challenged"))
print(f" Challenged: {challenged_count}")
# Write graph-data.json
output_path = os.path.abspath(args.output)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(graph, f, indent=2, ensure_ascii=False)
size_kb = os.path.getsize(output_path) / 1024
print(f" graph-data.json: {output_path} ({size_kb:.1f} KB)")
# Write claims-context.json
context_path = args.context_output
if not context_path:
context_path = os.path.join(os.path.dirname(output_path), "claims-context.json")
context_path = os.path.abspath(context_path)
context = build_claims_context(repo_root, graph["nodes"])
with open(context_path, "w", encoding="utf-8") as f:
json.dump(context, f, indent=2, ensure_ascii=False)
ctx_kb = os.path.getsize(context_path) / 1024
print(f" claims-context.json: {context_path} ({ctx_kb:.1f} KB)")
if __name__ == "__main__":
main()