Compare commits

..

No commits in common. "main" and "epimetheus/synthetic-recovery-prs" have entirely different histories.

17 changed files with 223 additions and 2723 deletions

View file

@ -1,120 +0,0 @@
#!/bin/bash
# One-time setup: prepare the bare mirror repo for teleo-infrastructure.
#
# Prerequisites (must happen BEFORE running this):
# 1. GitHub repo `living-ip/teleo-infrastructure` created (manual via web or
# `gh repo create` — the deploy PAT is fine-grained to teleo-codex only
# and cannot create new repos in the org).
# 2. GitHub PAT updated to include push access on the new repo (or rotate
# to a classic PAT with `repo` scope covering both).
#
# This script is idempotent — safe to re-run.
set -euo pipefail
MIRROR_BASE="/opt/teleo-eval/mirror"
REPO_DIR="$MIRROR_BASE/teleo-infrastructure.git"
FORGEJO_URL="http://localhost:3000/teleo/teleo-infrastructure.git"
GITHUB_REPO="living-ip/teleo-infrastructure"
FORGEJO_TOKEN_FILE="/opt/teleo-eval/secrets/forgejo-admin-token"
GITHUB_PAT_FILE="/opt/teleo-eval/secrets/github-pat"
if [ ! -f "$FORGEJO_TOKEN_FILE" ]; then
echo "ERROR: missing $FORGEJO_TOKEN_FILE" >&2
exit 1
fi
if [ ! -f "$GITHUB_PAT_FILE" ]; then
echo "ERROR: missing $GITHUB_PAT_FILE" >&2
exit 1
fi
FORGEJO_TOKEN=$(cat "$FORGEJO_TOKEN_FILE" | tr -d '[:space:]')
GITHUB_PAT=$(cat "$GITHUB_PAT_FILE" | tr -d '[:space:]')
# Sanity check: GitHub repo must exist before we point a remote at it.
echo "Verifying GitHub repo $GITHUB_REPO exists..."
GH_STATUS=$(curl -sS -o /dev/null -w "%{http_code}" \
-H "Authorization: Bearer $GITHUB_PAT" \
"https://api.github.com/repos/$GITHUB_REPO")
if [ "$GH_STATUS" != "200" ]; then
echo "ERROR: GitHub repo $GITHUB_REPO not accessible (HTTP $GH_STATUS)" >&2
echo "Create it first: gh repo create $GITHUB_REPO --public --description 'Pipeline + diagnostics infra for the LivingIP collective'" >&2
exit 2
fi
echo " OK — $GITHUB_REPO accessible"
# Sanity check: Forgejo repo must exist.
echo "Verifying Forgejo repo teleo/teleo-infrastructure exists..."
FG_STATUS=$(curl -sS -o /dev/null -w "%{http_code}" \
-H "Authorization: token $FORGEJO_TOKEN" \
"http://localhost:3000/api/v1/repos/teleo/teleo-infrastructure")
if [ "$FG_STATUS" != "200" ]; then
echo "ERROR: Forgejo repo teleo/teleo-infrastructure not accessible (HTTP $FG_STATUS)" >&2
exit 3
fi
echo " OK — Forgejo repo accessible"
# Init bare mirror if missing
if [ -d "$REPO_DIR" ]; then
echo "Bare repo already exists at $REPO_DIR — skipping init"
else
echo "Creating bare repo at $REPO_DIR..."
mkdir -p "$REPO_DIR"
cd "$REPO_DIR"
git init --bare >/dev/null
chown -R teleo:teleo "$REPO_DIR"
echo " OK — bare repo initialized"
fi
cd "$REPO_DIR"
# Configure remotes (idempotent: set-url succeeds whether remote exists or not)
# Forgejo remote (origin convention is reversed in this codebase: origin=GitHub,
# forgejo=Forgejo, matching the existing teleo-codex.git layout).
FORGEJO_REMOTE_URL="http://github-mirror:${FORGEJO_TOKEN}@localhost:3000/teleo/teleo-infrastructure.git"
# NOTE: "m3taversal" is a placeholder username — for fine-grained PATs the
# username field is decorative; the token does the auth. Matches the existing
# teleo-codex.git remote for consistency. (Ganymede review nit #4.)
GITHUB_REMOTE_URL="https://m3taversal:${GITHUB_PAT}@github.com/${GITHUB_REPO}.git"
if git remote get-url forgejo >/dev/null 2>&1; then
git remote set-url forgejo "$FORGEJO_REMOTE_URL"
echo " Updated forgejo remote URL"
else
git remote add forgejo "$FORGEJO_REMOTE_URL"
echo " Added forgejo remote"
fi
if git remote get-url origin >/dev/null 2>&1; then
git remote set-url origin "$GITHUB_REMOTE_URL"
echo " Updated origin remote URL"
else
git remote add origin "$GITHUB_REMOTE_URL"
echo " Added origin remote"
fi
# Initial fetch from Forgejo
echo "Fetching from Forgejo..."
git fetch forgejo --prune 2>&1 | sed 's/^/ /'
# Initial push to GitHub (will populate the empty repo)
# main_only mode: push ONLY refs/heads/main + tags, mirroring what sync-mirror.sh
# does for this repo on the recurring path. Agent review branches stay Forgejo-only.
echo "Pushing initial main + tags to GitHub..."
git update-ref refs/heads/main refs/remotes/forgejo/main 2>/dev/null || {
echo "ERROR: forgejo/main ref missing — fetch may have failed" >&2
exit 1
}
git push origin "refs/heads/main:refs/heads/main" 2>&1 | sed 's/^/ /' || {
echo "WARN: initial push failed — you may need to authorize the PAT for $GITHUB_REPO" >&2
}
git push origin --tags 2>&1 | sed 's/^/ /' || true
# Final permissions sweep
chown -R teleo:teleo "$REPO_DIR"
echo
echo "Setup complete. Verify with:"
echo " ssh teleo@77.42.65.182 ls -la $REPO_DIR/refs/heads"
echo " /opt/teleo-eval/sync-mirror.sh && tail -50 /opt/teleo-eval/logs/sync.log"

View file

@ -2,35 +2,22 @@
# Bidirectional sync: Forgejo (authoritative) <-> GitHub (public mirror)
# Forgejo wins on conflict. Runs every 2 minutes via cron.
#
# Repos handled (see MIRROR_REPOS below):
# - teleo-codex (mode=bidirectional): full PR roundtrip — fork PR refs from
# GitHub, auto-create Forgejo PR mirrors, link github_pr in pipeline.db.
# - teleo-infrastructure (mode=main_only): one-way sync of branches+tags from
# Forgejo to GitHub. No PR roundtrip — pipeline doesn't process infra PRs;
# external infra PRs land on GitHub for visibility, get reviewed manually.
#
# Security note: GitHub->Forgejo path is for external contributor convenience.
# Never auto-process branches arriving via this path without a PR.
# Eval pipeline and extract cron only act on PRs, not raw branches.
set -euo pipefail
REPO_DIR="/opt/teleo-eval/mirror/teleo-codex.git"
LOG="/opt/teleo-eval/logs/sync.log"
LOCKFILE="/tmp/sync-mirror.lock"
PIPELINE_DB="/opt/teleo-eval/pipeline/pipeline.db"
GITHUB_PAT_FILE="/opt/teleo-eval/secrets/github-pat"
GITHUB_REPO="living-ip/teleo-codex"
# (forgejo_owner_repo, github_owner_repo, bare_path, mode)
# mode: bidirectional | main_only
MIRROR_REPOS=(
"teleo/teleo-codex living-ip/teleo-codex /opt/teleo-eval/mirror/teleo-codex.git bidirectional"
"teleo/teleo-infrastructure living-ip/teleo-infrastructure /opt/teleo-eval/mirror/teleo-infrastructure.git main_only"
)
log() { echo "[$(date -Iseconds)] $1" >> "$LOG"; }
REPO_TAG="main"
log() { echo "[$(date -Iseconds)] [$REPO_TAG] $1" >> "$LOG"; }
# Lockfile — prevent concurrent runs (single lock for whole script)
# Lockfile — prevent concurrent runs
if [ -f "$LOCKFILE" ]; then
pid=$(cat "$LOCKFILE" 2>/dev/null)
if kill -0 "$pid" 2>/dev/null; then
@ -41,168 +28,114 @@ fi
echo $$ > "$LOCKFILE"
trap 'rm -f "$LOCKFILE"' EXIT
# Pre-flight: fix permissions if another user touched the mirror dir (Rhea)
BAD_PERMS=$(find "$REPO_DIR" ! -user teleo 2>/dev/null | head -1 || true)
if [ -n "$BAD_PERMS" ]; then
log "Fixing mirror permissions (found: $BAD_PERMS)"
chown -R teleo:teleo "$REPO_DIR" 2>/dev/null
fi
cd "$REPO_DIR" || { log "ERROR: cannot cd to $REPO_DIR"; exit 1; }
# ─────────────────────────────────────────────────────────────────────────────
# sync_repo: process one mirror entry. Sets module-level FORGEJO_REPO,
# GITHUB_REPO, REPO_DIR, MODE, REPO_TAG used by inner steps.
# ─────────────────────────────────────────────────────────────────────────────
sync_repo() {
FORGEJO_REPO="$1" # e.g. teleo/teleo-codex (path on Forgejo)
GITHUB_REPO="$2" # e.g. living-ip/teleo-codex (path on GitHub)
REPO_DIR="$3" # bare mirror dir
MODE="$4" # bidirectional | main_only
REPO_TAG="${FORGEJO_REPO##*/}" # short name for log prefix
# Step 1: Fetch from Forgejo (must succeed — it's authoritative)
log "Fetching from Forgejo..."
if ! git fetch forgejo --prune >> "$LOG" 2>&1; then
log "ERROR: Forgejo fetch failed — aborting"
exit 1
fi
# Pre-flight: bare repo must exist
if [ ! -d "$REPO_DIR" ]; then
log "ERROR: bare repo missing at $REPO_DIR — skipping"
return 0
fi
# Step 2: Fetch from GitHub (warn on failure, don't abort)
log "Fetching from GitHub..."
git fetch origin --prune >> "$LOG" 2>&1 || log "WARN: GitHub fetch failed"
# Pre-flight: fix permissions if another user touched the mirror dir (Rhea)
BAD_PERMS=$(find "$REPO_DIR" ! -user teleo 2>/dev/null | head -1 || true)
if [ -n "$BAD_PERMS" ]; then
log "Fixing mirror permissions (found: $BAD_PERMS)"
chown -R teleo:teleo "$REPO_DIR" 2>/dev/null || true
fi
cd "$REPO_DIR" || { log "ERROR: cannot cd to $REPO_DIR"; return 0; }
# Step 1: Fetch from Forgejo (must succeed — it's authoritative)
log "Fetching from Forgejo..."
if ! git fetch forgejo --prune >> "$LOG" 2>&1; then
log "ERROR: Forgejo fetch failed — skipping this repo"
return 0
fi
# Step 2: Fetch from GitHub (warn on failure, don't abort)
log "Fetching from GitHub..."
git fetch origin --prune >> "$LOG" 2>&1 || log "WARN: GitHub fetch failed"
# Step 2.1: Fetch GitHub fork PR refs (bidirectional only)
# Fork-based PRs don't create branches on origin — they create refs/pull/N/head.
# main_only repos don't accept fork PRs through the mirror path.
if [ "$MODE" = "bidirectional" ]; then
local PAT
PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
if [ -n "$PAT" ]; then
local OPEN_PRS
OPEN_PRS=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?state=open&per_page=100" \
-H "Authorization: token $PAT" 2>/dev/null || echo "[]")
echo "$OPEN_PRS" | python3 -c "
# Step 2.1: Fetch GitHub fork PR refs
# Fork-based PRs don't create branches on origin — they create refs/pull/N/head
# Fetch these so we can push them to Forgejo for evaluation
GITHUB_PAT_STEP2=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
if [ -n "$GITHUB_PAT_STEP2" ]; then
OPEN_PRS=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?state=open&per_page=100" \
-H "Authorization: token $GITHUB_PAT_STEP2" 2>/dev/null || echo "[]")
echo "$OPEN_PRS" | python3 -c "
import sys, json
prs = json.load(sys.stdin)
for pr in prs:
head = pr.get('head', {})
# Only process fork PRs (repo differs from base repo)
base_repo = pr.get('base', {}).get('repo', {}).get('full_name', '')
head_repo = head.get('repo', {}) or {}
head_full = head_repo.get('full_name', '')
if head_full and head_full != base_repo:
print(f\"{pr['number']} {head.get('ref', '')} {head.get('sha', '')}\")
" 2>/dev/null | while read pr_num branch_name head_sha; do
if [ -z "$pr_num" ] || [ -z "$branch_name" ]; then continue; fi
local PR_BRANCH="gh-pr-${pr_num}/${branch_name}"
local EXISTING
EXISTING=$(git rev-parse "refs/heads/$PR_BRANCH" 2>/dev/null || true)
if [ "$EXISTING" = "$head_sha" ]; then continue; fi
git fetch origin "refs/pull/${pr_num}/head:refs/heads/$PR_BRANCH" >> "$LOG" 2>&1 && \
log "Fetched fork PR #$pr_num -> $PR_BRANCH" || \
log "WARN: Failed to fetch fork PR #$pr_num"
done
if [ -z "$pr_num" ] || [ -z "$branch_name" ]; then continue; fi
PR_BRANCH="gh-pr-${pr_num}/${branch_name}"
# Check if we already have this ref at the right SHA
EXISTING=$(git rev-parse "refs/heads/$PR_BRANCH" 2>/dev/null || true)
if [ "$EXISTING" = "$head_sha" ]; then continue; fi
# Fetch the PR ref and create a local branch
git fetch origin "refs/pull/${pr_num}/head:refs/heads/$PR_BRANCH" >> "$LOG" 2>&1 && \
log "Fetched fork PR #$pr_num -> $PR_BRANCH" || \
log "WARN: Failed to fetch fork PR #$pr_num"
done
fi
# Step 2.5: GitHub main -> Forgejo main (ff-only)
# If a PR was merged on GitHub, GitHub main is ahead of Forgejo main.
# Fast-forward Forgejo main to match — safe because ff-only guarantees no divergence.
GITHUB_MAIN_FF=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FORGEJO_MAIN_FF=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GITHUB_MAIN_FF" ] && [ -n "$FORGEJO_MAIN_FF" ]; then
if [ "$GITHUB_MAIN_FF" != "$FORGEJO_MAIN_FF" ]; then
if git merge-base --is-ancestor "$FORGEJO_MAIN_FF" "$GITHUB_MAIN_FF"; then
log "GitHub main ($GITHUB_MAIN_FF) ahead of Forgejo main ($FORGEJO_MAIN_FF) — fast-forwarding"
git push forgejo "refs/remotes/origin/main:refs/heads/main" >> "$LOG" 2>&1 && \
log "Forgejo main fast-forwarded to $GITHUB_MAIN_FF" || \
log "WARN: Failed to fast-forward Forgejo main"
fi
fi
fi
# Step 2.5: GitHub main -> Forgejo main (ff-only)
# If a PR was merged on GitHub, GitHub main is ahead of Forgejo main.
# Fast-forward Forgejo main to match — safe because ff-only guarantees no divergence.
local GITHUB_MAIN_FF FORGEJO_MAIN_FF
GITHUB_MAIN_FF=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FORGEJO_MAIN_FF=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GITHUB_MAIN_FF" ] && [ -n "$FORGEJO_MAIN_FF" ]; then
if [ "$GITHUB_MAIN_FF" != "$FORGEJO_MAIN_FF" ]; then
if git merge-base --is-ancestor "$FORGEJO_MAIN_FF" "$GITHUB_MAIN_FF"; then
log "GitHub main ($GITHUB_MAIN_FF) ahead of Forgejo main ($FORGEJO_MAIN_FF) — fast-forwarding"
git push forgejo "refs/remotes/origin/main:refs/heads/main" >> "$LOG" 2>&1 && \
log "Forgejo main fast-forwarded to $GITHUB_MAIN_FF" || \
log "WARN: Failed to fast-forward Forgejo main"
fi
fi
# Step 3: Forgejo -> GitHub (primary direction)
# Update local refs from Forgejo remote refs using process substitution (avoids subshell)
log "Syncing Forgejo -> GitHub..."
while read branch; do
[ "$branch" = "HEAD" ] && continue
git update-ref "refs/heads/$branch" "refs/remotes/forgejo/$branch" 2>/dev/null || \
log "WARN: Failed to update ref $branch"
done < <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/)
# Safety: verify Forgejo main descends from GitHub main before force-pushing
GITHUB_MAIN=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FORGEJO_MAIN=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
PUSH_MAIN=true
if [ -n "$GITHUB_MAIN" ] && [ -n "$FORGEJO_MAIN" ]; then
if ! git merge-base --is-ancestor "$GITHUB_MAIN" "$FORGEJO_MAIN"; then
log "CRITICAL: Forgejo main is NOT a descendant of GitHub main — skipping main push"
log "CRITICAL: GitHub main: $GITHUB_MAIN, Forgejo main: $FORGEJO_MAIN"
PUSH_MAIN=false
fi
fi
# Step 3: Forgejo -> GitHub (primary direction)
log "Syncing Forgejo -> GitHub..."
if [ "$PUSH_MAIN" = true ]; then
git push origin --all --force >> "$LOG" 2>&1 || log "WARN: Push to GitHub failed"
else
# Push all branches except main
while read branch; do
[ "$branch" = "main" ] && continue
[ "$branch" = "HEAD" ] && continue
git update-ref "refs/heads/$branch" "refs/remotes/forgejo/$branch" 2>/dev/null || \
log "WARN: Failed to update ref $branch"
done < <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/)
git push origin --force "refs/heads/$branch:refs/heads/$branch" >> "$LOG" 2>&1 || \
log "WARN: Failed to push $branch to GitHub"
done < <(git for-each-ref --format="%(refname:lstrip=2)" refs/heads/)
fi
git push origin --tags --force >> "$LOG" 2>&1 || log "WARN: Tag push to GitHub failed"
# Safety: verify Forgejo main descends from GitHub main before force-pushing
local GITHUB_MAIN FORGEJO_MAIN PUSH_MAIN
GITHUB_MAIN=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FORGEJO_MAIN=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
PUSH_MAIN=true
if [ -n "$GITHUB_MAIN" ] && [ -n "$FORGEJO_MAIN" ]; then
if ! git merge-base --is-ancestor "$GITHUB_MAIN" "$FORGEJO_MAIN"; then
log "CRITICAL: Forgejo main is NOT a descendant of GitHub main — skipping main push"
log "CRITICAL: GitHub main: $GITHUB_MAIN, Forgejo main: $FORGEJO_MAIN"
PUSH_MAIN=false
fi
fi
# Step 4: GitHub -> Forgejo (external contributions only)
# Only push branches that exist on GitHub but NOT on Forgejo
log "Checking GitHub-only branches..."
GITHUB_ONLY=$(comm -23 \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/origin/ | grep -v HEAD | sort) \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/ | grep -v HEAD | sort))
if [ "$MODE" = "main_only" ]; then
# Infra-style mirror: push main + tags ONLY. Pre-review agent branches
# (epimetheus/*, ganymede/*, etc.) carry internal context — agent UUIDs,
# in-flight discussion, WIP — and must not land in the public GitHub
# history. (Ganymede review, finding #1.)
if [ "$PUSH_MAIN" = true ]; then
git push origin --force "refs/heads/main:refs/heads/main" >> "$LOG" 2>&1 || \
log "WARN: main push to GitHub failed"
fi
else
# Bidirectional mirror (codex): push all branches so external
# contributors can fork from any branch, not just main.
if [ "$PUSH_MAIN" = true ]; then
git push origin --all --force >> "$LOG" 2>&1 || log "WARN: Push to GitHub failed"
else
# Push all branches except main when main is divergent
while read branch; do
[ "$branch" = "main" ] && continue
[ "$branch" = "HEAD" ] && continue
git push origin --force "refs/heads/$branch:refs/heads/$branch" >> "$LOG" 2>&1 || \
log "WARN: Failed to push $branch to GitHub"
done < <(git for-each-ref --format="%(refname:lstrip=2)" refs/heads/)
fi
fi
git push origin --tags --force >> "$LOG" 2>&1 || log "WARN: Tag push to GitHub failed"
# Step 4: GitHub -> Forgejo + Forgejo PR auto-create (bidirectional only)
if [ "$MODE" = "bidirectional" ]; then
sync_github_to_forgejo_with_prs
fi
# Step 6: Divergence alerting (applies to both modes)
check_divergence
}
# ─────────────────────────────────────────────────────────────────────────────
# Step 4 split out: codex-specific GitHub→Forgejo branch push + PR auto-create.
# Reads FORGEJO_REPO, GITHUB_REPO, PIPELINE_DB, REPO_TAG from sync_repo scope.
# ─────────────────────────────────────────────────────────────────────────────
sync_github_to_forgejo_with_prs() {
log "Checking GitHub-only branches..."
local FORGEJO_HOST="http://localhost:3000/api/v1/repos/$FORGEJO_REPO"
local GITHUB_ONLY
GITHUB_ONLY=$(comm -23 \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/origin/ | grep -v HEAD | sort) \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/ | grep -v HEAD | sort))
if [ -z "$GITHUB_ONLY" ]; then
log "No new GitHub-only branches"
return 0
fi
local FORGEJO_TOKEN
if [ -n "$GITHUB_ONLY" ]; then
FORGEJO_TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-admin-token 2>/dev/null)
for branch in $GITHUB_ONLY; do
log "New from GitHub: $branch -> Forgejo"
@ -218,23 +151,22 @@ sync_github_to_forgejo_with_prs() {
continue
}
fi
# Skip pipeline-internal branch prefixes (no PR creation)
# Auto-create PR on Forgejo for mirrored branches (external contributor path)
# Skip pipeline-internal branches
case "$branch" in
extract/*|ingestion/*) continue ;;
esac
if [ -z "$FORGEJO_TOKEN" ]; then continue; fi
# Check if PR already exists for this branch (open or closed)
# NOTE: Forgejo ?head= filter is broken (ignores head value, returns all PRs).
# Workaround: fetch open+closed PRs, pipe to Python, check head.ref.
local HAS_PR
HAS_PR=$( {
curl -sf "$FORGEJO_HOST/pulls?state=open&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
echo ""
curl -sf "$FORGEJO_HOST/pulls?state=closed&sort=created&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
} | python3 -c "
if [ -n "$FORGEJO_TOKEN" ]; then
# Check if PR already exists for this branch (open or closed)
# NOTE: Forgejo ?head= filter is broken (ignores head value, returns all PRs).
# Workaround: fetch open+closed PRs, pipe to Python, check head.ref.
HAS_PR=$( {
curl -sf "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=open&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
echo ""
curl -sf "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=closed&sort=created&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
} | python3 -c "
import sys, json
branch = sys.argv[1]
for line in sys.stdin:
@ -247,159 +179,104 @@ for line in sys.stdin:
except: pass
print('no')
" "$branch" 2>/dev/null || echo "no")
if [ "$HAS_PR" = "yes" ]; then continue; fi
# Build PR title — for fork PRs, use the GitHub PR title
local PR_TITLE PAYLOAD RESULT PR_NUM GH_PR_NUM
if [[ "$branch" == gh-pr-* ]]; then
local FORK_GH_NUM PAT_T
FORK_GH_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
PAT_T=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
PR_TITLE=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls/$FORK_GH_NUM" \
-H "Authorization: token $PAT_T" 2>/dev/null | \
python3 -c "import sys,json; print(json.load(sys.stdin).get('title',''))" 2>/dev/null || true)
[ -z "$PR_TITLE" ] && PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
else
PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
fi
PAYLOAD=$(python3 -c "import sys,json; print(json.dumps({'title':sys.argv[1],'head':sys.argv[2],'base':'main'}))" "$PR_TITLE" "$branch")
RESULT=$(curl -sf -X POST "$FORGEJO_HOST/pulls" \
-H "Authorization: token $FORGEJO_TOKEN" \
-H "Content-Type: application/json" \
-d "$PAYLOAD" 2>/dev/null || echo "")
PR_NUM=$(echo "$RESULT" | grep -o '"number":[0-9]*' | head -1 | grep -o "[0-9]*" || true)
if [ -z "$PR_NUM" ]; then
log "WARN: Failed to auto-create PR for $branch"
continue
fi
log "Auto-created PR #$PR_NUM on Forgejo for $branch"
# Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB
if [[ "$branch" == gh-pr-* ]]; then
GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
else
local PAT
PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
GH_PR_NUM=""
if [ -n "$PAT" ]; then
GH_PR_NUM=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?head=living-ip:$branch&state=all" \
-H "Authorization: token $PAT" 2>/dev/null | \
python3 -c "import sys,json; prs=json.load(sys.stdin); print(prs[0]['number'] if prs else '')" 2>/dev/null || true)
if [ "$HAS_PR" = "no" ]; then
# Build PR title — for fork PRs, use the GitHub PR title
if [[ "$branch" == gh-pr-* ]]; then
FORK_GH_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
GITHUB_PAT_T=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
PR_TITLE=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls/$FORK_GH_NUM" \
-H "Authorization: token $GITHUB_PAT_T" 2>/dev/null | \
python3 -c "import sys,json; print(json.load(sys.stdin).get('title',''))" 2>/dev/null || true)
[ -z "$PR_TITLE" ] && PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
else
PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
fi
PAYLOAD=$(python3 -c "import sys,json; print(json.dumps({'title':sys.argv[1],'head':sys.argv[2],'base':'main'}))" "$PR_TITLE" "$branch")
RESULT=$(curl -sf -X POST "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls" \
-H "Authorization: token $FORGEJO_TOKEN" \
-H "Content-Type: application/json" \
-d "$PAYLOAD" 2>/dev/null || echo "")
PR_NUM=$(echo "$RESULT" | grep -o '"number":[0-9]*' | head -1 | grep -o "[0-9]*" || true)
if [ -n "$PR_NUM" ]; then
log "Auto-created PR #$PR_NUM on Forgejo for $branch"
# Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB
if [[ "$branch" == gh-pr-* ]]; then
GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
else
GITHUB_PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
GH_PR_NUM=""
if [ -n "$GITHUB_PAT" ]; then
GH_PR_NUM=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?head=living-ip:$branch&state=all" \
-H "Authorization: token $GITHUB_PAT" 2>/dev/null | \
python3 -c "import sys,json; prs=json.load(sys.stdin); print(prs[0]['number'] if prs else '')" 2>/dev/null || true)
fi
fi
if [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM WHERE number = $PR_NUM;" 2>/dev/null && \
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM" || \
log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB"
fi
else
log "WARN: Failed to auto-create PR for $branch"
fi
fi
fi
if [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM, source_channel = 'github' WHERE number = $PR_NUM;" 2>/dev/null && \
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM" || \
log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB"
fi
done
}
else
log "No new GitHub-only branches"
fi
# Step 6: Divergence alerting
# After all sync steps, check if GitHub and Forgejo main still differ.
# 2 consecutive divergent cycles (4 min) triggers a one-shot Telegram alert.
DIVERGENCE_FILE="/opt/teleo-eval/logs/.divergence-count"
git fetch forgejo main --quiet 2>/dev/null || true
git fetch origin main --quiet 2>/dev/null || true
GH_MAIN_FINAL=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FG_MAIN_FINAL=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
# ─────────────────────────────────────────────────────────────────────────────
# Step 6 split out: divergence alerting. Per-repo state file so each repo
# has its own divergence counter and alert state.
# ─────────────────────────────────────────────────────────────────────────────
check_divergence() {
local DIVERGENCE_FILE="/opt/teleo-eval/logs/.divergence-count.${REPO_TAG}"
git fetch forgejo main --quiet 2>/dev/null || true
git fetch origin main --quiet 2>/dev/null || true
local GH_MAIN_FINAL FG_MAIN_FINAL
GH_MAIN_FINAL=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FG_MAIN_FINAL=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GH_MAIN_FINAL" ] && [ -n "$FG_MAIN_FINAL" ] && [ "$GH_MAIN_FINAL" != "$FG_MAIN_FINAL" ]; then
local PREV
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" = "alerted" ]; then
log "DIVERGENCE: still diverged (already alerted)"
else
local COUNT=$((PREV + 1))
echo "$COUNT" > "$DIVERGENCE_FILE"
log "DIVERGENCE: cycle $COUNT — GitHub=$GH_MAIN_FINAL Forgejo=$FG_MAIN_FINAL"
if [ "$COUNT" -ge 2 ]; then
local BOT_TOKEN ADMIN_CHAT
BOT_TOKEN=$(cat /opt/teleo-eval/secrets/telegram-bot-token 2>/dev/null || true)
ADMIN_CHAT=$(cat /opt/teleo-eval/secrets/admin-chat-id 2>/dev/null || true)
if [ -n "$BOT_TOKEN" ] && [ -n "$ADMIN_CHAT" ]; then
local ALERT_MSG
ALERT_MSG=$(python3 -c "
if [ -n "$GH_MAIN_FINAL" ] && [ -n "$FG_MAIN_FINAL" ] && [ "$GH_MAIN_FINAL" != "$FG_MAIN_FINAL" ]; then
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" = "alerted" ]; then
log "DIVERGENCE: still diverged (already alerted)"
else
COUNT=$((PREV + 1))
echo "$COUNT" > "$DIVERGENCE_FILE"
log "DIVERGENCE: cycle $COUNT — GitHub=$GH_MAIN_FINAL Forgejo=$FG_MAIN_FINAL"
if [ "$COUNT" -ge 2 ]; then
BOT_TOKEN=$(cat /opt/teleo-eval/secrets/telegram-bot-token 2>/dev/null || true)
ADMIN_CHAT=$(cat /opt/teleo-eval/secrets/admin-chat-id 2>/dev/null || true)
if [ -n "$BOT_TOKEN" ] && [ -n "$ADMIN_CHAT" ]; then
ALERT_MSG=$(python3 -c "
import json, sys
msg = '⚠️ Mirror divergence detected (' + sys.argv[5] + ')\\n\\n'
msg = '⚠️ Mirror divergence detected\\n\\n'
msg += f'GitHub main: {sys.argv[1][:8]}\\n'
msg += f'Forgejo main: {sys.argv[2][:8]}\\n'
msg += f'Diverged for {sys.argv[3]} consecutive cycles ({int(sys.argv[3])*2} min)\\n\\n'
msg += 'Check sync-mirror.sh logs: /opt/teleo-eval/logs/sync.log'
print(json.dumps({'chat_id': sys.argv[4], 'text': msg, 'parse_mode': 'HTML'}))
" "$GH_MAIN_FINAL" "$FG_MAIN_FINAL" "$COUNT" "$ADMIN_CHAT" "$REPO_TAG")
if curl -sf -X POST "https://api.telegram.org/bot${BOT_TOKEN}/sendMessage" \
-H "Content-Type: application/json" \
-d "$ALERT_MSG" >> "$LOG" 2>&1; then
log "DIVERGENCE: alert sent to admin"
echo "alerted" > "$DIVERGENCE_FILE"
else
log "WARN: Failed to send divergence alert (will retry next cycle)"
fi
" "$GH_MAIN_FINAL" "$FG_MAIN_FINAL" "$COUNT" "$ADMIN_CHAT")
if curl -sf -X POST "https://api.telegram.org/bot${BOT_TOKEN}/sendMessage" \
-H "Content-Type: application/json" \
-d "$ALERT_MSG" >> "$LOG" 2>&1; then
log "DIVERGENCE: alert sent to admin"
echo "alerted" > "$DIVERGENCE_FILE"
else
log "WARN: Cannot send divergence alert — missing bot token or admin chat ID"
log "WARN: Failed to send divergence alert (will retry next cycle)"
fi
else
log "WARN: Cannot send divergence alert — missing bot token or admin chat ID"
fi
fi
else
if [ -f "$DIVERGENCE_FILE" ]; then
local PREV
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" != "0" ]; then
log "DIVERGENCE: resolved — repos back in sync"
fi
rm -f "$DIVERGENCE_FILE"
fi
fi
}
# ─────────────────────────────────────────────────────────────────────────────
# Main: process each configured mirror in sequence.
# A failure on one repo doesn't block subsequent repos — sync_repo returns 0
# on most error paths to keep the loop going.
# ─────────────────────────────────────────────────────────────────────────────
REPO_TAG="main"
log "Starting sync cycle"
# Step 0: self-heal any gh-pr-* PR rows missing github_pr.
# Runs FIRST — before per-repo work (branch-mirror loop, auto-create-PR block).
# Recovers from races/transient failures in Step 4.5's one-shot link UPDATE.
# Idempotent: SELECT empty when clean, zero-cost path. Same SELECT/UPDATE
# heals historical orphans (PR 4066 picked up on first cron tick post-deploy)
# and future races on subsequent ticks. The branch name encodes the GitHub PR
# number deterministically (gh-pr-{N}/...) so no API call is required.
if [ -f "$PIPELINE_DB" ]; then
sqlite3 -separator '|' "$PIPELINE_DB" \
"SELECT number, branch FROM prs WHERE branch LIKE 'gh-pr-%' AND github_pr IS NULL;" \
2>/dev/null | while IFS='|' read -r pr_num branch; do
# Regex requires >=1 digit — empty/non-numeric branches fail to parse here,
# not just at the empty-guard below. Keeps SQL-integer-safety load-bearing
# on the regex alone. [0-9][0-9]* is the portable BRE form of [0-9]+,
# works on both GNU sed (VPS) and BSD sed (dev macs).
gh_pr_num=$(echo "$branch" | sed -n 's|^gh-pr-\([0-9][0-9]*\)/.*|\1|p')
[ -z "$gh_pr_num" ] && continue
# Both interpolated values are integer-validated upstream (pr_num from
# INTEGER `number` column, gh_pr_num from regex above). No parametric
# binding available in bash sqlite3 — safety relies on those invariants.
if sqlite3 "$PIPELINE_DB" \
"UPDATE prs SET github_pr = $gh_pr_num, source_channel = 'github' WHERE number = $pr_num;" \
2>/dev/null; then
log "self-heal: linked Forgejo PR #$pr_num -> GitHub PR #$gh_pr_num"
else
if [ -f "$DIVERGENCE_FILE" ]; then
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" != "0" ]; then
log "DIVERGENCE: resolved — repos back in sync"
fi
done
rm -f "$DIVERGENCE_FILE"
fi
fi
for entry in "${MIRROR_REPOS[@]}"; do
# Read the 4 fields. `read` splits on $IFS (whitespace) by default.
read -r forgejo_repo github_repo bare_path mode <<< "$entry"
sync_repo "$forgejo_repo" "$github_repo" "$bare_path" "$mode"
done
REPO_TAG="main"
log "Sync cycle complete"
log "Sync complete"

View file

@ -9,16 +9,6 @@ DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
_cache = {"data": None, "ts": 0}
CACHE_TTL = 60 # 1 minute — activity should feel fresh
# commit_types we surface in the activity feed. `pipeline` is system
# maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden.
_FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave")
# Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're
# inbox archive filenames, not claim slugs. Used as a fallback signal when
# branch/description heuristics miss (e.g. populated descriptions that
# happen to be source titles, not claim insights).
_SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$")
def _get_conn():
conn = sqlite3.connect(DB_PATH)
@ -27,52 +17,19 @@ def _get_conn():
return conn
def _is_source_slug(slug):
return bool(slug and _SOURCE_SLUG_PATTERN.match(slug))
def _classify_event(branch, description, commit_type, candidate_slug=None):
"""Return one of: create | enrich | challenge | source | None.
Source-archive PRs are extract/* branches that filed a source into
inbox/archive/ but didn't produce a claim. Two signals classify them
as 'source' (defense in depth):
1. extract/* branch with empty description (no claim title produced)
2. candidate_slug matches YYYY-MM-DD-...-HASH4 (inbox filename pattern)
"""
commit_type_l = (commit_type or "").lower()
branch = branch or ""
description_lower = (description or "").lower()
has_desc = bool(description and description.strip())
if commit_type_l not in _FEED_COMMIT_TYPES:
def _classify_event(branch, description, commit_type):
if commit_type != "knowledge":
return None
# Explicit challenge signals win first.
if (commit_type_l == "challenge"
or branch.startswith("challenge/")
or "challenged_by" in description_lower):
return "challenge"
# Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich.
if (commit_type_l == "enrich"
or branch.startswith("enrich/")
or branch.startswith("reweave/")):
if branch and branch.startswith("extract/"):
return "create"
if branch and branch.startswith("reweave/"):
return "enrich"
if branch and branch.startswith("challenge/"):
return "challenge"
if description and "challenged_by" in description.lower():
return "challenge"
if branch and branch.startswith("enrich/"):
return "enrich"
# Source-only: extract/* with no claim description means inbox archive
# landed but no domain claim was written.
if branch.startswith("extract/") and not has_desc:
return "source"
# Belt-and-suspenders: if the slug we'd surface to the frontend looks
# like an inbox archive filename (date-prefix-hash), treat as source
# regardless of branch/commit_type/description state. Catches cases
# where description leaked but is just a source title, not a claim.
if _is_source_slug(candidate_slug):
return "source"
# Everything else with a description is a new claim.
return "create"
@ -102,7 +59,7 @@ def _extract_claim_slugs(description, branch=None):
if branch:
parts = branch.split("/", 1)
if len(parts) > 1:
return [parts[1]]
return [parts[1][:120]]
return []
titles = [t.strip() for t in description.split("|") if t.strip()]
slugs = []
@ -111,7 +68,7 @@ def _extract_claim_slugs(description, branch=None):
slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug)
slug = slug.replace(" ", "-").strip("-")
if len(slug) > 10:
slugs.append(slug)
slugs.append(slug[:120])
return slugs
@ -124,60 +81,33 @@ def _hot_score(challenge_count, enrich_count, signal_count, hours_since):
def _build_events():
conn = _get_conn()
try:
placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES))
rows = conn.execute(f"""
rows = conn.execute("""
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
p.merged_at, p.description, p.commit_type, p.cost_usd,
p.source_channel, p.source_path
p.source_channel
FROM prs p
WHERE p.status = 'merged'
AND p.commit_type IN ({placeholders})
AND p.commit_type = 'knowledge'
AND p.merged_at IS NOT NULL
ORDER BY p.merged_at DESC
LIMIT 2000
""", _FEED_COMMIT_TYPES).fetchall()
""").fetchall()
events = []
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
for row in rows:
slugs = _extract_claim_slugs(row["description"], row["branch"])
candidate_slug = slugs[0] if slugs else ""
event_type = _classify_event(
row["branch"], row["description"], row["commit_type"],
candidate_slug=candidate_slug,
)
event_type = _classify_event(row["branch"], row["description"], row["commit_type"])
if not event_type:
continue
contributor = _normalize_contributor(row["submitted_by"], row["agent"])
slugs = _extract_claim_slugs(row["description"], row["branch"])
merged_at = row["merged_at"] or ""
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40, "source": 0.15}
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40}
ci_earned = ci_map.get(event_type, 0)
# Source events never carry a claim_slug — no claim was written —
# so the frontend can't produce a 404-ing claim link.
if event_type == "source":
summary_text = _summary_from_branch(row["branch"])
source_slug = (
_summary_from_branch(row["branch"]).lower().replace(" ", "-")
or row["branch"]
)
events.append({
"type": "source",
"claim_slug": "",
"source_slug": source_slug,
"domain": row["domain"] or "unknown",
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"source_channel": row["source_channel"] or "unknown",
})
continue
for slug in slugs:
if slug not in claim_activity:
claim_activity[slug] = {
@ -234,8 +164,8 @@ def _sort_events(events, claim_activity, sort_mode, now_ts):
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
events.sort(key=hot_key, reverse=True)
elif sort_mode == "important":
type_rank = {"challenge": 0, "enrich": 1, "create": 2, "source": 3}
events.sort(key=lambda e: (type_rank.get(e["type"], 4), -len(e["summary"])))
type_rank = {"challenge": 0, "enrich": 1, "create": 2}
events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"])))
return events
@ -245,8 +175,6 @@ async def handle_activity_feed(request):
sort_mode = "recent"
domain = request.query.get("domain", "")
contributor = request.query.get("contributor", "")
type_param = request.query.get("type", "")
type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None
try:
limit = min(int(request.query.get("limit", "20")), 100)
except ValueError:
@ -268,8 +196,6 @@ async def handle_activity_feed(request):
filtered = [e for e in filtered if e["domain"] == domain]
if contributor:
filtered = [e for e in filtered if e["contributor"] == contributor]
if type_filter:
filtered = [e for e in filtered if e["type"] in type_filter]
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
total = len(sorted_events)

View file

@ -25,7 +25,6 @@ from aiohttp import web
from review_queue_routes import register_review_queue_routes
from daily_digest_routes import register_daily_digest_routes
from response_audit_routes import register_response_audit_routes, RESPONSE_AUDIT_PUBLIC_PATHS
from leaderboard_routes import register_leaderboard_routes, LEADERBOARD_PUBLIC_PATHS
from lib.search import search as kb_search, embed_query, search_qdrant
logger = logging.getLogger("argus")
@ -43,7 +42,7 @@ API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secret
# Endpoints that skip auth (dashboard is public for now, can lock later)
_PUBLIC_PATHS = frozenset({"/", "/prs", "/ops", "/health", "/agents", "/epistemic", "/legacy", "/audit", "/api/metrics", "/api/snapshots", "/api/vital-signs",
"/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest", "/api/search"})
"/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest"})
def _get_db() -> sqlite3.Connection:
@ -509,7 +508,7 @@ def _load_secret(path: Path) -> str | None:
@web.middleware
async def auth_middleware(request, handler):
"""API key check. Public paths skip auth. Protected paths require X-Api-Key header."""
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path in LEADERBOARD_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
return await handler(request)
expected = request.app.get("api_key")
if not expected:
@ -664,115 +663,38 @@ async def handle_api_domains(request):
return web.json_response({"domains": breakdown})
def _qdrant_hits_to_results(hits, include_expanded=False):
"""Shape raw Qdrant hits into Ship's chat-API contract."""
results = []
for h in hits:
payload = h.get("payload", {}) or {}
path = payload.get("claim_path", "") or ""
slug = path.rsplit("/", 1)[-1]
if slug.endswith(".md"):
slug = slug[:-3]
results.append({
"slug": slug,
"path": path,
"title": payload.get("claim_title", ""),
"domain": payload.get("domain"),
"confidence": payload.get("confidence"),
"score": round(float(h.get("score", 0.0) or 0.0), 4),
"body_excerpt": payload.get("snippet", "") or "",
})
return results
async def handle_api_search(request):
"""Semantic search over claims via Qdrant.
"""GET /api/search — semantic search over claims via Qdrant + graph expansion.
POST contract (Ship's chat API):
body: {"query": str, "limit": int, "min_score": float?, "domain": str?, "confidence": str?, "exclude": [str]?}
response: {"query": str, "results": [{"slug","path","title","domain","confidence","score","body_excerpt"}], "total": int}
GET (legacy + hackathon debug):
q: search query (required)
limit, domain, confidence, exclude, expand
min_score: if set, bypasses two-pass lib threshold (default lib behavior otherwise)
Query params:
q: search query (required)
domain: filter by domain (optional)
confidence: filter by confidence level (optional)
limit: max results, default 10 (optional)
exclude: comma-separated claim paths to exclude (optional)
expand: enable graph expansion, default true (optional)
"""
if request.method == "POST":
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON body"}, status=400)
query = (body.get("query") or "").strip()
if not query:
return web.json_response({"error": "query required"}, status=400)
try:
limit = min(int(body.get("limit") or 5), 50)
except (TypeError, ValueError):
return web.json_response({"error": "limit must be int"}, status=400)
try:
min_score = float(body.get("min_score") if body.get("min_score") is not None else 0.25)
except (TypeError, ValueError):
return web.json_response({"error": "min_score must be float"}, status=400)
domain = body.get("domain")
confidence = body.get("confidence")
exclude = body.get("exclude") or None
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
results = _qdrant_hits_to_results(hits)
return web.json_response({"query": query, "results": results, "total": len(results)})
# GET path
query = request.query.get("q", "").strip()
if not query:
return web.json_response({"error": "q parameter required"}, status=400)
domain = request.query.get("domain")
confidence = request.query.get("confidence")
try:
limit = min(int(request.query.get("limit", "10")), 50)
except ValueError:
return web.json_response({"error": "limit must be int"}, status=400)
limit = min(int(request.query.get("limit", "10")), 50)
exclude_raw = request.query.get("exclude", "")
exclude = [p.strip() for p in exclude_raw.split(",") if p.strip()] if exclude_raw else None
expand = request.query.get("expand", "true").lower() != "false"
min_score_raw = request.query.get("min_score")
if min_score_raw is not None:
try:
min_score = float(min_score_raw)
except ValueError:
return web.json_response({"error": "min_score must be float"}, status=400)
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
direct = _qdrant_hits_to_results(hits)
return web.json_response({
"query": query,
"direct_results": direct,
"expanded_results": [],
"total": len(direct),
})
# Default GET: Layer 1 + Layer 2 via lib
# Use shared search library (Layer 1 + Layer 2)
result = kb_search(query, expand=expand,
domain=domain, confidence=confidence, exclude=exclude)
if "error" in result:
error = result["error"]
if error == "embedding_failed":
return web.json_response({"error": "embedding failed"}, status=502)
return web.json_response({"error": error}, status=500)
return web.json_response(result)
@ -2346,7 +2268,6 @@ def create_app() -> web.Application:
app.router.add_get("/api/contributors", handle_api_contributors)
app.router.add_get("/api/domains", handle_api_domains)
app.router.add_get("/api/search", handle_api_search)
app.router.add_post("/api/search", handle_api_search)
app.router.add_get("/api/audit", handle_api_audit)
app.router.add_get("/audit", handle_audit_page)
app.router.add_post("/api/usage", handle_api_usage)
@ -2362,8 +2283,6 @@ def create_app() -> web.Application:
# Response audit - cost tracking + reasoning traces
app["db_path"] = str(DB_PATH)
register_response_audit_routes(app)
# Event-sourced leaderboard (Phase B — reads contribution_events directly)
register_leaderboard_routes(app)
# Timeline activity feed (per-PR + audit_log events for dashboard v2)
from activity_endpoint import handle_activity
app.router.add_get("/api/activity", handle_activity)

View file

@ -1,166 +0,0 @@
"""Leaderboard endpoint reading from event-sourced contribution_events.
Owner: Argus
Source of truth: pipeline.db contribution_events (Epimetheus, schema v25)
Reads contribution_events GROUP BY handle, computes CI as SUM(weight),
joins contributors for kind, returns sorted leaderboard with role breakdown.
Roles + weights (Phase A):
author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05
Endpoints:
GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100
"""
import logging
import re
import sqlite3
from aiohttp import web
logger = logging.getLogger("argus.leaderboard_routes")
ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator")
KIND_VALUES = ("person", "agent", "org", "all")
# Public path set so auth middleware lets it through
LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"})
def _conn(app):
"""Read-only connection to pipeline.db."""
db_path = app["db_path"]
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
def _parse_window(raw):
"""Parse window param. Returns (sql_clause, params_tuple, label).
Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours).
Caps N at 365d / 8760h to prevent abuse.
"""
if not raw or raw == "all_time":
return ("", (), "all_time")
m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower())
if not m:
return ("", (), "all_time")
n = int(m.group(1))
unit = m.group(2)
# Note: WHERE clause is composed via " AND ".join(...) — do NOT prefix with "AND ".
if unit == "d":
n = min(n, 365)
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d")
n = min(n, 8760)
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h")
async def handle_leaderboard(request):
"""GET /api/leaderboard.
Query params:
window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h')
domain: filter by domain (optional)
kind: 'person' (default) | 'agent' | 'org' | 'all'
limit: max entries (default 100, max 500)
"""
window_clause, window_params, window_label = _parse_window(request.query.get("window"))
domain = request.query.get("domain")
kind = request.query.get("kind", "person")
if kind not in KIND_VALUES:
kind = "person"
try:
limit = min(int(request.query.get("limit", "100")), 500)
except (ValueError, TypeError):
limit = 100
where = ["1=1", window_clause] if window_clause else ["1=1"]
params = list(window_params)
if domain:
where.append("ce.domain = ?")
params.append(domain)
if kind != "all":
where.append("COALESCE(c.kind, 'person') = ?")
params.append(kind)
where_sql = " AND ".join([w for w in where if w])
conn = _conn(request.app)
try:
# Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp
# LEFT JOIN contributors so handles in events but not in contributors still appear
# (defaults to kind='person' via COALESCE).
rows = conn.execute(f"""
SELECT
ce.handle,
COALESCE(c.kind, 'person') AS kind,
ROUND(SUM(ce.weight), 4) AS ci,
COUNT(*) AS events_count,
MIN(ce.timestamp) AS first_contribution,
MAX(ce.timestamp) AS last_contribution,
SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author,
SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger,
SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer,
SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator,
SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator,
COUNT(DISTINCT ce.domain) AS domain_count,
COUNT(DISTINCT ce.pr_number) AS pr_count
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
GROUP BY ce.handle, COALESCE(c.kind, 'person')
ORDER BY ci DESC, last_contribution DESC
LIMIT ?
""", (*params, limit + 1)).fetchall() # +1 to detect overflow
has_more = len(rows) > limit
rows = rows[:limit]
# Total count of distinct handles matching filters (without limit)
total_row = conn.execute(f"""
SELECT COUNT(DISTINCT ce.handle) AS total
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
""", params).fetchone()
total = total_row["total"] if total_row else 0
leaderboard = []
for r in rows:
leaderboard.append({
"handle": r["handle"],
"kind": r["kind"],
"ci": r["ci"],
"ci_breakdown": {
"author": round(r["ci_author"] or 0, 4),
"challenger": round(r["ci_challenger"] or 0, 4),
"synthesizer": round(r["ci_synthesizer"] or 0, 4),
"originator": round(r["ci_originator"] or 0, 4),
"evaluator": round(r["ci_evaluator"] or 0, 4),
},
"events_count": r["events_count"],
"domain_count": r["domain_count"],
"pr_count": r["pr_count"],
"first_contribution": r["first_contribution"],
"last_contribution": r["last_contribution"],
})
return web.json_response({
"window": window_label,
"domain": domain,
"kind_filter": kind,
"total": total,
"shown": len(leaderboard),
"has_more": has_more,
"source": "contribution_events", # explicit so consumers know the data origin
"leaderboard": leaderboard,
})
finally:
conn.close()
def register_leaderboard_routes(app: web.Application):
"""Register /api/leaderboard. Requires app['db_path'] to be set."""
app.router.add_get("/api/leaderboard", handle_leaderboard)

View file

@ -15,7 +15,6 @@ Epimetheus owns this module. Leo reviews changes.
import logging
import re
import sqlite3
from pathlib import Path
logger = logging.getLogger("pipeline.attribution")
@ -82,7 +81,6 @@ def normalize_handle(handle: str, conn=None) -> str:
if not handle:
return ""
h = handle.strip().lower().lstrip("@")
h = re.sub(r"\s*\(self-directed\)\s*$", "", h)
if conn is None:
return h
try:
@ -110,36 +108,6 @@ def classify_kind(handle: str) -> str:
return "person"
def is_publisher_handle(handle: str, conn) -> int | None:
"""Return publisher.id if the handle exists as a publisher name, else None.
Schema v26 split orgs/citations into the publishers table. Writer code
(upsert_contributor, insert_contribution_event) calls this to gate creating
contributor rows or events for handles that belong to publishers.
Without this gate, every merged PR with `sourcer: cnbc` (for example) would
re-create CNBC as a contributor and undo the v26 classifier cleanup.
Falls back gracefully on pre-v26 DBs: returns None if publishers table
doesn't exist yet (writer behaves like before, no regression).
"""
if not handle or conn is None:
return None
h = handle.strip().lower().lstrip("@")
try:
row = conn.execute(
"SELECT id FROM publishers WHERE name = ?", (h,),
).fetchone()
if row:
return row["id"] if hasattr(row, "keys") else row[0]
except sqlite3.OperationalError:
# Pre-v26 DB: publishers table doesn't exist yet. Fall through to None
# so writer behaves as before. Any other exception class is real signal
# (programming error, lock contention, corruption) — let it propagate.
logger.debug("is_publisher_handle: publishers table not present (pre-v26?)", exc_info=True)
return None
# ─── Parse attribution from claim content ──────────────────────────────────

View file

@ -84,14 +84,6 @@ MAX_EXTRACT_WORKERS = int(os.environ.get("MAX_EXTRACT_WORKERS", "5"))
MAX_EVAL_WORKERS = int(os.environ.get("MAX_EVAL_WORKERS", "7"))
MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain
# --- External GitHub PR merge strategy ---
# When True, gh-pr-N/* branches merge with --no-ff (preserves contributor SHA in
# main's history → GitHub recognizes "merged" badge). When False, fall back to
# cherry-pick (the default for all other branches). Default True; flip to False
# as an emergency backout if the no-ff path destabilizes merge throughput.
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
EXTERNAL_PR_NO_FF_MERGE = True
# --- Timeouts (seconds) ---
EXTRACT_TIMEOUT = 600 # 10 min
EVAL_TIMEOUT = 120 # 2 min — routine Sonnet/Gemini Flash calls (was 600, caused 10-min stalls)

View file

@ -14,7 +14,7 @@ import logging
import re
from . import config, db
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, is_publisher_handle, normalize_handle
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, normalize_handle
from .forgejo import get_pr_diff
logger = logging.getLogger("pipeline.contributor")
@ -62,12 +62,6 @@ def insert_contribution_event(
canonical = normalize_handle(handle, conn=conn)
if not canonical:
return False
# Schema v26 gate: handles classified as publishers (CNBC, SpaceNews, arxiv,
# etc.) are provenance metadata, not contributors. Don't credit them. Without
# this gate every merge re-creates org events and undoes the v26 cleanup.
if is_publisher_handle(canonical, conn) is not None:
logger.debug("insert_contribution_event: %r is a publisher — skipping event", canonical)
return False
kind = classify_kind(canonical)
try:
cur = conn.execute(
@ -425,21 +419,6 @@ def upsert_contributor(
logger.warning("Unknown contributor role: %s", role)
return
# Schema v26 gate: orgs/citations live in publishers table, not contributors.
# Skip without writing so the v26 classifier cleanup isn't undone by every
# merge that has `sourcer: cnbc` (or similar) in claim frontmatter.
#
# Note: bare normalization (lower + lstrip @), no alias resolution. This is
# consistent with the existing `SELECT handle FROM contributors WHERE handle = ?`
# below — both look up by canonical-form-as-stored. Today's classifier produces
# one publisher row per canonical handle, so bare lookup hits. Branch 3 will
# normalize alias→canonical at writer entry points (extract.py, post_extract);
# at that point this gate auto-tightens because callers pass canonical handles.
canonical_handle = handle.strip().lower().lstrip("@") if handle else ""
if canonical_handle and is_publisher_handle(canonical_handle, conn) is not None:
logger.debug("upsert_contributor: %r is a publisher — skipping contributor row", canonical_handle)
return
existing = conn.execute(
"SELECT handle FROM contributors WHERE handle = ?", (handle,)
).fetchone()

View file

@ -9,7 +9,7 @@ from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 26
SCHEMA_VERSION = 25
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -35,15 +35,6 @@ CREATE TABLE IF NOT EXISTS sources (
feedback TEXT,
-- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0,
-- v26: provenance publisher (news org / venue) + content author.
-- publisher_id references publishers(id) when source is from a known org.
-- original_author_handle references contributors(handle) when author is in our system.
-- original_author is free-text fallback ("Kim et al.", "Robin Hanson") not credit-bearing.
publisher_id INTEGER REFERENCES publishers(id),
content_type TEXT,
-- article | paper | tweet | conversation | self_authored | webpage | podcast
original_author TEXT,
original_author_handle TEXT REFERENCES contributors(handle),
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
@ -216,33 +207,6 @@ CREATE TABLE IF NOT EXISTS contributor_aliases (
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
-- Publishers: news orgs, academic venues, social platforms. NOT contributors these
-- provide metadata/provenance for sources, never earn leaderboard credit. Separating
-- these from contributors prevents CNBC/SpaceNews from dominating the leaderboard.
-- (Apr 24 Cory directive: "only credit the original source if its on X or tg")
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
-- Multi-platform identity: one contributor, many handles. Enables the leaderboard to
-- unify @thesensatore (X) + thesensatore (TG) + thesensatore@github into one person.
-- Writers check this table after resolving aliases to find canonical contributor handle.
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
"""
@ -800,51 +764,6 @@ def migrate(conn: sqlite3.Connection):
conn.commit()
logger.info("Migration v25: patched kind='agent' for pipeline handle")
if current < 26:
# Add publishers + contributor_identities. Non-breaking — new tables only.
# No existing data moved. Classification into publishers happens via a
# separate script (scripts/reclassify-contributors.py) with Cory-reviewed
# seed list. CHECK constraint on contributors.kind deferred to v27 after
# classification completes. (Apr 24 Cory directive: "fix schema, don't
# filter output" — separate contributors from publishers at the data layer.)
conn.executescript("""
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
""")
# Extend sources with provenance columns. ALTER TABLE ADD COLUMN is
# idempotent-safe via try/except because SQLite doesn't support IF NOT EXISTS
# on column adds.
for col_sql in (
"ALTER TABLE sources ADD COLUMN publisher_id INTEGER REFERENCES publishers(id)",
"ALTER TABLE sources ADD COLUMN content_type TEXT",
"ALTER TABLE sources ADD COLUMN original_author TEXT",
"ALTER TABLE sources ADD COLUMN original_author_handle TEXT REFERENCES contributors(handle)",
):
try:
conn.execute(col_sql)
except sqlite3.OperationalError as e:
if "duplicate column" not in str(e).lower():
raise
conn.commit()
logger.info("Migration v26: added publishers + contributor_identities tables + sources provenance columns")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",

View file

@ -429,171 +429,6 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
await _git("branch", "-D", clean_branch)
_GH_PR_BRANCH_RE = re.compile(r"^gh-pr-(\d+)/(.+)$")
async def _merge_no_ff_external(branch: str) -> tuple[bool, str]:
"""Merge an external GitHub fork PR with --no-ff so contributor SHA lands in main.
Why this differs from _cherry_pick_onto_main:
- Cherry-pick rewrites the contributor's commit SHA → GitHub's "is PR head SHA
an ancestor of main?" check returns false → "merged" badge never fires.
- --no-ff preserves the contributor's commit SHA as a parent of the merge
commit. After ff-push to main (the existing dispatch step), GitHub sees
the SHA in ancestry and marks the PR merged.
Mechanics:
1. Fetch origin/main + origin/{branch}
2. Worktree on local branch _merged-{slug} from origin/main
3. git merge --no-ff origin/{branch} with verbose message:
"Merge external GitHub PR #{N}: {branch_slug}"
4. Push merge commit to origin/_merged/{branch} (synthetic audit ref)
5. ff-push merge_sha origin/main directly (function owns the push, NOT
dispatch see sentinel return below)
The merge commit M has parents [main_sha, branch_sha]. M is a fast-forward
descendant of main_sha (via first-parent chain), so the push to main
works without --force.
Synthetic branch (Ship review Apr 28): we deliberately do NOT force-push
the contributor's gh-pr-N/* branch. Force-pushing it would rewrite the
branch tip with a merge commit the contributor didn't author, showing as
a confusing bot force-push in Forgejo's PR UI. The synthetic _merged/*
audit ref lets us track the merge commit without touching the contributor's
branch. Mirrors the _clean/* synthetic branch pattern in cherry-pick.
Sentinel return: function pushes merge_sha main itself (dispatch's ff-push
can't, since origin/{branch} is unchanged and not a descendant of main).
Returns a "merged --no-ff" sentinel string that dispatch detects to skip
its ff-push step and route directly to PR-close + mark_merged + audit.
The full 40-char merge SHA is in the return string for dispatch to extract.
Conflict handling: same auto-resolve pattern as cherry-pick entity-only
conflicts take main's version (--ours = current worktree HEAD = main),
other conflicts abort and return False with detail.
Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
"""
m = _GH_PR_BRANCH_RE.match(branch)
if not m:
return False, f"branch {branch} doesn't match gh-pr-N/* format"
gh_pr_num = m.group(1)
branch_slug = m.group(2)
slug = branch.replace("/", "-")
worktree_path = f"/tmp/teleo-merge-{slug}"
local_branch = f"_merged-{slug}" # local working branch in worktree
audit_ref = f"_merged/{branch}" # remote synthetic ref (preserves hierarchy)
# Fetch latest state — separate calls (long branch names break combined refspec)
rc, out = await _git("fetch", "origin", "main", timeout=15)
if rc != 0:
return False, f"fetch main failed: {out}"
rc, out = await _git("fetch", "origin", branch, timeout=15)
if rc != 0:
return False, f"fetch branch failed: {out}"
# Up-to-date check (mirrors cherry-pick path semantics)
rc, merge_base = await _git("merge-base", "origin/main", f"origin/{branch}")
rc2, main_sha = await _git("rev-parse", "origin/main")
if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip():
rc_diff, diff_out = await _git(
"diff", "--stat", f"origin/main..origin/{branch}", timeout=10,
)
if rc_diff != 0 or not diff_out.strip():
return True, "already up to date"
logger.info("External PR branch %s is descendant of main but has new content — proceeding", branch)
async with _bare_repo_lock:
# Clean up any stale local branch from a prior failed run
await _git("branch", "-D", local_branch)
rc, out = await _git("worktree", "add", "-b", local_branch, worktree_path, "origin/main")
if rc != 0:
return False, f"worktree add failed: {out}"
try:
merge_msg = f"Merge external GitHub PR #{gh_pr_num}: {branch_slug}"
rc, out = await _git(
"merge", "--no-ff", f"origin/{branch}",
"-m", merge_msg,
cwd=worktree_path, timeout=60,
)
if rc != 0:
# Identify conflicts
rc_ls, conflicting = await _git(
"diff", "--name-only", "--diff-filter=U", cwd=worktree_path,
)
conflict_files = [
f.strip() for f in conflicting.split("\n") if f.strip()
] if rc_ls == 0 else []
if conflict_files and all(f.startswith("entities/") for f in conflict_files):
# Entity-only conflicts: take main's version (entities are recoverable)
# In merge: --ours = branch we're ON (worktree HEAD = main)
# --theirs = branch merging in (origin/{branch})
for cf in conflict_files:
await _git("checkout", "--ours", cf, cwd=worktree_path)
await _git("add", cf, cwd=worktree_path)
# Complete the merge using the prepared MERGE_MSG (no editor)
rc_cont, cont_out = await _git(
"-c", "core.editor=true",
"commit", "--no-edit",
cwd=worktree_path, timeout=60,
)
if rc_cont != 0:
await _git("merge", "--abort", cwd=worktree_path)
return False, f"merge entity resolution failed for PR #{gh_pr_num}: {cont_out}"
logger.info(
"External PR #%s merge: entity conflict auto-resolved (dropped %s)",
gh_pr_num, ", ".join(sorted(conflict_files)),
)
else:
conflict_detail = ", ".join(conflict_files) if conflict_files else out[:200]
await _git("merge", "--abort", cwd=worktree_path)
return False, f"merge conflict on PR #{gh_pr_num}: {conflict_detail}"
# Capture the merge commit SHA before any pushes
rc, merge_sha = await _git("rev-parse", "HEAD", cwd=worktree_path)
if rc != 0:
return False, f"rev-parse merge HEAD failed: {merge_sha}"
merge_sha = merge_sha.strip().split("\n")[0]
# Push to synthetic audit ref _merged/{branch} (does not touch contributor's
# gh-pr-N/* branch). Plain --force: the audit ref is bot-owned and per-PR;
# if a prior aborted attempt left a stale ref, overwriting it is the
# intended behavior, and there's no concurrent writer to lease against.
rc, out = await _git(
"push", "--force", "origin", f"HEAD:refs/heads/{audit_ref}",
cwd=worktree_path, timeout=30,
)
if rc != 0:
return False, f"push to audit ref {audit_ref} failed: {out}"
# ff-push the merge commit to main. This is a true fast-forward (M is a
# descendant of origin/main via its first parent), so no --force needed.
# Forgejo's branch protection allows ff-push to main from authorized users.
rc, out = await _git(
"push", "origin", f"{merge_sha}:main",
cwd=worktree_path, timeout=30,
)
if rc != 0:
# Roll back audit ref if main push failed — keeps state consistent.
await _git("push", "--delete", "origin", f"refs/heads/{audit_ref}",
cwd=worktree_path, timeout=15)
return False, f"ff-push to main failed: {out}"
# Sentinel return: "merged --no-ff" prefix triggers dispatch's external-PR
# close path (skips ff-push, does PR-close + mark_merged + audit).
# Full 40-char merge SHA in the message so dispatch can parse it for audit.
return True, f"merged --no-ff (external PR #{gh_pr_num}, M={merge_sha}, audit_ref={audit_ref})"
finally:
async with _bare_repo_lock:
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", local_branch)
from .frontmatter import (
REWEAVE_EDGE_FIELDS,
parse_yaml_frontmatter,
@ -898,12 +733,6 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
# (Ganymede: manifest approach, Theseus: superset assertion + order-preserving dedup)
if branch.startswith("reweave/"):
merge_fn = _merge_reweave_pr(branch)
elif branch.startswith("gh-pr-") and config.EXTERNAL_PR_NO_FF_MERGE:
# External GitHub fork PRs: --no-ff merge so contributor SHA lands
# in main's history → GitHub recognizes "merged" badge.
# Backout via config.EXTERNAL_PR_NO_FF_MERGE = False (falls back to cherry-pick).
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
merge_fn = _merge_no_ff_external(branch)
else:
# Extraction commits ADD new files — cherry-pick applies cleanly.
merge_fn = _cherry_pick_onto_main(branch)
@ -957,58 +786,6 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
succeeded += 1
continue
# External GitHub PR (gh-pr-*): _merge_no_ff_external already pushed
# the merge commit to origin/main + the synthetic _merged/{branch}
# audit ref. Skip dispatch's ff-push (would fail — origin/{branch} is
# the contributor's untouched branch, not a descendant of main).
# Just close PR + mark_merged + audit, parsing merge SHA from sentinel.
if pick_msg.startswith("merged --no-ff"):
m = re.search(r"M=([a-f0-9]{40})", pick_msg)
merge_sha = m.group(1) if m else None
m_ref = re.search(r"audit_ref=(\S+?)\)", pick_msg)
audit_ref = m_ref.group(1) if m_ref else None
m_pr = re.search(r"external PR #(\d+)", pick_msg)
gh_pr_num = m_pr.group(1) if m_pr else None
# Surface drift between dispatch and _merge_no_ff_external if the
# success-message contract changes. Merge already succeeded; this
# is signal-only, not a gate on the close path.
if not (m and m_ref and m_pr):
logger.warning(
"PR #%d sentinel parse incomplete: M=%s, audit_ref=%s, gh_pr=%s, msg=%r",
pr_num, bool(m), bool(m_ref), bool(m_pr), pick_msg,
)
leo_token = get_agent_token("leo")
comment_body = (
f"Merged via --no-ff into main.\n"
f"Merge commit: `{merge_sha}`\n"
f"Audit ref: `{audit_ref}`\n"
f"Branch: `{branch}` (preserved unchanged)"
)
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
{"body": comment_body})
result = await forgejo_api("PATCH", repo_path(f"pulls/{pr_num}"),
{"state": "closed"}, token=leo_token)
if result is None:
logger.error("PR #%d: Forgejo close failed (no-ff path), skipping DB update", pr_num)
failed += 1
continue
mark_merged(conn, pr_num)
db.audit(conn, "merge", "merged", json.dumps({
"pr": pr_num, "branch": branch, "method": "no-ff",
"merge_commit_sha": merge_sha,
"audit_ref": audit_ref,
"github_pr": gh_pr_num,
}))
# NOTE: do NOT _delete_remote_branch(branch) here. The contributor's
# gh-pr-N/* branch is the mirror of their fork PR head — leaving it
# in place lets sync-mirror keep the GitHub PR <-> Forgejo PR link
# observable. The synthetic _merged/{branch} ref carries the merge.
logger.info("PR #%d merged via --no-ff (M=%s)", pr_num,
merge_sha[:8] if merge_sha else "?")
succeeded += 1
continue
# Local ff-push: cherry-picked branch is a descendant of origin/main.
# Regular push = fast-forward. Non-ff rejected by default (same safety).
# --force-with-lease removed: Forgejo categorically blocks it on protected branches.

View file

@ -267,7 +267,6 @@ format: tweet | thread
status: unprocessed
priority: high | medium | low
tags: [topic1, topic2]
intake_tier: research-task
---
## Content

View file

@ -1,280 +0,0 @@
#!/usr/bin/env python3
"""Backfill: re-attribute research-session-derived PRs from m3taversal to agent.
Problem: research-session.sh used to write source frontmatter without
`proposed_by` / `intake_tier`, so extract.py's contributor-classification
fallback set `prs.submitted_by = '@m3taversal'`, which propagated into
`contribution_events` as a `handle='m3taversal', role='author'` row per
research-derived claim. Result: agent research credited to the human.
Forward fix is a frontmatter-template patch to research-session.sh.
This script corrects historical records.
Identification:
Research-session source archives are committed to teleo-codex with a
message matching `^<agent>: research session YYYY-MM-DD `. The diff
for that commit lists `inbox/queue/*.md` files the agent created. Any
PR whose `source_path` matches one of those filenames is research-derived.
Touch list (per matched PR):
1. UPDATE prs SET submitted_by = '<agent> (self-directed)'
2. DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author' AND pr_number=?
3. INSERT OR IGNORE INTO contribution_events with handle=<agent>,
kind='agent', role='author', weight=0.30, original timestamp/domain/channel.
Defaults to --dry-run. Pass --apply to commit changes.
Usage:
python3 backfill-research-session-attribution.py --dry-run --days 30
python3 backfill-research-session-attribution.py --apply --days 30
"""
import argparse
import logging
import os
import re
import sqlite3
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("backfill-research-attr")
DEFAULT_REPO = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
DEFAULT_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
KNOWN_AGENTS = frozenset({"rio", "leo", "theseus", "vida", "clay", "astra"})
COMMIT_HEADER_RE = re.compile(r"^([a-z]+):\s+research session\s+\d{4}-\d{2}-\d{2}\s+—")
AUTHOR_WEIGHT = 0.30
def git(repo: Path, *args: str) -> str:
"""Run a git command in repo, return stdout. Raises on non-zero."""
result = subprocess.run(
["git", "-C", str(repo), *args],
capture_output=True, text=True, check=True,
)
return result.stdout
def discover_research_session_archives(repo: Path, days: int) -> dict[str, str]:
"""Return {source_filename_basename: agent_handle} for last N days.
Walks teleo-codex `git log --since`, filters to research-session commits,
parses agent from message header, lists inbox/queue/*.md files added in
that commit's diff. Maps the basename (which becomes source_path on extract)
to the agent who created it.
"""
log = git(repo, "log", f"--since={days} days ago", "--pretty=%H|%s", "--no-merges")
file_to_agent: dict[str, str] = {}
commits_seen = 0
commits_matched = 0
for line in log.splitlines():
if not line or "|" not in line:
continue
commits_seen += 1
sha, _, subject = line.partition("|")
m = COMMIT_HEADER_RE.match(subject)
if not m:
continue
agent = m.group(1)
if agent not in KNOWN_AGENTS:
logger.debug("skipping commit %s — unknown agent %r", sha[:8], agent)
continue
commits_matched += 1
# List files added in this commit (inbox/queue/*.md only)
try:
added = git(repo, "diff-tree", "--no-commit-id", "--name-only", "-r",
"--diff-filter=A", sha)
except subprocess.CalledProcessError:
logger.warning("diff-tree failed for %s", sha[:8])
continue
for f in added.splitlines():
if f.startswith("inbox/queue/") and f.endswith(".md"):
basename = Path(f).name
if basename in file_to_agent and file_to_agent[basename] != agent:
logger.warning(
"filename collision: %s — was %s, now %s (keeping first)",
basename, file_to_agent[basename], agent,
)
continue
file_to_agent.setdefault(basename, agent)
logger.info(
"scanned %d commits, %d research-session matches, %d unique source files",
commits_seen, commits_matched, len(file_to_agent),
)
return file_to_agent
def find_misattributed_prs(conn: sqlite3.Connection, file_to_agent: dict[str, str], days: int):
"""Return list of (pr_number, current_submitted_by, source_path, agent, domain, channel, merged_at).
Only includes PRs:
- with source_path basename in our research-session map
- currently attributed to '@m3taversal'
- merged within the last N days (cap on temporal scope)
"""
rows = conn.execute(
"""SELECT number, submitted_by, source_path, domain, source_channel, merged_at
FROM prs
WHERE submitted_by = '@m3taversal'
AND source_path IS NOT NULL
AND status = 'merged'
AND merged_at > datetime('now', ?)""",
(f"-{days} days",),
).fetchall()
matches = []
for row in rows:
basename = Path(row["source_path"]).name
agent = file_to_agent.get(basename)
if agent:
matches.append({
"pr": row["number"],
"current_submitted_by": row["submitted_by"],
"source_path": row["source_path"],
"basename": basename,
"agent": agent,
"domain": row["domain"],
"channel": row["source_channel"],
"merged_at": row["merged_at"],
})
return matches
def existing_event_count(conn: sqlite3.Connection, pr: int, handle: str, role: str) -> int:
"""Return count of contribution_events rows matching (handle, role, pr_number, claim_path IS NULL)."""
return conn.execute(
"""SELECT COUNT(*) FROM contribution_events
WHERE handle = ? AND role = ? AND pr_number = ? AND claim_path IS NULL""",
(handle, role, pr),
).fetchone()[0]
def apply_backfill(conn: sqlite3.Connection, matches: list[dict], dry_run: bool) -> dict:
"""Apply the backfill. Returns counters."""
counters = defaultdict(int)
if not dry_run:
conn.execute("BEGIN")
try:
for m in matches:
pr = m["pr"]
agent = m["agent"]
# Pre-checks for accurate dry-run reporting
old_event_exists = existing_event_count(conn, pr, "m3taversal", "author") > 0
new_event_exists = existing_event_count(conn, pr, agent, "author") > 0
if dry_run:
logger.info(
"would update pr=%d submitted_by '%s''%s (self-directed)' "
"[m3ta_event=%s, agent_event=%s]",
pr, m["current_submitted_by"], agent,
old_event_exists, new_event_exists,
)
counters["prs"] += 1
if old_event_exists:
counters["events_to_delete"] += 1
if not new_event_exists:
counters["events_to_insert"] += 1
continue
# 1. UPDATE prs.submitted_by
conn.execute(
"UPDATE prs SET submitted_by = ? WHERE number = ?",
(f"{agent} (self-directed)", pr),
)
counters["prs"] += 1
# 2. INSERT new agent author event (idempotent via UNIQUE index)
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, 'agent', 'author', ?, ?, NULL, ?, ?, COALESCE(?, datetime('now')))""",
(agent, AUTHOR_WEIGHT, pr, m["domain"], m["channel"], m["merged_at"]),
)
if cur.rowcount > 0:
counters["events_inserted"] += 1
# 3. DELETE old m3taversal author event
cur = conn.execute(
"""DELETE FROM contribution_events
WHERE handle = 'm3taversal' AND role = 'author'
AND pr_number = ? AND claim_path IS NULL""",
(pr,),
)
if cur.rowcount > 0:
counters["events_deleted"] += 1
if not dry_run:
conn.execute("COMMIT")
except Exception:
if not dry_run:
conn.execute("ROLLBACK")
raise
return dict(counters)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--repo", type=Path, default=DEFAULT_REPO)
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)")
parser.add_argument("--limit", type=int, default=0,
help="cap PR updates (0 = no cap; useful for testing on a small slice)")
args = parser.parse_args()
dry_run = not args.apply
logger.info("repo=%s db=%s days=%d mode=%s",
args.repo, args.db, args.days, "DRY-RUN" if dry_run else "APPLY")
if not args.repo.exists():
logger.error("repo not found: %s", args.repo)
sys.exit(1)
if not args.db.exists():
logger.error("db not found: %s", args.db)
sys.exit(1)
file_to_agent = discover_research_session_archives(args.repo, args.days)
if not file_to_agent:
logger.warning("no research-session source files found in last %d days", args.days)
sys.exit(0)
# Per-agent breakdown
by_agent = defaultdict(int)
for agent in file_to_agent.values():
by_agent[agent] += 1
for agent, count in sorted(by_agent.items()):
logger.info(" research-session sources by %s: %d", agent, count)
conn = sqlite3.connect(args.db)
conn.row_factory = sqlite3.Row
matches = find_misattributed_prs(conn, file_to_agent, args.days)
logger.info("misattributed PRs found: %d", len(matches))
if args.limit and len(matches) > args.limit:
logger.info("--limit=%d — truncating from %d", args.limit, len(matches))
matches = matches[:args.limit]
if not matches:
logger.info("nothing to do")
return
# Per-agent breakdown of misattribution
miss_by_agent = defaultdict(int)
for m in matches:
miss_by_agent[m["agent"]] += 1
logger.info("misattributed PR breakdown:")
for agent, count in sorted(miss_by_agent.items()):
logger.info(" %s: %d", agent, count)
counters = apply_backfill(conn, matches, dry_run)
logger.info("RESULT (%s): %s", "DRY-RUN" if dry_run else "APPLIED", counters)
if __name__ == "__main__":
main()

View file

@ -1,426 +0,0 @@
#!/usr/bin/env python3
"""Classify `contributors` rows into {keep_person, keep_agent, move_to_publisher, delete_garbage}.
Reads current contributors table, proposes reclassification per v26 schema design:
- Real humans + Pentagon agents stay in contributors (kind='person'|'agent')
- News orgs, publications, venues move to publishers table (new v26)
- Multi-word hyphenated garbage (parsing artifacts) gets deleted
- Their contribution_events are handled per category:
* Publishers: DELETE events (orgs shouldn't have credit)
* Garbage: DELETE events (bogus data)
* Persons/agents: keep events untouched
Classification is heuristic uses explicit allowlists + regex patterns + length gates.
Ambiguous cases default to 'review_needed' (human decision).
Usage:
python3 scripts/classify-contributors.py # dry-run analysis + report
python3 scripts/classify-contributors.py --apply # write changes
python3 scripts/classify-contributors.py --show <handle> # inspect a single row
Writes to pipeline.db only. Does NOT modify claim files.
"""
import argparse
import json
import os
import re
import sqlite3
import sys
from collections import Counter
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Pentagon agents: kind='agent'. Authoritative list.
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
})
# Publisher/news-org handles seen in current contributors table.
# Grouped by kind for the publishers row. Classified by inspection.
# NOTE: This list is hand-curated — add to it as new orgs appear.
PUBLISHERS_NEWS = {
# News outlets / brands
"cnbc", "al-jazeera", "axios", "bloomberg", "reuters", "bettorsinsider",
"fortune", "techcrunch", "coindesk", "coindesk-staff", "coindesk-research",
"coindesk research", "coindesk staff",
"defense-one", "thedefensepost", "theregister", "the-intercept",
"the-meridiem", "variety", "variety-staff", "variety staff", "spacenews",
"nasaspaceflight", "thedonkey", "insidedefense", "techpolicypress",
"morganlewis", "casinoorg", "deadline", "animationmagazine",
"defensepost", "casino-org", "casino.org",
"air & space forces magazine", "ieee spectrum", "techcrunch-staff",
"blockworks", "blockworks-staff", "decrypt", "ainvest", "banking-dive", "banking dive",
"cset-georgetown", "cset georgetown",
"kff", "kff-health-news", "kff health news", "kff-health-news---cbo",
"kff-health-news-/-cbo", "kff health news / cbo", "kffhealthnews",
"bloomberg-law",
"norton-rose-fulbright", "norton rose fulbright",
"defence-post", "the-defensepost",
"wilmerhale", "mofo", "sciencedirect",
"yogonet", "csr", "aisi-uk", "aisi", "aisi_gov", "rand",
"armscontrol", "eclinmed", "solana-compass", "solana compass",
"pmc11919318", "pmc11780016",
"healthverity", "natrium", "form-energy",
"courtlistener", "curtis-schiff", "curtis-schiff-prediction-markets",
"prophetx", "techpolicypress-staff",
"npr", "venturebeat", "geekwire", "payloadspace", "the-ankler",
"theankler", "tubefilter", "emarketer", "dagster",
"numerai", # fund/project brand, not person
"psl", "multistate",
}
PUBLISHERS_ACADEMIC = {
# Academic orgs, labs, papers, journals, institutions
"arxiv", "metr", "metr_evals", "apollo-research", "apollo research", "apolloresearch",
"jacc-study-authors", "jacc-data-report-authors",
"anthropic-fellows-program", "anthropic-fellows",
"anthropic-fellows-/-alignment-science-team", "anthropic-research",
"jmir-2024", "jmir 2024",
"oettl-et-al.,-journal-of-experimental-orthopaedics",
"oettl et al., journal of experimental orthopaedics",
"jacc", "nct06548490", "pmc",
"conitzer-et-al.-(2024)", "aquino-michaels-2026", "pan-et-al.",
"pan-et-al.-'natural-language-agent-harnesses'",
"stanford", "stanford-meta-harness",
"hendershot", "annals-im",
"nellie-liang,-brookings-institution", "nellie liang, brookings institution",
"penn-state", "american-heart-association", "american heart association",
"molt_cornelius", "molt-cornelius",
# Companies / labs / brand-orgs (not specific humans)
"anthropic", "anthropicai", "openai", "nasa", "icrc", "ecri",
"epochairesearch", "metadao", "iapam", "icer",
"who", "ama", "uspstf", "unknown",
"futard.io", # protocol/platform
"oxford-martin-ai-governance-initiative",
"oxford-martin-ai-governance",
"u.s.-food-and-drug-administration",
"jitse-goutbeek,-european-policy-centre", # cited person+org string → publisher
"adepoju-et-al.", # paper citation
# Formal-citation names (Firstname-Lastname or Lastname-et-al) — classified
# as academic citations, not reachable contributors. They'd need an @ handle
# to get CI credit per Cory's growth-loop design.
"senator-elissa-slotkin",
"bostrom", "hanson", "kaufmann", "noah-smith", "doug-shapiro",
"shayon-sengupta", "shayon sengupta",
"robin-hanson", "robin hanson", "eliezer-yudkowsky",
"leopold-aschenbrenner", "aschenbrenner",
"ramstead", "larsson", "heavey",
"dan-slimmon", "van-leeuwaarden", "ward-whitt", "adams",
"tamim-ansary", "spizzirri",
"dario-amodei", # formal-citation form (real @ is @darioamodei)
"corless", "oxranga", "vlahakis",
# Brand/project/DAO tokens — not individuals
"areal-dao", "areal", "theiaresearch", "futard-io", "dhrumil",
# Classic formal-citation names — famous academics/economists cited by surname.
# Reachable via @ handle if/when they join (e.g. Ostrom has no X, Hayek deceased,
# Friston has an institutional affiliation not an @ handle we'd track).
"clayton-christensen", "hidalgo", "coase", "wiener", "juarrero",
"ostrom", "centola", "hayek", "marshall-mcluhan", "blackmore",
"knuth", "friston", "aquino-michaels", "conitzer", "bak",
}
# NOTE: pseudonymous X handles that MAY be real contributors stay in keep_person:
# karpathy, simonw, swyx, metaproph3t, metanallok, mmdhrumil, sjdedic,
# ceterispar1bus — these are real X accounts and match Cory's growth loop.
# They appear without @ prefix because extraction frontmatter didn't normalize.
# Auto-creating them as contributors tier='cited' is correct (A-path from earlier).
PUBLISHERS_SOCIAL = {
"x", "twitter", "telegram", "x.com",
}
PUBLISHERS_INTERNAL = {
"teleohumanity-manifesto", "strategy-session-journal",
"living-capital-thesis-development", "attractor-state-historical-backtesting",
"web-research-compilation", "architectural-investing",
"governance---meritocratic-voting-+-futarchy", # title artifact
"sec-interpretive-release-s7-2026-09-(march-17", # title artifact
"mindstudio", # tooling/platform, not contributor
}
# Merge into one kind→set map for classification
PUBLISHER_KIND_MAP = {}
for h in PUBLISHERS_NEWS:
PUBLISHER_KIND_MAP[h.lower()] = "news"
for h in PUBLISHERS_ACADEMIC:
PUBLISHER_KIND_MAP[h.lower()] = "academic"
for h in PUBLISHERS_SOCIAL:
PUBLISHER_KIND_MAP[h.lower()] = "social_platform"
for h in PUBLISHERS_INTERNAL:
PUBLISHER_KIND_MAP[h.lower()] = "internal"
# Garbage: handles that are clearly parse artifacts, not real names.
# Pattern: contains parens, special chars, or >50 chars.
def is_garbage(handle: str) -> bool:
h = handle.strip()
if len(h) > 50:
return True
if re.search(r"[()\[\]<>{}\/\\|@#$%^&*=?!:;\"']", h):
# But @ can appear legitimately in handles like @thesensatore — allow if @ is only prefix
if h.startswith("@") and not re.search(r"[()\[\]<>{}\/\\|#$%^&*=?!:;\"']", h):
return False
return True
# Multi-word hyphenated with very specific artifact shape: 3+ hyphens in a row or trailing noise
if "---" in h or "---meritocratic" in h or h.endswith("(march") or h.endswith("-(march"):
return True
return False
def classify(handle: str) -> tuple[str, str | None]:
"""Return (category, publisher_kind).
category {'keep_agent', 'keep_person', 'publisher', 'garbage', 'review_needed'}
publisher_kind {'news','academic','social_platform','internal', None}
"""
h = handle.strip().lower().lstrip("@")
if h in PENTAGON_AGENTS:
return ("keep_agent", None)
if h in PUBLISHER_KIND_MAP:
return ("publisher", PUBLISHER_KIND_MAP[h])
if is_garbage(handle):
return ("garbage", None)
# @-prefixed handles or short-slug real-looking names → keep as person
# (Auto-create rule from Cory: @ handles auto-join as tier='cited'.)
if handle.startswith("@"):
return ("keep_person", None)
# Plausible handles (<=39 chars, alphanum + underscore/hyphen): treat as person.
# 39-char ceiling matches GitHub's handle limit and the writer path in
# contributor.py::_HANDLE_RE, so a valid 21-39 char real handle won't fall
# through to review_needed and block --apply.
if re.match(r"^[a-z0-9][a-z0-9_-]{0,38}$", h):
return ("keep_person", None)
# Everything else: needs human review
return ("review_needed", None)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true", help="Write changes to DB")
parser.add_argument("--show", type=str, help="Inspect a single handle")
parser.add_argument("--delete-events", action="store_true",
help="DELETE contribution_events for publishers+garbage (default: keep for audit)")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Sanity: publishers table must exist (v26 migration applied)
try:
conn.execute("SELECT 1 FROM publishers LIMIT 1")
except sqlite3.OperationalError:
print("ERROR: publishers table missing. Run migration v26 first.", file=sys.stderr)
sys.exit(2)
rows = conn.execute(
"SELECT handle, kind, tier, claims_merged FROM contributors ORDER BY claims_merged DESC"
).fetchall()
if args.show:
target = args.show.strip().lower().lstrip("@")
for r in rows:
if r["handle"].lower().lstrip("@") == target:
category, pkind = classify(r["handle"])
events_count = conn.execute(
"SELECT COUNT(*) FROM contribution_events WHERE handle = ?",
(r["handle"].lower().lstrip("@"),),
).fetchone()[0]
print(f"handle: {r['handle']}")
print(f"current_kind: {r['kind']}")
print(f"current_tier: {r['tier']}")
print(f"claims_merged: {r['claims_merged']}")
print(f"events: {events_count}")
print(f"→ category: {category}")
if pkind:
print(f"→ publisher: kind={pkind}")
return
print(f"No match for '{args.show}'")
return
# Classify all
buckets: dict[str, list[dict]] = {
"keep_agent": [],
"keep_person": [],
"publisher": [],
"garbage": [],
"review_needed": [],
}
for r in rows:
category, pkind = classify(r["handle"])
buckets[category].append({
"handle": r["handle"],
"kind_now": r["kind"],
"tier": r["tier"],
"claims": r["claims_merged"] or 0,
"publisher_kind": pkind,
})
print("=== Classification summary ===")
for cat, items in buckets.items():
print(f" {cat:18s} {len(items):5d}")
print("\n=== Sample of each category ===")
for cat, items in buckets.items():
print(f"\n--- {cat} (showing up to 10) ---")
for item in items[:10]:
tag = f"{item['publisher_kind']}" if item["publisher_kind"] else ""
print(f" {item['handle']:50s} claims={item['claims']:5d}{tag}")
print("\n=== Full review_needed list ===")
for item in buckets["review_needed"]:
print(f" {item['handle']:50s} claims={item['claims']:5d}")
# Diagnostic: orphan alias count for handles we're about to delete.
# Contributor_aliases has no FK (SQLite FKs require PRAGMA to enforce anyway),
# so aliases pointing to deleted canonical handles become orphans. Surface
# the count so the --delete-events decision is informed.
doomed = [item["handle"].lower().lstrip("@") for item in buckets["garbage"] + buckets["publisher"]]
if doomed:
placeholders = ",".join("?" * len(doomed))
orphan_count = conn.execute(
f"SELECT COUNT(*) FROM contributor_aliases WHERE canonical IN ({placeholders})",
doomed,
).fetchone()[0]
print(f"\n=== Alias orphan check ===")
print(f" contributor_aliases rows pointing to deletable canonicals: {orphan_count}")
if orphan_count:
print(f" (cleanup requires --delete-events; without it, aliases stay as orphans)")
if not args.apply:
print("\n(dry-run — no writes. Re-run with --apply to execute.)")
return
# ── Apply changes ──
print("\n=== Applying changes ===")
if buckets["review_needed"]:
print(f"ABORT: {len(buckets['review_needed'])} rows need human review. Fix classifier before --apply.")
sys.exit(3)
inserted_publishers = 0
reclassified_agents = 0
deleted_garbage = 0
deleted_publisher_rows = 0
deleted_events = 0
deleted_aliases = 0
# Single transaction — if any step errors, roll back. This prevents the failure
# mode where a publisher insert fails silently and we still delete the contributor
# row, losing data.
try:
conn.execute("BEGIN")
# 1. Insert publishers. Track which ones succeeded so step 4 only deletes those.
# Counter uses cur.rowcount so replay runs (where publishers already exist)
# report accurate inserted=0 instead of falsely claiming the full set.
# moved_to_publisher is unconditional — the contributors row still needs to
# be deleted even when the publishers row was added in a prior run.
moved_to_publisher = set()
for item in buckets["publisher"]:
name = item["handle"].strip().lower().lstrip("@")
cur = conn.execute(
"INSERT OR IGNORE INTO publishers (name, kind) VALUES (?, ?)",
(name, item["publisher_kind"]),
)
if cur.rowcount > 0:
inserted_publishers += 1
moved_to_publisher.add(item["handle"])
# 2. Ensure Pentagon agents have kind='agent' (idempotent after v25 patch)
for item in buckets["keep_agent"]:
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
(item["handle"].lower().lstrip("@"),),
)
reclassified_agents += 1
# 3. Delete garbage handles from contributors (and their events + aliases)
for item in buckets["garbage"]:
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_garbage += cur.rowcount
# 4. Delete publisher rows from contributors — ONLY for those successfully
# inserted into publishers above. Guards against partial failure.
# Aliases pointing to publisher-classified handles get cleaned under the
# same --delete-events gate: publishers live in their own table now, any
# leftover aliases in contributor_aliases are orphans.
for item in buckets["publisher"]:
if item["handle"] not in moved_to_publisher:
continue
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_publisher_rows += cur.rowcount
# 5. Audit log entry for the destructive operation (Ganymede Q5).
conn.execute(
"INSERT INTO audit_log (timestamp, stage, event, detail) VALUES (datetime('now'), ?, ?, ?)",
(
"schema_v26",
"classify_contributors",
json.dumps({
"publishers_inserted": inserted_publishers,
"agents_updated": reclassified_agents,
"garbage_deleted": deleted_garbage,
"publisher_rows_deleted": deleted_publisher_rows,
"events_deleted": deleted_events,
"aliases_deleted": deleted_aliases,
"delete_events_flag": bool(args.delete_events),
}),
),
)
conn.commit()
except Exception as e:
conn.rollback()
print(f"ERROR: Transaction failed, rolled back. {e}", file=sys.stderr)
sys.exit(4)
print(f" publishers inserted: {inserted_publishers}")
print(f" agents kind='agent' ensured: {reclassified_agents}")
print(f" garbage rows deleted: {deleted_garbage}")
print(f" publisher rows removed from contributors: {deleted_publisher_rows}")
if args.delete_events:
print(f" contribution_events deleted: {deleted_events}")
print(f" contributor_aliases deleted: {deleted_aliases}")
else:
print(f" (events + aliases kept — re-run with --delete-events to clean them)")
if __name__ == "__main__":
main()

View file

@ -1,108 +0,0 @@
#!/usr/bin/env python3
"""Reset m3taversal.sourcer_count from inflated legacy value to file-truth count.
Background: pre-Phase-A extract.py had a `submitted_by` fallback that credited
m3taversal as sourcer for every Telegram-ingested source, accumulating to 1011
sourcer_count in the contributors table. The actual file-truth count (sourcer
frontmatter equal to "m3taversal" in claim files) is 21. The 990-row delta is
infrastructure attribution that doesn't reflect content authorship.
The Phase A event-sourced ledger (contribution_events) computed the correct
389.55 CI from author events; /api/leaderboard reads from there directly.
But the legacy /api/contributors endpoint reads contributors.claims_merged
which carries the inflated 1011. Until that endpoint is deprecated, the
divergence shows two different numbers depending on which surface the UI
queries.
This script applies the surgical UPDATE that was run on VPS on 2026-04-27
during the leaderboard cutover. Committed as a script per Ganymede review:
"DB mutations go through reviewable code paths matters more than the
convenience of one-shot SQL. The artifact explains what was done and why."
Idempotent safe to re-run. If sourcer_count is already 21, no change.
Usage:
python3 scripts/reset-m3taversal-sourcer.py --dry-run
python3 scripts/reset-m3taversal-sourcer.py
"""
import argparse
import os
import sqlite3
import sys
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
TARGET_HANDLE = "m3taversal"
TRUTH_SOURCER_COUNT = 21
TRUTH_CLAIMS_MERGED = 21
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT handle, sourcer_count, claims_merged FROM contributors WHERE handle = ?",
(TARGET_HANDLE,),
).fetchone()
if not row:
print(f" No contributors row for {TARGET_HANDLE} — nothing to reset.")
return
print(
f" Current: {row['handle']} sourcer_count={row['sourcer_count']} "
f"claims_merged={row['claims_merged']}"
)
print(f" Target: sourcer_count={TRUTH_SOURCER_COUNT} claims_merged={TRUTH_CLAIMS_MERGED}")
if (row["sourcer_count"] == TRUTH_SOURCER_COUNT
and row["claims_merged"] == TRUTH_CLAIMS_MERGED):
print(" Already at target values — no-op.")
return
if args.dry_run:
print(" (dry-run) UPDATE would be applied. Re-run without --dry-run.")
return
conn.execute(
"""UPDATE contributors SET
sourcer_count = ?,
claims_merged = ?,
updated_at = datetime('now')
WHERE handle = ?""",
(TRUTH_SOURCER_COUNT, TRUTH_CLAIMS_MERGED, TARGET_HANDLE),
)
conn.execute(
"""INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)""",
(
"manual",
"m3taversal_sourcer_reset",
(
'{"reason":"Pre-Phase-A submitted_by fallback inflated to 1011; '
'file-truth is 21","sourcer_count_before":1011,'
'"sourcer_count_after":21,"claims_merged_after":21}'
),
),
)
conn.commit()
after = conn.execute(
"SELECT sourcer_count, claims_merged FROM contributors WHERE handle = ?",
(TARGET_HANDLE,),
).fetchone()
print(
f" Applied. Now: sourcer_count={after['sourcer_count']} "
f"claims_merged={after['claims_merged']}"
)
if __name__ == "__main__":
main()

View file

@ -1,152 +0,0 @@
"""Tests for diagnostics/activity_endpoint.py classify_pr_operation.
Covers the Leo gotcha extract/* branches with commit_type=enrich or
challenge classify by commit_type, not branch prefix. Same class of bug
as the contributor-role wiring fix.
"""
import sys
from pathlib import Path
import pytest
# diagnostics/ isn't on sys.path by default; add it for these tests.
_DIAG = Path(__file__).resolve().parents[1] / "diagnostics"
if str(_DIAG) not in sys.path:
sys.path.insert(0, str(_DIAG))
# aiohttp is imported at module load time; skip cleanly if not installed.
pytest.importorskip("aiohttp")
from activity_endpoint import classify_pr_operation # noqa: E402
# ─── Merged PRs: commit_type wins over branch prefix ───────────────────────
def test_extract_branch_legacy_knowledge_classifies_new():
assert classify_pr_operation("merged", "knowledge", "extract/foo", None) == "new"
def test_extract_branch_with_enrich_commit_type_classifies_enrich():
"""Leo gotcha: extract/* + commit_type=enrich → enrich, not new."""
assert classify_pr_operation("merged", "enrich", "extract/foo", None) == "enrich"
def test_extract_branch_with_challenge_commit_type_classifies_challenge():
"""Leo gotcha: extract/* + commit_type=challenge → challenge, not new."""
assert classify_pr_operation("merged", "challenge", "extract/foo", None) == "challenge"
def test_challenged_by_in_description_classifies_challenge():
assert (
classify_pr_operation(
"merged", "knowledge", "extract/foo", "evidence for challenged_by claim"
)
== "challenge"
)
# ─── Branch prefix fallback (when commit_type is generic) ──────────────────
def test_reweave_branch_classifies_enrich():
assert classify_pr_operation("merged", "knowledge", "reweave/batch-1", None) == "enrich"
def test_challenge_branch_classifies_challenge():
assert (
classify_pr_operation("merged", "knowledge", "challenge/nuclear-moloch", None)
== "challenge"
)
# ─── Maintenance commit_types → infra ──────────────────────────────────────
def test_fix_commit_type_classifies_infra():
assert classify_pr_operation("merged", "fix", "fix/deploy-bug", None) == "infra"
def test_pipeline_commit_type_classifies_infra():
assert (
classify_pr_operation("merged", "pipeline", "epimetheus/migration-v14", None)
== "infra"
)
# ─── Knowledge-producing commit_types → new ────────────────────────────────
def test_research_commit_type_classifies_new():
assert (
classify_pr_operation("merged", "research", "theseus/cornelius-batch-2", None)
== "new"
)
def test_entity_commit_type_classifies_new():
assert classify_pr_operation("merged", "entity", "leo/entities-update", None) == "new"
# ─── Non-merged statuses route through NON_MERGED_STATUS_TO_OPERATION ──────
def test_open_pr_classifies_extract():
assert classify_pr_operation("open", None, "extract/foo", None) == "extract"
def test_approved_pr_classifies_new():
assert classify_pr_operation("approved", None, "extract/foo", None) == "new"
def test_closed_pr_classifies_infra():
assert classify_pr_operation("closed", None, "extract/foo", None) == "infra"
def test_conflict_pr_classifies_challenge():
assert classify_pr_operation("conflict", None, "extract/foo", None) == "challenge"
def test_validating_pr_classifies_extract():
assert classify_pr_operation("validating", None, "extract/foo", None) == "extract"
def test_reviewing_pr_classifies_extract():
assert classify_pr_operation("reviewing", None, "extract/foo", None) == "extract"
def test_merging_pr_classifies_new():
assert classify_pr_operation("merging", None, "extract/foo", None) == "new"
def test_zombie_pr_classifies_infra():
assert classify_pr_operation("zombie", None, "extract/foo", None) == "infra"
# ─── Priority order: reweave commit_type vs reweave/ branch ─────────────────
# Reweave commit_type is in _MAINTENANCE_COMMIT_TYPES (→ infra), but
# branch.startswith('reweave/') is checked first (→ enrich). The bifurcation
# is real spec behavior — nightly reweave PRs must classify as enrich, not
# infra. Locking this in prevents a silent flip on future priority refactors.
def test_reweave_commit_type_with_reweave_branch_classifies_enrich():
"""Branch prefix wins over maintenance — reweave PRs are enrich, not infra."""
assert classify_pr_operation("merged", "reweave", "reweave/batch-1", None) == "enrich"
def test_reweave_commit_type_without_reweave_branch_classifies_infra():
"""Without reweave/ prefix, reweave commit_type falls to maintenance → infra."""
assert classify_pr_operation("merged", "reweave", "epimetheus/foo", None) == "infra"
# ─── Defensive cases — null/empty inputs shouldn't crash ───────────────────
def test_null_commit_type_and_branch_classifies_new():
assert classify_pr_operation("merged", None, None, None) == "new"
def test_unknown_status_falls_back_to_infra():
assert classify_pr_operation("nonsense", None, None, None) == "infra"

View file

@ -1,437 +0,0 @@
"""Tests for /api/leaderboard endpoint (diagnostics/leaderboard_routes.py).
Locks behavior for the four slicings consumed by Argus + Oberon:
- window: all_time | Nd | Nh
- domain: per-domain filter
- kind: person | agent | org | all
- limit: pagination + has_more flag
Regression coverage includes the AND-prefix SQL bug (commit 42d35d4): _parse_window
returned clauses prefixed with 'AND ' which produced 'WHERE 1=1 AND AND ...' when
joined into the WHERE clause via " AND ".join(...).
"""
import asyncio
import json
import sqlite3
from pathlib import Path
import pytest
# Skip whole file if aiohttp isn't available (matches test_activity_classify.py pattern)
aiohttp = pytest.importorskip("aiohttp")
# Make diagnostics/ importable
import sys
DIAG_ROOT = Path(__file__).parent.parent / "diagnostics"
sys.path.insert(0, str(DIAG_ROOT))
from leaderboard_routes import ( # noqa: E402
_parse_window,
handle_leaderboard,
KIND_VALUES,
LEADERBOARD_PUBLIC_PATHS,
)
from aiohttp.test_utils import make_mocked_request # noqa: E402
# ─── Schema lifted from lib/db.py:138-209 (v25 minimum) ──────────────────────
SCHEMA = """
CREATE TABLE contributors (
handle TEXT PRIMARY KEY,
kind TEXT DEFAULT 'person',
tier TEXT DEFAULT 'new',
claims_merged INTEGER DEFAULT 0,
sourcer_count INTEGER DEFAULT 0,
extractor_count INTEGER DEFAULT 0,
challenger_count INTEGER DEFAULT 0,
synthesizer_count INTEGER DEFAULT 0,
reviewer_count INTEGER DEFAULT 0,
challenges_survived INTEGER DEFAULT 0,
domains TEXT DEFAULT '[]',
first_contribution TEXT,
last_contribution TEXT
);
CREATE TABLE contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
"""
# ─── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture
def db_path(tmp_path):
"""Seeded pipeline.db with deterministic events.
Cohort:
- alice (person): 3 author events, 1 originator (recent 3d, internet-finance)
- bob (person): 5 author events (older, 60d ago, ai-alignment)
- carol (person): 1 author + 1 evaluator (today, internet-finance)
- rio (agent): 4 author + 2 evaluator (mixed, internet-finance + grand-strategy)
- leo (agent): 8 evaluator events (today, mixed domains)
- cnbc (org): 2 originator events (legacy, before classifier moved orgs)
- newhandle (no contributors row): 1 author event tests LEFT JOIN COALESCE
"""
p = tmp_path / "pipeline.db"
conn = sqlite3.connect(str(p))
conn.executescript(SCHEMA)
contribs = [
("alice", "person"),
("bob", "person"),
("carol", "person"),
("rio", "agent"),
("leo", "agent"),
("cnbc", "org"),
# newhandle intentionally absent — tests LEFT JOIN
]
for handle, kind in contribs:
conn.execute(
"INSERT INTO contributors (handle, kind) VALUES (?, ?)",
(handle, kind),
)
# (handle, role, weight, pr_number, claim_path, domain, timestamp)
events = [
# alice — 3 author + 1 originator, recent (all >24h ago, all <7d)
# Most-recent event at -2 days (not -1 days) so 24h window exclusion is
# unambiguous and not subject to fixture-vs-query microsecond drift.
("alice", "author", 0.30, 100, None, "internet-finance", "now,-2 days"),
("alice", "author", 0.30, 101, None, "internet-finance", "now,-2 days"),
("alice", "author", 0.30, 102, None, "ai-alignment", "now,-3 days"),
("alice", "originator", 0.15, 103, "domains/internet-finance/x.md", "internet-finance", "now,-2 days"),
# bob — 5 author, all 60d ago (outside 30d, inside all_time)
("bob", "author", 0.30, 200, None, "ai-alignment", "now,-60 days"),
("bob", "author", 0.30, 201, None, "ai-alignment", "now,-60 days"),
("bob", "author", 0.30, 202, None, "ai-alignment", "now,-61 days"),
("bob", "author", 0.30, 203, None, "ai-alignment", "now,-62 days"),
("bob", "author", 0.30, 204, None, "ai-alignment", "now,-63 days"),
# carol — 1 author + 1 evaluator, today
("carol", "author", 0.30, 300, None, "internet-finance", "now"),
("carol", "evaluator", 0.05, 301, None, "internet-finance", "now"),
# rio agent — 4 author + 2 evaluator
("rio", "author", 0.30, 400, None, "internet-finance", "now,-2 days"),
("rio", "author", 0.30, 401, None, "grand-strategy", "now,-2 days"),
("rio", "author", 0.30, 402, None, "internet-finance", "now,-2 days"),
("rio", "author", 0.30, 403, None, "internet-finance", "now,-2 days"),
("rio", "evaluator", 0.05, 404, None, "ai-alignment", "now,-2 days"),
("rio", "evaluator", 0.05, 405, None, "ai-alignment", "now,-2 days"),
# leo agent — 8 evaluator
*[
("leo", "evaluator", 0.05, 500 + i, None, "internet-finance" if i % 2 == 0 else "ai-alignment", "now")
for i in range(8)
],
# cnbc org — 2 originator (legacy data, kept by classifier+gate split)
("cnbc", "originator", 0.15, 600, "domains/internet-finance/y.md", "internet-finance", "now,-5 days"),
("cnbc", "originator", 0.15, 601, "domains/internet-finance/z.md", "internet-finance", "now,-5 days"),
# newhandle — handle in events but no contributors row (LEFT JOIN COALESCE → person)
# -2 days so 24h-window test exclusion is unambiguous (matches alice).
("newhandle", "author", 0.30, 700, None, "ai-alignment", "now,-2 days"),
]
for handle, role, weight, pr_num, claim_path, domain, ts_modifier in events:
# Use SQLite datetime() to compute timestamps relative to "now" so tests
# are deterministic across days. Multi-arg form: datetime('now', '-1 days').
ts_args = ts_modifier.split(",")
if len(ts_args) == 1:
ts_sql = f"datetime('{ts_args[0]}')"
else:
ts_sql = f"datetime('{ts_args[0]}', '{ts_args[1].strip()}')"
conn.execute(
f"""INSERT INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, {ts_sql})""",
(handle, "agent" if handle in {"rio", "leo"} else "person",
role, weight, pr_num, claim_path, domain),
)
conn.commit()
conn.close()
return str(p)
def _call(db_path, **query):
"""Build a mocked request, call handle_leaderboard, return parsed JSON."""
qs = "&".join(f"{k}={v}" for k, v in query.items())
req = make_mocked_request("GET", f"/api/leaderboard?{qs}")
# make_mocked_request gives us req.app — write db_path into it.
req.app["db_path"] = db_path
response = asyncio.run(handle_leaderboard(req))
return json.loads(response.body.decode())
# ─── _parse_window unit tests ────────────────────────────────────────────────
class TestParseWindow:
def test_default_is_all_time(self):
clause, params, label = _parse_window(None)
assert clause == ""
assert params == ()
assert label == "all_time"
def test_explicit_all_time(self):
clause, params, label = _parse_window("all_time")
assert clause == ""
assert label == "all_time"
def test_seven_days(self):
clause, params, label = _parse_window("7d")
assert clause == "ce.timestamp >= datetime('now', ?)"
assert params == ("-7 days",)
assert label == "7d"
# Regression: must NOT begin with "AND " (handle_leaderboard composes via " AND ".join)
assert not clause.startswith("AND")
def test_thirty_days(self):
clause, params, label = _parse_window("30d")
assert params == ("-30 days",)
assert label == "30d"
def test_hours(self):
clause, params, label = _parse_window("24h")
assert clause == "ce.timestamp >= datetime('now', ?)"
assert params == ("-24 hours",)
assert label == "24h"
def test_caps_days_at_365(self):
clause, params, label = _parse_window("9999d")
assert params == ("-365 days",)
def test_caps_hours_at_8760(self):
clause, params, label = _parse_window("99999h")
assert params == ("-8760 hours",)
def test_garbage_falls_to_all_time(self):
clause, params, label = _parse_window("foobar")
assert clause == ""
assert label == "all_time"
def test_uppercase_normalized(self):
clause, params, label = _parse_window("7D")
assert label == "7d"
def test_zero_days_still_emits_clause(self):
# 0d means "now or later" — empty result, but parse should succeed
clause, params, label = _parse_window("0d")
assert "datetime" in clause
assert label == "0d"
# ─── handle_leaderboard integration tests ────────────────────────────────────
class TestLeaderboardEndpoint:
def test_all_time_default_kind_person(self, db_path):
"""Default kind is 'person'. Returns all persons, sorted by CI desc."""
body = _call(db_path)
assert body["window"] == "all_time"
assert body["kind_filter"] == "person"
assert body["domain"] is None
assert body["source"] == "contribution_events"
# alice 3*0.30 + 0.15 = 1.05
# bob 5*0.30 = 1.50
# carol 0.30 + 0.05 = 0.35
# newhandle 0.30 (LEFT JOIN COALESCE → 'person')
# cnbc excluded (kind='org')
# rio/leo excluded (kind='agent')
handles = [r["handle"] for r in body["leaderboard"]]
assert "bob" in handles
assert "alice" in handles
assert "newhandle" in handles, "LEFT JOIN COALESCE should default missing contributors to 'person'"
assert "cnbc" not in handles, "kind=person should exclude orgs"
assert "rio" not in handles, "kind=person should exclude agents"
# Descending by CI
cis = [r["ci"] for r in body["leaderboard"]]
assert cis == sorted(cis, reverse=True)
def test_window_7d_excludes_old_events(self, db_path):
"""REGRESSION: 7d window must execute (no AND-prefix SQL error).
Bob has all events 60d ago must not appear in 7d window.
Alice has events 1-3d ago must appear.
"""
body = _call(db_path, window="7d")
assert body["window"] == "7d"
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "bob" not in handles, "60d-old events must be excluded from 7d window"
assert "carol" in handles # today
def test_window_30d_excludes_60d_events(self, db_path):
"""REGRESSION: 30d window must execute. Bob (60d) excluded; alice/carol included."""
body = _call(db_path, window="30d")
assert body["window"] == "30d"
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles
def test_window_24h_only_today(self, db_path):
"""24h window picks up today's events only.
Default kind=person. Within 24h: only carol (events at 'now').
Excluded: alice/newhandle (events at -2 days), bob (-60d), rio/leo (kind),
cnbc (-5d AND kind=org).
"""
body = _call(db_path, window="24h")
handles = [r["handle"] for r in body["leaderboard"]]
assert handles == ["carol"], (
"24h + kind=person should return only carol; got %r" % handles
)
def test_kind_agent(self, db_path):
"""kind=agent returns only agents."""
body = _call(db_path, kind="agent")
handles = [r["handle"] for r in body["leaderboard"]]
assert "rio" in handles
assert "leo" in handles
assert "alice" not in handles
assert "bob" not in handles
def test_kind_org(self, db_path):
"""kind=org returns only orgs (legacy events still queryable)."""
body = _call(db_path, kind="org")
handles = [r["handle"] for r in body["leaderboard"]]
assert handles == ["cnbc"]
assert body["leaderboard"][0]["ci"] == 0.30 # 2 * 0.15
def test_kind_all_returns_everyone(self, db_path):
"""kind=all returns all kinds — persons + agents + orgs."""
body = _call(db_path, kind="all")
handles = {r["handle"] for r in body["leaderboard"]}
assert handles == {"alice", "bob", "carol", "rio", "leo", "cnbc", "newhandle"}
def test_invalid_kind_falls_to_person(self, db_path):
"""Defensive: unknown kind value silently falls back to 'person'."""
body = _call(db_path, kind="bogus")
assert body["kind_filter"] == "person"
def test_domain_filter(self, db_path):
"""domain=internet-finance scopes events; kind filter still applies."""
body = _call(db_path, domain="internet-finance")
assert body["domain"] == "internet-finance"
handles = {r["handle"] for r in body["leaderboard"]}
# alice has 2 internet-finance authors + 1 originator
# carol has 1 internet-finance author + 1 evaluator
# bob has 0 (all ai-alignment)
# newhandle has 0 (ai-alignment only)
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles
assert "newhandle" not in handles
def test_composed_window_kind_domain(self, db_path):
"""REGRESSION: composed filters must build SQL correctly.
7d + person + internet-finance alice only.
"""
body = _call(db_path, window="7d", kind="person", domain="internet-finance")
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles # excluded by 7d
assert "rio" not in handles # excluded by kind=person
def test_limit_caps_results(self, db_path):
"""limit caps the leaderboard slice; total reflects unfiltered count."""
body = _call(db_path, kind="all", limit=3)
assert body["shown"] == 3
assert body["has_more"] is True
assert body["total"] == 7
def test_no_has_more_when_under_limit(self, db_path):
body = _call(db_path, kind="org")
assert body["shown"] == 1
assert body["has_more"] is False
assert body["total"] == 1
def test_invalid_limit_falls_to_default(self, db_path):
"""Defensive: garbage limit param falls to default 100. 7 entries < 100."""
body = _call(db_path, kind="all", limit="not-a-number")
assert body["shown"] == 7
assert body["has_more"] is False
def test_limit_capped_at_500(self, db_path):
"""Defensive: limit > 500 silently caps at 500."""
body = _call(db_path, limit=99999, kind="all")
# No assertion on the value of the cap from the response — just that
# it doesn't error and shown <= 500.
assert body["shown"] <= 500
def test_role_breakdown_present(self, db_path):
"""Each row includes ci_breakdown with all 5 roles."""
body = _call(db_path)
for entry in body["leaderboard"]:
assert set(entry["ci_breakdown"].keys()) == {
"author", "challenger", "synthesizer", "originator", "evaluator",
}
def test_alice_role_breakdown_correct(self, db_path):
"""Alice has 3 author (0.90) + 1 originator (0.15) = 1.05 total."""
body = _call(db_path)
alice = next(r for r in body["leaderboard"] if r["handle"] == "alice")
assert alice["ci"] == 1.05
assert alice["ci_breakdown"]["author"] == 0.90
assert alice["ci_breakdown"]["originator"] == 0.15
assert alice["ci_breakdown"]["challenger"] == 0
assert alice["ci_breakdown"]["synthesizer"] == 0
assert alice["ci_breakdown"]["evaluator"] == 0
assert alice["events_count"] == 4
assert alice["pr_count"] == 4
assert alice["domain_count"] == 2 # internet-finance + ai-alignment
def test_empty_window_returns_clean_response(self, db_path):
"""Window with no matching events returns shape-correct empty response."""
# 24h window + kind=org → cnbc is 5d ago, so empty
body = _call(db_path, window="24h", kind="org")
assert body["leaderboard"] == []
assert body["total"] == 0
assert body["shown"] == 0
assert body["has_more"] is False
assert body["source"] == "contribution_events"
def test_left_join_handles_missing_contributors_row(self, db_path):
"""REGRESSION: handle in events but missing from contributors must default to kind='person'.
Catches the failure mode where a handle classified as cited (auto-create
deferred to Branch 3) accumulates events but has no contributors row yet.
"""
body = _call(db_path)
newhandle_row = next(
(r for r in body["leaderboard"] if r["handle"] == "newhandle"), None
)
assert newhandle_row is not None
assert newhandle_row["kind"] == "person"
assert newhandle_row["ci"] == 0.30
# ─── Public path constant (auth middleware bypass) ───────────────────────────
def test_public_paths_includes_leaderboard():
"""Auth middleware needs LEADERBOARD_PUBLIC_PATHS to skip API key for /api/leaderboard."""
assert "/api/leaderboard" in LEADERBOARD_PUBLIC_PATHS
def test_kind_values_matches_contract():
"""API contract: only these 4 kind values are accepted."""
assert set(KIND_VALUES) == {"person", "agent", "org", "all"}

View file

@ -1,167 +0,0 @@
"""Verify research-attribution backfill is replay-safe against real schema.
Three things to prove:
1. (handle, role, pr_number) with claim_path=NULL deduplicates correctly
(idx_ce_unique_pr partial index handles SQLite NULL-not-equal-NULL).
2. Re-inserting an existing (handle, role, pr_number, NULL) row via INSERT OR IGNORE
is a true no-op does not create a phantom duplicate.
3. The backfill script's specific operation (DELETE then INSERT for same key)
nets zero rows when run twice in sequence.
"""
import sqlite3
import sys
# Schema lifted verbatim from lib/db.py:181-209
SCHEMA = """
CREATE TABLE contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
"""
def setup() -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
return conn
def insert_event(conn, handle, role, pr_number, claim_path=None):
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES (?, 'agent', ?, 0.30, ?, ?)""",
(handle, role, pr_number, claim_path),
)
return cur.rowcount
def count(conn) -> int:
return conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
def test_pr_level_dedup_with_null_claim_path():
"""Two inserts of same (handle, role, pr_number, NULL) → 1 row."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061)
r2 = insert_event(conn, "rio", "author", 4061)
n = count(conn)
assert r1 == 1, f"first insert should write, got rowcount={r1}"
assert r2 == 0, f"second insert should be ignored, got rowcount={r2}"
assert n == 1, f"expected 1 row, got {n}"
print("PASS: pr-level dedup with NULL claim_path")
def test_per_claim_dedup_with_path():
"""Two inserts of same (handle, role, pr_number, path) → 1 row."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
n = count(conn)
assert r1 == 1 and r2 == 0 and n == 1
print("PASS: per-claim dedup with claim_path")
def test_pr_level_and_per_claim_coexist():
"""A (handle, role, pr_number, NULL) and (handle, role, pr_number, 'x.md') coexist
because the partial indexes target different rows."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061, claim_path=None)
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
n = count(conn)
assert r1 == 1 and r2 == 1 and n == 2
print("PASS: pr-level and per-claim events coexist on same pr_number")
def test_backfill_replay_is_noop():
"""Simulate the exact backfill operation: INSERT correct event, DELETE wrong event.
Run twice. Expect identical state no phantom rows, no double-deletions."""
conn = setup()
# Initial state: m3taversal has the wrong author event for pr=4061
insert_event(conn, "m3taversal", "author", 4061)
assert count(conn) == 1
def backfill_pr_4061():
# Insert the correct event (rio is the real author)
conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES (?, 'agent', 'author', 0.30, 4061, NULL)""",
("rio (self-directed)",),
)
# Delete the wrong event
conn.execute(
"""DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author'
AND pr_number=4061 AND claim_path IS NULL""",
)
conn.commit()
backfill_pr_4061()
state_after_first = sorted(
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
for r in conn.execute("SELECT * FROM contribution_events")
)
assert state_after_first == [("rio (self-directed)", "author", 4061, None)], state_after_first
# Replay
backfill_pr_4061()
state_after_second = sorted(
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
for r in conn.execute("SELECT * FROM contribution_events")
)
assert state_after_first == state_after_second, "replay should be idempotent"
assert count(conn) == 1, f"expected 1 row after replay, got {count(conn)}"
print("PASS: backfill replay is a true no-op")
def test_replay_against_already_backfilled_pr_does_not_double_delete():
"""If m3taversal event was already deleted, running backfill again must not error
or affect anything else."""
conn = setup()
# Already-correct state: rio has the author event, m3taversal does not
insert_event(conn, "rio (self-directed)", "author", 4061)
insert_event(conn, "leo", "evaluator", 4061) # noise — should not be touched
# Run backfill: tries to INSERT (rio, author, 4061) — already exists, no-op
# Tries to DELETE (m3taversal, author, 4061) — already absent, 0 rows affected
cur1 = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES ('rio (self-directed)', 'agent', 'author', 0.30, 4061, NULL)""",
)
cur2 = conn.execute(
"""DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author'
AND pr_number=4061 AND claim_path IS NULL""",
)
assert cur1.rowcount == 0, f"insert should be no-op, got {cur1.rowcount}"
assert cur2.rowcount == 0, f"delete should be no-op, got {cur2.rowcount}"
assert count(conn) == 2, f"expected 2 rows preserved, got {count(conn)}"
print("PASS: replay against already-backfilled state preserves unrelated events")
if __name__ == "__main__":
test_pr_level_dedup_with_null_claim_path()
test_per_claim_dedup_with_path()
test_pr_level_and_per_claim_coexist()
test_backfill_replay_is_noop()
test_replay_against_already_backfilled_pr_does_not_double_delete()
print("\nAll 5 tests passed against real schema.")