#!/bin/bash # Batch extract sources from inbox/queue/ — v3 with two-gate skip logic # # Uses separate extract/ worktree (not main/ — prevents daemon race condition). # Skip logic uses two checks instead of local marker files (Ganymede v3 review): # Gate 1: Is source already in archive/{domain}/? → already processed, dedup # Gate 2: Does extraction branch exist on Forgejo? → extraction in progress # Gate 3: Does pipeline.db show ≥3 closed PRs for this source? → zombie, skip # Gate 4: Does pipeline.db show active OR recently closed PR? → skip (4h cooldown) # All gates pass → extract # # Architecture: Ganymede (two-gate) + Rhea (separate worktrees) REPO=/opt/teleo-eval/workspaces/extract MAIN_REPO=/opt/teleo-eval/workspaces/main EXTRACT=/opt/teleo-eval/openrouter-extract-v2.py CLEANUP=/opt/teleo-eval/post-extract-cleanup.py LOG=/opt/teleo-eval/logs/batch-extract-50.log DB=/opt/teleo-eval/pipeline/pipeline.db TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-leo-token) FORGEJO_URL="http://localhost:3000" MAX=50 MAX_CLOSED=3 # zombie retry limit: skip source after this many closed PRs COUNT=0 SUCCESS=0 FAILED=0 SKIPPED=0 # Lockfile to prevent concurrent runs LOCKFILE="/tmp/batch-extract.lock" if [ -f "$LOCKFILE" ]; then pid=$(cat "$LOCKFILE" 2>/dev/null) if kill -0 "$pid" 2>/dev/null; then echo "[$(date)] SKIP: batch extract already running (pid $pid)" >> $LOG exit 0 fi rm -f "$LOCKFILE" fi echo $$ > "$LOCKFILE" trap 'rm -f "$LOCKFILE"' EXIT echo "[$(date)] Starting batch extraction of $MAX sources" >> $LOG cd $REPO || exit 1 # Bug fix: don't swallow errors on critical git commands (Ganymede review) git fetch origin main >> $LOG 2>&1 || { echo "[$(date)] FATAL: fetch origin main failed" >> $LOG; exit 1; } git checkout -f main >> $LOG 2>&1 || { echo "[$(date)] FATAL: checkout main failed" >> $LOG; exit 1; } git reset --hard origin/main >> $LOG 2>&1 || { echo "[$(date)] FATAL: reset --hard failed" >> $LOG; exit 1; } # SHA canary: verify extract worktree matches origin/main (Ganymede review) LOCAL_SHA=$(git rev-parse HEAD) REMOTE_SHA=$(git rev-parse origin/main) if [ "$LOCAL_SHA" != "$REMOTE_SHA" ]; then echo "[$(date)] FATAL: extract worktree diverged from main ($LOCAL_SHA vs $REMOTE_SHA)" >> $LOG exit 1 fi # Pre-extraction cleanup: remove queue files that already exist in archive # This runs on the MAIN worktree (not extract/) so deletions are committed to git. # Prevents the "queue duplicate reappears after reset --hard" problem. CLEANED=0 for qfile in $MAIN_REPO/inbox/queue/*.md; do [ -f "$qfile" ] || continue qbase=$(basename "$qfile") if find "$MAIN_REPO/inbox/archive" -name "$qbase" 2>/dev/null | grep -q .; then rm -f "$qfile" CLEANED=$((CLEANED + 1)) fi done if [ "$CLEANED" -gt 0 ]; then echo "[$(date)] Cleaned $CLEANED stale queue duplicates" >> $LOG cd $MAIN_REPO git add -A inbox/queue/ 2>/dev/null git commit -m "pipeline: clean $CLEANED stale queue duplicates Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>" 2>/dev/null # Push with retry for attempt in 1 2 3; do git pull --rebase origin main 2>/dev/null git push origin main 2>/dev/null && break sleep 2 done cd $REPO git fetch origin main 2>/dev/null git reset --hard origin/main 2>/dev/null fi # Get sources in queue SOURCES=$(ls inbox/queue/*.md 2>/dev/null | head -$MAX) # Batch fetch all remote branches once (Ganymede: 1 call instead of 84) REMOTE_BRANCHES=$(git ls-remote --heads origin 2>/dev/null) if [ $? -ne 0 ]; then echo "[$(date)] ABORT: git ls-remote failed — remote unreachable, skipping cycle" >> $LOG exit 0 fi for SOURCE in $SOURCES; do COUNT=$((COUNT + 1)) BASENAME=$(basename "$SOURCE" .md) BRANCH="extract/$BASENAME" # Skip conversation archives — valuable content enters through standalone sources, # inline tags (SOURCE:/CLAIM:), and transcript review. Raw conversations produce # low-quality claims with schema failures. (Epimetheus session 4) if grep -q "^format: conversation" "$SOURCE" 2>/dev/null; then # Move to archive instead of leaving in queue (prevents re-processing) mv "$SOURCE" "$MAIN_REPO/inbox/archive/telegram/" 2>/dev/null echo "[$(date)] [$COUNT/$MAX] ARCHIVE $BASENAME (conversation — skipped extraction)" >> $LOG SKIPPED=$((SKIPPED + 1)) continue fi # Gate 1: Already in archive? Source was already processed — dedup (Ganymede) if find "$MAIN_REPO/inbox/archive" -name "$BASENAME.md" 2>/dev/null | grep -q .; then echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (already in archive)" >> $LOG # Delete the queue duplicate rm -f "$MAIN_REPO/inbox/queue/$BASENAME.md" 2>/dev/null SKIPPED=$((SKIPPED + 1)) continue fi # Gate 2: Branch exists on Forgejo? Extraction already in progress (cached lookup) # Enhancement: 2-hour staleness check (Ganymede review) — if branch is >2h old # and PR is unmergeable, close PR + delete branch and re-extract if echo "$REMOTE_BRANCHES" | grep -q "refs/heads/$BRANCH$"; then # Check branch age BRANCH_SHA=$(echo "$REMOTE_BRANCHES" | grep "refs/heads/$BRANCH$" | awk '{print $1}') BRANCH_AGE_EPOCH=$(git log -1 --format='%ct' "$BRANCH_SHA" 2>/dev/null || echo 0) NOW_EPOCH=$(date +%s) AGE_HOURS=$(( (NOW_EPOCH - BRANCH_AGE_EPOCH) / 3600 )) if [ "$AGE_HOURS" -ge 2 ]; then # Branch is stale — check if PR is mergeable # Note: Forgejo head= filter is unreliable. Fetch all open PRs and filter locally. PR_NUM=$(curl -sf "$FORGEJO_URL/api/v1/repos/teleo/teleo-codex/pulls?state=open&limit=50" \ -H "Authorization: token $TOKEN" | python3 -c " import sys,json prs=json.load(sys.stdin) branch='$BRANCH' matches=[p for p in prs if p['head']['ref']==branch] print(matches[0]['number'] if matches else '') " 2>/dev/null) if [ -n "$PR_NUM" ]; then PR_MERGEABLE=$(curl -sf "$FORGEJO_URL/api/v1/repos/teleo/teleo-codex/pulls/$PR_NUM" \ -H "Authorization: token $TOKEN" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("mergeable","true"))' 2>/dev/null) if [ "$PR_MERGEABLE" = "False" ] || [ "$PR_MERGEABLE" = "false" ]; then echo "[$(date)] [$COUNT/$MAX] STALE: $BASENAME (${AGE_HOURS}h old, unmergeable PR #$PR_NUM) — closing + re-extracting" >> $LOG # Close PR with audit comment curl -sf -X POST "$FORGEJO_URL/api/v1/repos/teleo/teleo-codex/issues/$PR_NUM/comments" \ -H "Authorization: token $TOKEN" -H "Content-Type: application/json" \ -d '{"body":"Auto-closed: extraction branch stale >2h, conflict unresolvable. Source will be re-extracted from current main."}' > /dev/null 2>&1 curl -sf -X PATCH "$FORGEJO_URL/api/v1/repos/teleo/teleo-codex/pulls/$PR_NUM" \ -H "Authorization: token $TOKEN" -H "Content-Type: application/json" \ -d '{"state":"closed"}' > /dev/null 2>&1 # Delete remote branch git push origin --delete "$BRANCH" 2>/dev/null # Fall through to extraction below else echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (branch exists ${AGE_HOURS}h, PR #$PR_NUM mergeable — waiting)" >> $LOG SKIPPED=$((SKIPPED + 1)) continue fi else # No PR found but branch exists — orphan branch, clean up echo "[$(date)] [$COUNT/$MAX] STALE: $BASENAME (orphan branch ${AGE_HOURS}h, no PR) — deleting" >> $LOG git push origin --delete "$BRANCH" 2>/dev/null # Fall through to extraction fi else echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (branch exists — in progress, ${AGE_HOURS}h old)" >> $LOG SKIPPED=$((SKIPPED + 1)) continue fi fi # Gate 3: Check pipeline.db for zombie sources — too many closed PRs means # the source keeps failing eval. Skip after MAX_CLOSED rejections. (Epimetheus) if [ -f "$DB" ]; then CLOSED_COUNT=$(sqlite3 "$DB" "SELECT COUNT(*) FROM prs WHERE branch = 'extract/$BASENAME' AND status = 'closed'" 2>/dev/null || echo 0) if [ "$CLOSED_COUNT" -ge "$MAX_CLOSED" ]; then echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (zombie: $CLOSED_COUNT closed PRs >= $MAX_CLOSED limit)" >> $LOG SKIPPED=$((SKIPPED + 1)) continue fi fi # Gate 4: Check pipeline.db for active or recently closed PRs — prevents # re-extraction waste when eval closes a PR and batch-extract runs again # before the source is manually reviewed. 4h cooldown after closure. if [ -f "$DB" ]; then ACTIVE_COUNT=$(sqlite3 "$DB" "SELECT COUNT(*) FROM prs WHERE branch = 'extract/$BASENAME' AND status IN ('extracting','approved','merging')" 2>/dev/null || echo 0) if [ "$ACTIVE_COUNT" -ge 1 ]; then echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (active PR exists)" >> $LOG SKIPPED=$((SKIPPED + 1)) continue fi RECENT_CLOSED=$(sqlite3 "$DB" "SELECT COUNT(*) FROM prs WHERE branch = 'extract/$BASENAME' AND status = 'closed' AND created_at > datetime('now', '-4 hours')" 2>/dev/null || echo 0) if [ "$RECENT_CLOSED" -ge 1 ]; then echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (recently closed PR — 4h cooldown)" >> $LOG SKIPPED=$((SKIPPED + 1)) continue fi fi echo "[$(date)] [$COUNT/$MAX] Processing $BASENAME" >> $LOG # Reset to main (log errors — don't swallow) git checkout -f main >> $LOG 2>&1 || { echo " -> SKIP (checkout main failed)" >> $LOG; SKIPPED=$((SKIPPED + 1)); continue; } git fetch origin main >> $LOG 2>&1 git reset --hard origin/main >> $LOG 2>&1 || { echo " -> SKIP (reset failed)" >> $LOG; SKIPPED=$((SKIPPED + 1)); continue; } # Clean stale remote branch (Leo's catch — prevents checkout conflicts) git push origin --delete "$BRANCH" 2>/dev/null # Create fresh branch git branch -D "$BRANCH" 2>/dev/null git checkout -b "$BRANCH" 2>/dev/null if [ $? -ne 0 ]; then echo " -> SKIP (branch creation failed)" >> $LOG SKIPPED=$((SKIPPED + 1)) continue fi # Run extraction python3 $EXTRACT "$SOURCE" --no-review >> $LOG 2>&1 EXTRACT_RC=$? if [ $EXTRACT_RC -ne 0 ]; then FAILED=$((FAILED + 1)) echo " -> FAILED (extract rc=$EXTRACT_RC)" >> $LOG continue fi # Post-extraction cleanup python3 $CLEANUP $REPO >> $LOG 2>&1 # Check if any files were created/modified CHANGED=$(git status --porcelain | wc -l | tr -d " ") if [ "$CHANGED" -eq 0 ]; then echo " -> No changes (enrichment/null-result only)" >> $LOG continue fi # Commit git add -A git commit -m "extract: $BASENAME Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>" >> $LOG 2>&1 # Push git push "http://leo:${TOKEN}@localhost:3000/teleo/teleo-codex.git" "$BRANCH" --force >> $LOG 2>&1 # Create PR (include prior art sidecar if available) PRIOR_ART_FILE="${SOURCE}.prior-art" PR_BODY="" if [ -f "$PRIOR_ART_FILE" ]; then # Escape JSON special chars in prior art content PR_BODY=$(cat "$PRIOR_ART_FILE" | python3 -c 'import sys,json; print(json.dumps(sys.stdin.read()))') PR_BODY=${PR_BODY:1:-1} # Strip outer quotes from json.dumps fi curl -sf -X POST "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls" \ -H "Authorization: token $TOKEN" \ -H "Content-Type: application/json" \ -d "{\"title\":\"extract: $BASENAME\",\"head\":\"$BRANCH\",\"base\":\"main\",\"body\":\"$PR_BODY\"}" >> /dev/null 2>&1 SUCCESS=$((SUCCESS + 1)) echo " -> SUCCESS ($CHANGED files)" >> $LOG # Back to main git checkout -f main >> $LOG 2>&1 # Rate limit sleep 2 done echo "[$(date)] Batch complete: $SUCCESS success, $FAILED failed, $SKIPPED skipped (already attempted)" >> $LOG git checkout -f main >> $LOG 2>&1 git reset --hard origin/main >> $LOG 2>&1