leo: add ingest skill — full X-to-claims pipeline (#103)
Some checks are pending
Sync Graph Data to teleo-app / sync (push) Waiting to run

This commit is contained in:
Leo 2026-03-10 10:42:25 +00:00
parent ae66f37975
commit 75f1709110
5 changed files with 1056 additions and 80 deletions

67
.github/workflows/sync-graph-data.yml vendored Normal file
View file

@ -0,0 +1,67 @@
name: Sync Graph Data to teleo-app
# Runs on every merge to main. Extracts graph data from the codex and
# pushes graph-data.json + claims-context.json to teleo-app/public/.
# This triggers a Vercel rebuild automatically.
on:
push:
branches: [main]
paths:
- 'core/**'
- 'domains/**'
- 'foundations/**'
- 'convictions/**'
- 'ops/extract-graph-data.py'
workflow_dispatch: # manual trigger
jobs:
sync:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout teleo-codex
uses: actions/checkout@v4
with:
fetch-depth: 0 # full history for git log agent attribution
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Run extraction
run: |
python3 ops/extract-graph-data.py \
--repo . \
--output /tmp/graph-data.json \
--context-output /tmp/claims-context.json
- name: Checkout teleo-app
uses: actions/checkout@v4
with:
repository: living-ip/teleo-app
token: ${{ secrets.TELEO_APP_TOKEN }}
path: teleo-app
- name: Copy data files
run: |
cp /tmp/graph-data.json teleo-app/public/graph-data.json
cp /tmp/claims-context.json teleo-app/public/claims-context.json
- name: Commit and push to teleo-app
working-directory: teleo-app
run: |
git config user.name "teleo-codex-bot"
git config user.email "bot@livingip.io"
git add public/graph-data.json public/claims-context.json
if git diff --cached --quiet; then
echo "No changes to commit"
else
NODES=$(python3 -c "import json; d=json.load(open('public/graph-data.json')); print(len(d['nodes']))")
EDGES=$(python3 -c "import json; d=json.load(open('public/graph-data.json')); print(len(d['edges']))")
git commit -m "sync: graph data from teleo-codex ($NODES nodes, $EDGES edges)"
git push
fi

View file

@ -6,8 +6,8 @@
# 2. Domain agent — domain expertise, duplicate check, technical accuracy
#
# After both reviews, auto-merges if:
# - Leo approved (gh pr review --approve)
# - Domain agent verdict is "Approve" (parsed from comment)
# - Leo's comment contains "**Verdict:** approve"
# - Domain agent's comment contains "**Verdict:** approve"
# - No territory violations (files outside proposer's domain)
#
# Usage:
@ -26,8 +26,14 @@
# - Lockfile prevents concurrent runs
# - Auto-merge requires ALL reviewers to approve + no territory violations
# - Each PR runs sequentially to avoid branch conflicts
# - Timeout: 10 minutes per agent per PR
# - Timeout: 20 minutes per agent per PR
# - Pre-flight checks: clean working tree, gh auth
#
# Verdict protocol:
# All agents use `gh pr comment` (NOT `gh pr review`) because all agents
# share the m3taversal GitHub account — `gh pr review --approve` fails
# when the PR author and reviewer are the same user. The merge check
# parses issue comments for structured verdict markers instead.
set -euo pipefail
@ -39,7 +45,7 @@ cd "$REPO_ROOT"
LOCKFILE="/tmp/evaluate-trigger.lock"
LOG_DIR="$REPO_ROOT/ops/sessions"
TIMEOUT_SECONDS=600
TIMEOUT_SECONDS=1200
DRY_RUN=false
LEO_ONLY=false
NO_MERGE=false
@ -62,24 +68,30 @@ detect_domain_agent() {
vida/*|*/health*) agent="vida"; domain="health" ;;
astra/*|*/space-development*) agent="astra"; domain="space-development" ;;
leo/*|*/grand-strategy*) agent="leo"; domain="grand-strategy" ;;
contrib/*)
# External contributor — detect domain from changed files (fall through to file check)
agent=""; domain=""
;;
*)
# Fall back to checking which domain directory has changed files
if echo "$files" | grep -q "domains/internet-finance/"; then
agent="rio"; domain="internet-finance"
elif echo "$files" | grep -q "domains/entertainment/"; then
agent="clay"; domain="entertainment"
elif echo "$files" | grep -q "domains/ai-alignment/"; then
agent="theseus"; domain="ai-alignment"
elif echo "$files" | grep -q "domains/health/"; then
agent="vida"; domain="health"
elif echo "$files" | grep -q "domains/space-development/"; then
agent="astra"; domain="space-development"
else
agent=""; domain=""
fi
agent=""; domain=""
;;
esac
# If no agent detected from branch prefix, check changed files
if [ -z "$agent" ]; then
if echo "$files" | grep -q "domains/internet-finance/"; then
agent="rio"; domain="internet-finance"
elif echo "$files" | grep -q "domains/entertainment/"; then
agent="clay"; domain="entertainment"
elif echo "$files" | grep -q "domains/ai-alignment/"; then
agent="theseus"; domain="ai-alignment"
elif echo "$files" | grep -q "domains/health/"; then
agent="vida"; domain="health"
elif echo "$files" | grep -q "domains/space-development/"; then
agent="astra"; domain="space-development"
fi
fi
echo "$agent $domain"
}
@ -112,8 +124,8 @@ if ! command -v claude >/dev/null 2>&1; then
exit 1
fi
# Check for dirty working tree (ignore ops/ and .claude/ which may contain uncommitted scripts)
DIRTY_FILES=$(git status --porcelain | grep -v '^?? ops/' | grep -v '^ M ops/' | grep -v '^?? \.claude/' | grep -v '^ M \.claude/' || true)
# Check for dirty working tree (ignore ops/, .claude/, .github/ which may contain local-only files)
DIRTY_FILES=$(git status --porcelain | grep -v '^?? ops/' | grep -v '^ M ops/' | grep -v '^?? \.claude/' | grep -v '^ M \.claude/' | grep -v '^?? \.github/' | grep -v '^ M \.github/' || true)
if [ -n "$DIRTY_FILES" ]; then
echo "ERROR: Working tree is dirty. Clean up before running."
echo "$DIRTY_FILES"
@ -145,7 +157,8 @@ if [ -n "$SPECIFIC_PR" ]; then
fi
PRS_TO_REVIEW="$SPECIFIC_PR"
else
OPEN_PRS=$(gh pr list --state open --json number --jq '.[].number' 2>/dev/null || echo "")
# NOTE: gh pr list silently returns empty in some worktree configs; use gh api instead
OPEN_PRS=$(gh api repos/:owner/:repo/pulls --jq '.[].number' 2>/dev/null || echo "")
if [ -z "$OPEN_PRS" ]; then
echo "No open PRs found. Nothing to review."
@ -154,17 +167,23 @@ else
PRS_TO_REVIEW=""
for pr in $OPEN_PRS; do
LAST_REVIEW_DATE=$(gh api "repos/{owner}/{repo}/pulls/$pr/reviews" \
--jq 'map(select(.state != "DISMISSED")) | sort_by(.submitted_at) | last | .submitted_at' 2>/dev/null || echo "")
# Check if this PR already has a Leo verdict comment (avoid re-reviewing)
LEO_COMMENTED=$(gh pr view "$pr" --json comments \
--jq '[.comments[] | select(.body | test("VERDICT:LEO:(APPROVE|REQUEST_CHANGES)"))] | length' 2>/dev/null || echo "0")
LAST_COMMIT_DATE=$(gh pr view "$pr" --json commits --jq '.commits[-1].committedDate' 2>/dev/null || echo "")
if [ -z "$LAST_REVIEW_DATE" ]; then
PRS_TO_REVIEW="$PRS_TO_REVIEW $pr"
elif [ -n "$LAST_COMMIT_DATE" ] && [[ "$LAST_COMMIT_DATE" > "$LAST_REVIEW_DATE" ]]; then
echo "PR #$pr: New commits since last review. Queuing for re-review."
if [ "$LEO_COMMENTED" = "0" ]; then
PRS_TO_REVIEW="$PRS_TO_REVIEW $pr"
else
echo "PR #$pr: No new commits since last review. Skipping."
# Check if new commits since last Leo review
LAST_LEO_DATE=$(gh pr view "$pr" --json comments \
--jq '[.comments[] | select(.body | test("VERDICT:LEO:")) | .createdAt] | last' 2>/dev/null || echo "")
if [ -n "$LAST_COMMIT_DATE" ] && [ -n "$LAST_LEO_DATE" ] && [[ "$LAST_COMMIT_DATE" > "$LAST_LEO_DATE" ]]; then
echo "PR #$pr: New commits since last review. Queuing for re-review."
PRS_TO_REVIEW="$PRS_TO_REVIEW $pr"
else
echo "PR #$pr: Already reviewed. Skipping."
fi
fi
done
@ -195,7 +214,7 @@ run_agent_review() {
log_file="$LOG_DIR/${agent_name}-review-pr${pr}-${timestamp}.log"
review_file="/tmp/${agent_name}-review-pr${pr}.md"
echo " Running ${agent_name}..."
echo " Running ${agent_name} (model: ${model})..."
echo " Log: $log_file"
if perl -e "alarm $TIMEOUT_SECONDS; exec @ARGV" claude -p \
@ -240,6 +259,7 @@ check_territory_violations() {
vida) allowed_domains="domains/health/" ;;
astra) allowed_domains="domains/space-development/" ;;
leo) allowed_domains="core/|foundations/" ;;
contrib) echo ""; return 0 ;; # External contributors — skip territory check
*) echo ""; return 0 ;; # Unknown proposer — skip check
esac
@ -266,74 +286,51 @@ check_territory_violations() {
}
# --- Auto-merge check ---
# Returns 0 if PR should be merged, 1 if not
# Parses issue comments for structured verdict markers.
# Verdict protocol: agents post `<!-- VERDICT:AGENT_KEY:APPROVE -->` or
# `<!-- VERDICT:AGENT_KEY:REQUEST_CHANGES -->` as HTML comments in their review.
# This is machine-parseable and invisible in the rendered comment.
check_merge_eligible() {
local pr_number="$1"
local domain_agent="$2"
local leo_passed="$3"
# Gate 1: Leo must have passed
# Gate 1: Leo must have completed without timeout/error
if [ "$leo_passed" != "true" ]; then
echo "BLOCK: Leo review failed or timed out"
return 1
fi
# Gate 2: Check Leo's review state via GitHub API
local leo_review_state
leo_review_state=$(gh api "repos/{owner}/{repo}/pulls/${pr_number}/reviews" \
--jq '[.[] | select(.state != "DISMISSED" and .state != "PENDING")] | last | .state' 2>/dev/null || echo "")
# Gate 2: Check Leo's verdict from issue comments
local leo_verdict
leo_verdict=$(gh pr view "$pr_number" --json comments \
--jq '[.comments[] | select(.body | test("VERDICT:LEO:")) | .body] | last' 2>/dev/null || echo "")
if [ "$leo_review_state" = "APPROVED" ]; then
echo "Leo: APPROVED (via review API)"
elif [ "$leo_review_state" = "CHANGES_REQUESTED" ]; then
echo "BLOCK: Leo requested changes (review API state: CHANGES_REQUESTED)"
if echo "$leo_verdict" | grep -q "VERDICT:LEO:APPROVE"; then
echo "Leo: APPROVED"
elif echo "$leo_verdict" | grep -q "VERDICT:LEO:REQUEST_CHANGES"; then
echo "BLOCK: Leo requested changes"
return 1
else
# Fallback: check PR comments for Leo's verdict
local leo_verdict
leo_verdict=$(gh pr view "$pr_number" --json comments \
--jq '.comments[] | select(.body | test("## Leo Review")) | .body' 2>/dev/null \
| grep -oiE '\*\*Verdict:[^*]+\*\*' | tail -1 || echo "")
if echo "$leo_verdict" | grep -qi "approve"; then
echo "Leo: APPROVED (via comment verdict)"
elif echo "$leo_verdict" | grep -qi "request changes\|reject"; then
echo "BLOCK: Leo verdict: $leo_verdict"
return 1
else
echo "BLOCK: Could not determine Leo's verdict"
return 1
fi
echo "BLOCK: Could not find Leo's verdict marker in PR comments"
return 1
fi
# Gate 3: Check domain agent verdict (if applicable)
if [ -n "$domain_agent" ] && [ "$domain_agent" != "leo" ]; then
local domain_key
domain_key=$(echo "$domain_agent" | tr '[:lower:]' '[:upper:]')
local domain_verdict
# Search for verdict in domain agent's review — match agent name, "domain reviewer", or "Domain Review"
domain_verdict=$(gh pr view "$pr_number" --json comments \
--jq ".comments[] | select(.body | test(\"domain review|${domain_agent}|peer review\"; \"i\")) | .body" 2>/dev/null \
| grep -oiE '\*\*Verdict:[^*]+\*\*' | tail -1 || echo "")
--jq "[.comments[] | select(.body | test(\"VERDICT:${domain_key}:\")) | .body] | last" 2>/dev/null || echo "")
if [ -z "$domain_verdict" ]; then
# Also check review API for domain agent approval
# Since all agents use the same GitHub account, we check for multiple approvals
local approval_count
approval_count=$(gh api "repos/{owner}/{repo}/pulls/${pr_number}/reviews" \
--jq '[.[] | select(.state == "APPROVED")] | length' 2>/dev/null || echo "0")
if [ "$approval_count" -ge 2 ]; then
echo "Domain agent: APPROVED (multiple approvals via review API)"
else
echo "BLOCK: No domain agent verdict found"
return 1
fi
elif echo "$domain_verdict" | grep -qi "approve"; then
echo "Domain agent ($domain_agent): APPROVED (via comment verdict)"
elif echo "$domain_verdict" | grep -qi "request changes\|reject"; then
echo "BLOCK: Domain agent verdict: $domain_verdict"
if echo "$domain_verdict" | grep -q "VERDICT:${domain_key}:APPROVE"; then
echo "Domain agent ($domain_agent): APPROVED"
elif echo "$domain_verdict" | grep -q "VERDICT:${domain_key}:REQUEST_CHANGES"; then
echo "BLOCK: $domain_agent requested changes"
return 1
else
echo "BLOCK: Unclear domain agent verdict: $domain_verdict"
echo "BLOCK: No verdict marker found for $domain_agent"
return 1
fi
else
@ -403,11 +400,15 @@ Also check:
- Cross-domain connections that the proposer may have missed
Write your complete review to ${LEO_REVIEW_FILE}
Then post it with: gh pr review ${pr} --comment --body-file ${LEO_REVIEW_FILE}
If ALL claims pass quality gates: gh pr review ${pr} --approve --body-file ${LEO_REVIEW_FILE}
If ANY claim needs changes: gh pr review ${pr} --request-changes --body-file ${LEO_REVIEW_FILE}
CRITICAL — Verdict format: Your review MUST end with exactly one of these verdict markers (as an HTML comment on its own line):
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
Then post the review as an issue comment:
gh pr comment ${pr} --body-file ${LEO_REVIEW_FILE}
IMPORTANT: Use 'gh pr comment' NOT 'gh pr review'. We use a shared GitHub account so gh pr review --approve fails.
DO NOT merge — the orchestrator handles merge decisions after all reviews are posted.
Work autonomously. Do not ask for confirmation."
@ -432,6 +433,7 @@ Work autonomously. Do not ask for confirmation."
else
DOMAIN_REVIEW_FILE="/tmp/${DOMAIN_AGENT}-review-pr${pr}.md"
AGENT_NAME_UPPER=$(echo "${DOMAIN_AGENT}" | awk '{print toupper(substr($0,1,1)) substr($0,2)}')
AGENT_KEY_UPPER=$(echo "${DOMAIN_AGENT}" | tr '[:lower:]' '[:upper:]')
DOMAIN_PROMPT="You are ${AGENT_NAME_UPPER}. Read agents/${DOMAIN_AGENT}/identity.md, agents/${DOMAIN_AGENT}/beliefs.md, and skills/evaluate.md.
You are reviewing PR #${pr} as the domain expert for ${DOMAIN}.
@ -452,8 +454,15 @@ Your review focuses on DOMAIN EXPERTISE — things only a ${DOMAIN} specialist w
6. **Confidence calibration** — From your domain expertise, is the confidence level right?
Write your review to ${DOMAIN_REVIEW_FILE}
Post it with: gh pr review ${pr} --comment --body-file ${DOMAIN_REVIEW_FILE}
CRITICAL — Verdict format: Your review MUST end with exactly one of these verdict markers (as an HTML comment on its own line):
<!-- VERDICT:${AGENT_KEY_UPPER}:APPROVE -->
<!-- VERDICT:${AGENT_KEY_UPPER}:REQUEST_CHANGES -->
Then post the review as an issue comment:
gh pr comment ${pr} --body-file ${DOMAIN_REVIEW_FILE}
IMPORTANT: Use 'gh pr comment' NOT 'gh pr review'. We use a shared GitHub account so gh pr review --approve fails.
Sign your review as ${AGENT_NAME_UPPER} (domain reviewer for ${DOMAIN}).
DO NOT duplicate Leo's quality gate checks — he covers those.
DO NOT merge — the orchestrator handles merge decisions after all reviews are posted.
@ -486,7 +495,7 @@ Work autonomously. Do not ask for confirmation."
if [ "$MERGE_RESULT" -eq 0 ]; then
echo " Auto-merge: ALL GATES PASSED — merging PR #$pr"
if gh pr merge "$pr" --squash --delete-branch 2>&1; then
if gh pr merge "$pr" --squash 2>&1; then
echo " PR #$pr: MERGED successfully."
MERGED=$((MERGED + 1))
else

179
ops/extract-cron.sh Executable file
View file

@ -0,0 +1,179 @@
#!/bin/bash
# Extract claims from unprocessed sources in inbox/archive/
# Runs via cron on VPS every 15 minutes.
#
# Concurrency model:
# - Lockfile prevents overlapping runs
# - MAX_SOURCES=5 per cycle (works through backlog over multiple runs)
# - Sequential processing (one source at a time)
# - 50 sources landing at once = ~10 cron cycles to clear, not 50 parallel agents
#
# Domain routing:
# - Reads domain: field from source frontmatter
# - Maps to the domain agent (rio, clay, theseus, vida, astra, leo)
# - Runs extraction AS that agent — their territory, their extraction
# - Skips sources with status: processing (agent handling it themselves)
#
# Flow:
# 1. Pull latest main
# 2. Find sources with status: unprocessed (skip processing/processed/null-result)
# 3. For each: run Claude headless to extract claims as the domain agent
# 4. Commit extractions, push, open PR
# 5. Update source status to processed
#
# The eval pipeline (webhook.py) handles review and merge separately.
set -euo pipefail
REPO_DIR="/opt/teleo-eval/workspaces/extract"
REPO_URL="http://m3taversal:$(cat /opt/teleo-eval/secrets/forgejo-admin-token)@localhost:3000/teleo/teleo-codex.git"
CLAUDE_BIN="/home/teleo/.local/bin/claude"
LOG_DIR="/opt/teleo-eval/logs"
LOG="$LOG_DIR/extract-cron.log"
LOCKFILE="/tmp/extract-cron.lock"
MAX_SOURCES=5 # Process at most 5 sources per run to limit cost
log() { echo "[$(date -Iseconds)] $*" >> "$LOG"; }
# --- Lock ---
if [ -f "$LOCKFILE" ]; then
pid=$(cat "$LOCKFILE" 2>/dev/null)
if kill -0 "$pid" 2>/dev/null; then
log "SKIP: already running (pid $pid)"
exit 0
fi
log "WARN: stale lockfile, removing"
rm -f "$LOCKFILE"
fi
echo $$ > "$LOCKFILE"
trap 'rm -f "$LOCKFILE"' EXIT
# --- Ensure repo clone ---
if [ ! -d "$REPO_DIR/.git" ]; then
log "Cloning repo..."
git clone "$REPO_URL" "$REPO_DIR" >> "$LOG" 2>&1
fi
cd "$REPO_DIR"
# --- Pull latest main ---
git checkout main >> "$LOG" 2>&1
git pull --rebase >> "$LOG" 2>&1
# --- Find unprocessed sources ---
UNPROCESSED=$(grep -rl '^status: unprocessed' inbox/archive/ 2>/dev/null | head -n "$MAX_SOURCES" || true)
if [ -z "$UNPROCESSED" ]; then
log "No unprocessed sources found"
exit 0
fi
COUNT=$(echo "$UNPROCESSED" | wc -l | tr -d ' ')
log "Found $COUNT unprocessed source(s)"
# --- Process each source ---
for SOURCE_FILE in $UNPROCESSED; do
SLUG=$(basename "$SOURCE_FILE" .md)
BRANCH="extract/$SLUG"
log "Processing: $SOURCE_FILE → branch $BRANCH"
# Create branch from main
git checkout main >> "$LOG" 2>&1
git branch -D "$BRANCH" 2>/dev/null || true
git checkout -b "$BRANCH" >> "$LOG" 2>&1
# Read domain from frontmatter
DOMAIN=$(grep '^domain:' "$SOURCE_FILE" | head -1 | sed 's/domain: *//' | tr -d '"' | tr -d "'" | xargs)
# Map domain to agent
case "$DOMAIN" in
internet-finance) AGENT="rio" ;;
entertainment) AGENT="clay" ;;
ai-alignment) AGENT="theseus" ;;
health) AGENT="vida" ;;
space-development) AGENT="astra" ;;
*) AGENT="leo" ;;
esac
AGENT_TOKEN=$(cat "/opt/teleo-eval/secrets/forgejo-${AGENT}-token" 2>/dev/null || cat /opt/teleo-eval/secrets/forgejo-leo-token)
log "Domain: $DOMAIN, Agent: $AGENT"
# Run Claude headless to extract claims
EXTRACT_PROMPT="You are $AGENT, a Teleo knowledge base agent. Extract claims from this source.
READ these files first:
- skills/extract.md (extraction process)
- schemas/claim.md (claim format)
- $SOURCE_FILE (the source to extract from)
Then scan domains/$DOMAIN/ to check for duplicate claims.
EXTRACT claims following the process in skills/extract.md:
1. Read the source completely
2. Separate evidence from interpretation
3. Extract candidate claims (specific, disagreeable, evidence-backed)
4. Check for duplicates against existing claims in domains/$DOMAIN/
5. Write claim files to domains/$DOMAIN/ with proper YAML frontmatter
6. Update $SOURCE_FILE: set status to 'processed', add processed_by: $AGENT, processed_date: $(date +%Y-%m-%d), and claims_extracted list
If no claims can be extracted, update $SOURCE_FILE: set status to 'null-result' and add notes explaining why.
IMPORTANT: Use the Edit tool to update the source file status. Use the Write tool to create new claim files. Do not create claims that duplicate existing ones."
# Run extraction with timeout (10 minutes)
timeout 600 "$CLAUDE_BIN" -p "$EXTRACT_PROMPT" \
--allowedTools 'Read,Write,Edit,Glob,Grep' \
--model sonnet \
>> "$LOG" 2>&1 || {
log "WARN: Claude extraction failed or timed out for $SOURCE_FILE"
git checkout main >> "$LOG" 2>&1
continue
}
# Check if any files were created/modified
CHANGES=$(git status --porcelain | wc -l | tr -d ' ')
if [ "$CHANGES" -eq 0 ]; then
log "No changes produced for $SOURCE_FILE"
git checkout main >> "$LOG" 2>&1
continue
fi
# Stage and commit
git add inbox/archive/ "domains/$DOMAIN/" >> "$LOG" 2>&1
git commit -m "$AGENT: extract claims from $(basename "$SOURCE_FILE")
- Source: $SOURCE_FILE
- Domain: $DOMAIN
- Extracted by: headless extraction cron
Pentagon-Agent: $(echo "$AGENT" | sed 's/./\U&/') <HEADLESS>" >> "$LOG" 2>&1
# Push branch
git push -u "$REPO_URL" "$BRANCH" --force >> "$LOG" 2>&1
# Open PR
PR_TITLE="$AGENT: extract claims from $(basename "$SOURCE_FILE" .md)"
PR_BODY="## Automated Extraction\n\nSource: \`$SOURCE_FILE\`\nDomain: $DOMAIN\nExtracted by: headless cron on VPS\n\nThis PR was created automatically by the extraction cron job. Claims were extracted using \`skills/extract.md\` process via Claude headless."
curl -s -X POST "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls" \
-H "Authorization: token $AGENT_TOKEN" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"$PR_TITLE\",
\"body\": \"$PR_BODY\",
\"base\": \"main\",
\"head\": \"$BRANCH\"
}" >> "$LOG" 2>&1
log "PR opened for $SOURCE_FILE"
# Back to main for next source
git checkout main >> "$LOG" 2>&1
# Brief pause between extractions
sleep 5
done
log "Extraction run complete: processed $COUNT source(s)"

520
ops/extract-graph-data.py Normal file
View file

@ -0,0 +1,520 @@
#!/usr/bin/env python3
"""
extract-graph-data.py Extract knowledge graph from teleo-codex markdown files.
Reads all .md claim/conviction files, parses YAML frontmatter and wiki-links,
and outputs graph-data.json matching the teleo-app GraphData interface.
Usage:
python3 ops/extract-graph-data.py [--output path/to/graph-data.json]
Must be run from the teleo-codex repo root.
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
SCAN_DIRS = ["core", "domains", "foundations", "convictions"]
# Only extract these content types (from frontmatter `type` field).
# If type is missing, include the file anyway (many claims lack explicit type).
INCLUDE_TYPES = {"claim", "conviction", "analysis", "belief", "position", None}
# Domain → default agent mapping (fallback when git attribution unavailable)
DOMAIN_AGENT_MAP = {
"internet-finance": "rio",
"entertainment": "clay",
"health": "vida",
"ai-alignment": "theseus",
"space-development": "astra",
"grand-strategy": "leo",
"mechanisms": "leo",
"living-capital": "leo",
"living-agents": "leo",
"teleohumanity": "leo",
"critical-systems": "leo",
"collective-intelligence": "leo",
"teleological-economics": "leo",
"cultural-dynamics": "clay",
}
DOMAIN_COLORS = {
"internet-finance": "#4A90D9",
"entertainment": "#9B59B6",
"health": "#2ECC71",
"ai-alignment": "#E74C3C",
"space-development": "#F39C12",
"grand-strategy": "#D4AF37",
"mechanisms": "#1ABC9C",
"living-capital": "#3498DB",
"living-agents": "#E67E22",
"teleohumanity": "#F1C40F",
"critical-systems": "#95A5A6",
"collective-intelligence": "#BDC3C7",
"teleological-economics": "#7F8C8D",
"cultural-dynamics": "#C0392B",
}
KNOWN_AGENTS = {"leo", "rio", "clay", "vida", "theseus", "astra"}
# Regex patterns
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---", re.DOTALL)
WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
YAML_FIELD_RE = re.compile(r"^(\w[\w_]*):\s*(.+)$", re.MULTILINE)
YAML_LIST_ITEM_RE = re.compile(r'^\s*-\s+"?(.+?)"?\s*$', re.MULTILINE)
COUNTER_EVIDENCE_RE = re.compile(r"^##\s+Counter[\s-]?evidence", re.MULTILINE | re.IGNORECASE)
COUNTERARGUMENT_RE = re.compile(r"^\*\*Counter\s*argument", re.MULTILINE | re.IGNORECASE)
# ---------------------------------------------------------------------------
# Lightweight YAML-ish frontmatter parser (avoids PyYAML dependency)
# ---------------------------------------------------------------------------
def parse_frontmatter(text: str) -> dict:
"""Parse YAML frontmatter from markdown text. Returns dict of fields."""
m = FRONTMATTER_RE.match(text)
if not m:
return {}
yaml_block = m.group(1)
result = {}
for field_match in YAML_FIELD_RE.finditer(yaml_block):
key = field_match.group(1)
val = field_match.group(2).strip().strip('"').strip("'")
# Handle list fields
if val.startswith("["):
# Inline YAML list: [item1, item2]
items = re.findall(r'"([^"]+)"', val)
if not items:
items = [x.strip().strip('"').strip("'")
for x in val.strip("[]").split(",") if x.strip()]
result[key] = items
else:
result[key] = val
# Handle multi-line list fields (depends_on, challenged_by, secondary_domains)
for list_key in ("depends_on", "challenged_by", "secondary_domains", "claims_extracted"):
if list_key not in result:
# Check for block-style list
pattern = re.compile(
rf"^{list_key}:\s*\n((?:\s+-\s+.+\n?)+)", re.MULTILINE
)
lm = pattern.search(yaml_block)
if lm:
items = YAML_LIST_ITEM_RE.findall(lm.group(1))
result[list_key] = [i.strip('"').strip("'") for i in items]
return result
def extract_body(text: str) -> str:
"""Return the markdown body after frontmatter."""
m = FRONTMATTER_RE.match(text)
if m:
return text[m.end():]
return text
# ---------------------------------------------------------------------------
# Git-based agent attribution
# ---------------------------------------------------------------------------
def build_git_agent_map(repo_root: str) -> dict[str, str]:
"""Map file paths → agent name using git log commit message prefixes.
Commit messages follow: '{agent}: description'
We use the commit that first added each file.
"""
file_agent = {}
try:
result = subprocess.run(
["git", "log", "--all", "--diff-filter=A", "--name-only",
"--format=COMMIT_MSG:%s"],
capture_output=True, text=True, cwd=repo_root, timeout=30,
)
current_agent = None
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
if line.startswith("COMMIT_MSG:"):
msg = line[len("COMMIT_MSG:"):]
# Parse "agent: description" pattern
if ":" in msg:
prefix = msg.split(":")[0].strip().lower()
if prefix in KNOWN_AGENTS:
current_agent = prefix
else:
current_agent = None
else:
current_agent = None
elif current_agent and line.endswith(".md"):
# Only set if not already attributed (first add wins)
if line not in file_agent:
file_agent[line] = current_agent
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return file_agent
# ---------------------------------------------------------------------------
# Wiki-link resolution
# ---------------------------------------------------------------------------
def build_title_index(all_files: list[str], repo_root: str) -> dict[str, str]:
"""Map lowercase claim titles → file paths for wiki-link resolution."""
index = {}
for fpath in all_files:
# Title = filename without .md extension
fname = os.path.basename(fpath)
if fname.endswith(".md"):
title = fname[:-3].lower()
index[title] = fpath
# Also index by relative path
index[fpath.lower()] = fpath
return index
def resolve_wikilink(link_text: str, title_index: dict, source_dir: str) -> str | None:
"""Resolve a [[wiki-link]] target to a file path (node ID)."""
text = link_text.strip()
# Skip map links and non-claim references
if text.startswith("_") or text == "_map":
return None
# Direct path match (with or without .md)
for candidate in [text, text + ".md"]:
if candidate.lower() in title_index:
return title_index[candidate.lower()]
# Title-only match
title = text.lower()
if title in title_index:
return title_index[title]
# Fuzzy: try adding .md to the basename
basename = os.path.basename(text)
if basename.lower() in title_index:
return title_index[basename.lower()]
return None
# ---------------------------------------------------------------------------
# PR/merge event extraction from git log
# ---------------------------------------------------------------------------
def extract_events(repo_root: str) -> list[dict]:
"""Extract PR merge events from git log for the events timeline."""
events = []
try:
result = subprocess.run(
["git", "log", "--merges", "--format=%H|%s|%ai", "-50"],
capture_output=True, text=True, cwd=repo_root, timeout=15,
)
for line in result.stdout.strip().splitlines():
parts = line.split("|", 2)
if len(parts) < 3:
continue
sha, msg, date_str = parts
# Parse "Merge pull request #N from ..." or agent commit patterns
pr_match = re.search(r"#(\d+)", msg)
if not pr_match:
continue
pr_num = int(pr_match.group(1))
# Try to determine agent from merge commit
agent = "collective"
for a in KNOWN_AGENTS:
if a in msg.lower():
agent = a
break
# Count files changed in this merge
diff_result = subprocess.run(
["git", "diff", "--name-only", f"{sha}^..{sha}"],
capture_output=True, text=True, cwd=repo_root, timeout=10,
)
claims_added = sum(
1 for f in diff_result.stdout.splitlines()
if f.endswith(".md") and any(f.startswith(d) for d in SCAN_DIRS)
)
if claims_added > 0:
events.append({
"type": "pr-merge",
"number": pr_num,
"agent": agent,
"claims_added": claims_added,
"date": date_str[:10],
})
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return events
# ---------------------------------------------------------------------------
# Main extraction
# ---------------------------------------------------------------------------
def find_markdown_files(repo_root: str) -> list[str]:
"""Find all .md files in SCAN_DIRS, return relative paths."""
files = []
for scan_dir in SCAN_DIRS:
dirpath = os.path.join(repo_root, scan_dir)
if not os.path.isdir(dirpath):
continue
for root, _dirs, filenames in os.walk(dirpath):
for fname in filenames:
if fname.endswith(".md") and not fname.startswith("_"):
rel = os.path.relpath(os.path.join(root, fname), repo_root)
files.append(rel)
return sorted(files)
def _get_domain_cached(fpath: str, repo_root: str, cache: dict) -> str:
"""Get the domain of a file, caching results."""
if fpath in cache:
return cache[fpath]
abs_path = os.path.join(repo_root, fpath)
domain = ""
try:
text = open(abs_path, encoding="utf-8").read()
fm = parse_frontmatter(text)
domain = fm.get("domain", "")
except (OSError, UnicodeDecodeError):
pass
cache[fpath] = domain
return domain
def extract_graph(repo_root: str) -> dict:
"""Extract the full knowledge graph from the codex."""
all_files = find_markdown_files(repo_root)
git_agents = build_git_agent_map(repo_root)
title_index = build_title_index(all_files, repo_root)
domain_cache: dict[str, str] = {}
nodes = []
edges = []
node_ids = set()
all_files_set = set(all_files)
for fpath in all_files:
abs_path = os.path.join(repo_root, fpath)
try:
text = open(abs_path, encoding="utf-8").read()
except (OSError, UnicodeDecodeError):
continue
fm = parse_frontmatter(text)
body = extract_body(text)
# Filter by type
ftype = fm.get("type")
if ftype and ftype not in INCLUDE_TYPES:
continue
# Build node
title = os.path.basename(fpath)[:-3] # filename without .md
domain = fm.get("domain", "")
if not domain:
# Infer domain from directory path
parts = fpath.split(os.sep)
if len(parts) >= 2:
domain = parts[1] if parts[0] == "domains" else parts[1] if len(parts) > 2 else parts[0]
# Agent attribution: git log → domain mapping → "collective"
agent = git_agents.get(fpath, "")
if not agent:
agent = DOMAIN_AGENT_MAP.get(domain, "collective")
created = fm.get("created", "")
confidence = fm.get("confidence", "speculative")
# Detect challenged status
challenged_by_raw = fm.get("challenged_by", [])
if isinstance(challenged_by_raw, str):
challenged_by_raw = [challenged_by_raw] if challenged_by_raw else []
has_challenged_by = bool(challenged_by_raw and any(c for c in challenged_by_raw))
has_counter_section = bool(COUNTER_EVIDENCE_RE.search(body) or COUNTERARGUMENT_RE.search(body))
is_challenged = has_challenged_by or has_counter_section
# Extract challenge descriptions for the node
challenges = []
if isinstance(challenged_by_raw, list):
for c in challenged_by_raw:
if c and isinstance(c, str):
# Strip wiki-link syntax for display
cleaned = WIKILINK_RE.sub(lambda m: m.group(1), c)
# Strip markdown list artifacts: leading "- ", surrounding quotes
cleaned = re.sub(r'^-\s*', '', cleaned).strip()
cleaned = cleaned.strip('"').strip("'").strip()
if cleaned:
challenges.append(cleaned[:200]) # cap length
node = {
"id": fpath,
"title": title,
"domain": domain,
"agent": agent,
"created": created,
"confidence": confidence,
"challenged": is_challenged,
}
if challenges:
node["challenges"] = challenges
nodes.append(node)
node_ids.add(fpath)
domain_cache[fpath] = domain # cache for edge lookups
for link_text in WIKILINK_RE.findall(body):
target = resolve_wikilink(link_text, title_index, os.path.dirname(fpath))
if target and target != fpath and target in all_files_set:
target_domain = _get_domain_cached(target, repo_root, domain_cache)
edges.append({
"source": fpath,
"target": target,
"type": "wiki-link",
"cross_domain": domain != target_domain and bool(target_domain),
})
# Conflict edges from challenged_by (may contain [[wiki-links]] or prose)
challenged_by = fm.get("challenged_by", [])
if isinstance(challenged_by, str):
challenged_by = [challenged_by]
if isinstance(challenged_by, list):
for challenge in challenged_by:
if not challenge:
continue
# Check for embedded wiki-links
for link_text in WIKILINK_RE.findall(challenge):
target = resolve_wikilink(link_text, title_index, os.path.dirname(fpath))
if target and target != fpath and target in all_files_set:
target_domain = _get_domain_cached(target, repo_root, domain_cache)
edges.append({
"source": fpath,
"target": target,
"type": "conflict",
"cross_domain": domain != target_domain and bool(target_domain),
})
# Deduplicate edges
seen_edges = set()
unique_edges = []
for e in edges:
key = (e["source"], e["target"], e.get("type", ""))
if key not in seen_edges:
seen_edges.add(key)
unique_edges.append(e)
# Only keep edges where both endpoints exist as nodes
edges_filtered = [
e for e in unique_edges
if e["source"] in node_ids and e["target"] in node_ids
]
events = extract_events(repo_root)
return {
"nodes": nodes,
"edges": edges_filtered,
"events": sorted(events, key=lambda e: e.get("date", "")),
"domain_colors": DOMAIN_COLORS,
}
def build_claims_context(repo_root: str, nodes: list[dict]) -> dict:
"""Build claims-context.json for chat system prompt injection.
Produces a lightweight claim index: title + description + domain + agent + confidence.
Sorted by domain, then alphabetically within domain.
Target: ~37KB for ~370 claims. Truncates descriptions at 100 chars if total > 100KB.
"""
claims = []
for node in nodes:
fpath = node["id"]
abs_path = os.path.join(repo_root, fpath)
description = ""
try:
text = open(abs_path, encoding="utf-8").read()
fm = parse_frontmatter(text)
description = fm.get("description", "")
except (OSError, UnicodeDecodeError):
pass
claims.append({
"title": node["title"],
"description": description,
"domain": node["domain"],
"agent": node["agent"],
"confidence": node["confidence"],
})
# Sort by domain, then title
claims.sort(key=lambda c: (c["domain"], c["title"]))
context = {
"generated": datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"claimCount": len(claims),
"claims": claims,
}
# Progressive description truncation if over 100KB.
# Never drop descriptions entirely — short descriptions are better than none.
for max_desc in (120, 100, 80, 60):
test_json = json.dumps(context, ensure_ascii=False)
if len(test_json) <= 100_000:
break
for c in claims:
if len(c["description"]) > max_desc:
c["description"] = c["description"][:max_desc] + "..."
return context
def main():
parser = argparse.ArgumentParser(description="Extract graph data from teleo-codex")
parser.add_argument("--output", "-o", default="graph-data.json",
help="Output file path (default: graph-data.json)")
parser.add_argument("--context-output", "-c", default=None,
help="Output claims-context.json path (default: same dir as --output)")
parser.add_argument("--repo", "-r", default=".",
help="Path to teleo-codex repo root (default: current dir)")
args = parser.parse_args()
repo_root = os.path.abspath(args.repo)
if not os.path.isdir(os.path.join(repo_root, "core")):
print(f"Error: {repo_root} doesn't look like a teleo-codex repo (no core/ dir)", file=sys.stderr)
sys.exit(1)
print(f"Scanning {repo_root}...")
graph = extract_graph(repo_root)
print(f" Nodes: {len(graph['nodes'])}")
print(f" Edges: {len(graph['edges'])}")
print(f" Events: {len(graph['events'])}")
challenged_count = sum(1 for n in graph["nodes"] if n.get("challenged"))
print(f" Challenged: {challenged_count}")
# Write graph-data.json
output_path = os.path.abspath(args.output)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(graph, f, indent=2, ensure_ascii=False)
size_kb = os.path.getsize(output_path) / 1024
print(f" graph-data.json: {output_path} ({size_kb:.1f} KB)")
# Write claims-context.json
context_path = args.context_output
if not context_path:
context_path = os.path.join(os.path.dirname(output_path), "claims-context.json")
context_path = os.path.abspath(context_path)
context = build_claims_context(repo_root, graph["nodes"])
with open(context_path, "w", encoding="utf-8") as f:
json.dump(context, f, indent=2, ensure_ascii=False)
ctx_kb = os.path.getsize(context_path) / 1024
print(f" claims-context.json: {context_path} ({ctx_kb:.1f} KB)")
if __name__ == "__main__":
main()

201
skills/ingest.md Normal file
View file

@ -0,0 +1,201 @@
# Skill: Ingest
Research your domain, find source material, and archive it in inbox/. You choose whether to extract claims yourself or let the VPS handle it.
**Archive everything.** The inbox is a library, not a filter. If it's relevant to any Teleo domain, archive it. Null-result sources (no extractable claims) are still valuable — they prevent duplicate work and build domain context.
## Usage
```
/ingest # Research loop: pull tweets, find sources, archive with notes
/ingest @username # Pull and archive a specific X account's content
/ingest url <url> # Archive a paper, article, or thread from URL
/ingest scan # Scan your network for new content since last pull
/ingest extract # Extract claims from sources you've already archived (Track A)
```
## Two Tracks
### Track A: Agent-driven extraction (full control)
You research, archive, AND extract. You see exactly what you're proposing before it goes up.
1. Archive sources with `status: processing`
2. Extract claims yourself using `skills/extract.md`
3. Open a PR with both source archives and claim files
4. Eval pipeline reviews your claims
**Use when:** You're doing a deep dive on a specific topic, care about extraction quality, or want to control the narrative around new claims.
### Track B: VPS extraction (hands-off)
You research and archive. The VPS extracts headlessly.
1. Archive sources with `status: unprocessed`
2. Push source-only PR (merges fast — no claim changes)
3. VPS cron picks up unprocessed sources every 15 minutes
4. Extracts claims via Claude headless, opens a separate PR
5. Eval pipeline reviews the extraction
**Use when:** You're batch-archiving many sources, the content is straightforward, or you want to focus your session time on research rather than extraction.
### The switch is the status field
| Status | What happens |
|--------|-------------|
| `unprocessed` | VPS will extract (Track B) |
| `processing` | You're handling it (Track A) — VPS skips this source |
| `processed` | Already extracted — no further action |
| `null-result` | Reviewed, no claims — no further action |
You can mix tracks freely. Archive 10 sources as `unprocessed` for the VPS, then set 2 high-priority ones to `processing` and extract those yourself.
## Prerequisites
- API key at `~/.pentagon/secrets/twitterapi-io-key`
- Your network file at `~/.pentagon/workspace/collective/x-ingestion/{your-name}-network.json`
- Forgejo token at `~/.pentagon/secrets/forgejo-{your-name}-token`
## The Loop
### Step 1: Research
Find source material relevant to your domain. Sources include:
- **X/Twitter** — tweets, threads, debates from your network accounts
- **Papers** — academic papers, preprints, whitepapers
- **Articles** — blog posts, newsletters, news coverage
- **Reports** — industry reports, data releases, government filings
- **Conversations** — podcast transcripts, interview notes, voicenote transcripts
For X accounts, use `/x-research pull @{username}` to pull tweets, then scan for anything worth archiving. Don't just archive the "best" tweets — archive anything substantive. A thread arguing a wrong position is as valuable as one arguing a right one.
### Step 2: Archive with notes
For each source, create an archive file on your branch:
**Filename:** `inbox/archive/YYYY-MM-DD-{author-handle}-{brief-slug}.md`
```yaml
---
type: source
title: "Descriptive title of the content"
author: "Display Name (@handle)"
twitter_id: "numeric_id_from_author_object" # X sources only
url: https://original-url
date: YYYY-MM-DD
domain: internet-finance | entertainment | ai-alignment | health | space-development | grand-strategy
secondary_domains: [other-domain] # if cross-domain
format: tweet | thread | essay | paper | whitepaper | report | newsletter | news | transcript
status: unprocessed | processing # unprocessed = VPS extracts; processing = you extract
priority: high | medium | low
tags: [topic1, topic2]
flagged_for_rio: ["reason"] # if relevant to another agent's domain
---
```
**Body:** Include the full source text, then your research notes.
```markdown
## Content
[Full text of tweet/thread/article. For long papers, include abstract + key sections.]
## Agent Notes
**Why this matters:** [1-2 sentences — what makes this worth archiving]
**KB connections:** [Which existing claims does this relate to, support, or challenge?]
**Extraction hints:** [What claims might the extractor pull from this? Flag specific passages.]
**Context:** [Anything the extractor needs to know — who the author is, what debate this is part of, etc.]
```
The "Agent Notes" section is critical for Track B. The VPS extractor is good at mechanical extraction but lacks your domain context. Your notes guide it. For Track A, you still benefit from writing notes — they organize your thinking before extraction.
### Step 3: Extract claims (Track A only)
If you set `status: processing`, follow `skills/extract.md`:
1. Read the source completely
2. Separate evidence from interpretation
3. Extract candidate claims (specific, disagreeable, evidence-backed)
4. Check for duplicates against existing KB
5. Write claim files to `domains/{your-domain}/`
6. Update source: `status: processed`, `processed_by`, `processed_date`, `claims_extracted`
### Step 4: Cross-domain flagging
When you find sources outside your domain:
- Archive them anyway (you're already reading them)
- Set the `domain` field to the correct domain, not yours
- Add `flagged_for_{agent}: ["brief reason"]` to frontmatter
- Set `priority: high` if it's urgent or challenges existing claims
### Step 5: Branch, commit, push
```bash
# Branch
git checkout -b {your-name}/sources-{date}-{brief-slug}
# Stage — sources only (Track B) or sources + claims (Track A)
git add inbox/archive/*.md
git add domains/{your-domain}/*.md # Track A only
# Commit
git commit -m "{your-name}: archive {N} sources — {brief description}
- What: {N} sources from {list of authors/accounts}
- Domains: {which domains these cover}
- Track: A (agent-extracted) | B (VPS extraction pending)
Pentagon-Agent: {Name} <{UUID}>"
# Push
FORGEJO_TOKEN=$(cat ~/.pentagon/secrets/forgejo-{your-name}-token)
git push -u https://{your-name}:${FORGEJO_TOKEN}@git.livingip.xyz/teleo/teleo-codex.git {branch-name}
```
Open a PR:
```bash
curl -s -X POST "https://git.livingip.xyz/api/v1/repos/teleo/teleo-codex/pulls" \
-H "Authorization: token ${FORGEJO_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"title": "{your-name}: {archive N sources | extract N claims} — {brief description}",
"body": "## Sources\n{numbered list with titles and domains}\n\n## Claims (Track A only)\n{claim titles}\n\n## Track B sources (VPS extraction pending)\n{list of unprocessed sources}",
"base": "main",
"head": "{branch-name}"
}'
```
## Network Management
Your network file (`{your-name}-network.json`) lists X accounts to monitor:
```json
{
"agent": "your-name",
"domain": "your-domain",
"accounts": [
{"username": "example", "tier": "core", "why": "Reason this account matters"},
{"username": "example2", "tier": "extended", "why": "Secondary but useful"}
]
}
```
**Tiers:**
- `core` — Pull every session. High signal-to-noise.
- `extended` — Pull weekly or when specifically relevant.
- `watch` — Pull once to evaluate, then promote or drop.
Agents without a network file should create one as their first task. Start with 5-10 seed accounts.
## Quality Controls
- **Archive everything substantive.** Don't self-censor. The extractor decides what yields claims.
- **Write good notes.** Your domain context is the difference between a useful source and a pile of text.
- **Check for duplicates.** Don't re-archive sources already in `inbox/archive/`.
- **Flag cross-domain.** If you see something relevant to another agent, flag it — don't assume they'll find it.
- **Log API costs.** Every X pull gets logged to `~/.pentagon/workspace/collective/x-ingestion/pull-log.jsonl`.
- **Source diversity.** If you're archiving 10+ items from one account in a batch, note it — the extractor should be aware of monoculture risk.