teleo-codex/ops/extract-cron.sh

248 lines
8.7 KiB
Bash
Executable file

#!/bin/bash
# Extract claims from unprocessed sources in inbox/archive/
# Runs via cron on VPS every 15 minutes.
#
# Concurrency model:
# - Lockfile prevents overlapping runs
# - MAX_SOURCES=5 per cycle (works through backlog over multiple runs)
# - Sequential processing (one source at a time)
# - 50 sources landing at once = ~10 cron cycles to clear, not 50 parallel agents
#
# Domain routing:
# - Reads domain: field from source frontmatter
# - Maps to the domain agent (rio, clay, theseus, vida, astra, leo)
# - Runs extraction AS that agent — their territory, their extraction
# - Skips sources with status: processing (agent handling it themselves)
#
# Flow:
# 1. Pull latest main
# 2. Find sources with status: unprocessed (skip processing/processed/null-result)
# 3. For each: run Claude headless to extract claims as the domain agent
# 4. Mark source as processing on main (prevents re-processing next cycle)
# 5. Commit extractions on branch, push, open PR
# 6. Eval pipeline reviews the extraction PR separately
#
# The eval pipeline (webhook.py) handles review and merge separately.
set -euo pipefail
REPO_DIR="/opt/teleo-eval/workspaces/extract"
FORGEJO_URL="http://localhost:3000"
FORGEJO_ADMIN_TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-admin-token)
CLAUDE_BIN="/home/teleo/.local/bin/claude"
LOG_DIR="/opt/teleo-eval/logs"
LOG="$LOG_DIR/extract-cron.log"
LOCKFILE="/tmp/extract-cron.lock"
PENDING_FILE="/opt/teleo-eval/extract-pending.txt"
MAX_SOURCES=5 # Process at most 5 sources per run
log() { echo "[$(date -Iseconds)] $*" >> "$LOG"; }
# --- Lock ---
if [ -f "$LOCKFILE" ]; then
pid=$(cat "$LOCKFILE" 2>/dev/null)
if kill -0 "$pid" 2>/dev/null; then
log "SKIP: already running (pid $pid)"
exit 0
fi
log "WARN: stale lockfile, removing"
rm -f "$LOCKFILE"
fi
echo $$ > "$LOCKFILE"
trap 'rm -f "$LOCKFILE"' EXIT
# --- Init pending file ---
touch "$PENDING_FILE"
# --- Ensure repo clone ---
if [ ! -d "$REPO_DIR/.git" ]; then
log "Cloning repo..."
git -c http.extraHeader="Authorization: token $FORGEJO_ADMIN_TOKEN" \
clone "${FORGEJO_URL}/teleo/teleo-codex.git" "$REPO_DIR" >> "$LOG" 2>&1
fi
cd "$REPO_DIR"
# Configure git auth via credential helper (keeps tokens out of logs)
git config credential.helper "!f() { echo username=m3taversal; echo password=$FORGEJO_ADMIN_TOKEN; }; f"
git remote set-url origin "${FORGEJO_URL}/teleo/teleo-codex.git" 2>/dev/null || true
# --- Pull latest main ---
git checkout main >> "$LOG" 2>&1
git pull --rebase >> "$LOG" 2>&1
# --- Find unprocessed sources ---
# Only match status: unprocessed within YAML frontmatter (between first two --- lines)
UNPROCESSED=$(awk '/^---$/{f++} f==1 && /^status: unprocessed/{print FILENAME; nextfile}' inbox/archive/*.md 2>/dev/null || true)
# Filter out sources already pending extraction
if [ -s "$PENDING_FILE" ]; then
UNPROCESSED=$(echo "$UNPROCESSED" | grep -vxFf "$PENDING_FILE" || true)
fi
# Limit to MAX_SOURCES
UNPROCESSED=$(echo "$UNPROCESSED" | head -n "$MAX_SOURCES")
if [ -z "$UNPROCESSED" ]; then
log "No unprocessed sources found"
exit 0
fi
COUNT=$(echo "$UNPROCESSED" | wc -l | tr -d ' ')
log "Found $COUNT unprocessed source(s)"
# --- Process each source ---
for SOURCE_FILE in $UNPROCESSED; do
SLUG=$(basename "$SOURCE_FILE" .md)
BRANCH="extract/$SLUG"
log "Processing: $SOURCE_FILE → branch $BRANCH"
# Mark as pending (prevents re-processing on next cron cycle)
echo "$SOURCE_FILE" >> "$PENDING_FILE"
# Create branch from main
git checkout main >> "$LOG" 2>&1
git branch -D "$BRANCH" 2>/dev/null || true
git checkout -b "$BRANCH" >> "$LOG" 2>&1
# Read domain from frontmatter
DOMAIN=$(awk '/^---$/{f++} f==1 && /^domain:/{sub(/^domain: */, ""); gsub(/["'"'"']/, ""); print; exit}' "$SOURCE_FILE")
# Map domain to agent
case "$DOMAIN" in
internet-finance) AGENT="rio" ;;
entertainment) AGENT="clay" ;;
ai-alignment) AGENT="theseus" ;;
health) AGENT="vida" ;;
space-development) AGENT="astra" ;;
*) AGENT="leo" ;;
esac
AGENT_TOKEN=$(cat "/opt/teleo-eval/secrets/forgejo-${AGENT}-token" 2>/dev/null || cat /opt/teleo-eval/secrets/forgejo-leo-token)
log "Domain: $DOMAIN, Agent: $AGENT"
# Run Claude headless to extract claims
EXTRACT_PROMPT="You are $AGENT, a Teleo knowledge base agent. Extract claims from this source.
READ these files first:
- skills/extract.md (extraction process)
- schemas/claim.md (claim format)
- $SOURCE_FILE (the source to extract from)
Then scan domains/$DOMAIN/ to check for duplicate claims.
EXTRACT claims following the process in skills/extract.md:
1. Read the source completely
2. Separate evidence from interpretation
3. Extract candidate claims (specific, disagreeable, evidence-backed)
4. Check for duplicates against existing claims in domains/$DOMAIN/
5. Write claim files to domains/$DOMAIN/ with proper YAML frontmatter
6. Update $SOURCE_FILE: set status to 'processed', add processed_by: $AGENT, processed_date: $(date +%Y-%m-%d), and claims_extracted list
If no claims can be extracted, update $SOURCE_FILE: set status to 'null-result' and add notes explaining why.
IMPORTANT: Use the Edit tool to update the source file status. Use the Write tool to create new claim files. Do not create claims that duplicate existing ones."
# Run extraction with timeout (10 minutes)
timeout 600 "$CLAUDE_BIN" -p "$EXTRACT_PROMPT" \
--allowedTools 'Read,Write,Edit,Glob,Grep' \
--model sonnet \
--permission-mode bypassPermissions \
>> "$LOG" 2>&1 || {
log "WARN: Claude extraction failed or timed out for $SOURCE_FILE"
git checkout main >> "$LOG" 2>&1
continue
}
# Check if any files were created/modified
CHANGED_FILES=$(git status --porcelain)
if [ -z "$CHANGED_FILES" ]; then
log "No changes produced for $SOURCE_FILE"
git checkout main >> "$LOG" 2>&1
continue
fi
# Stage only files in expected paths
git status --porcelain | awk '{print $2}' | while read -r f; do
case "$f" in
inbox/archive/*|domains/*)
git add "$f" >> "$LOG" 2>&1
;;
*)
log "WARN: Unexpected file change outside inbox/domains: $f — skipping"
;;
esac
done
# Check if anything was staged
if git diff --cached --quiet; then
log "No valid changes to commit for $SOURCE_FILE"
git checkout -- . >> "$LOG" 2>&1
git checkout main >> "$LOG" 2>&1
continue
fi
AGENT_UPPER=$(echo "$AGENT" | sed 's/./\U&/')
git commit -m "$AGENT: extract claims from $(basename "$SOURCE_FILE")
- Source: $SOURCE_FILE
- Domain: $DOMAIN
- Extracted by: headless extraction cron
Pentagon-Agent: $AGENT_UPPER <HEADLESS>" >> "$LOG" 2>&1
# Push branch
git push -u origin "$BRANCH" --force >> "$LOG" 2>&1
# Check if PR already exists for this branch
EXISTING_PR=$(curl -s "${FORGEJO_URL}/api/v1/repos/teleo/teleo-codex/pulls?state=open" \
-H "Authorization: token $AGENT_TOKEN" \
| jq -r ".[] | select(.head.ref == \"$BRANCH\") | .number" 2>/dev/null)
if [ -n "$EXISTING_PR" ]; then
log "PR already exists for $BRANCH (#$EXISTING_PR), skipping creation"
else
# Build PR JSON safely with jq
PR_JSON=$(jq -n \
--arg title "$AGENT: extract claims from $(basename "$SOURCE_FILE" .md)" \
--arg body "## Automated Extraction
Source: \`$SOURCE_FILE\`
Domain: $DOMAIN
Extracted by: headless cron on VPS
This PR was created automatically by the extraction cron job. Claims were extracted using \`skills/extract.md\` process via Claude headless." \
--arg base "main" \
--arg head "$BRANCH" \
'{title: $title, body: $body, base: $base, head: $head}')
curl -s -X POST "${FORGEJO_URL}/api/v1/repos/teleo/teleo-codex/pulls" \
-H "Authorization: token $AGENT_TOKEN" \
-H "Content-Type: application/json" \
-d "$PR_JSON" >> "$LOG" 2>&1
log "PR opened for $SOURCE_FILE"
fi
# Back to main for next source
git checkout main >> "$LOG" 2>&1
# Brief pause between extractions
sleep 5
done
# Clean up pending file — remove entries for sources that have been processed
# (their PRs exist or their status changed on main)
if [ -f "$PENDING_FILE" ]; then
TEMP_PENDING=$(mktemp)
while IFS= read -r pending_source; do
if [ -f "$pending_source" ] && grep -q '^status: unprocessed' "$pending_source" 2>/dev/null; then
echo "$pending_source" >> "$TEMP_PENDING"
fi
done < "$PENDING_FILE"
mv "$TEMP_PENDING" "$PENDING_FILE"
fi
log "Extraction run complete: processed $COUNT source(s)"