Imports 67 files from VPS (/opt/teleo-eval/) into repo as the single source of truth. Previously only 8 of 67 files existed in repo — the rest were deployed directly to VPS via SCP, causing massive drift. Includes: - pipeline/lib/: 33 Python modules (daemon core, extraction, evaluation, merge, cascade, cross-domain, costs, attribution, etc.) - pipeline/: main daemon (teleo-pipeline.py), reweave.py, batch-extract-50.sh - diagnostics/: 19 files (4-page dashboard, alerting, daily digest, review queue, tier1 metrics) - agent-state/: bootstrap, lib-state, cascade inbox processor, schema - systemd/: service unit files for reference - deploy.sh: rsync-based deploy with --dry-run, syntax checks, dirty-tree gate - research-session.sh: updated with Step 8.5 digest + cascade inbox processing No new code written — all files are exact copies from VPS as of 2026-04-06. From this point forward: edit in repo, commit, then deploy.sh. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
283 lines
12 KiB
Bash
Executable file
283 lines
12 KiB
Bash
Executable file
#!/bin/bash
|
|
# Batch extract sources from inbox/queue/ — v3 with two-gate skip logic
|
|
#
|
|
# Uses separate extract/ worktree (not main/ — prevents daemon race condition).
|
|
# Skip logic uses two checks instead of local marker files (Ganymede v3 review):
|
|
# Gate 1: Is source already in archive/{domain}/? → already processed, dedup
|
|
# Gate 2: Does extraction branch exist on Forgejo? → extraction in progress
|
|
# Gate 3: Does pipeline.db show ≥3 closed PRs for this source? → zombie, skip
|
|
# Gate 4: Does pipeline.db show active OR recently closed PR? → skip (4h cooldown)
|
|
# All gates pass → extract
|
|
#
|
|
# Architecture: Ganymede (two-gate) + Rhea (separate worktrees)
|
|
|
|
REPO=/opt/teleo-eval/workspaces/extract
|
|
MAIN_REPO=/opt/teleo-eval/workspaces/main
|
|
EXTRACT=/opt/teleo-eval/openrouter-extract-v2.py
|
|
CLEANUP=/opt/teleo-eval/post-extract-cleanup.py
|
|
LOG=/opt/teleo-eval/logs/batch-extract-50.log
|
|
DB=/opt/teleo-eval/pipeline/pipeline.db
|
|
TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-leo-token)
|
|
FORGEJO_URL="http://localhost:3000"
|
|
MAX=50
|
|
MAX_CLOSED=3 # zombie retry limit: skip source after this many closed PRs
|
|
COUNT=0
|
|
SUCCESS=0
|
|
FAILED=0
|
|
SKIPPED=0
|
|
|
|
# Lockfile to prevent concurrent runs
|
|
LOCKFILE="/tmp/batch-extract.lock"
|
|
if [ -f "$LOCKFILE" ]; then
|
|
pid=$(cat "$LOCKFILE" 2>/dev/null)
|
|
if kill -0 "$pid" 2>/dev/null; then
|
|
echo "[$(date)] SKIP: batch extract already running (pid $pid)" >> $LOG
|
|
exit 0
|
|
fi
|
|
rm -f "$LOCKFILE"
|
|
fi
|
|
echo $$ > "$LOCKFILE"
|
|
trap 'rm -f "$LOCKFILE"' EXIT
|
|
|
|
echo "[$(date)] Starting batch extraction of $MAX sources" >> $LOG
|
|
|
|
cd $REPO || exit 1
|
|
|
|
# Bug fix: don't swallow errors on critical git commands (Ganymede review)
|
|
git fetch origin main >> $LOG 2>&1 || { echo "[$(date)] FATAL: fetch origin main failed" >> $LOG; exit 1; }
|
|
git checkout -f main >> $LOG 2>&1 || { echo "[$(date)] FATAL: checkout main failed" >> $LOG; exit 1; }
|
|
git reset --hard origin/main >> $LOG 2>&1 || { echo "[$(date)] FATAL: reset --hard failed" >> $LOG; exit 1; }
|
|
|
|
# SHA canary: verify extract worktree matches origin/main (Ganymede review)
|
|
LOCAL_SHA=$(git rev-parse HEAD)
|
|
REMOTE_SHA=$(git rev-parse origin/main)
|
|
if [ "$LOCAL_SHA" != "$REMOTE_SHA" ]; then
|
|
echo "[$(date)] FATAL: extract worktree diverged from main ($LOCAL_SHA vs $REMOTE_SHA)" >> $LOG
|
|
exit 1
|
|
fi
|
|
|
|
# Pre-extraction cleanup: remove queue files that already exist in archive
|
|
# This runs on the MAIN worktree (not extract/) so deletions are committed to git.
|
|
# Prevents the "queue duplicate reappears after reset --hard" problem.
|
|
CLEANED=0
|
|
for qfile in $MAIN_REPO/inbox/queue/*.md; do
|
|
[ -f "$qfile" ] || continue
|
|
qbase=$(basename "$qfile")
|
|
if find "$MAIN_REPO/inbox/archive" -name "$qbase" 2>/dev/null | grep -q .; then
|
|
rm -f "$qfile"
|
|
CLEANED=$((CLEANED + 1))
|
|
fi
|
|
done
|
|
if [ "$CLEANED" -gt 0 ]; then
|
|
echo "[$(date)] Cleaned $CLEANED stale queue duplicates" >> $LOG
|
|
cd $MAIN_REPO
|
|
git add -A inbox/queue/ 2>/dev/null
|
|
git commit -m "pipeline: clean $CLEANED stale queue duplicates
|
|
|
|
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>" 2>/dev/null
|
|
# Push with retry
|
|
for attempt in 1 2 3; do
|
|
git pull --rebase origin main 2>/dev/null
|
|
git push origin main 2>/dev/null && break
|
|
sleep 2
|
|
done
|
|
cd $REPO
|
|
git fetch origin main 2>/dev/null
|
|
git reset --hard origin/main 2>/dev/null
|
|
fi
|
|
|
|
# Get sources in queue
|
|
SOURCES=$(ls inbox/queue/*.md 2>/dev/null | head -$MAX)
|
|
|
|
# Batch fetch all remote branches once (Ganymede: 1 call instead of 84)
|
|
REMOTE_BRANCHES=$(git ls-remote --heads origin 2>/dev/null)
|
|
if [ $? -ne 0 ]; then
|
|
echo "[$(date)] ABORT: git ls-remote failed — remote unreachable, skipping cycle" >> $LOG
|
|
exit 0
|
|
fi
|
|
|
|
for SOURCE in $SOURCES; do
|
|
COUNT=$((COUNT + 1))
|
|
BASENAME=$(basename "$SOURCE" .md)
|
|
BRANCH="extract/$BASENAME"
|
|
|
|
# Skip conversation archives — valuable content enters through standalone sources,
|
|
# inline tags (SOURCE:/CLAIM:), and transcript review. Raw conversations produce
|
|
# low-quality claims with schema failures. (Epimetheus session 4)
|
|
if grep -q "^format: conversation" "$SOURCE" 2>/dev/null; then
|
|
# Move to archive instead of leaving in queue (prevents re-processing)
|
|
mv "$SOURCE" "$MAIN_REPO/inbox/archive/telegram/" 2>/dev/null
|
|
echo "[$(date)] [$COUNT/$MAX] ARCHIVE $BASENAME (conversation — skipped extraction)" >> $LOG
|
|
SKIPPED=$((SKIPPED + 1))
|
|
continue
|
|
fi
|
|
|
|
# Gate 1: Already in archive? Source was already processed — dedup (Ganymede)
|
|
if find "$MAIN_REPO/inbox/archive" -name "$BASENAME.md" 2>/dev/null | grep -q .; then
|
|
echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (already in archive)" >> $LOG
|
|
# Delete the queue duplicate
|
|
rm -f "$MAIN_REPO/inbox/queue/$BASENAME.md" 2>/dev/null
|
|
SKIPPED=$((SKIPPED + 1))
|
|
continue
|
|
fi
|
|
|
|
# Gate 2: Branch exists on Forgejo? Extraction already in progress (cached lookup)
|
|
# Enhancement: 2-hour staleness check (Ganymede review) — if branch is >2h old
|
|
# and PR is unmergeable, close PR + delete branch and re-extract
|
|
if echo "$REMOTE_BRANCHES" | grep -q "refs/heads/$BRANCH$"; then
|
|
# Check branch age
|
|
BRANCH_SHA=$(echo "$REMOTE_BRANCHES" | grep "refs/heads/$BRANCH$" | awk '{print $1}')
|
|
BRANCH_AGE_EPOCH=$(git log -1 --format='%ct' "$BRANCH_SHA" 2>/dev/null || echo 0)
|
|
NOW_EPOCH=$(date +%s)
|
|
AGE_HOURS=$(( (NOW_EPOCH - BRANCH_AGE_EPOCH) / 3600 ))
|
|
|
|
if [ "$AGE_HOURS" -ge 2 ]; then
|
|
# Branch is stale — check if PR is mergeable
|
|
# Note: Forgejo head= filter is unreliable. Fetch all open PRs and filter locally.
|
|
PR_NUM=$(curl -sf "$FORGEJO_URL/api/v1/repos/teleo/teleo-codex/pulls?state=open&limit=50" \
|
|
-H "Authorization: token $TOKEN" | python3 -c "
|
|
import sys,json
|
|
prs=json.load(sys.stdin)
|
|
branch='$BRANCH'
|
|
matches=[p for p in prs if p['head']['ref']==branch]
|
|
print(matches[0]['number'] if matches else '')
|
|
" 2>/dev/null)
|
|
if [ -n "$PR_NUM" ]; then
|
|
PR_MERGEABLE=$(curl -sf "$FORGEJO_URL/api/v1/repos/teleo/teleo-codex/pulls/$PR_NUM" \
|
|
-H "Authorization: token $TOKEN" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("mergeable","true"))' 2>/dev/null)
|
|
if [ "$PR_MERGEABLE" = "False" ] || [ "$PR_MERGEABLE" = "false" ]; then
|
|
echo "[$(date)] [$COUNT/$MAX] STALE: $BASENAME (${AGE_HOURS}h old, unmergeable PR #$PR_NUM) — closing + re-extracting" >> $LOG
|
|
# Close PR with audit comment
|
|
curl -sf -X POST "$FORGEJO_URL/api/v1/repos/teleo/teleo-codex/issues/$PR_NUM/comments" \
|
|
-H "Authorization: token $TOKEN" -H "Content-Type: application/json" \
|
|
-d '{"body":"Auto-closed: extraction branch stale >2h, conflict unresolvable. Source will be re-extracted from current main."}' > /dev/null 2>&1
|
|
curl -sf -X PATCH "$FORGEJO_URL/api/v1/repos/teleo/teleo-codex/pulls/$PR_NUM" \
|
|
-H "Authorization: token $TOKEN" -H "Content-Type: application/json" \
|
|
-d '{"state":"closed"}' > /dev/null 2>&1
|
|
# Delete remote branch
|
|
git push origin --delete "$BRANCH" 2>/dev/null
|
|
# Fall through to extraction below
|
|
else
|
|
echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (branch exists ${AGE_HOURS}h, PR #$PR_NUM mergeable — waiting)" >> $LOG
|
|
SKIPPED=$((SKIPPED + 1))
|
|
continue
|
|
fi
|
|
else
|
|
# No PR found but branch exists — orphan branch, clean up
|
|
echo "[$(date)] [$COUNT/$MAX] STALE: $BASENAME (orphan branch ${AGE_HOURS}h, no PR) — deleting" >> $LOG
|
|
git push origin --delete "$BRANCH" 2>/dev/null
|
|
# Fall through to extraction
|
|
fi
|
|
else
|
|
echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (branch exists — in progress, ${AGE_HOURS}h old)" >> $LOG
|
|
SKIPPED=$((SKIPPED + 1))
|
|
continue
|
|
fi
|
|
fi
|
|
|
|
# Gate 3: Check pipeline.db for zombie sources — too many closed PRs means
|
|
# the source keeps failing eval. Skip after MAX_CLOSED rejections. (Epimetheus)
|
|
if [ -f "$DB" ]; then
|
|
CLOSED_COUNT=$(sqlite3 "$DB" "SELECT COUNT(*) FROM prs WHERE branch = 'extract/$BASENAME' AND status = 'closed'" 2>/dev/null || echo 0)
|
|
if [ "$CLOSED_COUNT" -ge "$MAX_CLOSED" ]; then
|
|
echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (zombie: $CLOSED_COUNT closed PRs >= $MAX_CLOSED limit)" >> $LOG
|
|
SKIPPED=$((SKIPPED + 1))
|
|
continue
|
|
fi
|
|
fi
|
|
|
|
# Gate 4: Check pipeline.db for active or recently closed PRs — prevents
|
|
# re-extraction waste when eval closes a PR and batch-extract runs again
|
|
# before the source is manually reviewed. 4h cooldown after closure.
|
|
if [ -f "$DB" ]; then
|
|
ACTIVE_COUNT=$(sqlite3 "$DB" "SELECT COUNT(*) FROM prs WHERE branch = 'extract/$BASENAME' AND status IN ('extracting','approved','merging')" 2>/dev/null || echo 0)
|
|
if [ "$ACTIVE_COUNT" -ge 1 ]; then
|
|
echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (active PR exists)" >> $LOG
|
|
SKIPPED=$((SKIPPED + 1))
|
|
continue
|
|
fi
|
|
RECENT_CLOSED=$(sqlite3 "$DB" "SELECT COUNT(*) FROM prs WHERE branch = 'extract/$BASENAME' AND status = 'closed' AND created_at > datetime('now', '-4 hours')" 2>/dev/null || echo 0)
|
|
if [ "$RECENT_CLOSED" -ge 1 ]; then
|
|
echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (recently closed PR — 4h cooldown)" >> $LOG
|
|
SKIPPED=$((SKIPPED + 1))
|
|
continue
|
|
fi
|
|
fi
|
|
|
|
echo "[$(date)] [$COUNT/$MAX] Processing $BASENAME" >> $LOG
|
|
|
|
# Reset to main (log errors — don't swallow)
|
|
git checkout -f main >> $LOG 2>&1 || { echo " -> SKIP (checkout main failed)" >> $LOG; SKIPPED=$((SKIPPED + 1)); continue; }
|
|
git fetch origin main >> $LOG 2>&1
|
|
git reset --hard origin/main >> $LOG 2>&1 || { echo " -> SKIP (reset failed)" >> $LOG; SKIPPED=$((SKIPPED + 1)); continue; }
|
|
|
|
# Clean stale remote branch (Leo's catch — prevents checkout conflicts)
|
|
git push origin --delete "$BRANCH" 2>/dev/null
|
|
|
|
# Create fresh branch
|
|
git branch -D "$BRANCH" 2>/dev/null
|
|
git checkout -b "$BRANCH" 2>/dev/null
|
|
if [ $? -ne 0 ]; then
|
|
echo " -> SKIP (branch creation failed)" >> $LOG
|
|
SKIPPED=$((SKIPPED + 1))
|
|
continue
|
|
fi
|
|
|
|
# Run extraction
|
|
python3 $EXTRACT "$SOURCE" --no-review >> $LOG 2>&1
|
|
EXTRACT_RC=$?
|
|
|
|
|
|
|
|
if [ $EXTRACT_RC -ne 0 ]; then
|
|
FAILED=$((FAILED + 1))
|
|
echo " -> FAILED (extract rc=$EXTRACT_RC)" >> $LOG
|
|
continue
|
|
fi
|
|
|
|
# Post-extraction cleanup
|
|
python3 $CLEANUP $REPO >> $LOG 2>&1
|
|
|
|
# Check if any files were created/modified
|
|
CHANGED=$(git status --porcelain | wc -l | tr -d " ")
|
|
if [ "$CHANGED" -eq 0 ]; then
|
|
echo " -> No changes (enrichment/null-result only)" >> $LOG
|
|
continue
|
|
fi
|
|
|
|
# Commit
|
|
git add -A
|
|
git commit -m "extract: $BASENAME
|
|
|
|
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>" >> $LOG 2>&1
|
|
|
|
# Push
|
|
git push "http://leo:${TOKEN}@localhost:3000/teleo/teleo-codex.git" "$BRANCH" --force >> $LOG 2>&1
|
|
|
|
# Create PR (include prior art sidecar if available)
|
|
PRIOR_ART_FILE="${SOURCE}.prior-art"
|
|
PR_BODY=""
|
|
if [ -f "$PRIOR_ART_FILE" ]; then
|
|
# Escape JSON special chars in prior art content
|
|
PR_BODY=$(cat "$PRIOR_ART_FILE" | python3 -c 'import sys,json; print(json.dumps(sys.stdin.read()))')
|
|
PR_BODY=${PR_BODY:1:-1} # Strip outer quotes from json.dumps
|
|
fi
|
|
curl -sf -X POST "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls" \
|
|
-H "Authorization: token $TOKEN" \
|
|
-H "Content-Type: application/json" \
|
|
-d "{\"title\":\"extract: $BASENAME\",\"head\":\"$BRANCH\",\"base\":\"main\",\"body\":\"$PR_BODY\"}" >> /dev/null 2>&1
|
|
|
|
SUCCESS=$((SUCCESS + 1))
|
|
echo " -> SUCCESS ($CHANGED files)" >> $LOG
|
|
|
|
# Back to main
|
|
git checkout -f main >> $LOG 2>&1
|
|
|
|
# Rate limit
|
|
sleep 2
|
|
done
|
|
|
|
echo "[$(date)] Batch complete: $SUCCESS success, $FAILED failed, $SKIPPED skipped (already attempted)" >> $LOG
|
|
|
|
git checkout -f main >> $LOG 2>&1
|
|
git reset --hard origin/main >> $LOG 2>&1
|