Compare commits
49 commits
ship/metad
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| aaab659900 | |||
|
|
e78308862a | ||
| a54f52234a | |||
|
|
c9515c770a | ||
| 3dca3aab5f | |||
| 2ee9dd5150 | |||
| b29ec95dd8 | |||
|
|
74bf0461e8 | ||
|
|
01097da22c | ||
| 6c66da33e4 | |||
| c3f2010a42 | |||
| ed4893e837 | |||
| 73880e138d | |||
| 1bc541ac93 | |||
| 50b888a751 | |||
| 0eb26327fc | |||
| fc002354d4 | |||
| 5db6a0248c | |||
| 4b2b59b184 | |||
| ba234ec4b3 | |||
| e63d27d259 | |||
| 517e9884cc | |||
| 3f8666ee0c | |||
| 87f97eb4fa | |||
| ad1d82f5ee | |||
| 923454c9ea | |||
| ed4af4d72e | |||
| ed5f7ef6cc | |||
| 7741c1e6de | |||
| 992b4ee36f | |||
| de204db539 | |||
| 1eb259de8a | |||
| b8504c1b60 | |||
| 33f6ca9e3f | |||
| b9c4947637 | |||
| bf647b7abb | |||
| 1351db70a9 | |||
| d60b6f8bf2 | |||
| cd5aac5cc6 | |||
| 7c6417d6be | |||
| 42d35d4e15 | |||
| de7e5ec709 | |||
| 369f6c96da | |||
| 6aff03ff56 | |||
| 319e03e2c6 | |||
| 2d332c66d4 | |||
| dea1b02aa6 | |||
| d0fb4c96e3 | |||
| 926a397839 |
26 changed files with 3535 additions and 875 deletions
165
README.md
165
README.md
|
|
@ -1,65 +1,134 @@
|
|||
# teleo-infrastructure
|
||||
|
||||
Pipeline infrastructure for the Teleo collective knowledge base. Async Python daemon that extracts, validates, evaluates, and merges claims via Forgejo PRs.
|
||||
This repo runs the pipeline that processes contributions into the
|
||||
[teleo-codex](https://github.com/living-ip/teleo-codex) knowledge base.
|
||||
|
||||
## Directory Structure
|
||||
Every claim on `main` has been extracted from a source, validated for schema
|
||||
and duplicates, evaluated by at least two independent reviewers, and merged
|
||||
through an event-sourced audit log. The whole flow is an async Python daemon
|
||||
talking to a Forgejo git server, an SQLite WAL state store, OpenRouter (for
|
||||
most LLM calls), and the Anthropic Claude CLI (for Opus deep reviews).
|
||||
|
||||
**Production state** (live):
|
||||
|
||||
| Metric | Value |
|
||||
|---|---|
|
||||
| Claims merged into `main` | 1,546 across 13 domains |
|
||||
| PRs merged through the pipeline | 1,975 |
|
||||
| Merge throughput (last 7d) | 508 PRs (~73/day) |
|
||||
| Review approval rate | 94% |
|
||||
| Cost per merged claim (last 30d) | $0.10 incl. extract + triage + multi-tier review |
|
||||
| Production agents | 6 (rio, theseus, leo, vida, astra, clay) |
|
||||
|
||||
## Pipeline
|
||||
|
||||
Concurrent stage loops in a single daemon (`teleo-pipeline.py`), coordinated
|
||||
by SQLite. Circuit breakers cap costs, retry budgets cap attempts, and merges
|
||||
are serialized per-domain to avoid cross-PR conflicts.
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
Inbox["inbox/queue/"] --> Extract
|
||||
Extract["Extract<br/>(Sonnet 4.5)"] --> Validate
|
||||
Validate["Validate<br/>(tier 0, $0)"] --> Evaluate
|
||||
Evaluate["Evaluate<br/>(tiered, multi-model)"] --> Merge
|
||||
Merge["Merge<br/>(Forgejo, domain-serial)"] --> Effects
|
||||
Effects["Effects<br/>cascade · backlinks · reciprocal edges"]
|
||||
```
|
||||
teleo-infrastructure/
|
||||
├── teleo-pipeline.py # Daemon entry point
|
||||
├── reweave.py # Reciprocal edge maintenance
|
||||
├── lib/ # Pipeline modules (Python package)
|
||||
├── diagnostics/ # Monitoring dashboard (port 8081)
|
||||
├── telegram/ # Telegram bot interface
|
||||
├── deploy/ # Deployment + mirror scripts
|
||||
├── systemd/ # Service definitions
|
||||
├── agent-state/ # Cross-session agent state
|
||||
├── research/ # Nightly research orchestration
|
||||
├── hermes-agent/ # Hermes agent setup
|
||||
├── scripts/ # One-off backfills + migrations
|
||||
├── tests/ # Test suite
|
||||
└── docs/ # Operational documentation
|
||||
|
||||
If any reviewer rejects, the PR gets a structured rationale and either
|
||||
re-extraction guidance (for fixable issues) or a terminal close (for
|
||||
scope or duplicate problems). Approved merges trigger downstream effects:
|
||||
|
||||
- **Cascade** — agents whose beliefs/positions depend on the changed claim get inbox notifications
|
||||
- **Bidirectional provenance** — `sourced_from:` is stamped on each claim at extraction; the source's `claims_extracted:` list is updated post-merge
|
||||
- **Reciprocal edges** — when a new claim has `supports: [X]`, X's frontmatter is updated with `supports: [new]`
|
||||
- **Cross-domain index** — entity mentions across domain boundaries are logged for silo detection
|
||||
|
||||
## Multi-agent review
|
||||
|
||||
Reviews aren't free. Tier classification is deterministic where possible
|
||||
(changes to `core/` or `foundations/` always go Deep) and otherwise picked
|
||||
by Haiku based on PR scope. Last 30d distribution: 76% Standard, 21% Light,
|
||||
2% Deep.
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
PR[New PR] --> Classify{Classify}
|
||||
Classify -->|"core/, foundations/, challenged"| Deep
|
||||
Classify -->|default| Standard
|
||||
Classify -->|single claim, low risk| Light
|
||||
Light["Light tier<br/>Domain agent only"] --> Result
|
||||
Standard["Standard tier<br/>Domain agent + Leo (Sonnet 4.5)"] --> Result
|
||||
Deep["Deep tier<br/>Domain agent + Leo (Opus)"] --> Result
|
||||
Result{Both approve?}
|
||||
Result -->|yes| MergeOK[Merge]
|
||||
Result -->|no| Reject[Structured rejection<br/>+ re-extract guidance]
|
||||
```
|
||||
|
||||
Domain agents bring domain expertise: **Rio** (internet-finance), **Vida**
|
||||
(health), **Astra** (space-development), **Clay** (entertainment),
|
||||
**Theseus** (ai-alignment). **Leo** brings cross-domain consistency on
|
||||
every PR. Disagreement between the two reviewers surfaces in `audit_log`
|
||||
and is tracked as a quality signal, not silenced.
|
||||
|
||||
Model diversity isn't cosmetic — same-family models share ~60% of their
|
||||
errors (Kim et al. ICML 2025). Pipeline mixes Haiku for triage, Gemini 2.5
|
||||
Flash for domain review, Sonnet 4.5 for Leo standard, Opus for Leo deep.
|
||||
|
||||
## Contributor flow
|
||||
|
||||
External contributors submit PRs to
|
||||
[`living-ip/teleo-codex`](https://github.com/living-ip/teleo-codex) on GitHub.
|
||||
A mirror sync (every 2 minutes) fast-forwards the PR onto Forgejo, where
|
||||
the pipeline picks it up. From there it's the same flow as agent-authored
|
||||
PRs — same tiers, same reviewers, same merge rules.
|
||||
|
||||
The contributor-facing guide lives in
|
||||
[`teleo-codex/CONTRIBUTING.md`](https://github.com/living-ip/teleo-codex/blob/main/CONTRIBUTING.md).
|
||||
|
||||
## Repository layout
|
||||
|
||||
| Directory | What it does |
|
||||
|-----------------|-----------------------------------------------------------|
|
||||
| `lib/` | Pipeline modules — config, db, extract, evaluate, merge, cascade |
|
||||
| `diagnostics/` | Argus monitoring dashboard (4 pages: ops, health, agents, epistemic) |
|
||||
| `telegram/` | Telegram bot that answers from the knowledge base |
|
||||
| `research/` | Nightly autonomous research sessions for domain agents |
|
||||
| `agent-state/` | File-backed state for cross-session agent continuity |
|
||||
| `deploy/` | Auto-deploy pipeline (Forgejo → working dirs → systemd) |
|
||||
| `systemd/` | Service definitions for daemon + dashboard + agents |
|
||||
| `scripts/` | Backfills and one-off migrations |
|
||||
| `tests/` | pytest suite |
|
||||
| `docs/` | Architecture specs and operational protocols |
|
||||
|
||||
## Ownership
|
||||
|
||||
Each directory has one owning agent. The owner is accountable for correctness and reviews all changes to their section. See `CODEOWNERS` for per-file detail.
|
||||
Code review authority is enforced by [`CODEOWNERS`](./CODEOWNERS) — every
|
||||
file has one accountable agent. The high-level map:
|
||||
|
||||
| Directory | Owner | What it does |
|
||||
|-----------|-------|-------------|
|
||||
| `lib/` (core) | **Ship** | Config, DB, merge, cascade, validation, LLM calls |
|
||||
| `lib/` (extraction) | **Epimetheus** | Source extraction, entity processing, pre-screening |
|
||||
| `lib/` (evaluation) | **Leo** | Claim evaluation, analytics, attribution |
|
||||
| `lib/` (health) | **Argus** | Health checks, search, claim index |
|
||||
| `diagnostics/` | **Argus** | 4-page dashboard, alerting, vitality metrics |
|
||||
| `telegram/` | **Ship** | Telegram bot, X integration, retrieval |
|
||||
| `deploy/` | **Ship** | rsync deploy, GitHub-Forgejo mirror |
|
||||
| `systemd/` | **Ship** | teleo-pipeline, teleo-diagnostics, teleo-agent@ |
|
||||
| `agent-state/` | **Ship** | Bootstrap, state library, cascade inbox processor |
|
||||
| `research/` | **Ship** | Nightly research sessions, prompt templates |
|
||||
| `scripts/` | **Ship** | Backfills, migrations, one-off maintenance |
|
||||
| `tests/` | **Ganymede** | pytest suite, integration tests |
|
||||
| `docs/` | Shared | Architecture, specs, protocols |
|
||||
- **Ship** — pipeline core, telegram, deploy, agent-state, research, systemd
|
||||
- **Epimetheus** — extraction (intake, entity processing, pre-screening, post-extract validation)
|
||||
- **Leo** — evaluation (claim review, analytics, attribution)
|
||||
- **Argus** — health (diagnostics dashboard, alerting, claim index, search)
|
||||
- **Ganymede** — tests (pytest suite, integration, code review gate)
|
||||
|
||||
## VPS Layout
|
||||
For active sprint work and per-agent in-flight items, see each agent's
|
||||
status report in their Pentagon profile.
|
||||
|
||||
Runs on Hetzner CAX31 (77.42.65.182) as user `teleo`.
|
||||
|
||||
| VPS Path | Repo Source | Service |
|
||||
|----------|-------------|---------|
|
||||
| `/opt/teleo-eval/pipeline/` | `lib/`, `teleo-pipeline.py`, `reweave.py` | teleo-pipeline |
|
||||
| `/opt/teleo-eval/diagnostics/` | `diagnostics/` | teleo-diagnostics |
|
||||
| `/opt/teleo-eval/telegram/` | `telegram/` | (manual) |
|
||||
| `/opt/teleo-eval/agent-state/` | `agent-state/` | (used by research-session.sh) |
|
||||
|
||||
## Quick Start
|
||||
## Development
|
||||
|
||||
```bash
|
||||
# Run tests
|
||||
pip install -e ".[dev]"
|
||||
pytest
|
||||
|
||||
# Deploy to VPS
|
||||
./deploy/deploy.sh --dry-run # preview
|
||||
./deploy/deploy.sh # deploy
|
||||
```
|
||||
|
||||
## Operations
|
||||
|
||||
Production deployment runs on a single VPS. Runbook, restart procedures,
|
||||
secret rotation, and on-call live in the private
|
||||
[`teleo-ops`](https://github.com/living-ip/teleo-ops) repo (request access).
|
||||
|
||||
## License
|
||||
|
||||
[TBD]
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ fi
|
|||
|
||||
# Syntax check all Python files before copying
|
||||
ERRORS=0
|
||||
for f in lib/*.py *.py diagnostics/*.py telegram/*.py tests/*.py scripts/*.py; do
|
||||
for f in lib/*.py *.py diagnostics/*.py telegram/*.py tests/*.py; do
|
||||
[ -f "$f" ] || continue
|
||||
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>&1; then
|
||||
log "SYNTAX ERROR: $f"
|
||||
|
|
@ -77,7 +77,6 @@ rsync "${RSYNC_OPTS[@]}" telegram/ "$PIPELINE_DIR/telegram/"
|
|||
rsync "${RSYNC_OPTS[@]}" diagnostics/ "$DIAGNOSTICS_DIR/"
|
||||
rsync "${RSYNC_OPTS[@]}" agent-state/ "$AGENT_STATE_DIR/"
|
||||
rsync "${RSYNC_OPTS[@]}" tests/ "$PIPELINE_DIR/tests/"
|
||||
rsync "${RSYNC_OPTS[@]}" scripts/ "$PIPELINE_DIR/scripts/"
|
||||
[ -f research/research-session.sh ] && rsync "${RSYNC_OPTS[@]}" research/research-session.sh /opt/teleo-eval/research-session.sh
|
||||
|
||||
# Safety net: ensure all .sh files are executable after rsync
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ echo ""
|
|||
# Syntax check all Python files before deploying
|
||||
echo "=== Pre-deploy syntax check ==="
|
||||
ERRORS=0
|
||||
for f in "$REPO_ROOT/lib/"*.py "$REPO_ROOT/"*.py "$REPO_ROOT/diagnostics/"*.py "$REPO_ROOT/telegram/"*.py "$REPO_ROOT/scripts/"*.py; do
|
||||
for f in "$REPO_ROOT/lib/"*.py "$REPO_ROOT/"*.py "$REPO_ROOT/diagnostics/"*.py "$REPO_ROOT/telegram/"*.py; do
|
||||
[ -f "$f" ] || continue
|
||||
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>/dev/null; then
|
||||
echo "SYNTAX ERROR: $f"
|
||||
|
|
@ -80,10 +80,6 @@ echo "=== Tests ==="
|
|||
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/tests/" "$VPS_HOST:$VPS_PIPELINE/tests/"
|
||||
echo ""
|
||||
|
||||
echo "=== Scripts ==="
|
||||
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/scripts/" "$VPS_HOST:$VPS_PIPELINE/scripts/"
|
||||
echo ""
|
||||
|
||||
echo "=== Diagnostics ==="
|
||||
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/diagnostics/" "$VPS_HOST:$VPS_DIAGNOSTICS/"
|
||||
echo ""
|
||||
|
|
|
|||
120
deploy/setup-infra-mirror.sh
Executable file
120
deploy/setup-infra-mirror.sh
Executable file
|
|
@ -0,0 +1,120 @@
|
|||
#!/bin/bash
|
||||
# One-time setup: prepare the bare mirror repo for teleo-infrastructure.
|
||||
#
|
||||
# Prerequisites (must happen BEFORE running this):
|
||||
# 1. GitHub repo `living-ip/teleo-infrastructure` created (manual via web or
|
||||
# `gh repo create` — the deploy PAT is fine-grained to teleo-codex only
|
||||
# and cannot create new repos in the org).
|
||||
# 2. GitHub PAT updated to include push access on the new repo (or rotate
|
||||
# to a classic PAT with `repo` scope covering both).
|
||||
#
|
||||
# This script is idempotent — safe to re-run.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
MIRROR_BASE="/opt/teleo-eval/mirror"
|
||||
REPO_DIR="$MIRROR_BASE/teleo-infrastructure.git"
|
||||
FORGEJO_URL="http://localhost:3000/teleo/teleo-infrastructure.git"
|
||||
GITHUB_REPO="living-ip/teleo-infrastructure"
|
||||
FORGEJO_TOKEN_FILE="/opt/teleo-eval/secrets/forgejo-admin-token"
|
||||
GITHUB_PAT_FILE="/opt/teleo-eval/secrets/github-pat"
|
||||
|
||||
if [ ! -f "$FORGEJO_TOKEN_FILE" ]; then
|
||||
echo "ERROR: missing $FORGEJO_TOKEN_FILE" >&2
|
||||
exit 1
|
||||
fi
|
||||
if [ ! -f "$GITHUB_PAT_FILE" ]; then
|
||||
echo "ERROR: missing $GITHUB_PAT_FILE" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
FORGEJO_TOKEN=$(cat "$FORGEJO_TOKEN_FILE" | tr -d '[:space:]')
|
||||
GITHUB_PAT=$(cat "$GITHUB_PAT_FILE" | tr -d '[:space:]')
|
||||
|
||||
# Sanity check: GitHub repo must exist before we point a remote at it.
|
||||
echo "Verifying GitHub repo $GITHUB_REPO exists..."
|
||||
GH_STATUS=$(curl -sS -o /dev/null -w "%{http_code}" \
|
||||
-H "Authorization: Bearer $GITHUB_PAT" \
|
||||
"https://api.github.com/repos/$GITHUB_REPO")
|
||||
if [ "$GH_STATUS" != "200" ]; then
|
||||
echo "ERROR: GitHub repo $GITHUB_REPO not accessible (HTTP $GH_STATUS)" >&2
|
||||
echo "Create it first: gh repo create $GITHUB_REPO --public --description 'Pipeline + diagnostics infra for the LivingIP collective'" >&2
|
||||
exit 2
|
||||
fi
|
||||
echo " OK — $GITHUB_REPO accessible"
|
||||
|
||||
# Sanity check: Forgejo repo must exist.
|
||||
echo "Verifying Forgejo repo teleo/teleo-infrastructure exists..."
|
||||
FG_STATUS=$(curl -sS -o /dev/null -w "%{http_code}" \
|
||||
-H "Authorization: token $FORGEJO_TOKEN" \
|
||||
"http://localhost:3000/api/v1/repos/teleo/teleo-infrastructure")
|
||||
if [ "$FG_STATUS" != "200" ]; then
|
||||
echo "ERROR: Forgejo repo teleo/teleo-infrastructure not accessible (HTTP $FG_STATUS)" >&2
|
||||
exit 3
|
||||
fi
|
||||
echo " OK — Forgejo repo accessible"
|
||||
|
||||
# Init bare mirror if missing
|
||||
if [ -d "$REPO_DIR" ]; then
|
||||
echo "Bare repo already exists at $REPO_DIR — skipping init"
|
||||
else
|
||||
echo "Creating bare repo at $REPO_DIR..."
|
||||
mkdir -p "$REPO_DIR"
|
||||
cd "$REPO_DIR"
|
||||
git init --bare >/dev/null
|
||||
chown -R teleo:teleo "$REPO_DIR"
|
||||
echo " OK — bare repo initialized"
|
||||
fi
|
||||
|
||||
cd "$REPO_DIR"
|
||||
|
||||
# Configure remotes (idempotent: set-url succeeds whether remote exists or not)
|
||||
# Forgejo remote (origin convention is reversed in this codebase: origin=GitHub,
|
||||
# forgejo=Forgejo, matching the existing teleo-codex.git layout).
|
||||
FORGEJO_REMOTE_URL="http://github-mirror:${FORGEJO_TOKEN}@localhost:3000/teleo/teleo-infrastructure.git"
|
||||
# NOTE: "m3taversal" is a placeholder username — for fine-grained PATs the
|
||||
# username field is decorative; the token does the auth. Matches the existing
|
||||
# teleo-codex.git remote for consistency. (Ganymede review nit #4.)
|
||||
GITHUB_REMOTE_URL="https://m3taversal:${GITHUB_PAT}@github.com/${GITHUB_REPO}.git"
|
||||
|
||||
if git remote get-url forgejo >/dev/null 2>&1; then
|
||||
git remote set-url forgejo "$FORGEJO_REMOTE_URL"
|
||||
echo " Updated forgejo remote URL"
|
||||
else
|
||||
git remote add forgejo "$FORGEJO_REMOTE_URL"
|
||||
echo " Added forgejo remote"
|
||||
fi
|
||||
|
||||
if git remote get-url origin >/dev/null 2>&1; then
|
||||
git remote set-url origin "$GITHUB_REMOTE_URL"
|
||||
echo " Updated origin remote URL"
|
||||
else
|
||||
git remote add origin "$GITHUB_REMOTE_URL"
|
||||
echo " Added origin remote"
|
||||
fi
|
||||
|
||||
# Initial fetch from Forgejo
|
||||
echo "Fetching from Forgejo..."
|
||||
git fetch forgejo --prune 2>&1 | sed 's/^/ /'
|
||||
|
||||
# Initial push to GitHub (will populate the empty repo)
|
||||
# main_only mode: push ONLY refs/heads/main + tags, mirroring what sync-mirror.sh
|
||||
# does for this repo on the recurring path. Agent review branches stay Forgejo-only.
|
||||
echo "Pushing initial main + tags to GitHub..."
|
||||
git update-ref refs/heads/main refs/remotes/forgejo/main 2>/dev/null || {
|
||||
echo "ERROR: forgejo/main ref missing — fetch may have failed" >&2
|
||||
exit 1
|
||||
}
|
||||
|
||||
git push origin "refs/heads/main:refs/heads/main" 2>&1 | sed 's/^/ /' || {
|
||||
echo "WARN: initial push failed — you may need to authorize the PAT for $GITHUB_REPO" >&2
|
||||
}
|
||||
git push origin --tags 2>&1 | sed 's/^/ /' || true
|
||||
|
||||
# Final permissions sweep
|
||||
chown -R teleo:teleo "$REPO_DIR"
|
||||
|
||||
echo
|
||||
echo "Setup complete. Verify with:"
|
||||
echo " ssh teleo@77.42.65.182 ls -la $REPO_DIR/refs/heads"
|
||||
echo " /opt/teleo-eval/sync-mirror.sh && tail -50 /opt/teleo-eval/logs/sync.log"
|
||||
|
|
@ -2,22 +2,35 @@
|
|||
# Bidirectional sync: Forgejo (authoritative) <-> GitHub (public mirror)
|
||||
# Forgejo wins on conflict. Runs every 2 minutes via cron.
|
||||
#
|
||||
# Repos handled (see MIRROR_REPOS below):
|
||||
# - teleo-codex (mode=bidirectional): full PR roundtrip — fork PR refs from
|
||||
# GitHub, auto-create Forgejo PR mirrors, link github_pr in pipeline.db.
|
||||
# - teleo-infrastructure (mode=main_only): one-way sync of branches+tags from
|
||||
# Forgejo to GitHub. No PR roundtrip — pipeline doesn't process infra PRs;
|
||||
# external infra PRs land on GitHub for visibility, get reviewed manually.
|
||||
#
|
||||
# Security note: GitHub->Forgejo path is for external contributor convenience.
|
||||
# Never auto-process branches arriving via this path without a PR.
|
||||
# Eval pipeline and extract cron only act on PRs, not raw branches.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
REPO_DIR="/opt/teleo-eval/mirror/teleo-codex.git"
|
||||
LOG="/opt/teleo-eval/logs/sync.log"
|
||||
LOCKFILE="/tmp/sync-mirror.lock"
|
||||
PIPELINE_DB="/opt/teleo-eval/pipeline/pipeline.db"
|
||||
GITHUB_PAT_FILE="/opt/teleo-eval/secrets/github-pat"
|
||||
GITHUB_REPO="living-ip/teleo-codex"
|
||||
|
||||
log() { echo "[$(date -Iseconds)] $1" >> "$LOG"; }
|
||||
# (forgejo_owner_repo, github_owner_repo, bare_path, mode)
|
||||
# mode: bidirectional | main_only
|
||||
MIRROR_REPOS=(
|
||||
"teleo/teleo-codex living-ip/teleo-codex /opt/teleo-eval/mirror/teleo-codex.git bidirectional"
|
||||
"teleo/teleo-infrastructure living-ip/teleo-infrastructure /opt/teleo-eval/mirror/teleo-infrastructure.git main_only"
|
||||
)
|
||||
|
||||
# Lockfile — prevent concurrent runs
|
||||
REPO_TAG="main"
|
||||
log() { echo "[$(date -Iseconds)] [$REPO_TAG] $1" >> "$LOG"; }
|
||||
|
||||
# Lockfile — prevent concurrent runs (single lock for whole script)
|
||||
if [ -f "$LOCKFILE" ]; then
|
||||
pid=$(cat "$LOCKFILE" 2>/dev/null)
|
||||
if kill -0 "$pid" 2>/dev/null; then
|
||||
|
|
@ -28,116 +41,204 @@ fi
|
|||
echo $$ > "$LOCKFILE"
|
||||
trap 'rm -f "$LOCKFILE"' EXIT
|
||||
|
||||
# Pre-flight: fix permissions if another user touched the mirror dir (Rhea)
|
||||
BAD_PERMS=$(find "$REPO_DIR" ! -user teleo 2>/dev/null | head -1 || true)
|
||||
if [ -n "$BAD_PERMS" ]; then
|
||||
log "Fixing mirror permissions (found: $BAD_PERMS)"
|
||||
chown -R teleo:teleo "$REPO_DIR" 2>/dev/null
|
||||
fi
|
||||
cd "$REPO_DIR" || { log "ERROR: cannot cd to $REPO_DIR"; exit 1; }
|
||||
|
||||
# Step 1: Fetch from Forgejo (must succeed — it's authoritative)
|
||||
log "Fetching from Forgejo..."
|
||||
if ! git fetch forgejo --prune >> "$LOG" 2>&1; then
|
||||
log "ERROR: Forgejo fetch failed — aborting"
|
||||
exit 1
|
||||
fi
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# sync_repo: process one mirror entry. Sets module-level FORGEJO_REPO,
|
||||
# GITHUB_REPO, REPO_DIR, MODE, REPO_TAG used by inner steps.
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
sync_repo() {
|
||||
FORGEJO_REPO="$1" # e.g. teleo/teleo-codex (path on Forgejo)
|
||||
GITHUB_REPO="$2" # e.g. living-ip/teleo-codex (path on GitHub)
|
||||
REPO_DIR="$3" # bare mirror dir
|
||||
MODE="$4" # bidirectional | main_only
|
||||
REPO_TAG="${FORGEJO_REPO##*/}" # short name for log prefix
|
||||
|
||||
# Step 2: Fetch from GitHub (warn on failure, don't abort)
|
||||
log "Fetching from GitHub..."
|
||||
git fetch origin --prune >> "$LOG" 2>&1 || log "WARN: GitHub fetch failed"
|
||||
# Pre-flight: bare repo must exist
|
||||
if [ ! -d "$REPO_DIR" ]; then
|
||||
log "ERROR: bare repo missing at $REPO_DIR — skipping"
|
||||
return 0
|
||||
fi
|
||||
|
||||
# Step 2.1: Fetch GitHub fork PR refs
|
||||
# Fork-based PRs don't create branches on origin — they create refs/pull/N/head
|
||||
# Fetch these so we can push them to Forgejo for evaluation
|
||||
GITHUB_PAT_STEP2=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
|
||||
if [ -n "$GITHUB_PAT_STEP2" ]; then
|
||||
OPEN_PRS=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?state=open&per_page=100" \
|
||||
-H "Authorization: token $GITHUB_PAT_STEP2" 2>/dev/null || echo "[]")
|
||||
echo "$OPEN_PRS" | python3 -c "
|
||||
# Pre-flight: fix permissions if another user touched the mirror dir (Rhea)
|
||||
BAD_PERMS=$(find "$REPO_DIR" ! -user teleo 2>/dev/null | head -1 || true)
|
||||
if [ -n "$BAD_PERMS" ]; then
|
||||
log "Fixing mirror permissions (found: $BAD_PERMS)"
|
||||
chown -R teleo:teleo "$REPO_DIR" 2>/dev/null || true
|
||||
fi
|
||||
cd "$REPO_DIR" || { log "ERROR: cannot cd to $REPO_DIR"; return 0; }
|
||||
|
||||
# Step 1: Fetch from Forgejo (must succeed — it's authoritative)
|
||||
log "Fetching from Forgejo..."
|
||||
if ! git fetch forgejo --prune >> "$LOG" 2>&1; then
|
||||
log "ERROR: Forgejo fetch failed — skipping this repo"
|
||||
return 0
|
||||
fi
|
||||
|
||||
# Step 2: Fetch from GitHub (warn on failure, don't abort)
|
||||
log "Fetching from GitHub..."
|
||||
git fetch origin --prune >> "$LOG" 2>&1 || log "WARN: GitHub fetch failed"
|
||||
|
||||
# Step 2.1: Fetch GitHub fork PR refs (bidirectional only)
|
||||
# Fork-based PRs don't create branches on origin — they create refs/pull/N/head.
|
||||
# main_only repos don't accept fork PRs through the mirror path.
|
||||
if [ "$MODE" = "bidirectional" ]; then
|
||||
local PAT
|
||||
PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
|
||||
if [ -n "$PAT" ]; then
|
||||
local OPEN_PRS
|
||||
OPEN_PRS=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?state=open&per_page=100" \
|
||||
-H "Authorization: token $PAT" 2>/dev/null || echo "[]")
|
||||
echo "$OPEN_PRS" | python3 -c "
|
||||
import sys, json
|
||||
prs = json.load(sys.stdin)
|
||||
for pr in prs:
|
||||
head = pr.get('head', {})
|
||||
# Only process fork PRs (repo differs from base repo)
|
||||
base_repo = pr.get('base', {}).get('repo', {}).get('full_name', '')
|
||||
head_repo = head.get('repo', {}) or {}
|
||||
head_full = head_repo.get('full_name', '')
|
||||
if head_full and head_full != base_repo:
|
||||
print(f\"{pr['number']} {head.get('ref', '')} {head.get('sha', '')}\")
|
||||
" 2>/dev/null | while read pr_num branch_name head_sha; do
|
||||
if [ -z "$pr_num" ] || [ -z "$branch_name" ]; then continue; fi
|
||||
PR_BRANCH="gh-pr-${pr_num}/${branch_name}"
|
||||
# Check if we already have this ref at the right SHA
|
||||
EXISTING=$(git rev-parse "refs/heads/$PR_BRANCH" 2>/dev/null || true)
|
||||
if [ "$EXISTING" = "$head_sha" ]; then continue; fi
|
||||
# Fetch the PR ref and create a local branch
|
||||
git fetch origin "refs/pull/${pr_num}/head:refs/heads/$PR_BRANCH" >> "$LOG" 2>&1 && \
|
||||
log "Fetched fork PR #$pr_num -> $PR_BRANCH" || \
|
||||
log "WARN: Failed to fetch fork PR #$pr_num"
|
||||
done
|
||||
fi
|
||||
|
||||
# Step 2.5: GitHub main -> Forgejo main (ff-only)
|
||||
# If a PR was merged on GitHub, GitHub main is ahead of Forgejo main.
|
||||
# Fast-forward Forgejo main to match — safe because ff-only guarantees no divergence.
|
||||
GITHUB_MAIN_FF=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
|
||||
FORGEJO_MAIN_FF=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
|
||||
if [ -n "$GITHUB_MAIN_FF" ] && [ -n "$FORGEJO_MAIN_FF" ]; then
|
||||
if [ "$GITHUB_MAIN_FF" != "$FORGEJO_MAIN_FF" ]; then
|
||||
if git merge-base --is-ancestor "$FORGEJO_MAIN_FF" "$GITHUB_MAIN_FF"; then
|
||||
log "GitHub main ($GITHUB_MAIN_FF) ahead of Forgejo main ($FORGEJO_MAIN_FF) — fast-forwarding"
|
||||
git push forgejo "refs/remotes/origin/main:refs/heads/main" >> "$LOG" 2>&1 && \
|
||||
log "Forgejo main fast-forwarded to $GITHUB_MAIN_FF" || \
|
||||
log "WARN: Failed to fast-forward Forgejo main"
|
||||
if [ -z "$pr_num" ] || [ -z "$branch_name" ]; then continue; fi
|
||||
local PR_BRANCH="gh-pr-${pr_num}/${branch_name}"
|
||||
local EXISTING
|
||||
EXISTING=$(git rev-parse "refs/heads/$PR_BRANCH" 2>/dev/null || true)
|
||||
if [ "$EXISTING" = "$head_sha" ]; then continue; fi
|
||||
git fetch origin "refs/pull/${pr_num}/head:refs/heads/$PR_BRANCH" >> "$LOG" 2>&1 && \
|
||||
log "Fetched fork PR #$pr_num -> $PR_BRANCH" || \
|
||||
log "WARN: Failed to fetch fork PR #$pr_num"
|
||||
done
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
# Step 3: Forgejo -> GitHub (primary direction)
|
||||
# Update local refs from Forgejo remote refs using process substitution (avoids subshell)
|
||||
log "Syncing Forgejo -> GitHub..."
|
||||
while read branch; do
|
||||
[ "$branch" = "HEAD" ] && continue
|
||||
git update-ref "refs/heads/$branch" "refs/remotes/forgejo/$branch" 2>/dev/null || \
|
||||
log "WARN: Failed to update ref $branch"
|
||||
done < <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/)
|
||||
|
||||
# Safety: verify Forgejo main descends from GitHub main before force-pushing
|
||||
GITHUB_MAIN=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
|
||||
FORGEJO_MAIN=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
|
||||
PUSH_MAIN=true
|
||||
if [ -n "$GITHUB_MAIN" ] && [ -n "$FORGEJO_MAIN" ]; then
|
||||
if ! git merge-base --is-ancestor "$GITHUB_MAIN" "$FORGEJO_MAIN"; then
|
||||
log "CRITICAL: Forgejo main is NOT a descendant of GitHub main — skipping main push"
|
||||
log "CRITICAL: GitHub main: $GITHUB_MAIN, Forgejo main: $FORGEJO_MAIN"
|
||||
PUSH_MAIN=false
|
||||
# Step 2.5: GitHub main -> Forgejo main (ff-only)
|
||||
# If a PR was merged on GitHub, GitHub main is ahead of Forgejo main.
|
||||
# Fast-forward Forgejo main to match — safe because ff-only guarantees no divergence.
|
||||
local GITHUB_MAIN_FF FORGEJO_MAIN_FF
|
||||
GITHUB_MAIN_FF=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
|
||||
FORGEJO_MAIN_FF=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
|
||||
if [ -n "$GITHUB_MAIN_FF" ] && [ -n "$FORGEJO_MAIN_FF" ]; then
|
||||
if [ "$GITHUB_MAIN_FF" != "$FORGEJO_MAIN_FF" ]; then
|
||||
if git merge-base --is-ancestor "$FORGEJO_MAIN_FF" "$GITHUB_MAIN_FF"; then
|
||||
log "GitHub main ($GITHUB_MAIN_FF) ahead of Forgejo main ($FORGEJO_MAIN_FF) — fast-forwarding"
|
||||
git push forgejo "refs/remotes/origin/main:refs/heads/main" >> "$LOG" 2>&1 && \
|
||||
log "Forgejo main fast-forwarded to $GITHUB_MAIN_FF" || \
|
||||
log "WARN: Failed to fast-forward Forgejo main"
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ "$PUSH_MAIN" = true ]; then
|
||||
git push origin --all --force >> "$LOG" 2>&1 || log "WARN: Push to GitHub failed"
|
||||
else
|
||||
# Push all branches except main
|
||||
# Step 3: Forgejo -> GitHub (primary direction)
|
||||
log "Syncing Forgejo -> GitHub..."
|
||||
while read branch; do
|
||||
[ "$branch" = "main" ] && continue
|
||||
[ "$branch" = "HEAD" ] && continue
|
||||
git push origin --force "refs/heads/$branch:refs/heads/$branch" >> "$LOG" 2>&1 || \
|
||||
log "WARN: Failed to push $branch to GitHub"
|
||||
done < <(git for-each-ref --format="%(refname:lstrip=2)" refs/heads/)
|
||||
fi
|
||||
git push origin --tags --force >> "$LOG" 2>&1 || log "WARN: Tag push to GitHub failed"
|
||||
git update-ref "refs/heads/$branch" "refs/remotes/forgejo/$branch" 2>/dev/null || \
|
||||
log "WARN: Failed to update ref $branch"
|
||||
done < <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/)
|
||||
|
||||
# Step 4: GitHub -> Forgejo (external contributions only)
|
||||
# Only push branches that exist on GitHub but NOT on Forgejo
|
||||
log "Checking GitHub-only branches..."
|
||||
GITHUB_ONLY=$(comm -23 \
|
||||
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/origin/ | grep -v HEAD | sort) \
|
||||
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/ | grep -v HEAD | sort))
|
||||
# Safety: verify Forgejo main descends from GitHub main before force-pushing
|
||||
local GITHUB_MAIN FORGEJO_MAIN PUSH_MAIN
|
||||
GITHUB_MAIN=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
|
||||
FORGEJO_MAIN=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
|
||||
PUSH_MAIN=true
|
||||
if [ -n "$GITHUB_MAIN" ] && [ -n "$FORGEJO_MAIN" ]; then
|
||||
if ! git merge-base --is-ancestor "$GITHUB_MAIN" "$FORGEJO_MAIN"; then
|
||||
log "CRITICAL: Forgejo main is NOT a descendant of GitHub main — skipping main push"
|
||||
log "CRITICAL: GitHub main: $GITHUB_MAIN, Forgejo main: $FORGEJO_MAIN"
|
||||
PUSH_MAIN=false
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -n "$GITHUB_ONLY" ]; then
|
||||
if [ "$MODE" = "main_only" ]; then
|
||||
# Infra-style mirror: push main + tags ONLY. Pre-review agent branches
|
||||
# (epimetheus/*, ganymede/*, etc.) carry internal context — agent UUIDs,
|
||||
# in-flight discussion, WIP — and must not land in the public GitHub
|
||||
# history. (Ganymede review, finding #1.)
|
||||
if [ "$PUSH_MAIN" = true ]; then
|
||||
git push origin --force "refs/heads/main:refs/heads/main" >> "$LOG" 2>&1 || \
|
||||
log "WARN: main push to GitHub failed"
|
||||
fi
|
||||
else
|
||||
# Bidirectional mirror (codex): push all branches so external
|
||||
# contributors can fork from any branch, not just main.
|
||||
if [ "$PUSH_MAIN" = true ]; then
|
||||
git push origin --all --force >> "$LOG" 2>&1 || log "WARN: Push to GitHub failed"
|
||||
else
|
||||
# Push all branches except main when main is divergent
|
||||
while read branch; do
|
||||
[ "$branch" = "main" ] && continue
|
||||
[ "$branch" = "HEAD" ] && continue
|
||||
git push origin --force "refs/heads/$branch:refs/heads/$branch" >> "$LOG" 2>&1 || \
|
||||
log "WARN: Failed to push $branch to GitHub"
|
||||
done < <(git for-each-ref --format="%(refname:lstrip=2)" refs/heads/)
|
||||
fi
|
||||
fi
|
||||
git push origin --tags --force >> "$LOG" 2>&1 || log "WARN: Tag push to GitHub failed"
|
||||
|
||||
# Step 4: GitHub -> Forgejo + Forgejo PR auto-create (bidirectional only)
|
||||
if [ "$MODE" = "bidirectional" ]; then
|
||||
sync_github_to_forgejo_with_prs
|
||||
fi
|
||||
|
||||
# Step 6: Divergence alerting (applies to both modes)
|
||||
check_divergence
|
||||
}
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Step 4 split out: codex-specific GitHub→Forgejo branch push + PR auto-create.
|
||||
# Reads FORGEJO_REPO, GITHUB_REPO, PIPELINE_DB, REPO_TAG from sync_repo scope.
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
sync_github_to_forgejo_with_prs() {
|
||||
log "Checking GitHub-only branches..."
|
||||
local FORGEJO_HOST="http://localhost:3000/api/v1/repos/$FORGEJO_REPO"
|
||||
local GITHUB_ONLY
|
||||
GITHUB_ONLY=$(comm -23 \
|
||||
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/origin/ | grep -v HEAD | sort) \
|
||||
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/ | grep -v HEAD | sort))
|
||||
|
||||
if [ -z "$GITHUB_ONLY" ]; then
|
||||
log "No new GitHub-only branches"
|
||||
return 0
|
||||
fi
|
||||
|
||||
local FORGEJO_TOKEN
|
||||
FORGEJO_TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-admin-token 2>/dev/null)
|
||||
|
||||
# Lazy schema for sync-mirror's auto-create tracker. Records (branch, sha)
|
||||
# pairs we've already auto-created PRs for, so the loop below can skip
|
||||
# redundant creates after pipeline merge → _delete_remote_branch →
|
||||
# GitHub-only re-discovery → re-push. Cheap CREATE IF NOT EXISTS on each
|
||||
# cycle; no migration needed because this table is private to sync-mirror.
|
||||
sqlite3 "$PIPELINE_DB" "CREATE TABLE IF NOT EXISTS sync_autocreate_tracker (branch TEXT NOT NULL, sha TEXT NOT NULL, pr_number INTEGER, created_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (branch, sha));" 2>/dev/null || true
|
||||
|
||||
for branch in $GITHUB_ONLY; do
|
||||
# Already-tracked gate: if we've previously auto-created a PR for
|
||||
# this exact (branch, sha), skip the entire push+create sequence.
|
||||
# Closes the empty-PR loop (research and reweave both observed):
|
||||
# pipeline merges PR → _delete_remote_branch on Forgejo → next sync
|
||||
# sees branch GitHub-only (origin still has it) → re-pushes to
|
||||
# Forgejo → HAS_PR misses (Forgejo ?head= broken; closed PRs scroll
|
||||
# past 50-item paginated window) → auto-creates fresh PR → pipeline
|
||||
# merges (empty no-op via cherry-pick / reweave union) → repeat.
|
||||
# Tracker keys on SHA, so legitimate new commits on the same branch
|
||||
# produce a new SHA → tracker miss → auto-create proceeds normally.
|
||||
local BRANCH_SHA TRACKED_PR
|
||||
if [[ "$branch" == gh-pr-* ]]; then
|
||||
BRANCH_SHA=$(git rev-parse "refs/heads/$branch" 2>/dev/null || true)
|
||||
else
|
||||
BRANCH_SHA=$(git rev-parse "refs/remotes/origin/$branch" 2>/dev/null || true)
|
||||
fi
|
||||
if [ -n "$BRANCH_SHA" ]; then
|
||||
# stderr → $LOG so sustained sqlite3 contention surfaces in ops logs
|
||||
# rather than silently falling through to a redundant auto-create.
|
||||
TRACKED_PR=$(sqlite3 "$PIPELINE_DB" "SELECT pr_number FROM sync_autocreate_tracker WHERE branch=$(printf "'%s'" "${branch//\'/\'\'}") AND sha=$(printf "'%s'" "$BRANCH_SHA") LIMIT 1;" 2>>"$LOG" || echo "")
|
||||
if [ -n "$TRACKED_PR" ]; then
|
||||
log "Skip auto-create: $branch SHA $BRANCH_SHA already tracked (PR #$TRACKED_PR)"
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
|
||||
log "New from GitHub: $branch -> Forgejo"
|
||||
# Fork PR branches live as local refs (from Step 2.1), not on origin remote
|
||||
if [[ "$branch" == gh-pr-* ]]; then
|
||||
|
|
@ -151,22 +252,23 @@ if [ -n "$GITHUB_ONLY" ]; then
|
|||
continue
|
||||
}
|
||||
fi
|
||||
# Auto-create PR on Forgejo for mirrored branches (external contributor path)
|
||||
# Skip pipeline-internal branches
|
||||
# Skip pipeline-internal branch prefixes (no PR creation)
|
||||
case "$branch" in
|
||||
extract/*|ingestion/*) continue ;;
|
||||
esac
|
||||
if [ -n "$FORGEJO_TOKEN" ]; then
|
||||
# Check if PR already exists for this branch (open or closed)
|
||||
# NOTE: Forgejo ?head= filter is broken (ignores head value, returns all PRs).
|
||||
# Workaround: fetch open+closed PRs, pipe to Python, check head.ref.
|
||||
HAS_PR=$( {
|
||||
curl -sf "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=open&limit=50" \
|
||||
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
|
||||
echo ""
|
||||
curl -sf "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=closed&sort=created&limit=50" \
|
||||
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
|
||||
} | python3 -c "
|
||||
if [ -z "$FORGEJO_TOKEN" ]; then continue; fi
|
||||
|
||||
# Check if PR already exists for this branch (open or closed)
|
||||
# NOTE: Forgejo ?head= filter is broken (ignores head value, returns all PRs).
|
||||
# Workaround: fetch open+closed PRs, pipe to Python, check head.ref.
|
||||
local HAS_PR
|
||||
HAS_PR=$( {
|
||||
curl -sf "$FORGEJO_HOST/pulls?state=open&limit=50" \
|
||||
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
|
||||
echo ""
|
||||
curl -sf "$FORGEJO_HOST/pulls?state=closed&sort=created&limit=50" \
|
||||
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
|
||||
} | python3 -c "
|
||||
import sys, json
|
||||
branch = sys.argv[1]
|
||||
for line in sys.stdin:
|
||||
|
|
@ -179,104 +281,171 @@ for line in sys.stdin:
|
|||
except: pass
|
||||
print('no')
|
||||
" "$branch" 2>/dev/null || echo "no")
|
||||
if [ "$HAS_PR" = "no" ]; then
|
||||
# Build PR title — for fork PRs, use the GitHub PR title
|
||||
if [[ "$branch" == gh-pr-* ]]; then
|
||||
FORK_GH_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
|
||||
GITHUB_PAT_T=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
|
||||
PR_TITLE=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls/$FORK_GH_NUM" \
|
||||
-H "Authorization: token $GITHUB_PAT_T" 2>/dev/null | \
|
||||
python3 -c "import sys,json; print(json.load(sys.stdin).get('title',''))" 2>/dev/null || true)
|
||||
[ -z "$PR_TITLE" ] && PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
|
||||
else
|
||||
PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
|
||||
fi
|
||||
PAYLOAD=$(python3 -c "import sys,json; print(json.dumps({'title':sys.argv[1],'head':sys.argv[2],'base':'main'}))" "$PR_TITLE" "$branch")
|
||||
RESULT=$(curl -sf -X POST "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls" \
|
||||
-H "Authorization: token $FORGEJO_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$PAYLOAD" 2>/dev/null || echo "")
|
||||
PR_NUM=$(echo "$RESULT" | grep -o '"number":[0-9]*' | head -1 | grep -o "[0-9]*" || true)
|
||||
if [ -n "$PR_NUM" ]; then
|
||||
log "Auto-created PR #$PR_NUM on Forgejo for $branch"
|
||||
# Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB
|
||||
if [[ "$branch" == gh-pr-* ]]; then
|
||||
GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
|
||||
else
|
||||
GITHUB_PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
|
||||
GH_PR_NUM=""
|
||||
if [ -n "$GITHUB_PAT" ]; then
|
||||
GH_PR_NUM=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?head=living-ip:$branch&state=all" \
|
||||
-H "Authorization: token $GITHUB_PAT" 2>/dev/null | \
|
||||
python3 -c "import sys,json; prs=json.load(sys.stdin); print(prs[0]['number'] if prs else '')" 2>/dev/null || true)
|
||||
fi
|
||||
fi
|
||||
if [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
|
||||
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM WHERE number = $PR_NUM;" 2>/dev/null && \
|
||||
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM" || \
|
||||
log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB"
|
||||
fi
|
||||
else
|
||||
log "WARN: Failed to auto-create PR for $branch"
|
||||
fi
|
||||
|
||||
if [ "$HAS_PR" = "yes" ]; then continue; fi
|
||||
|
||||
# Build PR title — for fork PRs, use the GitHub PR title
|
||||
local PR_TITLE PAYLOAD RESULT PR_NUM GH_PR_NUM
|
||||
if [[ "$branch" == gh-pr-* ]]; then
|
||||
local FORK_GH_NUM PAT_T
|
||||
FORK_GH_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
|
||||
PAT_T=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
|
||||
PR_TITLE=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls/$FORK_GH_NUM" \
|
||||
-H "Authorization: token $PAT_T" 2>/dev/null | \
|
||||
python3 -c "import sys,json; print(json.load(sys.stdin).get('title',''))" 2>/dev/null || true)
|
||||
[ -z "$PR_TITLE" ] && PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
|
||||
else
|
||||
PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
|
||||
fi
|
||||
PAYLOAD=$(python3 -c "import sys,json; print(json.dumps({'title':sys.argv[1],'head':sys.argv[2],'base':'main'}))" "$PR_TITLE" "$branch")
|
||||
RESULT=$(curl -sf -X POST "$FORGEJO_HOST/pulls" \
|
||||
-H "Authorization: token $FORGEJO_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$PAYLOAD" 2>/dev/null || echo "")
|
||||
PR_NUM=$(echo "$RESULT" | grep -o '"number":[0-9]*' | head -1 | grep -o "[0-9]*" || true)
|
||||
if [ -z "$PR_NUM" ]; then
|
||||
log "WARN: Failed to auto-create PR for $branch"
|
||||
continue
|
||||
fi
|
||||
log "Auto-created PR #$PR_NUM on Forgejo for $branch"
|
||||
|
||||
# Record (branch, sha, pr_number) so the tracker gate above can short-
|
||||
# circuit the next time we see this exact (branch, sha) combination.
|
||||
# INSERT OR IGNORE: idempotent if a concurrent run already inserted.
|
||||
# WARN log on failure: silent INSERT failure under sustained sqlite3
|
||||
# contention would mask the loop reappearing on the next cycle (HAS_PR
|
||||
# only saves us while the closed PR is in the 50-item pagination window).
|
||||
if [ -n "$BRANCH_SHA" ] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
|
||||
if ! sqlite3 "$PIPELINE_DB" "INSERT OR IGNORE INTO sync_autocreate_tracker (branch, sha, pr_number) VALUES ($(printf "'%s'" "${branch//\'/\'\'}"), $(printf "'%s'" "$BRANCH_SHA"), $PR_NUM);" 2>>"$LOG"; then
|
||||
log "WARN: tracker insert failed for $branch SHA $BRANCH_SHA (PR #$PR_NUM) — duplicate auto-create possible next cycle"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB
|
||||
if [[ "$branch" == gh-pr-* ]]; then
|
||||
GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
|
||||
else
|
||||
local PAT
|
||||
PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
|
||||
GH_PR_NUM=""
|
||||
if [ -n "$PAT" ]; then
|
||||
GH_PR_NUM=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?head=living-ip:$branch&state=all" \
|
||||
-H "Authorization: token $PAT" 2>/dev/null | \
|
||||
python3 -c "import sys,json; prs=json.load(sys.stdin); print(prs[0]['number'] if prs else '')" 2>/dev/null || true)
|
||||
fi
|
||||
fi
|
||||
if [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
|
||||
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM, source_channel = 'github' WHERE number = $PR_NUM;" 2>/dev/null && \
|
||||
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM" || \
|
||||
log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB"
|
||||
fi
|
||||
done
|
||||
else
|
||||
log "No new GitHub-only branches"
|
||||
fi
|
||||
}
|
||||
|
||||
# Step 6: Divergence alerting
|
||||
# After all sync steps, check if GitHub and Forgejo main still differ.
|
||||
# 2 consecutive divergent cycles (4 min) triggers a one-shot Telegram alert.
|
||||
DIVERGENCE_FILE="/opt/teleo-eval/logs/.divergence-count"
|
||||
git fetch forgejo main --quiet 2>/dev/null || true
|
||||
git fetch origin main --quiet 2>/dev/null || true
|
||||
GH_MAIN_FINAL=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
|
||||
FG_MAIN_FINAL=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
|
||||
|
||||
if [ -n "$GH_MAIN_FINAL" ] && [ -n "$FG_MAIN_FINAL" ] && [ "$GH_MAIN_FINAL" != "$FG_MAIN_FINAL" ]; then
|
||||
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
|
||||
if [ "$PREV" = "alerted" ]; then
|
||||
log "DIVERGENCE: still diverged (already alerted)"
|
||||
else
|
||||
COUNT=$((PREV + 1))
|
||||
echo "$COUNT" > "$DIVERGENCE_FILE"
|
||||
log "DIVERGENCE: cycle $COUNT — GitHub=$GH_MAIN_FINAL Forgejo=$FG_MAIN_FINAL"
|
||||
if [ "$COUNT" -ge 2 ]; then
|
||||
BOT_TOKEN=$(cat /opt/teleo-eval/secrets/telegram-bot-token 2>/dev/null || true)
|
||||
ADMIN_CHAT=$(cat /opt/teleo-eval/secrets/admin-chat-id 2>/dev/null || true)
|
||||
if [ -n "$BOT_TOKEN" ] && [ -n "$ADMIN_CHAT" ]; then
|
||||
ALERT_MSG=$(python3 -c "
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Step 6 split out: divergence alerting. Per-repo state file so each repo
|
||||
# has its own divergence counter and alert state.
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
check_divergence() {
|
||||
local DIVERGENCE_FILE="/opt/teleo-eval/logs/.divergence-count.${REPO_TAG}"
|
||||
git fetch forgejo main --quiet 2>/dev/null || true
|
||||
git fetch origin main --quiet 2>/dev/null || true
|
||||
local GH_MAIN_FINAL FG_MAIN_FINAL
|
||||
GH_MAIN_FINAL=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
|
||||
FG_MAIN_FINAL=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
|
||||
|
||||
if [ -n "$GH_MAIN_FINAL" ] && [ -n "$FG_MAIN_FINAL" ] && [ "$GH_MAIN_FINAL" != "$FG_MAIN_FINAL" ]; then
|
||||
local PREV
|
||||
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
|
||||
if [ "$PREV" = "alerted" ]; then
|
||||
log "DIVERGENCE: still diverged (already alerted)"
|
||||
else
|
||||
local COUNT=$((PREV + 1))
|
||||
echo "$COUNT" > "$DIVERGENCE_FILE"
|
||||
log "DIVERGENCE: cycle $COUNT — GitHub=$GH_MAIN_FINAL Forgejo=$FG_MAIN_FINAL"
|
||||
if [ "$COUNT" -ge 2 ]; then
|
||||
local BOT_TOKEN ADMIN_CHAT
|
||||
BOT_TOKEN=$(cat /opt/teleo-eval/secrets/telegram-bot-token 2>/dev/null || true)
|
||||
ADMIN_CHAT=$(cat /opt/teleo-eval/secrets/admin-chat-id 2>/dev/null || true)
|
||||
if [ -n "$BOT_TOKEN" ] && [ -n "$ADMIN_CHAT" ]; then
|
||||
local ALERT_MSG
|
||||
ALERT_MSG=$(python3 -c "
|
||||
import json, sys
|
||||
msg = '⚠️ Mirror divergence detected\\n\\n'
|
||||
msg = '⚠️ Mirror divergence detected (' + sys.argv[5] + ')\\n\\n'
|
||||
msg += f'GitHub main: {sys.argv[1][:8]}\\n'
|
||||
msg += f'Forgejo main: {sys.argv[2][:8]}\\n'
|
||||
msg += f'Diverged for {sys.argv[3]} consecutive cycles ({int(sys.argv[3])*2} min)\\n\\n'
|
||||
msg += 'Check sync-mirror.sh logs: /opt/teleo-eval/logs/sync.log'
|
||||
print(json.dumps({'chat_id': sys.argv[4], 'text': msg, 'parse_mode': 'HTML'}))
|
||||
" "$GH_MAIN_FINAL" "$FG_MAIN_FINAL" "$COUNT" "$ADMIN_CHAT")
|
||||
if curl -sf -X POST "https://api.telegram.org/bot${BOT_TOKEN}/sendMessage" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$ALERT_MSG" >> "$LOG" 2>&1; then
|
||||
log "DIVERGENCE: alert sent to admin"
|
||||
echo "alerted" > "$DIVERGENCE_FILE"
|
||||
" "$GH_MAIN_FINAL" "$FG_MAIN_FINAL" "$COUNT" "$ADMIN_CHAT" "$REPO_TAG")
|
||||
if curl -sf -X POST "https://api.telegram.org/bot${BOT_TOKEN}/sendMessage" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$ALERT_MSG" >> "$LOG" 2>&1; then
|
||||
log "DIVERGENCE: alert sent to admin"
|
||||
echo "alerted" > "$DIVERGENCE_FILE"
|
||||
else
|
||||
log "WARN: Failed to send divergence alert (will retry next cycle)"
|
||||
fi
|
||||
else
|
||||
log "WARN: Failed to send divergence alert (will retry next cycle)"
|
||||
log "WARN: Cannot send divergence alert — missing bot token or admin chat ID"
|
||||
fi
|
||||
else
|
||||
log "WARN: Cannot send divergence alert — missing bot token or admin chat ID"
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
else
|
||||
if [ -f "$DIVERGENCE_FILE" ]; then
|
||||
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
|
||||
if [ "$PREV" != "0" ]; then
|
||||
log "DIVERGENCE: resolved — repos back in sync"
|
||||
else
|
||||
if [ -f "$DIVERGENCE_FILE" ]; then
|
||||
local PREV
|
||||
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
|
||||
if [ "$PREV" != "0" ]; then
|
||||
log "DIVERGENCE: resolved — repos back in sync"
|
||||
fi
|
||||
rm -f "$DIVERGENCE_FILE"
|
||||
fi
|
||||
rm -f "$DIVERGENCE_FILE"
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Main: process each configured mirror in sequence.
|
||||
# A failure on one repo doesn't block subsequent repos — sync_repo returns 0
|
||||
# on most error paths to keep the loop going.
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
REPO_TAG="main"
|
||||
log "Starting sync cycle"
|
||||
|
||||
# Step 0: self-heal any gh-pr-* PR rows missing github_pr.
|
||||
# Runs FIRST — before per-repo work (branch-mirror loop, auto-create-PR block).
|
||||
# Recovers from races/transient failures in Step 4.5's one-shot link UPDATE.
|
||||
# Idempotent: SELECT empty when clean, zero-cost path. Same SELECT/UPDATE
|
||||
# heals historical orphans (PR 4066 picked up on first cron tick post-deploy)
|
||||
# and future races on subsequent ticks. The branch name encodes the GitHub PR
|
||||
# number deterministically (gh-pr-{N}/...) so no API call is required.
|
||||
if [ -f "$PIPELINE_DB" ]; then
|
||||
sqlite3 -separator '|' "$PIPELINE_DB" \
|
||||
"SELECT number, branch FROM prs WHERE branch LIKE 'gh-pr-%' AND github_pr IS NULL;" \
|
||||
2>/dev/null | while IFS='|' read -r pr_num branch; do
|
||||
# Regex requires >=1 digit — empty/non-numeric branches fail to parse here,
|
||||
# not just at the empty-guard below. Keeps SQL-integer-safety load-bearing
|
||||
# on the regex alone. [0-9][0-9]* is the portable BRE form of [0-9]+,
|
||||
# works on both GNU sed (VPS) and BSD sed (dev macs).
|
||||
gh_pr_num=$(echo "$branch" | sed -n 's|^gh-pr-\([0-9][0-9]*\)/.*|\1|p')
|
||||
[ -z "$gh_pr_num" ] && continue
|
||||
# Both interpolated values are integer-validated upstream (pr_num from
|
||||
# INTEGER `number` column, gh_pr_num from regex above). No parametric
|
||||
# binding available in bash sqlite3 — safety relies on those invariants.
|
||||
if sqlite3 "$PIPELINE_DB" \
|
||||
"UPDATE prs SET github_pr = $gh_pr_num, source_channel = 'github' WHERE number = $pr_num;" \
|
||||
2>/dev/null; then
|
||||
log "self-heal: linked Forgejo PR #$pr_num -> GitHub PR #$gh_pr_num"
|
||||
fi
|
||||
done
|
||||
fi
|
||||
|
||||
log "Sync complete"
|
||||
for entry in "${MIRROR_REPOS[@]}"; do
|
||||
# Read the 4 fields. `read` splits on $IFS (whitespace) by default.
|
||||
read -r forgejo_repo github_repo bare_path mode <<< "$entry"
|
||||
sync_repo "$forgejo_repo" "$github_repo" "$bare_path" "$mode"
|
||||
done
|
||||
|
||||
REPO_TAG="main"
|
||||
log "Sync cycle complete"
|
||||
|
|
|
|||
|
|
@ -9,6 +9,16 @@ DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
|
|||
_cache = {"data": None, "ts": 0}
|
||||
CACHE_TTL = 60 # 1 minute — activity should feel fresh
|
||||
|
||||
# commit_types we surface in the activity feed. `pipeline` is system
|
||||
# maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden.
|
||||
_FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave")
|
||||
|
||||
# Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're
|
||||
# inbox archive filenames, not claim slugs. Used as a fallback signal when
|
||||
# branch/description heuristics miss (e.g. populated descriptions that
|
||||
# happen to be source titles, not claim insights).
|
||||
_SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$")
|
||||
|
||||
|
||||
def _get_conn():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
|
|
@ -17,28 +27,146 @@ def _get_conn():
|
|||
return conn
|
||||
|
||||
|
||||
def _classify_event(branch, description, commit_type):
|
||||
if commit_type != "knowledge":
|
||||
def _is_source_slug(slug):
|
||||
return bool(slug and _SOURCE_SLUG_PATTERN.match(slug))
|
||||
|
||||
|
||||
def _classify_event(branch, description, commit_type, candidate_slug=None):
|
||||
"""Return one of: create | enrich | challenge | source | session_digest | None.
|
||||
|
||||
Source-archive PRs are extract/* branches that filed a source into
|
||||
inbox/archive/ but didn't produce a claim. Session-digest PRs are
|
||||
agent research/entity commits with no per-claim description — they
|
||||
represent session-level rollups, not specific knowledge artifacts.
|
||||
"""
|
||||
commit_type_l = (commit_type or "").lower()
|
||||
branch = branch or ""
|
||||
description_lower = (description or "").lower()
|
||||
has_desc = bool(description and description.strip())
|
||||
|
||||
if commit_type_l not in _FEED_COMMIT_TYPES:
|
||||
return None
|
||||
if branch and branch.startswith("extract/"):
|
||||
return "create"
|
||||
if branch and branch.startswith("reweave/"):
|
||||
return "enrich"
|
||||
if branch and branch.startswith("challenge/"):
|
||||
|
||||
# Explicit challenge signals win first.
|
||||
if (commit_type_l == "challenge"
|
||||
or branch.startswith("challenge/")
|
||||
or "challenged_by" in description_lower):
|
||||
return "challenge"
|
||||
if description and "challenged_by" in description.lower():
|
||||
return "challenge"
|
||||
if branch and branch.startswith("enrich/"):
|
||||
|
||||
# Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich.
|
||||
if (commit_type_l == "enrich"
|
||||
or branch.startswith("enrich/")
|
||||
or branch.startswith("reweave/")):
|
||||
return "enrich"
|
||||
|
||||
# Research and entity commits with no description are session-level
|
||||
# rollups (e.g. astra/research-2026-05-11). They have no claim to
|
||||
# link to — surface as session_digest, not as a phantom create.
|
||||
if commit_type_l in ("research", "entity") and not has_desc:
|
||||
return "session_digest"
|
||||
|
||||
# Source-only: extract/* with no claim description means inbox archive
|
||||
# landed but no domain claim was written.
|
||||
if branch.startswith("extract/") and not has_desc:
|
||||
return "source"
|
||||
|
||||
# Belt-and-suspenders: if the slug we'd surface to the frontend looks
|
||||
# like an inbox archive filename (date-prefix-hash), treat as source
|
||||
# regardless of branch/commit_type/description state. Catches cases
|
||||
# where description leaked but is just a source title, not a claim.
|
||||
if _is_source_slug(candidate_slug):
|
||||
return "source"
|
||||
|
||||
# Everything else with a description is a new claim.
|
||||
return "create"
|
||||
|
||||
|
||||
# Internal classifier value -> canonical `kind` enum returned to frontend.
|
||||
_KIND_MAP = {
|
||||
"create": "claim_merged",
|
||||
"enrich": "claim_enriched",
|
||||
"challenge": "claim_challenged",
|
||||
"source": "source_archived",
|
||||
"session_digest": "session_digest",
|
||||
}
|
||||
|
||||
|
||||
def _archive_slug_from_branch(branch):
|
||||
"""For extract/YYYY-MM-DD-...-HASH4, return YYYY-MM-DD-... (keep date,
|
||||
drop the 4-hex hash suffix). Matches inbox/archive filename convention.
|
||||
"""
|
||||
if not branch or "/" not in branch:
|
||||
return ""
|
||||
slug = branch.split("/", 1)[1]
|
||||
return re.sub(r"-[a-f0-9]{4}$", "", slug)
|
||||
|
||||
|
||||
def _source_target_url(domain, archive_slug):
|
||||
"""Forgejo blob URL for an archived source file. Falls back to the
|
||||
repo-wide inbox/archive directory when domain is unknown so the link
|
||||
still resolves to something useful instead of a 404.
|
||||
"""
|
||||
if not archive_slug:
|
||||
return None
|
||||
domain = (domain or "").strip()
|
||||
if not domain or domain == "unknown":
|
||||
return "https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive"
|
||||
return (
|
||||
"https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive/"
|
||||
f"{domain}/{archive_slug}.md"
|
||||
)
|
||||
|
||||
|
||||
def _claim_target_url(claim_slug):
|
||||
if not claim_slug:
|
||||
return None
|
||||
return f"/claims/{claim_slug}"
|
||||
|
||||
|
||||
# Canonical clickthrough URL for an activity-feed event.
|
||||
#
|
||||
# Every merged PR in the pipeline.db `prs` table lives on Forgejo at
|
||||
# git.livingip.xyz/teleo/teleo-codex/pulls/{number}. A small subset (3 of
|
||||
# 4094 as of 2026-05-13) was additionally mirrored to GitHub and has
|
||||
# prs.github_pr populated. Prefer GitHub when available (more public-facing
|
||||
# surface), fall back to Forgejo so every row has a real destination
|
||||
# instead of None (which makes the frontend whole-row overlay no-op and
|
||||
# leaves pipeline-attributed events looking dead-on-click).
|
||||
def _pr_url(pr_number, github_pr):
|
||||
if github_pr:
|
||||
return f"https://github.com/living-ip/teleo-codex/pull/{github_pr}"
|
||||
if pr_number:
|
||||
return f"https://git.livingip.xyz/teleo/teleo-codex/pulls/{pr_number}"
|
||||
return None
|
||||
|
||||
|
||||
# Canonicalize contributor labels so frontend links resolve to real
|
||||
# /contributors/{handle} pages. Pipeline writers (extract.py, manual edits,
|
||||
# the old backfill_submitted_by.py) historically wrote mixed-case agent
|
||||
# names with a trailing decorator into prs.submitted_by — e.g.
|
||||
# "Vida (self-directed)", "pipeline (reweave)", or "@m3taversal".
|
||||
# These decorated strings do not exist as contributors and 404 the profile
|
||||
# page. Strip the trailing parenthetical wholesale: valid handles match
|
||||
# ^[a-z0-9][a-z0-9_-]{0,38}$ (see pipeline/lib/attribution._HANDLE_RE) and
|
||||
# cannot contain parens, so this is lossless.
|
||||
_TRAILING_PAREN_RE = re.compile(r"\s*\([^)]*\)\s*$")
|
||||
|
||||
|
||||
def _canonicalize(raw):
|
||||
if not raw:
|
||||
return ""
|
||||
h = raw.strip().lower().lstrip("@")
|
||||
h = _TRAILING_PAREN_RE.sub("", h).strip()
|
||||
return h
|
||||
|
||||
|
||||
def _normalize_contributor(submitted_by, agent):
|
||||
if submitted_by and submitted_by.strip():
|
||||
name = submitted_by.strip().lstrip("@")
|
||||
name = _canonicalize(submitted_by)
|
||||
if name:
|
||||
return name
|
||||
name = _canonicalize(agent)
|
||||
if name and name != "pipeline":
|
||||
return name
|
||||
if agent and agent.strip() and agent != "pipeline":
|
||||
return agent.strip()
|
||||
return "pipeline"
|
||||
|
||||
|
||||
|
|
@ -59,7 +187,7 @@ def _extract_claim_slugs(description, branch=None):
|
|||
if branch:
|
||||
parts = branch.split("/", 1)
|
||||
if len(parts) > 1:
|
||||
return [parts[1][:120]]
|
||||
return [parts[1]]
|
||||
return []
|
||||
titles = [t.strip() for t in description.split("|") if t.strip()]
|
||||
slugs = []
|
||||
|
|
@ -68,7 +196,7 @@ def _extract_claim_slugs(description, branch=None):
|
|||
slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug)
|
||||
slug = slug.replace(" ", "-").strip("-")
|
||||
if len(slug) > 10:
|
||||
slugs.append(slug[:120])
|
||||
slugs.append(slug)
|
||||
return slugs
|
||||
|
||||
|
||||
|
|
@ -81,33 +209,98 @@ def _hot_score(challenge_count, enrich_count, signal_count, hours_since):
|
|||
def _build_events():
|
||||
conn = _get_conn()
|
||||
try:
|
||||
rows = conn.execute("""
|
||||
placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES))
|
||||
rows = conn.execute(f"""
|
||||
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
|
||||
p.merged_at, p.description, p.commit_type, p.cost_usd,
|
||||
p.source_channel
|
||||
p.source_channel, p.source_path, p.github_pr
|
||||
FROM prs p
|
||||
WHERE p.status = 'merged'
|
||||
AND p.commit_type = 'knowledge'
|
||||
AND p.commit_type IN ({placeholders})
|
||||
AND p.merged_at IS NOT NULL
|
||||
ORDER BY p.merged_at DESC
|
||||
LIMIT 2000
|
||||
""").fetchall()
|
||||
""", _FEED_COMMIT_TYPES).fetchall()
|
||||
|
||||
events = []
|
||||
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
|
||||
|
||||
for row in rows:
|
||||
event_type = _classify_event(row["branch"], row["description"], row["commit_type"])
|
||||
slugs = _extract_claim_slugs(row["description"], row["branch"])
|
||||
candidate_slug = slugs[0] if slugs else ""
|
||||
event_type = _classify_event(
|
||||
row["branch"], row["description"], row["commit_type"],
|
||||
candidate_slug=candidate_slug,
|
||||
)
|
||||
if not event_type:
|
||||
continue
|
||||
|
||||
contributor = _normalize_contributor(row["submitted_by"], row["agent"])
|
||||
slugs = _extract_claim_slugs(row["description"], row["branch"])
|
||||
# Hide pipeline-attributed events (reweave/*, ingestion/*) from the
|
||||
# public activity feed. They're automation maintenance, not
|
||||
# contributions — the daemon re-knits the graph nightly and ingests
|
||||
# external sources. Internal diagnostics + CI math still see these
|
||||
# rows in prs / contribution_events; only the public timeline drops
|
||||
# them. Mirrors the existing _FEED_COMMIT_TYPES filter (which hides
|
||||
# commit_type='pipeline') along the contributor axis.
|
||||
if contributor == "pipeline":
|
||||
continue
|
||||
merged_at = row["merged_at"] or ""
|
||||
domain = row["domain"] or "unknown"
|
||||
kind = _KIND_MAP.get(event_type, event_type)
|
||||
|
||||
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40}
|
||||
ci_map = {
|
||||
"create": 0.35, "enrich": 0.25, "challenge": 0.40,
|
||||
"source": 0.15, "session_digest": 0.05,
|
||||
}
|
||||
ci_earned = ci_map.get(event_type, 0)
|
||||
|
||||
# Source events never carry a claim_slug — no claim was written.
|
||||
# target_url points at the archived file on Forgejo instead.
|
||||
if event_type == "source":
|
||||
archive_slug = _archive_slug_from_branch(row["branch"])
|
||||
summary_text = _summary_from_branch(row["branch"])
|
||||
source_display_slug = (
|
||||
summary_text.lower().replace(" ", "-") or row["branch"]
|
||||
)
|
||||
events.append({
|
||||
"kind": kind,
|
||||
"type": "source",
|
||||
"target_url": _source_target_url(domain, archive_slug),
|
||||
"claim_slug": "",
|
||||
"source_slug": source_display_slug,
|
||||
"domain": domain,
|
||||
"contributor": contributor,
|
||||
"timestamp": merged_at,
|
||||
"ci_earned": round(ci_earned, 2),
|
||||
"summary": summary_text,
|
||||
"pr_number": row["number"],
|
||||
"pr_url": _pr_url(row["number"], row["github_pr"]),
|
||||
"source_channel": row["source_channel"] or "unknown",
|
||||
})
|
||||
continue
|
||||
|
||||
# Session digests have no clickthrough surface yet (per-agent
|
||||
# session pages not built). target_url=null so frontend renders
|
||||
# plain text instead of a broken /claims/research-... link.
|
||||
if event_type == "session_digest":
|
||||
summary_text = _summary_from_branch(row["branch"]) or "Research session"
|
||||
events.append({
|
||||
"kind": kind,
|
||||
"type": "session_digest",
|
||||
"target_url": None,
|
||||
"claim_slug": "",
|
||||
"domain": domain,
|
||||
"contributor": contributor,
|
||||
"timestamp": merged_at,
|
||||
"ci_earned": round(ci_earned, 2),
|
||||
"summary": summary_text,
|
||||
"pr_number": row["number"],
|
||||
"pr_url": _pr_url(row["number"], row["github_pr"]),
|
||||
"source_channel": row["source_channel"] or "unknown",
|
||||
})
|
||||
continue
|
||||
|
||||
for slug in slugs:
|
||||
if slug not in claim_activity:
|
||||
claim_activity[slug] = {
|
||||
|
|
@ -132,14 +325,17 @@ def _build_events():
|
|||
|
||||
for slug in (slugs[:1] if slugs else [""]):
|
||||
events.append({
|
||||
"kind": kind,
|
||||
"type": event_type,
|
||||
"target_url": _claim_target_url(slug),
|
||||
"claim_slug": slug,
|
||||
"domain": row["domain"] or "unknown",
|
||||
"domain": domain,
|
||||
"contributor": contributor,
|
||||
"timestamp": merged_at,
|
||||
"ci_earned": round(ci_earned, 2),
|
||||
"summary": summary_text,
|
||||
"pr_number": row["number"],
|
||||
"pr_url": _pr_url(row["number"], row["github_pr"]),
|
||||
"source_channel": row["source_channel"] or "unknown",
|
||||
})
|
||||
|
||||
|
|
@ -164,8 +360,11 @@ def _sort_events(events, claim_activity, sort_mode, now_ts):
|
|||
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
|
||||
events.sort(key=hot_key, reverse=True)
|
||||
elif sort_mode == "important":
|
||||
type_rank = {"challenge": 0, "enrich": 1, "create": 2}
|
||||
events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"])))
|
||||
type_rank = {
|
||||
"challenge": 0, "enrich": 1, "create": 2,
|
||||
"source": 3, "session_digest": 4,
|
||||
}
|
||||
events.sort(key=lambda e: (type_rank.get(e["type"], 5), -len(e["summary"])))
|
||||
return events
|
||||
|
||||
|
||||
|
|
@ -175,6 +374,8 @@ async def handle_activity_feed(request):
|
|||
sort_mode = "recent"
|
||||
domain = request.query.get("domain", "")
|
||||
contributor = request.query.get("contributor", "")
|
||||
type_param = request.query.get("type", "")
|
||||
type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None
|
||||
try:
|
||||
limit = min(int(request.query.get("limit", "20")), 100)
|
||||
except ValueError:
|
||||
|
|
@ -196,6 +397,14 @@ async def handle_activity_feed(request):
|
|||
filtered = [e for e in filtered if e["domain"] == domain]
|
||||
if contributor:
|
||||
filtered = [e for e in filtered if e["contributor"] == contributor]
|
||||
if type_filter:
|
||||
# Accept both legacy `type` values (create/enrich/challenge/source/
|
||||
# session_digest) and canonical `kind` values (claim_merged/etc.) so
|
||||
# callers can migrate at their own pace.
|
||||
filtered = [
|
||||
e for e in filtered
|
||||
if e["type"] in type_filter or e.get("kind") in type_filter
|
||||
]
|
||||
|
||||
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
|
||||
total = len(sorted_events)
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ from aiohttp import web
|
|||
from review_queue_routes import register_review_queue_routes
|
||||
from daily_digest_routes import register_daily_digest_routes
|
||||
from response_audit_routes import register_response_audit_routes, RESPONSE_AUDIT_PUBLIC_PATHS
|
||||
from leaderboard_routes import register_leaderboard_routes, LEADERBOARD_PUBLIC_PATHS
|
||||
from lib.search import search as kb_search, embed_query, search_qdrant
|
||||
|
||||
logger = logging.getLogger("argus")
|
||||
|
|
@ -508,7 +509,7 @@ def _load_secret(path: Path) -> str | None:
|
|||
@web.middleware
|
||||
async def auth_middleware(request, handler):
|
||||
"""API key check. Public paths skip auth. Protected paths require X-Api-Key header."""
|
||||
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
|
||||
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path in LEADERBOARD_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
|
||||
return await handler(request)
|
||||
expected = request.app.get("api_key")
|
||||
if not expected:
|
||||
|
|
@ -2361,6 +2362,8 @@ def create_app() -> web.Application:
|
|||
# Response audit - cost tracking + reasoning traces
|
||||
app["db_path"] = str(DB_PATH)
|
||||
register_response_audit_routes(app)
|
||||
# Event-sourced leaderboard (Phase B — reads contribution_events directly)
|
||||
register_leaderboard_routes(app)
|
||||
# Timeline activity feed (per-PR + audit_log events for dashboard v2)
|
||||
from activity_endpoint import handle_activity
|
||||
app.router.add_get("/api/activity", handle_activity)
|
||||
|
|
|
|||
|
|
@ -79,12 +79,16 @@ def main():
|
|||
fm = sfm
|
||||
break
|
||||
|
||||
# `submitted_by` is stored as a canonical handle (lowercase, no @, no
|
||||
# "(self-directed)" / "(reweave)" suffix). Read consumers normalize via
|
||||
# attribution.normalize_handle, so writing decorated strings produces
|
||||
# downstream 404s on /contributors/{handle} (livingip-web timeline).
|
||||
if fm:
|
||||
proposed_by = fm.get("proposed_by")
|
||||
intake_tier = fm.get("intake_tier")
|
||||
|
||||
if proposed_by:
|
||||
contributor = proposed_by.strip().strip('"').strip("'")
|
||||
contributor = proposed_by.strip().strip('"').strip("'").lower().lstrip("@")
|
||||
elif intake_tier == "research-task":
|
||||
# Derive agent from branch prefix
|
||||
prefix = branch.split("/", 1)[0] if "/" in branch else "unknown"
|
||||
|
|
@ -94,13 +98,12 @@ def main():
|
|||
"clay": "clay", "astra": "astra", "leo": "leo",
|
||||
"reweave": "pipeline",
|
||||
}
|
||||
agent = agent_map.get(prefix, prefix)
|
||||
contributor = f"{agent} (self-directed)"
|
||||
contributor = agent_map.get(prefix, prefix)
|
||||
elif intake_tier == "directed":
|
||||
contributor = "@m3taversal"
|
||||
contributor = "m3taversal"
|
||||
else:
|
||||
# Default: if source exists but no proposed_by, it was Cory's submission
|
||||
contributor = "@m3taversal"
|
||||
# Default: if source exists but no proposed_by, operator submitted it.
|
||||
contributor = "m3taversal"
|
||||
|
||||
if contributor:
|
||||
conn.execute(
|
||||
|
|
@ -114,19 +117,19 @@ def main():
|
|||
agent = branch.split("/", 1)[0]
|
||||
conn.execute(
|
||||
"UPDATE prs SET submitted_by = ? WHERE number = ?",
|
||||
(f"{agent} (self-directed)", pr["number"]),
|
||||
(agent, pr["number"]),
|
||||
)
|
||||
updated += 1
|
||||
elif branch.startswith("reweave/"):
|
||||
conn.execute(
|
||||
"UPDATE prs SET submitted_by = 'pipeline (reweave)' WHERE number = ?",
|
||||
"UPDATE prs SET submitted_by = 'pipeline' WHERE number = ?",
|
||||
(pr["number"],),
|
||||
)
|
||||
updated += 1
|
||||
else:
|
||||
# Everything else (extract/, ingestion/, unknown) → Cory directed it
|
||||
# Everything else (extract/, ingestion/, unknown) → operator directed it
|
||||
conn.execute(
|
||||
"UPDATE prs SET submitted_by = '@m3taversal' WHERE number = ?",
|
||||
"UPDATE prs SET submitted_by = 'm3taversal' WHERE number = ?",
|
||||
(pr["number"],),
|
||||
)
|
||||
updated += 1
|
||||
|
|
|
|||
|
|
@ -1,79 +1,399 @@
|
|||
"""Claims API endpoint — serves claim data from the codex filesystem."""
|
||||
import os
|
||||
"""Claims API — list endpoint + canonical claim detail page.
|
||||
|
||||
Owner: Argus
|
||||
Routes:
|
||||
GET /api/claims — list/filter (frontmatter scan, lightweight)
|
||||
GET /api/claims/{slug} — full claim detail (Ship contract)
|
||||
GET /api/domains — domain rollups for sidebar
|
||||
|
||||
The detail endpoint is the canonical /claims/{slug} backend per Ship's
|
||||
2026-04-29 brief. One round-trip, no N+1 cascade. Wikilinks resolved
|
||||
server-side via title→slug index built from a tree walk.
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
import sqlite3
|
||||
import time
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
from aiohttp import web
|
||||
|
||||
CODEX_ROOT = Path("/opt/teleo-eval/workspaces/main/domains")
|
||||
_cache = {"data": None, "ts": 0}
|
||||
CACHE_TTL = 300 # 5 minutes
|
||||
# Codex tree roots — claims live in three places (Sourcer Apr 26 fix scope)
|
||||
CODEX_BASE = Path("/opt/teleo-eval/workspaces/main")
|
||||
CLAIM_TREES = [CODEX_BASE / "domains", CODEX_BASE / "foundations", CODEX_BASE / "core"]
|
||||
|
||||
def _parse_frontmatter(filepath):
|
||||
# pipeline.db for joins (review_records, prs, sources)
|
||||
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
|
||||
|
||||
# In-process caches
|
||||
_list_cache = {"data": None, "ts": 0}
|
||||
_LIST_CACHE_TTL = 300 # 5 min — list view tolerates staleness
|
||||
|
||||
_index_cache = {"by_title": None, "by_stem": None, "ts": 0}
|
||||
_INDEX_CACHE_TTL = 60 # 1 min — title→slug index for wikilink resolution
|
||||
|
||||
CORS_HEADERS = {"Access-Control-Allow-Origin": "*"}
|
||||
|
||||
# Wikilink pattern. [[text]] or [[text|alias]] — we keep the link text only.
|
||||
_WIKILINK_RE = re.compile(r"\[\[([^\]|#]+?)(?:[#|][^\]]*)?\]\]")
|
||||
|
||||
|
||||
# ─── Normalization ─────────────────────────────────────────────────────────
|
||||
|
||||
def _normalize_for_match(s):
|
||||
"""Collapse a title or slug to a comparable form.
|
||||
|
||||
Rules (from Ship's brief — match the link-fixer canonicalization):
|
||||
- lowercase
|
||||
- hyphen ↔ space tolerant (both → single space)
|
||||
- collapse runs of whitespace
|
||||
- strip leading/trailing whitespace
|
||||
- drop trailing punctuation that gets stripped from filenames
|
||||
(`.`, `?`, `!`, `:`, `--`)
|
||||
NOTE: lib/attribution.py exposes only normalize_handle today, not the
|
||||
title normalizer Ship referenced. Implementing inline; if a canonical
|
||||
helper lands later we point at it.
|
||||
"""
|
||||
if not s:
|
||||
return ""
|
||||
s = str(s).lower().strip()
|
||||
# Treat hyphens as spaces, then collapse whitespace runs
|
||||
s = s.replace("-", " ").replace("_", " ")
|
||||
s = re.sub(r"\s+", " ", s)
|
||||
# Strip ASCII punctuation that filenames drop
|
||||
s = re.sub(r"[^\w\s]", "", s)
|
||||
return s.strip()
|
||||
|
||||
|
||||
# ─── Frontmatter parse ─────────────────────────────────────────────────────
|
||||
|
||||
_CODE_FENCE_WRAPPER_RE = re.compile(r"^\s*```(?:markdown|md)?\s*\n(.*?)\n```\s*$", re.DOTALL)
|
||||
|
||||
|
||||
def _split_frontmatter(text):
|
||||
"""Return (frontmatter_dict, body_str) or (None, None) if not a claim file.
|
||||
|
||||
Tolerates files wrapped in a top-level ```markdown ... ``` code fence —
|
||||
some agents have produced these (e.g. Montreal Protocol claim from Astra,
|
||||
2024-12-09). Unwrap once before frontmatter detection.
|
||||
"""
|
||||
if not text:
|
||||
return None, None
|
||||
m = _CODE_FENCE_WRAPPER_RE.match(text)
|
||||
if m:
|
||||
text = m.group(1)
|
||||
text = text.lstrip()
|
||||
if not text.startswith("---"):
|
||||
return None, None
|
||||
try:
|
||||
end = text.index("\n---", 3)
|
||||
except ValueError:
|
||||
return None, None
|
||||
try:
|
||||
fm = yaml.safe_load(text[3:end])
|
||||
except Exception:
|
||||
return None, None
|
||||
if not isinstance(fm, dict):
|
||||
return None, None
|
||||
body = text[end + 4:].lstrip()
|
||||
return fm, body
|
||||
|
||||
|
||||
def _read_claim_file(filepath):
|
||||
"""Read a claim file from disk. Returns (frontmatter, body) or (None, None)."""
|
||||
try:
|
||||
text = filepath.read_text(encoding="utf-8")
|
||||
if not text.startswith("---"):
|
||||
return None
|
||||
end = text.index("---", 3)
|
||||
fm = yaml.safe_load(text[3:end])
|
||||
if not fm or fm.get("type") != "claim":
|
||||
return None
|
||||
body = text[end+3:].strip()
|
||||
# Count wiki-links
|
||||
links = re.findall(r"\[\[([^\]]+)\]\]", body)
|
||||
# Extract first paragraph as summary
|
||||
paragraphs = [p.strip() for p in body.split("\n\n") if p.strip() and not p.strip().startswith("#")]
|
||||
summary = paragraphs[0][:300] if paragraphs else ""
|
||||
return {
|
||||
"slug": filepath.stem,
|
||||
"title": fm.get("title", filepath.stem.replace("-", " ")),
|
||||
"domain": fm.get("domain", "unknown"),
|
||||
"confidence": fm.get("confidence", "unknown"),
|
||||
"agent": fm.get("agent"),
|
||||
"scope": fm.get("scope"),
|
||||
"created": str(fm.get("created", "")),
|
||||
"source": fm.get("source", "") if isinstance(fm.get("source"), str) else "",
|
||||
"sourcer": fm.get("sourcer", ""),
|
||||
"wiki_link_count": len(links),
|
||||
"summary": summary,
|
||||
"challenged_by": fm.get("challenged_by"),
|
||||
"related_claims": fm.get("related_claims", []),
|
||||
}
|
||||
except Exception:
|
||||
return None
|
||||
except (OSError, UnicodeDecodeError):
|
||||
return None, None
|
||||
return _split_frontmatter(text)
|
||||
|
||||
|
||||
def _load_all_claims():
|
||||
now = time.time()
|
||||
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
|
||||
return _cache["data"]
|
||||
# ─── Tree walk + indexing ──────────────────────────────────────────────────
|
||||
|
||||
claims = []
|
||||
for domain_dir in sorted(CODEX_ROOT.iterdir()):
|
||||
if not domain_dir.is_dir():
|
||||
def _walk_claim_files():
|
||||
"""Yield Path objects for every .md claim file in domains/, foundations/, core/."""
|
||||
for root in CLAIM_TREES:
|
||||
if not root.exists():
|
||||
continue
|
||||
for f in sorted(domain_dir.glob("*.md")):
|
||||
for f in root.rglob("*.md"):
|
||||
if f.name == "_map.md":
|
||||
continue
|
||||
c = _parse_frontmatter(f)
|
||||
if c:
|
||||
claims.append(c)
|
||||
yield f
|
||||
|
||||
_cache["data"] = claims
|
||||
_cache["ts"] = now
|
||||
|
||||
def _build_indexes():
|
||||
"""Build (title→stem, stem→relpath) indexes for wikilink resolution.
|
||||
|
||||
Cached for _INDEX_CACHE_TTL. Pulls from claim-index endpoint when
|
||||
possible (already cached upstream) and falls back to filesystem walk.
|
||||
"""
|
||||
now = time.time()
|
||||
if _index_cache["by_title"] is not None and now - _index_cache["ts"] < _INDEX_CACHE_TTL:
|
||||
return _index_cache["by_title"], _index_cache["by_stem"]
|
||||
|
||||
by_title = {}
|
||||
by_stem = {}
|
||||
for f in _walk_claim_files():
|
||||
stem = f.stem
|
||||
rel = str(f.relative_to(CODEX_BASE))
|
||||
by_stem[stem] = rel
|
||||
# Index by stem-as-normalized too (covers wikilinks that use the slug)
|
||||
by_title[_normalize_for_match(stem)] = stem
|
||||
# Also try parsing the title from frontmatter for higher-fidelity matches
|
||||
fm, _ = _read_claim_file(f)
|
||||
if fm:
|
||||
title = fm.get("title")
|
||||
if title:
|
||||
key = _normalize_for_match(title)
|
||||
if key and key not in by_title:
|
||||
by_title[key] = stem
|
||||
|
||||
_index_cache["by_title"] = by_title
|
||||
_index_cache["by_stem"] = by_stem
|
||||
_index_cache["ts"] = now
|
||||
return by_title, by_stem
|
||||
|
||||
|
||||
def _resolve_wikilinks(body, by_title):
|
||||
"""Extract [[link]] occurrences from body, return {link_text: slug_or_null}."""
|
||||
out = {}
|
||||
for match in _WIKILINK_RE.finditer(body or ""):
|
||||
link_text = match.group(1).strip()
|
||||
if not link_text or link_text in out:
|
||||
continue
|
||||
norm = _normalize_for_match(link_text)
|
||||
out[link_text] = by_title.get(norm)
|
||||
return out
|
||||
|
||||
|
||||
# ─── Edge extraction from frontmatter ──────────────────────────────────────
|
||||
|
||||
_EDGE_FIELDS = {
|
||||
"supports": "supports",
|
||||
"challenges": "challenges",
|
||||
"challenged_by": "challenges", # canonical: store as challenges direction
|
||||
"related": "related",
|
||||
"related_claims": "related",
|
||||
"depends_on": "depends_on",
|
||||
}
|
||||
|
||||
|
||||
def _extract_edges(fm, by_title, by_stem):
|
||||
"""Return edges dict shaped per Ship's contract.
|
||||
|
||||
Each edge is {slug, title, exists}. Slug resolved through title index.
|
||||
"""
|
||||
edges = {"supports": [], "challenges": [], "related": [], "depends_on": []}
|
||||
|
||||
for fm_key, edge_kind in _EDGE_FIELDS.items():
|
||||
raw = fm.get(fm_key)
|
||||
if not raw:
|
||||
continue
|
||||
items = raw if isinstance(raw, list) else [raw]
|
||||
for item in items:
|
||||
if not isinstance(item, str):
|
||||
continue
|
||||
text = item.strip()
|
||||
# Strip wikilink wrapping if present
|
||||
text = re.sub(r"^\[\[|\]\]$", "", text)
|
||||
# Strip pipe annotations: "[[link|alias]]" style or "claim | edge_type | date"
|
||||
text = text.split("|")[0].strip()
|
||||
if not text:
|
||||
continue
|
||||
# Try title match first, fall back to stem match
|
||||
slug = by_title.get(_normalize_for_match(text))
|
||||
if not slug and text in by_stem:
|
||||
slug = text
|
||||
edges[edge_kind].append({
|
||||
"slug": slug,
|
||||
"title": text,
|
||||
"exists": slug is not None,
|
||||
})
|
||||
|
||||
return edges
|
||||
|
||||
|
||||
# ─── Source provenance ─────────────────────────────────────────────────────
|
||||
|
||||
def _resolve_sourced_from(conn, claim_filepath, fm, title, stem):
|
||||
"""Build sourced_from list for the claim.
|
||||
|
||||
Strategy: find PRs that produced this claim (via prs.description LIKE
|
||||
or branch slug match), look at prs.source_path → inbox archive file →
|
||||
parse that source's frontmatter for title/url. Falls back to the raw
|
||||
`source` string from the claim's own frontmatter.
|
||||
|
||||
Both `title` and `stem` must be non-empty — caller (handler) already
|
||||
falls back stem→title; passing empty values would leak `LIKE '%%'`
|
||||
and match unrelated PRs.
|
||||
"""
|
||||
out = []
|
||||
seen_paths = set()
|
||||
pr_rows = []
|
||||
if (title or "").strip() and (stem or "").strip():
|
||||
try:
|
||||
pr_rows = conn.execute(
|
||||
"""SELECT DISTINCT source_path
|
||||
FROM prs
|
||||
WHERE source_path IS NOT NULL AND source_path != ''
|
||||
AND (description LIKE ? OR branch LIKE ?)
|
||||
LIMIT 10""",
|
||||
(f"%{title}%", f"%{stem}%"),
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
pr_rows = []
|
||||
|
||||
for row in pr_rows:
|
||||
path = row["source_path"]
|
||||
if not path or path in seen_paths:
|
||||
continue
|
||||
seen_paths.add(path)
|
||||
out.append(_resolve_source_file(path))
|
||||
|
||||
# 2. Fallback: parse raw source frontmatter field if no PR match
|
||||
if not out:
|
||||
raw = fm.get("source")
|
||||
if isinstance(raw, str) and raw.strip():
|
||||
out.append({"path": None, "title": raw.strip()[:200], "url": None})
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _resolve_source_file(rel_path):
|
||||
"""Given inbox/archive/... path, parse frontmatter for title+url. Best-effort."""
|
||||
full = CODEX_BASE / rel_path
|
||||
entry = {"path": rel_path, "title": None, "url": None}
|
||||
if full.exists():
|
||||
fm, _ = _read_claim_file(full)
|
||||
if fm:
|
||||
entry["title"] = fm.get("title") or fm.get("source") or rel_path
|
||||
entry["url"] = fm.get("url")
|
||||
if not entry["title"]:
|
||||
# Last resort: derive from filename
|
||||
entry["title"] = Path(rel_path).stem.replace("-", " ")
|
||||
return entry
|
||||
|
||||
|
||||
# ─── Reviews + PRs ─────────────────────────────────────────────────────────
|
||||
|
||||
def _load_pr_history(conn, title, stem):
|
||||
"""Find PRs that touched this claim and their reviews.
|
||||
|
||||
Both title and stem must be non-empty strings — empty leaks `LIKE '%%'`
|
||||
which matches every PR. Handler already populates a fallback so this
|
||||
is a defense-in-depth guard.
|
||||
"""
|
||||
if not (title or "").strip() or not (stem or "").strip():
|
||||
return [], []
|
||||
|
||||
try:
|
||||
pr_rows = conn.execute(
|
||||
"""SELECT number, merged_at, commit_type, agent, branch, status
|
||||
FROM prs
|
||||
WHERE merged_at IS NOT NULL
|
||||
AND (description LIKE ? OR branch LIKE ?)
|
||||
ORDER BY merged_at ASC
|
||||
LIMIT 50""",
|
||||
(f"%{title}%", f"%{stem}%"),
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
return [], []
|
||||
|
||||
prs = [
|
||||
{
|
||||
"number": r["number"],
|
||||
"merged_at": r["merged_at"],
|
||||
"kind": r["commit_type"] or "unknown",
|
||||
"agent": r["agent"],
|
||||
"branch": r["branch"],
|
||||
}
|
||||
for r in pr_rows
|
||||
]
|
||||
|
||||
pr_numbers = [p["number"] for p in prs]
|
||||
if not pr_numbers:
|
||||
return prs, []
|
||||
|
||||
placeholders = ",".join("?" * len(pr_numbers))
|
||||
try:
|
||||
review_rows = conn.execute(
|
||||
f"""SELECT pr_number, reviewer, reviewer_model, outcome,
|
||||
rejection_reason, notes, reviewed_at
|
||||
FROM review_records
|
||||
WHERE pr_number IN ({placeholders})
|
||||
ORDER BY reviewed_at ASC""",
|
||||
pr_numbers,
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
review_rows = []
|
||||
|
||||
reviews = [
|
||||
{
|
||||
"pr_number": r["pr_number"],
|
||||
"reviewer": r["reviewer"],
|
||||
"model": r["reviewer_model"],
|
||||
"outcome": r["outcome"],
|
||||
"rejection_reason": r["rejection_reason"],
|
||||
"notes": r["notes"],
|
||||
"reviewed_at": r["reviewed_at"],
|
||||
}
|
||||
for r in review_rows
|
||||
]
|
||||
return prs, reviews
|
||||
|
||||
|
||||
# ─── List view (preserved) ─────────────────────────────────────────────────
|
||||
|
||||
def _parse_list_entry(filepath):
|
||||
fm, body = _read_claim_file(filepath)
|
||||
if not fm or fm.get("type") != "claim":
|
||||
return None
|
||||
links = _WIKILINK_RE.findall(body or "")
|
||||
paragraphs = [p.strip() for p in (body or "").split("\n\n")
|
||||
if p.strip() and not p.strip().startswith("#")]
|
||||
summary = paragraphs[0][:300] if paragraphs else ""
|
||||
return {
|
||||
"slug": filepath.stem,
|
||||
"title": fm.get("title", filepath.stem.replace("-", " ")),
|
||||
"domain": fm.get("domain", "unknown"),
|
||||
"confidence": fm.get("confidence", "unknown"),
|
||||
"agent": fm.get("agent"),
|
||||
"scope": fm.get("scope"),
|
||||
"created": str(fm.get("created", "")),
|
||||
"source": fm.get("source", "") if isinstance(fm.get("source"), str) else "",
|
||||
"sourcer": fm.get("sourcer", ""),
|
||||
"wiki_link_count": len(links),
|
||||
"summary": summary,
|
||||
"challenged_by": fm.get("challenged_by"),
|
||||
"related_claims": fm.get("related_claims", []),
|
||||
}
|
||||
|
||||
|
||||
def _load_all_claims_list():
|
||||
now = time.time()
|
||||
if _list_cache["data"] and now - _list_cache["ts"] < _LIST_CACHE_TTL:
|
||||
return _list_cache["data"]
|
||||
claims = []
|
||||
for f in _walk_claim_files():
|
||||
entry = _parse_list_entry(f)
|
||||
if entry:
|
||||
claims.append(entry)
|
||||
_list_cache["data"] = claims
|
||||
_list_cache["ts"] = now
|
||||
return claims
|
||||
|
||||
|
||||
async def handle_claims(request):
|
||||
claims = _load_all_claims()
|
||||
# ─── Handlers ──────────────────────────────────────────────────────────────
|
||||
|
||||
async def handle_claims(request):
|
||||
claims = _load_all_claims_list()
|
||||
|
||||
# Filters
|
||||
domain = request.query.get("domain")
|
||||
search = request.query.get("q", "").lower()
|
||||
confidence = request.query.get("confidence")
|
||||
agent = request.query.get("agent")
|
||||
sort = request.query.get("sort", "recent") # recent, alpha, domain
|
||||
sort = request.query.get("sort", "recent")
|
||||
|
||||
filtered = claims
|
||||
if domain:
|
||||
|
|
@ -83,9 +403,9 @@ async def handle_claims(request):
|
|||
if agent:
|
||||
filtered = [c for c in filtered if c["agent"] == agent]
|
||||
if search:
|
||||
filtered = [c for c in filtered if search in c["title"].lower() or search in c["summary"].lower()]
|
||||
filtered = [c for c in filtered
|
||||
if search in c["title"].lower() or search in c["summary"].lower()]
|
||||
|
||||
# Sort
|
||||
if sort == "recent":
|
||||
filtered.sort(key=lambda c: c["created"], reverse=True)
|
||||
elif sort == "alpha":
|
||||
|
|
@ -93,12 +413,10 @@ async def handle_claims(request):
|
|||
elif sort == "domain":
|
||||
filtered.sort(key=lambda c: (c["domain"], c["title"].lower()))
|
||||
|
||||
# Pagination
|
||||
limit = min(int(request.query.get("limit", "50")), 200)
|
||||
offset = int(request.query.get("offset", "0"))
|
||||
page = filtered[offset:offset+limit]
|
||||
page = filtered[offset:offset + limit]
|
||||
|
||||
# Domain counts for sidebar
|
||||
domain_counts = {}
|
||||
for c in claims:
|
||||
domain_counts[c["domain"]] = domain_counts.get(c["domain"], 0) + 1
|
||||
|
|
@ -111,31 +429,114 @@ async def handle_claims(request):
|
|||
"domains": dict(sorted(domain_counts.items(), key=lambda x: -x[1])),
|
||||
"confidence_levels": sorted(set(c["confidence"] for c in claims)),
|
||||
"agents": sorted(set(c["agent"] for c in claims if c["agent"])),
|
||||
}, headers={"Access-Control-Allow-Origin": "*"})
|
||||
}, headers=CORS_HEADERS)
|
||||
|
||||
|
||||
async def handle_claim_detail(request):
|
||||
slug = request.match_info["slug"]
|
||||
claims = _load_all_claims()
|
||||
for c in claims:
|
||||
if c["slug"] == slug:
|
||||
# Read full body for detail view
|
||||
for domain_dir in CODEX_ROOT.iterdir():
|
||||
if not domain_dir.is_dir():
|
||||
continue
|
||||
f = domain_dir / f"{slug}.md"
|
||||
if f.exists():
|
||||
text = f.read_text(encoding="utf-8")
|
||||
end = text.index("---", 3)
|
||||
body = text[end+3:].strip()
|
||||
c["body"] = body
|
||||
"""GET /api/claims/{slug} — canonical claim detail page (Ship contract).
|
||||
|
||||
One round-trip, all data resolved server-side. Wikilinks pre-resolved.
|
||||
"""
|
||||
requested_slug = request.match_info["slug"]
|
||||
by_title, by_stem = _build_indexes()
|
||||
|
||||
# Resolution order: exact stem → title-normalized (handles description-derived
|
||||
# slugs from /api/activity-feed that are longer than on-disk file stems) →
|
||||
# stem-as-prefix (handles description-derived slugs that are shorter than the
|
||||
# file stem because the description was truncated upstream).
|
||||
slug = requested_slug
|
||||
rel_path = by_stem.get(slug)
|
||||
if not rel_path:
|
||||
# Title fallback: requested slug = slugified frontmatter title
|
||||
norm = _normalize_for_match(requested_slug)
|
||||
resolved_stem = by_title.get(norm)
|
||||
if resolved_stem:
|
||||
slug = resolved_stem
|
||||
rel_path = by_stem.get(resolved_stem)
|
||||
if not rel_path:
|
||||
# Prefix fallback: walk stems sharing a common prefix with the request,
|
||||
# pick longest match. Anchored at 32 chars to avoid spurious hits.
|
||||
norm_req = _normalize_for_match(requested_slug)
|
||||
best_stem = None
|
||||
best_len = 0
|
||||
for stem in by_stem:
|
||||
norm_stem = _normalize_for_match(stem)
|
||||
common = 0
|
||||
for a, b in zip(norm_req, norm_stem):
|
||||
if a != b:
|
||||
break
|
||||
return web.json_response(c, headers={"Access-Control-Allow-Origin": "*"})
|
||||
return web.json_response({"error": "claim not found"}, status=404)
|
||||
common += 1
|
||||
if common >= 32 and common > best_len:
|
||||
best_stem = stem
|
||||
best_len = common
|
||||
if best_stem:
|
||||
slug = best_stem
|
||||
rel_path = by_stem.get(best_stem)
|
||||
if not rel_path:
|
||||
return web.json_response({"error": "claim not found", "slug": requested_slug},
|
||||
status=404, headers=CORS_HEADERS)
|
||||
|
||||
filepath = CODEX_BASE / rel_path
|
||||
fm, body = _read_claim_file(filepath)
|
||||
if not fm:
|
||||
# File exists at this stem but has no parseable frontmatter — almost
|
||||
# always a stray enrichment fragment that landed in domains/ without
|
||||
# being merged into a parent claim. Surfacing as 404 (no claim here)
|
||||
# not 500: the caller can't act on it differently anyway.
|
||||
return web.json_response({"error": "claim not found", "slug": slug,
|
||||
"reason": "file_no_frontmatter"},
|
||||
status=404, headers=CORS_HEADERS)
|
||||
|
||||
# Open read-only DB connection for this request
|
||||
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
title = fm.get("title") or slug.replace("-", " ")
|
||||
prs, reviews = _load_pr_history(conn, title, slug)
|
||||
sourced_from = _resolve_sourced_from(conn, filepath, fm, title, slug)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
last_review = None
|
||||
if reviews:
|
||||
latest = reviews[-1]
|
||||
last_review = {
|
||||
"outcome": latest["outcome"],
|
||||
"reviewer": latest["reviewer"],
|
||||
"date": (latest["reviewed_at"] or "")[:10],
|
||||
}
|
||||
|
||||
# secondary_domains: explicit list, or empty
|
||||
secondary = fm.get("secondary_domains") or fm.get("cross_domain_links") or []
|
||||
if isinstance(secondary, str):
|
||||
secondary = [secondary]
|
||||
|
||||
description = fm.get("description") or ""
|
||||
|
||||
edges = _extract_edges(fm, by_title, by_stem)
|
||||
wikilinks = _resolve_wikilinks(body, by_title)
|
||||
|
||||
response = {
|
||||
"slug": slug,
|
||||
"title": title,
|
||||
"domain": fm.get("domain", "unknown"),
|
||||
"secondary_domains": secondary,
|
||||
"confidence": fm.get("confidence", "unknown"),
|
||||
"description": description,
|
||||
"created": str(fm.get("created", "")),
|
||||
"last_review": last_review,
|
||||
"body": body or "",
|
||||
"sourced_from": sourced_from,
|
||||
"reviews": reviews,
|
||||
"prs": prs,
|
||||
"edges": edges,
|
||||
"wikilinks": wikilinks,
|
||||
}
|
||||
return web.json_response(response, headers=CORS_HEADERS)
|
||||
|
||||
|
||||
async def handle_domains(request):
|
||||
claims = _load_all_claims()
|
||||
claims = _load_all_claims_list()
|
||||
domains = {}
|
||||
for c in claims:
|
||||
d = c["domain"]
|
||||
|
|
@ -146,13 +547,11 @@ async def handle_domains(request):
|
|||
domains[d]["agents"].add(c["agent"])
|
||||
conf = c["confidence"]
|
||||
domains[d]["confidence_dist"][conf] = domains[d]["confidence_dist"].get(conf, 0) + 1
|
||||
|
||||
result = []
|
||||
for d in sorted(domains.values(), key=lambda x: -x["count"]):
|
||||
d["agents"] = sorted(d["agents"])
|
||||
result.append(d)
|
||||
|
||||
return web.json_response(result, headers={"Access-Control-Allow-Origin": "*"})
|
||||
return web.json_response(result, headers=CORS_HEADERS)
|
||||
|
||||
|
||||
def register_claims_routes(app):
|
||||
|
|
|
|||
166
diagnostics/leaderboard_routes.py
Normal file
166
diagnostics/leaderboard_routes.py
Normal file
|
|
@ -0,0 +1,166 @@
|
|||
"""Leaderboard endpoint reading from event-sourced contribution_events.
|
||||
|
||||
Owner: Argus
|
||||
Source of truth: pipeline.db contribution_events (Epimetheus, schema v25)
|
||||
|
||||
Reads contribution_events GROUP BY handle, computes CI as SUM(weight),
|
||||
joins contributors for kind, returns sorted leaderboard with role breakdown.
|
||||
|
||||
Roles + weights (Phase A):
|
||||
author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05
|
||||
|
||||
Endpoints:
|
||||
GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import sqlite3
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
logger = logging.getLogger("argus.leaderboard_routes")
|
||||
|
||||
ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator")
|
||||
KIND_VALUES = ("person", "agent", "org", "all")
|
||||
|
||||
# Public path set so auth middleware lets it through
|
||||
LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"})
|
||||
|
||||
|
||||
def _conn(app):
|
||||
"""Read-only connection to pipeline.db."""
|
||||
db_path = app["db_path"]
|
||||
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
||||
def _parse_window(raw):
|
||||
"""Parse window param. Returns (sql_clause, params_tuple, label).
|
||||
|
||||
Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours).
|
||||
Caps N at 365d / 8760h to prevent abuse.
|
||||
"""
|
||||
if not raw or raw == "all_time":
|
||||
return ("", (), "all_time")
|
||||
m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower())
|
||||
if not m:
|
||||
return ("", (), "all_time")
|
||||
n = int(m.group(1))
|
||||
unit = m.group(2)
|
||||
# Note: WHERE clause is composed via " AND ".join(...) — do NOT prefix with "AND ".
|
||||
if unit == "d":
|
||||
n = min(n, 365)
|
||||
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d")
|
||||
n = min(n, 8760)
|
||||
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h")
|
||||
|
||||
|
||||
async def handle_leaderboard(request):
|
||||
"""GET /api/leaderboard.
|
||||
|
||||
Query params:
|
||||
window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h')
|
||||
domain: filter by domain (optional)
|
||||
kind: 'person' (default) | 'agent' | 'org' | 'all'
|
||||
limit: max entries (default 100, max 500)
|
||||
"""
|
||||
window_clause, window_params, window_label = _parse_window(request.query.get("window"))
|
||||
domain = request.query.get("domain")
|
||||
kind = request.query.get("kind", "person")
|
||||
if kind not in KIND_VALUES:
|
||||
kind = "person"
|
||||
try:
|
||||
limit = min(int(request.query.get("limit", "100")), 500)
|
||||
except (ValueError, TypeError):
|
||||
limit = 100
|
||||
|
||||
where = ["1=1", window_clause] if window_clause else ["1=1"]
|
||||
params = list(window_params)
|
||||
if domain:
|
||||
where.append("ce.domain = ?")
|
||||
params.append(domain)
|
||||
if kind != "all":
|
||||
where.append("COALESCE(c.kind, 'person') = ?")
|
||||
params.append(kind)
|
||||
|
||||
where_sql = " AND ".join([w for w in where if w])
|
||||
|
||||
conn = _conn(request.app)
|
||||
try:
|
||||
# Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp
|
||||
# LEFT JOIN contributors so handles in events but not in contributors still appear
|
||||
# (defaults to kind='person' via COALESCE).
|
||||
rows = conn.execute(f"""
|
||||
SELECT
|
||||
ce.handle,
|
||||
COALESCE(c.kind, 'person') AS kind,
|
||||
ROUND(SUM(ce.weight), 4) AS ci,
|
||||
COUNT(*) AS events_count,
|
||||
MIN(ce.timestamp) AS first_contribution,
|
||||
MAX(ce.timestamp) AS last_contribution,
|
||||
SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author,
|
||||
SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger,
|
||||
SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer,
|
||||
SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator,
|
||||
SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator,
|
||||
COUNT(DISTINCT ce.domain) AS domain_count,
|
||||
COUNT(DISTINCT ce.pr_number) AS pr_count
|
||||
FROM contribution_events ce
|
||||
LEFT JOIN contributors c ON c.handle = ce.handle
|
||||
WHERE {where_sql}
|
||||
GROUP BY ce.handle, COALESCE(c.kind, 'person')
|
||||
ORDER BY ci DESC, last_contribution DESC
|
||||
LIMIT ?
|
||||
""", (*params, limit + 1)).fetchall() # +1 to detect overflow
|
||||
|
||||
has_more = len(rows) > limit
|
||||
rows = rows[:limit]
|
||||
|
||||
# Total count of distinct handles matching filters (without limit)
|
||||
total_row = conn.execute(f"""
|
||||
SELECT COUNT(DISTINCT ce.handle) AS total
|
||||
FROM contribution_events ce
|
||||
LEFT JOIN contributors c ON c.handle = ce.handle
|
||||
WHERE {where_sql}
|
||||
""", params).fetchone()
|
||||
total = total_row["total"] if total_row else 0
|
||||
|
||||
leaderboard = []
|
||||
for r in rows:
|
||||
leaderboard.append({
|
||||
"handle": r["handle"],
|
||||
"kind": r["kind"],
|
||||
"ci": r["ci"],
|
||||
"ci_breakdown": {
|
||||
"author": round(r["ci_author"] or 0, 4),
|
||||
"challenger": round(r["ci_challenger"] or 0, 4),
|
||||
"synthesizer": round(r["ci_synthesizer"] or 0, 4),
|
||||
"originator": round(r["ci_originator"] or 0, 4),
|
||||
"evaluator": round(r["ci_evaluator"] or 0, 4),
|
||||
},
|
||||
"events_count": r["events_count"],
|
||||
"domain_count": r["domain_count"],
|
||||
"pr_count": r["pr_count"],
|
||||
"first_contribution": r["first_contribution"],
|
||||
"last_contribution": r["last_contribution"],
|
||||
})
|
||||
|
||||
return web.json_response({
|
||||
"window": window_label,
|
||||
"domain": domain,
|
||||
"kind_filter": kind,
|
||||
"total": total,
|
||||
"shown": len(leaderboard),
|
||||
"has_more": has_more,
|
||||
"source": "contribution_events", # explicit so consumers know the data origin
|
||||
"leaderboard": leaderboard,
|
||||
})
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def register_leaderboard_routes(app: web.Application):
|
||||
"""Register /api/leaderboard. Requires app['db_path'] to be set."""
|
||||
app.router.add_get("/api/leaderboard", handle_leaderboard)
|
||||
|
|
@ -15,6 +15,7 @@ Epimetheus owns this module. Leo reviews changes.
|
|||
|
||||
import logging
|
||||
import re
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger("pipeline.attribution")
|
||||
|
|
@ -81,6 +82,7 @@ def normalize_handle(handle: str, conn=None) -> str:
|
|||
if not handle:
|
||||
return ""
|
||||
h = handle.strip().lower().lstrip("@")
|
||||
h = re.sub(r"\s*\(self-directed\)\s*$", "", h)
|
||||
if conn is None:
|
||||
return h
|
||||
try:
|
||||
|
|
@ -108,6 +110,36 @@ def classify_kind(handle: str) -> str:
|
|||
return "person"
|
||||
|
||||
|
||||
def is_publisher_handle(handle: str, conn) -> int | None:
|
||||
"""Return publisher.id if the handle exists as a publisher name, else None.
|
||||
|
||||
Schema v26 split orgs/citations into the publishers table. Writer code
|
||||
(upsert_contributor, insert_contribution_event) calls this to gate creating
|
||||
contributor rows or events for handles that belong to publishers.
|
||||
|
||||
Without this gate, every merged PR with `sourcer: cnbc` (for example) would
|
||||
re-create CNBC as a contributor and undo the v26 classifier cleanup.
|
||||
|
||||
Falls back gracefully on pre-v26 DBs: returns None if publishers table
|
||||
doesn't exist yet (writer behaves like before, no regression).
|
||||
"""
|
||||
if not handle or conn is None:
|
||||
return None
|
||||
h = handle.strip().lower().lstrip("@")
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT id FROM publishers WHERE name = ?", (h,),
|
||||
).fetchone()
|
||||
if row:
|
||||
return row["id"] if hasattr(row, "keys") else row[0]
|
||||
except sqlite3.OperationalError:
|
||||
# Pre-v26 DB: publishers table doesn't exist yet. Fall through to None
|
||||
# so writer behaves as before. Any other exception class is real signal
|
||||
# (programming error, lock contention, corruption) — let it propagate.
|
||||
logger.debug("is_publisher_handle: publishers table not present (pre-v26?)", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
# ─── Parse attribution from claim content ──────────────────────────────────
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -84,6 +84,14 @@ MAX_EXTRACT_WORKERS = int(os.environ.get("MAX_EXTRACT_WORKERS", "5"))
|
|||
MAX_EVAL_WORKERS = int(os.environ.get("MAX_EVAL_WORKERS", "7"))
|
||||
MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain
|
||||
|
||||
# --- External GitHub PR merge strategy ---
|
||||
# When True, gh-pr-N/* branches merge with --no-ff (preserves contributor SHA in
|
||||
# main's history → GitHub recognizes "merged" badge). When False, fall back to
|
||||
# cherry-pick (the default for all other branches). Default True; flip to False
|
||||
# as an emergency backout if the no-ff path destabilizes merge throughput.
|
||||
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
|
||||
EXTERNAL_PR_NO_FF_MERGE = True
|
||||
|
||||
# --- Timeouts (seconds) ---
|
||||
EXTRACT_TIMEOUT = 600 # 10 min
|
||||
EVAL_TIMEOUT = 120 # 2 min — routine Sonnet/Gemini Flash calls (was 600, caused 10-min stalls)
|
||||
|
|
@ -203,6 +211,14 @@ HEALTH_CHECK_INTERVAL = 60
|
|||
# --- Extraction gates ---
|
||||
EXTRACTION_COOLDOWN_HOURS = 4 # Skip sources with any PR activity in this window. Defense-in-depth for DB-status filter.
|
||||
|
||||
# --- Verdict-deadlock reaper ---
|
||||
# Defaults safe (dry-run, 24h age, hourly throttle). Operator flips REAPER_DRY_RUN
|
||||
# to "false" via systemctl edit teleo-pipeline → restart, no code change required.
|
||||
REAPER_DRY_RUN = os.environ.get("REAPER_DRY_RUN", "true").lower() == "true"
|
||||
REAPER_DEADLOCK_AGE_HOURS = int(os.environ.get("REAPER_DEADLOCK_AGE_HOURS", "24"))
|
||||
REAPER_INTERVAL_SECONDS = int(os.environ.get("REAPER_INTERVAL_SECONDS", "3600"))
|
||||
REAPER_MAX_PER_RUN = int(os.environ.get("REAPER_MAX_PER_RUN", "50"))
|
||||
|
||||
# --- Retrieval (Telegram bot) ---
|
||||
RETRIEVAL_RRF_K = 20 # RRF smoothing constant — tuned for 5-10 results per source
|
||||
RETRIEVAL_ENTITY_BOOST = 1.5 # RRF score multiplier for claims wiki-linked from matched entities
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import logging
|
|||
import re
|
||||
|
||||
from . import config, db
|
||||
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, normalize_handle
|
||||
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, is_publisher_handle, normalize_handle
|
||||
from .forgejo import get_pr_diff
|
||||
|
||||
logger = logging.getLogger("pipeline.contributor")
|
||||
|
|
@ -62,6 +62,12 @@ def insert_contribution_event(
|
|||
canonical = normalize_handle(handle, conn=conn)
|
||||
if not canonical:
|
||||
return False
|
||||
# Schema v26 gate: handles classified as publishers (CNBC, SpaceNews, arxiv,
|
||||
# etc.) are provenance metadata, not contributors. Don't credit them. Without
|
||||
# this gate every merge re-creates org events and undoes the v26 cleanup.
|
||||
if is_publisher_handle(canonical, conn) is not None:
|
||||
logger.debug("insert_contribution_event: %r is a publisher — skipping event", canonical)
|
||||
return False
|
||||
kind = classify_kind(canonical)
|
||||
try:
|
||||
cur = conn.execute(
|
||||
|
|
@ -419,6 +425,21 @@ def upsert_contributor(
|
|||
logger.warning("Unknown contributor role: %s", role)
|
||||
return
|
||||
|
||||
# Schema v26 gate: orgs/citations live in publishers table, not contributors.
|
||||
# Skip without writing so the v26 classifier cleanup isn't undone by every
|
||||
# merge that has `sourcer: cnbc` (or similar) in claim frontmatter.
|
||||
#
|
||||
# Note: bare normalization (lower + lstrip @), no alias resolution. This is
|
||||
# consistent with the existing `SELECT handle FROM contributors WHERE handle = ?`
|
||||
# below — both look up by canonical-form-as-stored. Today's classifier produces
|
||||
# one publisher row per canonical handle, so bare lookup hits. Branch 3 will
|
||||
# normalize alias→canonical at writer entry points (extract.py, post_extract);
|
||||
# at that point this gate auto-tightens because callers pass canonical handles.
|
||||
canonical_handle = handle.strip().lower().lstrip("@") if handle else ""
|
||||
if canonical_handle and is_publisher_handle(canonical_handle, conn) is not None:
|
||||
logger.debug("upsert_contributor: %r is a publisher — skipping contributor row", canonical_handle)
|
||||
return
|
||||
|
||||
existing = conn.execute(
|
||||
"SELECT handle FROM contributors WHERE handle = ?", (handle,)
|
||||
).fetchone()
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ from datetime import date
|
|||
from pathlib import Path
|
||||
|
||||
from . import config
|
||||
from .attribution import normalize_handle
|
||||
from .costs import record_usage
|
||||
from .db import classify_source_channel
|
||||
from .domains import agent_for_domain
|
||||
|
|
@ -683,16 +684,25 @@ async def _extract_one_source(
|
|||
logger.info("PR #%d created for %s (%d claims, %d entities)", pr_num, source_file, len(claim_files), len(entity_files))
|
||||
|
||||
# Store contributor attribution: who submitted this source?
|
||||
# Priority: proposed_by field → intake_tier inference → "unknown"
|
||||
# Priority: proposed_by field → intake_tier inference → operator default.
|
||||
# NB: `submitted_by` is a CANONICAL HANDLE — lowercase, no @, no
|
||||
# trailing "(self-directed)" decorator. The "self-directed" signal is
|
||||
# already carried by intake_tier == "research-task" + the prs.agent
|
||||
# column; persisting it here as a string suffix produced decorated
|
||||
# values like "Vida (self-directed)" that broke /contributors/{handle}
|
||||
# lookups downstream (livingip-web timeline → 404). Read consumers
|
||||
# (lib/contributor.insert_contribution_event, scripts/scoring_digest,
|
||||
# diagnostics/activity_feed_api) all normalize via attribution.normalize_handle
|
||||
# anyway, so writing the canonical form is the source-of-truth fix.
|
||||
if proposed_by:
|
||||
contributor = proposed_by.strip().strip('"').strip("'")
|
||||
contributor = normalize_handle(proposed_by, conn=conn)
|
||||
elif intake_tier == "research-task":
|
||||
contributor = f"{agent_name} (self-directed)"
|
||||
contributor = normalize_handle(agent_name, conn=conn)
|
||||
elif intake_tier == "directed":
|
||||
contributor = "@m3taversal"
|
||||
contributor = "m3taversal"
|
||||
else:
|
||||
# Default: if no proposed_by and not a research task, Cory submitted it
|
||||
contributor = "@m3taversal"
|
||||
# Default: if no proposed_by and not a research task, operator submitted it.
|
||||
contributor = "m3taversal"
|
||||
|
||||
# Build pipe-separated claim titles for the description field
|
||||
claim_titles = " | ".join(
|
||||
|
|
@ -923,6 +933,36 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
except Exception:
|
||||
logger.debug("Failed to read source %s", f, exc_info=True)
|
||||
|
||||
# Archive-basename filter: skip queue files whose basename already exists in
|
||||
# inbox/archive/. Research-session commits on agent branches occasionally
|
||||
# re-introduce already-archived queue files when the branch is re-merged,
|
||||
# producing same-source re-extractions every cooldown cycle. The archive
|
||||
# copy is the source of truth — if a file with this basename is in archive,
|
||||
# the source is processed regardless of queue state. Single archive scan
|
||||
# per cycle, cheap (~1k files).
|
||||
#
|
||||
# Assumes basename uniqueness across queue+archive — current naming
|
||||
# convention (date-prefix + topic-slug) makes collisions vanishingly
|
||||
# rare. If short generic names like "notes.md" enter the queue, this
|
||||
# filter silently false-positives.
|
||||
if unprocessed:
|
||||
archive_dir = main / "inbox" / "archive"
|
||||
archived_basenames: set[str] = set()
|
||||
if archive_dir.exists():
|
||||
for af in archive_dir.rglob("*.md"):
|
||||
if af.name.startswith("_"):
|
||||
continue
|
||||
archived_basenames.add(af.name)
|
||||
if archived_basenames:
|
||||
before = len(unprocessed)
|
||||
unprocessed = [
|
||||
(sp, c, f) for sp, c, f in unprocessed
|
||||
if Path(sp).name not in archived_basenames
|
||||
]
|
||||
skipped = before - len(unprocessed)
|
||||
if skipped:
|
||||
logger.info("Skipped %d queue source(s) — basename already in inbox/archive/", skipped)
|
||||
|
||||
# Don't early-return here — re-extraction sources may exist even when queue is empty
|
||||
# (the re-extraction check runs after open-PR filtering below)
|
||||
|
||||
|
|
|
|||
265
lib/merge.py
265
lib/merge.py
|
|
@ -112,16 +112,44 @@ async def discover_external_prs(conn) -> int:
|
|||
# Detect origin: pipeline agents have per-agent Forgejo users
|
||||
pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"}
|
||||
author = pr.get("user", {}).get("login", "")
|
||||
is_pipeline = author.lower() in pipeline_users
|
||||
branch_ref = pr["head"]["ref"]
|
||||
|
||||
# Pre-classify by branch prefix — pipeline-shape branches are
|
||||
# pipeline regardless of Forgejo opener. reweave.py and
|
||||
# ingestion run as the operator's token, so opener-based
|
||||
# classification mis-credited system maintenance to the
|
||||
# operator (~2.7k PRs on m3ta's contributor row before the
|
||||
# 2026-05-12 reattribute fix). Branch prefix is the canonical
|
||||
# signal: reweave/ingestion -> 'pipeline', <agent>/ -> agent.
|
||||
branch_target = None
|
||||
if branch_ref.startswith(("reweave/", "ingestion/")):
|
||||
branch_target = "pipeline"
|
||||
elif branch_ref.startswith(_AGENT_NAMES):
|
||||
# _AGENT_NAMES is a tuple of bare names; agent branches
|
||||
# are "<name>/..." so use the tuple as a startswith prefix
|
||||
# set after appending '/'.
|
||||
for name in _AGENT_NAMES:
|
||||
if branch_ref.startswith(name + "/"):
|
||||
branch_target = name
|
||||
break
|
||||
|
||||
is_pipeline = author.lower() in pipeline_users or branch_target is not None
|
||||
origin = "pipeline" if is_pipeline else "human"
|
||||
priority = "high" if origin == "human" else None
|
||||
domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"])
|
||||
agent, commit_type = classify_branch(pr["head"]["ref"])
|
||||
source_channel = classify_source_channel(pr["head"]["ref"])
|
||||
domain = None if not is_pipeline else detect_domain_from_branch(branch_ref)
|
||||
agent, commit_type = classify_branch(branch_ref)
|
||||
source_channel = classify_source_channel(branch_ref)
|
||||
|
||||
# For human PRs, submitted_by is the Forgejo author.
|
||||
# For pipeline PRs, submitted_by is set later by extract.py (from source proposed_by).
|
||||
submitted_by = author if origin == "human" else None
|
||||
# submitted_by precedence (canonical handles only):
|
||||
# 1. branch prefix (pipeline/agent) — set here at discovery
|
||||
# 2. Forgejo opener for human PRs — set here, lowercased
|
||||
# 3. extract.py later (from source proposed_by) — left None
|
||||
if branch_target is not None:
|
||||
submitted_by = branch_target
|
||||
elif origin == "human":
|
||||
submitted_by = author.lower().lstrip("@") if author else None
|
||||
else:
|
||||
submitted_by = None
|
||||
|
||||
conn.execute(
|
||||
"""INSERT OR IGNORE INTO prs
|
||||
|
|
@ -429,6 +457,171 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
|
|||
await _git("branch", "-D", clean_branch)
|
||||
|
||||
|
||||
_GH_PR_BRANCH_RE = re.compile(r"^gh-pr-(\d+)/(.+)$")
|
||||
|
||||
|
||||
async def _merge_no_ff_external(branch: str) -> tuple[bool, str]:
|
||||
"""Merge an external GitHub fork PR with --no-ff so contributor SHA lands in main.
|
||||
|
||||
Why this differs from _cherry_pick_onto_main:
|
||||
- Cherry-pick rewrites the contributor's commit SHA → GitHub's "is PR head SHA
|
||||
an ancestor of main?" check returns false → "merged" badge never fires.
|
||||
- --no-ff preserves the contributor's commit SHA as a parent of the merge
|
||||
commit. After ff-push to main (the existing dispatch step), GitHub sees
|
||||
the SHA in ancestry and marks the PR merged.
|
||||
|
||||
Mechanics:
|
||||
1. Fetch origin/main + origin/{branch}
|
||||
2. Worktree on local branch _merged-{slug} from origin/main
|
||||
3. git merge --no-ff origin/{branch} with verbose message:
|
||||
"Merge external GitHub PR #{N}: {branch_slug}"
|
||||
4. Push merge commit to origin/_merged/{branch} (synthetic audit ref)
|
||||
5. ff-push merge_sha → origin/main directly (function owns the push, NOT
|
||||
dispatch — see sentinel return below)
|
||||
|
||||
The merge commit M has parents [main_sha, branch_sha]. M is a fast-forward
|
||||
descendant of main_sha (via first-parent chain), so the push to main
|
||||
works without --force.
|
||||
|
||||
Synthetic branch (Ship review Apr 28): we deliberately do NOT force-push
|
||||
the contributor's gh-pr-N/* branch. Force-pushing it would rewrite the
|
||||
branch tip with a merge commit the contributor didn't author, showing as
|
||||
a confusing bot force-push in Forgejo's PR UI. The synthetic _merged/*
|
||||
audit ref lets us track the merge commit without touching the contributor's
|
||||
branch. Mirrors the _clean/* synthetic branch pattern in cherry-pick.
|
||||
|
||||
Sentinel return: function pushes merge_sha → main itself (dispatch's ff-push
|
||||
can't, since origin/{branch} is unchanged and not a descendant of main).
|
||||
Returns a "merged --no-ff" sentinel string that dispatch detects to skip
|
||||
its ff-push step and route directly to PR-close + mark_merged + audit.
|
||||
The full 40-char merge SHA is in the return string for dispatch to extract.
|
||||
|
||||
Conflict handling: same auto-resolve pattern as cherry-pick — entity-only
|
||||
conflicts take main's version (--ours = current worktree HEAD = main),
|
||||
other conflicts abort and return False with detail.
|
||||
|
||||
Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
|
||||
"""
|
||||
m = _GH_PR_BRANCH_RE.match(branch)
|
||||
if not m:
|
||||
return False, f"branch {branch} doesn't match gh-pr-N/* format"
|
||||
gh_pr_num = m.group(1)
|
||||
branch_slug = m.group(2)
|
||||
|
||||
slug = branch.replace("/", "-")
|
||||
worktree_path = f"/tmp/teleo-merge-{slug}"
|
||||
local_branch = f"_merged-{slug}" # local working branch in worktree
|
||||
audit_ref = f"_merged/{branch}" # remote synthetic ref (preserves hierarchy)
|
||||
|
||||
# Fetch latest state — separate calls (long branch names break combined refspec)
|
||||
rc, out = await _git("fetch", "origin", "main", timeout=15)
|
||||
if rc != 0:
|
||||
return False, f"fetch main failed: {out}"
|
||||
rc, out = await _git("fetch", "origin", branch, timeout=15)
|
||||
if rc != 0:
|
||||
return False, f"fetch branch failed: {out}"
|
||||
|
||||
# Up-to-date check (mirrors cherry-pick path semantics)
|
||||
rc, merge_base = await _git("merge-base", "origin/main", f"origin/{branch}")
|
||||
rc2, main_sha = await _git("rev-parse", "origin/main")
|
||||
if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip():
|
||||
rc_diff, diff_out = await _git(
|
||||
"diff", "--stat", f"origin/main..origin/{branch}", timeout=10,
|
||||
)
|
||||
if rc_diff != 0 or not diff_out.strip():
|
||||
return True, "already up to date"
|
||||
logger.info("External PR branch %s is descendant of main but has new content — proceeding", branch)
|
||||
|
||||
async with _bare_repo_lock:
|
||||
# Clean up any stale local branch from a prior failed run
|
||||
await _git("branch", "-D", local_branch)
|
||||
rc, out = await _git("worktree", "add", "-b", local_branch, worktree_path, "origin/main")
|
||||
if rc != 0:
|
||||
return False, f"worktree add failed: {out}"
|
||||
|
||||
try:
|
||||
merge_msg = f"Merge external GitHub PR #{gh_pr_num}: {branch_slug}"
|
||||
rc, out = await _git(
|
||||
"merge", "--no-ff", f"origin/{branch}",
|
||||
"-m", merge_msg,
|
||||
cwd=worktree_path, timeout=60,
|
||||
)
|
||||
|
||||
if rc != 0:
|
||||
# Identify conflicts
|
||||
rc_ls, conflicting = await _git(
|
||||
"diff", "--name-only", "--diff-filter=U", cwd=worktree_path,
|
||||
)
|
||||
conflict_files = [
|
||||
f.strip() for f in conflicting.split("\n") if f.strip()
|
||||
] if rc_ls == 0 else []
|
||||
|
||||
if conflict_files and all(f.startswith("entities/") for f in conflict_files):
|
||||
# Entity-only conflicts: take main's version (entities are recoverable)
|
||||
# In merge: --ours = branch we're ON (worktree HEAD = main)
|
||||
# --theirs = branch merging in (origin/{branch})
|
||||
for cf in conflict_files:
|
||||
await _git("checkout", "--ours", cf, cwd=worktree_path)
|
||||
await _git("add", cf, cwd=worktree_path)
|
||||
# Complete the merge using the prepared MERGE_MSG (no editor)
|
||||
rc_cont, cont_out = await _git(
|
||||
"-c", "core.editor=true",
|
||||
"commit", "--no-edit",
|
||||
cwd=worktree_path, timeout=60,
|
||||
)
|
||||
if rc_cont != 0:
|
||||
await _git("merge", "--abort", cwd=worktree_path)
|
||||
return False, f"merge entity resolution failed for PR #{gh_pr_num}: {cont_out}"
|
||||
logger.info(
|
||||
"External PR #%s merge: entity conflict auto-resolved (dropped %s)",
|
||||
gh_pr_num, ", ".join(sorted(conflict_files)),
|
||||
)
|
||||
else:
|
||||
conflict_detail = ", ".join(conflict_files) if conflict_files else out[:200]
|
||||
await _git("merge", "--abort", cwd=worktree_path)
|
||||
return False, f"merge conflict on PR #{gh_pr_num}: {conflict_detail}"
|
||||
|
||||
# Capture the merge commit SHA before any pushes
|
||||
rc, merge_sha = await _git("rev-parse", "HEAD", cwd=worktree_path)
|
||||
if rc != 0:
|
||||
return False, f"rev-parse merge HEAD failed: {merge_sha}"
|
||||
merge_sha = merge_sha.strip().split("\n")[0]
|
||||
|
||||
# Push to synthetic audit ref _merged/{branch} (does not touch contributor's
|
||||
# gh-pr-N/* branch). Plain --force: the audit ref is bot-owned and per-PR;
|
||||
# if a prior aborted attempt left a stale ref, overwriting it is the
|
||||
# intended behavior, and there's no concurrent writer to lease against.
|
||||
rc, out = await _git(
|
||||
"push", "--force", "origin", f"HEAD:refs/heads/{audit_ref}",
|
||||
cwd=worktree_path, timeout=30,
|
||||
)
|
||||
if rc != 0:
|
||||
return False, f"push to audit ref {audit_ref} failed: {out}"
|
||||
|
||||
# ff-push the merge commit to main. This is a true fast-forward (M is a
|
||||
# descendant of origin/main via its first parent), so no --force needed.
|
||||
# Forgejo's branch protection allows ff-push to main from authorized users.
|
||||
rc, out = await _git(
|
||||
"push", "origin", f"{merge_sha}:main",
|
||||
cwd=worktree_path, timeout=30,
|
||||
)
|
||||
if rc != 0:
|
||||
# Roll back audit ref if main push failed — keeps state consistent.
|
||||
await _git("push", "--delete", "origin", f"refs/heads/{audit_ref}",
|
||||
cwd=worktree_path, timeout=15)
|
||||
return False, f"ff-push to main failed: {out}"
|
||||
|
||||
# Sentinel return: "merged --no-ff" prefix triggers dispatch's external-PR
|
||||
# close path (skips ff-push, does PR-close + mark_merged + audit).
|
||||
# Full 40-char merge SHA in the message so dispatch can parse it for audit.
|
||||
return True, f"merged --no-ff (external PR #{gh_pr_num}, M={merge_sha}, audit_ref={audit_ref})"
|
||||
|
||||
finally:
|
||||
async with _bare_repo_lock:
|
||||
await _git("worktree", "remove", "--force", worktree_path)
|
||||
await _git("branch", "-D", local_branch)
|
||||
|
||||
|
||||
from .frontmatter import (
|
||||
REWEAVE_EDGE_FIELDS,
|
||||
parse_yaml_frontmatter,
|
||||
|
|
@ -733,6 +926,12 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
|
|||
# (Ganymede: manifest approach, Theseus: superset assertion + order-preserving dedup)
|
||||
if branch.startswith("reweave/"):
|
||||
merge_fn = _merge_reweave_pr(branch)
|
||||
elif branch.startswith("gh-pr-") and config.EXTERNAL_PR_NO_FF_MERGE:
|
||||
# External GitHub fork PRs: --no-ff merge so contributor SHA lands
|
||||
# in main's history → GitHub recognizes "merged" badge.
|
||||
# Backout via config.EXTERNAL_PR_NO_FF_MERGE = False (falls back to cherry-pick).
|
||||
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
|
||||
merge_fn = _merge_no_ff_external(branch)
|
||||
else:
|
||||
# Extraction commits ADD new files — cherry-pick applies cleanly.
|
||||
merge_fn = _cherry_pick_onto_main(branch)
|
||||
|
|
@ -786,6 +985,58 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
|
|||
succeeded += 1
|
||||
continue
|
||||
|
||||
# External GitHub PR (gh-pr-*): _merge_no_ff_external already pushed
|
||||
# the merge commit to origin/main + the synthetic _merged/{branch}
|
||||
# audit ref. Skip dispatch's ff-push (would fail — origin/{branch} is
|
||||
# the contributor's untouched branch, not a descendant of main).
|
||||
# Just close PR + mark_merged + audit, parsing merge SHA from sentinel.
|
||||
if pick_msg.startswith("merged --no-ff"):
|
||||
m = re.search(r"M=([a-f0-9]{40})", pick_msg)
|
||||
merge_sha = m.group(1) if m else None
|
||||
m_ref = re.search(r"audit_ref=(\S+?)\)", pick_msg)
|
||||
audit_ref = m_ref.group(1) if m_ref else None
|
||||
m_pr = re.search(r"external PR #(\d+)", pick_msg)
|
||||
gh_pr_num = m_pr.group(1) if m_pr else None
|
||||
# Surface drift between dispatch and _merge_no_ff_external if the
|
||||
# success-message contract changes. Merge already succeeded; this
|
||||
# is signal-only, not a gate on the close path.
|
||||
if not (m and m_ref and m_pr):
|
||||
logger.warning(
|
||||
"PR #%d sentinel parse incomplete: M=%s, audit_ref=%s, gh_pr=%s, msg=%r",
|
||||
pr_num, bool(m), bool(m_ref), bool(m_pr), pick_msg,
|
||||
)
|
||||
|
||||
leo_token = get_agent_token("leo")
|
||||
comment_body = (
|
||||
f"Merged via --no-ff into main.\n"
|
||||
f"Merge commit: `{merge_sha}`\n"
|
||||
f"Audit ref: `{audit_ref}`\n"
|
||||
f"Branch: `{branch}` (preserved unchanged)"
|
||||
)
|
||||
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
|
||||
{"body": comment_body})
|
||||
result = await forgejo_api("PATCH", repo_path(f"pulls/{pr_num}"),
|
||||
{"state": "closed"}, token=leo_token)
|
||||
if result is None:
|
||||
logger.error("PR #%d: Forgejo close failed (no-ff path), skipping DB update", pr_num)
|
||||
failed += 1
|
||||
continue
|
||||
mark_merged(conn, pr_num)
|
||||
db.audit(conn, "merge", "merged", json.dumps({
|
||||
"pr": pr_num, "branch": branch, "method": "no-ff",
|
||||
"merge_commit_sha": merge_sha,
|
||||
"audit_ref": audit_ref,
|
||||
"github_pr": gh_pr_num,
|
||||
}))
|
||||
# NOTE: do NOT _delete_remote_branch(branch) here. The contributor's
|
||||
# gh-pr-N/* branch is the mirror of their fork PR head — leaving it
|
||||
# in place lets sync-mirror keep the GitHub PR <-> Forgejo PR link
|
||||
# observable. The synthetic _merged/{branch} ref carries the merge.
|
||||
logger.info("PR #%d merged via --no-ff (M=%s)", pr_num,
|
||||
merge_sha[:8] if merge_sha else "?")
|
||||
succeeded += 1
|
||||
continue
|
||||
|
||||
# Local ff-push: cherry-picked branch is a descendant of origin/main.
|
||||
# Regular push = fast-forward. Non-ff rejected by default (same safety).
|
||||
# --force-with-lease removed: Forgejo categorically blocks it on protected branches.
|
||||
|
|
|
|||
|
|
@ -522,30 +522,53 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
Finds PRs with substantive issue tags that haven't exceeded fix budget.
|
||||
Processes up to 3 per cycle (Rhea: 180s interval, don't overwhelm eval).
|
||||
"""
|
||||
# Build the actionable-tag list from the routing constants so adding a new
|
||||
# tag to FIXABLE_TAGS / CONVERTIBLE_TAGS / UNFIXABLE_TAGS auto-updates the
|
||||
# SELECT filter — no two-place edit footgun.
|
||||
actionable_tags = sorted(FIXABLE_TAGS | CONVERTIBLE_TAGS | UNFIXABLE_TAGS)
|
||||
placeholders = ",".join(["?"] * len(actionable_tags))
|
||||
|
||||
# Push the actionable-tag filter into SQL (was a post-fetch Python loop).
|
||||
# The old shape selected the 3 oldest request_changes PRs and then dropped
|
||||
# ones without actionable tags, so empty-eval_issues rows occupied LIMIT-3
|
||||
# forever (head-of-line). Now LIMIT-3 always returns 3 actionable rows.
|
||||
# Reaper handles the empty-tag PRs after their 24h cooldown.
|
||||
rows = conn.execute(
|
||||
"""SELECT number, eval_issues FROM prs
|
||||
f"""SELECT number, eval_issues FROM prs
|
||||
WHERE status = 'open'
|
||||
AND tier0_pass = 1
|
||||
AND (domain_verdict = 'request_changes' OR leo_verdict = 'request_changes')
|
||||
AND COALESCE(fix_attempts, 0) < ?
|
||||
AND (last_attempt IS NULL OR last_attempt < datetime('now', '-3 minutes'))
|
||||
AND json_valid(eval_issues)
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM json_each(eval_issues)
|
||||
WHERE value IN ({placeholders})
|
||||
)
|
||||
ORDER BY created_at ASC
|
||||
LIMIT 3""",
|
||||
(MAX_SUBSTANTIVE_FIXES + config.MAX_FIX_ATTEMPTS,), # Total budget: mechanical + substantive
|
||||
(MAX_SUBSTANTIVE_FIXES + config.MAX_FIX_ATTEMPTS, *actionable_tags),
|
||||
).fetchall()
|
||||
|
||||
if not rows:
|
||||
return 0, 0
|
||||
|
||||
# Filter to only PRs with substantive issues (not just mechanical)
|
||||
# Defense-in-depth: json_valid(eval_issues) in the SELECT already filters
|
||||
# corrupt JSON before json_each runs, so this WARN should be unreachable.
|
||||
# Kept anyway: json_valid and json.loads use technically distinct parsers,
|
||||
# and the journal entry names the failure mode if SQLite ever surfaces a
|
||||
# row that passes json_valid + json_each but fails json.loads.
|
||||
substantive_rows = []
|
||||
for row in rows:
|
||||
try:
|
||||
issues = json.loads(row["eval_issues"] or "[]")
|
||||
json.loads(row["eval_issues"] or "[]")
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
logger.warning(
|
||||
"PR #%d: corrupt eval_issues JSON — skipping in substantive fix cycle",
|
||||
row["number"],
|
||||
)
|
||||
continue
|
||||
if set(issues) & (FIXABLE_TAGS | CONVERTIBLE_TAGS | UNFIXABLE_TAGS):
|
||||
substantive_rows.append(row)
|
||||
substantive_rows.append(row)
|
||||
|
||||
if not substantive_rows:
|
||||
return 0, 0
|
||||
|
|
@ -559,7 +582,13 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
if result.get("action"):
|
||||
fixed += 1
|
||||
elif result.get("skipped"):
|
||||
logger.debug("PR #%d: substantive fix skipped: %s", row["number"], result.get("reason"))
|
||||
# Was DEBUG — promoted to INFO to make stuck-PR root cause
|
||||
# visible without enabling DEBUG fleet-wide. (Ship Apr 24+
|
||||
# silent skip diagnosis.)
|
||||
logger.info(
|
||||
"PR #%d: substantive fix skipped: %s",
|
||||
row["number"], result.get("reason"),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("PR #%d: substantive fix failed", row["number"])
|
||||
errors += 1
|
||||
|
|
@ -569,3 +598,191 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
logger.info("Substantive fix cycle: %d fixed, %d errors", fixed, errors)
|
||||
|
||||
return fixed, errors
|
||||
|
||||
|
||||
# ─── Verdict-deadlock reaper ──────────────────────────────────────────────
|
||||
#
|
||||
# Defense-in-depth for PRs that substantive_fixer can't make progress on.
|
||||
# Targets two stuck-verdict shapes empirically observed in production:
|
||||
#
|
||||
# 1. leo:request_changes + domain:approve
|
||||
# Leo asked for substantive fix; fixer either failed silently
|
||||
# (no_claim_files / no_review_comments / etc.) or the issue tag isn't
|
||||
# in FIXABLE | CONVERTIBLE | UNFIXABLE. PR sits forever.
|
||||
#
|
||||
# 2. leo:skipped + domain:request_changes
|
||||
# Eval bypassed Leo (eval_attempts >= MAX). Domain rejected with no
|
||||
# structured eval_issues. fixer can't classify → silent skip → forever.
|
||||
#
|
||||
# Both shapes need a clearance path. Reaper closes them after a 24h cooldown
|
||||
# with audit_log breadcrumbs for forensics. First deploy runs in dry-run mode
|
||||
# (audit "would_close" events only — no Forgejo writes, no DB closes).
|
||||
#
|
||||
# Reaper config (REAPER_DRY_RUN, REAPER_DEADLOCK_AGE_HOURS, REAPER_INTERVAL_SECONDS,
|
||||
# REAPER_MAX_PER_RUN) lives in lib/config.py with env-var overrides — operator
|
||||
# flips dry-run to live via `systemctl edit teleo-pipeline.service`
|
||||
# (Environment=REAPER_DRY_RUN=false) + restart. No code change, no commit, no
|
||||
# redeploy required.
|
||||
|
||||
|
||||
async def verdict_deadlock_reaper_cycle(conn) -> int:
|
||||
"""Reap PRs stuck in conflicting-verdict deadlock for >24h.
|
||||
|
||||
Returns count of PRs closed (or "would-close" in dry-run mode).
|
||||
Throttled to once per REAPER_INTERVAL_SECONDS via sentinel audit event.
|
||||
"""
|
||||
# Throttle: skip if last reaper run was within REAPER_INTERVAL_SECONDS.
|
||||
# Uses audit_log as the rate-limit ledger so no schema/state needed.
|
||||
# stage='reaper' filter so the planner uses idx_audit_stage (avoids full scan).
|
||||
last_run = conn.execute(
|
||||
"SELECT MAX(timestamp) FROM audit_log "
|
||||
"WHERE stage = 'reaper' AND event = 'verdict_deadlock_reaper_run'"
|
||||
).fetchone()[0]
|
||||
if last_run:
|
||||
cur = conn.execute(
|
||||
"SELECT (julianday('now') - julianday(?)) * 86400 < ?",
|
||||
(last_run, config.REAPER_INTERVAL_SECONDS),
|
||||
).fetchone()[0]
|
||||
if cur:
|
||||
return 0
|
||||
|
||||
# Two stuck-verdict shapes: leo:rc+domain:approve, leo:skipped+domain:rc.
|
||||
#
|
||||
# Branch allowlist invariant: the reaper closes ONLY disposable, pipeline-
|
||||
# generated branches — content the pipeline (or a daily cron) created and
|
||||
# can recreate. Four classes qualify:
|
||||
#
|
||||
# extract/* — per-source extraction PRs, regenerated next ingest cycle
|
||||
# reweave/* — nightly graph-edge maintenance, regenerated next reweave
|
||||
# fix/* — pipeline-internal fix branches
|
||||
# */research-YYYY-MM-DD — daily {agent}/research-{date} cron sessions.
|
||||
# Matched via SQLite `_` single-char wildcards as
|
||||
# `research-20__-__-__` to literally enforce the date-
|
||||
# suffix shape. Excludes hand-named research branches
|
||||
# (rio/research-batch-agents-memory-harnesses,
|
||||
# theseus/research-2nd-attempt-on-X, etc.) which are
|
||||
# feature work owned by the agent. Pattern good through
|
||||
# 2099; revisit then.
|
||||
#
|
||||
# WIP agent feature branches (theseus/feature-foo, epimetheus/some-fix,
|
||||
# rio/research-thesis-name) are NEVER reaped — owners review their own PRs
|
||||
# on their own cadence. The date-shaped pattern threads the needle: picks
|
||||
# up daily synthesis output the agent regenerates tomorrow while leaving
|
||||
# manually-named research work alone.
|
||||
rows = conn.execute(
|
||||
"""SELECT number, branch, eval_issues, leo_verdict, domain_verdict,
|
||||
last_attempt, fix_attempts
|
||||
FROM prs
|
||||
WHERE status = 'open'
|
||||
AND tier0_pass = 1
|
||||
AND last_attempt IS NOT NULL
|
||||
AND last_attempt < datetime('now', ? || ' hours')
|
||||
AND (branch LIKE 'extract/%'
|
||||
OR branch LIKE 'reweave/%'
|
||||
OR branch LIKE 'fix/%'
|
||||
OR branch LIKE '%/research-20__-__-__')
|
||||
AND (
|
||||
(leo_verdict = 'request_changes' AND domain_verdict = 'approve')
|
||||
OR (leo_verdict = 'skipped' AND domain_verdict = 'request_changes')
|
||||
)
|
||||
ORDER BY last_attempt ASC
|
||||
LIMIT ?""",
|
||||
(f"-{config.REAPER_DEADLOCK_AGE_HOURS}", config.REAPER_MAX_PER_RUN),
|
||||
).fetchall()
|
||||
|
||||
mode = "dryrun" if config.REAPER_DRY_RUN else "live"
|
||||
|
||||
if not rows:
|
||||
# Heartbeat anyway so throttle ticks even when nothing to reap.
|
||||
db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({
|
||||
"candidates": 0, "closed": 0, "mode": mode,
|
||||
}))
|
||||
return 0
|
||||
|
||||
logger.info(
|
||||
"Verdict-deadlock reaper [%s]: %d candidate(s) in deadlock >%dh",
|
||||
mode, len(rows), config.REAPER_DEADLOCK_AGE_HOURS,
|
||||
)
|
||||
|
||||
closed = 0
|
||||
would_close = 0
|
||||
errors = 0
|
||||
for row in rows:
|
||||
pr = row["number"]
|
||||
reason_detail = {
|
||||
"pr": pr,
|
||||
"branch": row["branch"],
|
||||
"leo_verdict": row["leo_verdict"],
|
||||
"domain_verdict": row["domain_verdict"],
|
||||
"eval_issues": row["eval_issues"],
|
||||
"last_attempt": row["last_attempt"],
|
||||
"fix_attempts": row["fix_attempts"],
|
||||
}
|
||||
|
||||
if config.REAPER_DRY_RUN:
|
||||
# Audit only — do NOT touch DB row or Forgejo state.
|
||||
db.audit(conn, "reaper", "verdict_deadlock_would_close",
|
||||
json.dumps(reason_detail))
|
||||
logger.info(
|
||||
"Reaper [dryrun]: would close PR #%d (leo=%s domain=%s issues=%s)",
|
||||
pr, row["leo_verdict"], row["domain_verdict"], row["eval_issues"],
|
||||
)
|
||||
would_close += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
comment_body = (
|
||||
"Closed by verdict-deadlock reaper.\n\n"
|
||||
f"This PR sat for >{config.REAPER_DEADLOCK_AGE_HOURS}h with conflicting "
|
||||
f"verdicts (leo={row['leo_verdict']}, domain={row['domain_verdict']}) "
|
||||
f"that the substantive fixer couldn't auto-resolve.\n\n"
|
||||
f"Eval issues: `{row['eval_issues']}`\n"
|
||||
f"Last attempt: {row['last_attempt']}\n\n"
|
||||
"_Automated message from the LivingIP pipeline._"
|
||||
)
|
||||
await forgejo_api(
|
||||
"POST", repo_path(f"issues/{pr}/comments"), {"body": comment_body},
|
||||
)
|
||||
patch_result = await forgejo_api(
|
||||
"PATCH", repo_path(f"pulls/{pr}"), {"state": "closed"},
|
||||
token=get_agent_token("leo"),
|
||||
)
|
||||
if patch_result is None:
|
||||
logger.warning(
|
||||
"Reaper: PR #%d Forgejo close failed — skipping DB close to "
|
||||
"avoid drift", pr,
|
||||
)
|
||||
errors += 1
|
||||
continue
|
||||
# Forgejo already closed at the PATCH above — pass close_on_forgejo=False
|
||||
# so close_pr() doesn't issue a redundant PATCH (which on transient
|
||||
# failure returns False and skips the DB close → status drift).
|
||||
await close_pr(
|
||||
conn, pr,
|
||||
last_error=(
|
||||
f"verdict_deadlock_reaper: leo={row['leo_verdict']} "
|
||||
f"domain={row['domain_verdict']} age>{config.REAPER_DEADLOCK_AGE_HOURS}h"
|
||||
),
|
||||
close_on_forgejo=False,
|
||||
)
|
||||
db.audit(conn, "reaper", "verdict_deadlock_closed",
|
||||
json.dumps(reason_detail))
|
||||
closed += 1
|
||||
except Exception:
|
||||
logger.exception("Reaper: PR #%d close failed", pr)
|
||||
errors += 1
|
||||
|
||||
db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({
|
||||
"candidates": len(rows), "closed": closed, "would_close": would_close,
|
||||
"errors": errors, "mode": mode,
|
||||
}))
|
||||
if errors:
|
||||
logger.warning(
|
||||
"Verdict-deadlock reaper [%s]: %d closed, %d would-close, %d errors",
|
||||
mode, closed, would_close, errors,
|
||||
)
|
||||
elif config.REAPER_DRY_RUN:
|
||||
logger.info("Verdict-deadlock reaper [dryrun]: %d would-close", would_close)
|
||||
else:
|
||||
logger.info("Verdict-deadlock reaper [live]: %d closed", closed)
|
||||
return closed + would_close
|
||||
|
|
|
|||
|
|
@ -267,6 +267,7 @@ format: tweet | thread
|
|||
status: unprocessed
|
||||
priority: high | medium | low
|
||||
tags: [topic1, topic2]
|
||||
intake_tier: research-task
|
||||
---
|
||||
|
||||
## Content
|
||||
|
|
|
|||
282
scripts/backfill-research-session-attribution.py
Normal file
282
scripts/backfill-research-session-attribution.py
Normal file
|
|
@ -0,0 +1,282 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Backfill: re-attribute research-session-derived PRs from m3taversal to agent.
|
||||
|
||||
Problem: research-session.sh used to write source frontmatter without
|
||||
`proposed_by` / `intake_tier`, so extract.py's contributor-classification
|
||||
fallback set `prs.submitted_by = '@m3taversal'`, which propagated into
|
||||
`contribution_events` as a `handle='m3taversal', role='author'` row per
|
||||
research-derived claim. Result: agent research credited to the human.
|
||||
|
||||
Forward fix is a frontmatter-template patch to research-session.sh.
|
||||
This script corrects historical records.
|
||||
|
||||
Identification:
|
||||
Research-session source archives are committed to teleo-codex with a
|
||||
message matching `^<agent>: research session YYYY-MM-DD —`. The diff
|
||||
for that commit lists `inbox/queue/*.md` files the agent created. Any
|
||||
PR whose `source_path` matches one of those filenames is research-derived.
|
||||
|
||||
Touch list (per matched PR):
|
||||
1. UPDATE prs SET submitted_by = '<agent>' (canonical handle, lowercase, no
|
||||
trailing "(self-directed)" suffix — see lib/extract.py and
|
||||
diagnostics/activity_feed_api.py for why decorators leak into 404s)
|
||||
2. DELETE FROM contribution_events
|
||||
WHERE handle='m3taversal' AND role='author' AND pr_number=?
|
||||
3. INSERT OR IGNORE INTO contribution_events with handle=<agent>,
|
||||
kind='agent', role='author', weight=0.30, original timestamp/domain/channel.
|
||||
|
||||
Defaults to --dry-run. Pass --apply to commit changes.
|
||||
|
||||
Usage:
|
||||
python3 backfill-research-session-attribution.py --dry-run --days 30
|
||||
python3 backfill-research-session-attribution.py --apply --days 30
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import subprocess
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
logger = logging.getLogger("backfill-research-attr")
|
||||
|
||||
DEFAULT_REPO = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
|
||||
DEFAULT_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
|
||||
|
||||
KNOWN_AGENTS = frozenset({"rio", "leo", "theseus", "vida", "clay", "astra"})
|
||||
COMMIT_HEADER_RE = re.compile(r"^([a-z]+):\s+research session\s+\d{4}-\d{2}-\d{2}\s+—")
|
||||
AUTHOR_WEIGHT = 0.30
|
||||
|
||||
|
||||
def git(repo: Path, *args: str) -> str:
|
||||
"""Run a git command in repo, return stdout. Raises on non-zero."""
|
||||
result = subprocess.run(
|
||||
["git", "-C", str(repo), *args],
|
||||
capture_output=True, text=True, check=True,
|
||||
)
|
||||
return result.stdout
|
||||
|
||||
|
||||
def discover_research_session_archives(repo: Path, days: int) -> dict[str, str]:
|
||||
"""Return {source_filename_basename: agent_handle} for last N days.
|
||||
|
||||
Walks teleo-codex `git log --since`, filters to research-session commits,
|
||||
parses agent from message header, lists inbox/queue/*.md files added in
|
||||
that commit's diff. Maps the basename (which becomes source_path on extract)
|
||||
to the agent who created it.
|
||||
"""
|
||||
log = git(repo, "log", f"--since={days} days ago", "--pretty=%H|%s", "--no-merges")
|
||||
file_to_agent: dict[str, str] = {}
|
||||
commits_seen = 0
|
||||
commits_matched = 0
|
||||
for line in log.splitlines():
|
||||
if not line or "|" not in line:
|
||||
continue
|
||||
commits_seen += 1
|
||||
sha, _, subject = line.partition("|")
|
||||
m = COMMIT_HEADER_RE.match(subject)
|
||||
if not m:
|
||||
continue
|
||||
agent = m.group(1)
|
||||
if agent not in KNOWN_AGENTS:
|
||||
logger.debug("skipping commit %s — unknown agent %r", sha[:8], agent)
|
||||
continue
|
||||
commits_matched += 1
|
||||
# List files added in this commit (inbox/queue/*.md only)
|
||||
try:
|
||||
added = git(repo, "diff-tree", "--no-commit-id", "--name-only", "-r",
|
||||
"--diff-filter=A", sha)
|
||||
except subprocess.CalledProcessError:
|
||||
logger.warning("diff-tree failed for %s", sha[:8])
|
||||
continue
|
||||
for f in added.splitlines():
|
||||
if f.startswith("inbox/queue/") and f.endswith(".md"):
|
||||
basename = Path(f).name
|
||||
if basename in file_to_agent and file_to_agent[basename] != agent:
|
||||
logger.warning(
|
||||
"filename collision: %s — was %s, now %s (keeping first)",
|
||||
basename, file_to_agent[basename], agent,
|
||||
)
|
||||
continue
|
||||
file_to_agent.setdefault(basename, agent)
|
||||
logger.info(
|
||||
"scanned %d commits, %d research-session matches, %d unique source files",
|
||||
commits_seen, commits_matched, len(file_to_agent),
|
||||
)
|
||||
return file_to_agent
|
||||
|
||||
|
||||
def find_misattributed_prs(conn: sqlite3.Connection, file_to_agent: dict[str, str], days: int):
|
||||
"""Return list of (pr_number, current_submitted_by, source_path, agent, domain, channel, merged_at).
|
||||
|
||||
Only includes PRs:
|
||||
- with source_path basename in our research-session map
|
||||
- currently attributed to '@m3taversal'
|
||||
- merged within the last N days (cap on temporal scope)
|
||||
"""
|
||||
rows = conn.execute(
|
||||
"""SELECT number, submitted_by, source_path, domain, source_channel, merged_at
|
||||
FROM prs
|
||||
WHERE submitted_by = '@m3taversal'
|
||||
AND source_path IS NOT NULL
|
||||
AND status = 'merged'
|
||||
AND merged_at > datetime('now', ?)""",
|
||||
(f"-{days} days",),
|
||||
).fetchall()
|
||||
matches = []
|
||||
for row in rows:
|
||||
basename = Path(row["source_path"]).name
|
||||
agent = file_to_agent.get(basename)
|
||||
if agent:
|
||||
matches.append({
|
||||
"pr": row["number"],
|
||||
"current_submitted_by": row["submitted_by"],
|
||||
"source_path": row["source_path"],
|
||||
"basename": basename,
|
||||
"agent": agent,
|
||||
"domain": row["domain"],
|
||||
"channel": row["source_channel"],
|
||||
"merged_at": row["merged_at"],
|
||||
})
|
||||
return matches
|
||||
|
||||
|
||||
def existing_event_count(conn: sqlite3.Connection, pr: int, handle: str, role: str) -> int:
|
||||
"""Return count of contribution_events rows matching (handle, role, pr_number, claim_path IS NULL)."""
|
||||
return conn.execute(
|
||||
"""SELECT COUNT(*) FROM contribution_events
|
||||
WHERE handle = ? AND role = ? AND pr_number = ? AND claim_path IS NULL""",
|
||||
(handle, role, pr),
|
||||
).fetchone()[0]
|
||||
|
||||
|
||||
def apply_backfill(conn: sqlite3.Connection, matches: list[dict], dry_run: bool) -> dict:
|
||||
"""Apply the backfill. Returns counters."""
|
||||
counters = defaultdict(int)
|
||||
if not dry_run:
|
||||
conn.execute("BEGIN")
|
||||
try:
|
||||
for m in matches:
|
||||
pr = m["pr"]
|
||||
agent = m["agent"]
|
||||
|
||||
# Pre-checks for accurate dry-run reporting
|
||||
old_event_exists = existing_event_count(conn, pr, "m3taversal", "author") > 0
|
||||
new_event_exists = existing_event_count(conn, pr, agent, "author") > 0
|
||||
|
||||
if dry_run:
|
||||
logger.info(
|
||||
"would update pr=%d submitted_by '%s' → '%s' "
|
||||
"[m3ta_event=%s, agent_event=%s]",
|
||||
pr, m["current_submitted_by"], agent,
|
||||
old_event_exists, new_event_exists,
|
||||
)
|
||||
counters["prs"] += 1
|
||||
if old_event_exists:
|
||||
counters["events_to_delete"] += 1
|
||||
if not new_event_exists:
|
||||
counters["events_to_insert"] += 1
|
||||
continue
|
||||
|
||||
# 1. UPDATE prs.submitted_by — canonical handle only.
|
||||
conn.execute(
|
||||
"UPDATE prs SET submitted_by = ? WHERE number = ?",
|
||||
(agent, pr),
|
||||
)
|
||||
counters["prs"] += 1
|
||||
|
||||
# 2. INSERT new agent author event (idempotent via UNIQUE index)
|
||||
cur = conn.execute(
|
||||
"""INSERT OR IGNORE INTO contribution_events
|
||||
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
|
||||
VALUES (?, 'agent', 'author', ?, ?, NULL, ?, ?, COALESCE(?, datetime('now')))""",
|
||||
(agent, AUTHOR_WEIGHT, pr, m["domain"], m["channel"], m["merged_at"]),
|
||||
)
|
||||
if cur.rowcount > 0:
|
||||
counters["events_inserted"] += 1
|
||||
|
||||
# 3. DELETE old m3taversal author event
|
||||
cur = conn.execute(
|
||||
"""DELETE FROM contribution_events
|
||||
WHERE handle = 'm3taversal' AND role = 'author'
|
||||
AND pr_number = ? AND claim_path IS NULL""",
|
||||
(pr,),
|
||||
)
|
||||
if cur.rowcount > 0:
|
||||
counters["events_deleted"] += 1
|
||||
|
||||
if not dry_run:
|
||||
conn.execute("COMMIT")
|
||||
except Exception:
|
||||
if not dry_run:
|
||||
conn.execute("ROLLBACK")
|
||||
raise
|
||||
|
||||
return dict(counters)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--repo", type=Path, default=DEFAULT_REPO)
|
||||
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
|
||||
parser.add_argument("--days", type=int, default=30)
|
||||
parser.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)")
|
||||
parser.add_argument("--limit", type=int, default=0,
|
||||
help="cap PR updates (0 = no cap; useful for testing on a small slice)")
|
||||
args = parser.parse_args()
|
||||
dry_run = not args.apply
|
||||
|
||||
logger.info("repo=%s db=%s days=%d mode=%s",
|
||||
args.repo, args.db, args.days, "DRY-RUN" if dry_run else "APPLY")
|
||||
|
||||
if not args.repo.exists():
|
||||
logger.error("repo not found: %s", args.repo)
|
||||
sys.exit(1)
|
||||
if not args.db.exists():
|
||||
logger.error("db not found: %s", args.db)
|
||||
sys.exit(1)
|
||||
|
||||
file_to_agent = discover_research_session_archives(args.repo, args.days)
|
||||
if not file_to_agent:
|
||||
logger.warning("no research-session source files found in last %d days", args.days)
|
||||
sys.exit(0)
|
||||
|
||||
# Per-agent breakdown
|
||||
by_agent = defaultdict(int)
|
||||
for agent in file_to_agent.values():
|
||||
by_agent[agent] += 1
|
||||
for agent, count in sorted(by_agent.items()):
|
||||
logger.info(" research-session sources by %s: %d", agent, count)
|
||||
|
||||
conn = sqlite3.connect(args.db)
|
||||
conn.row_factory = sqlite3.Row
|
||||
matches = find_misattributed_prs(conn, file_to_agent, args.days)
|
||||
logger.info("misattributed PRs found: %d", len(matches))
|
||||
|
||||
if args.limit and len(matches) > args.limit:
|
||||
logger.info("--limit=%d — truncating from %d", args.limit, len(matches))
|
||||
matches = matches[:args.limit]
|
||||
|
||||
if not matches:
|
||||
logger.info("nothing to do")
|
||||
return
|
||||
|
||||
# Per-agent breakdown of misattribution
|
||||
miss_by_agent = defaultdict(int)
|
||||
for m in matches:
|
||||
miss_by_agent[m["agent"]] += 1
|
||||
logger.info("misattributed PR breakdown:")
|
||||
for agent, count in sorted(miss_by_agent.items()):
|
||||
logger.info(" %s: %d", agent, count)
|
||||
|
||||
counters = apply_backfill(conn, matches, dry_run)
|
||||
logger.info("RESULT (%s): %s", "DRY-RUN" if dry_run else "APPLIED", counters)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,495 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""metadao-scrape.py — pull active/recent proposals from metadao.fi into source markdown.
|
||||
|
||||
Replaces the broken futard.io GraphQL ingestion (Cloud Run → teleo-api).
|
||||
metadao.fi is a Vercel-protected Next.js App Router site; direct curl is blocked
|
||||
by the anti-bot challenge. A real headless browser passes the challenge cleanly,
|
||||
and once cookies are issued for the context we can call /api/decode-proposal/{addr}
|
||||
from inside the browser to get structured instruction data.
|
||||
|
||||
Discovery flow:
|
||||
1. visit / to prime Vercel cookies
|
||||
2. visit /projects, scrape distinct /projects/{slug} hrefs
|
||||
3. for each project, visit /projects/{slug}, scrape proposal addresses from DOM
|
||||
4. for each NEW proposal (basename not already in --archive-dir):
|
||||
a. visit proposal page, capture rendered prose
|
||||
b. call /api/decode-proposal/{addr} via in-browser fetch for instructions
|
||||
c. write source markdown to --output-dir
|
||||
|
||||
Idempotent. Skips proposals whose basename is already present in archive-dir
|
||||
or output-dir. Designed to run from a systemd timer or one-shot.
|
||||
|
||||
Usage:
|
||||
python3 metadao-scrape.py --archive-dir /opt/teleo-eval/workspaces/main/inbox/archive \\
|
||||
--output-dir /opt/teleo-eval/workspaces/main/inbox/queue \\
|
||||
[--dry-run] [--limit 10] [--project solomon]
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
from datetime import date, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(message)s",
|
||||
)
|
||||
log = logging.getLogger("metadao-scrape")
|
||||
|
||||
BASE = "https://www.metadao.fi"
|
||||
USER_AGENT = (
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
|
||||
)
|
||||
|
||||
|
||||
def slugify(text: str, max_len: int = 60) -> str:
|
||||
s = text.lower().strip()
|
||||
s = re.sub(r"[^a-z0-9\s-]", "", s)
|
||||
s = re.sub(r"\s+", "-", s)
|
||||
s = re.sub(r"-+", "-", s)
|
||||
return s.strip("-")[:max_len].rstrip("-")
|
||||
|
||||
|
||||
def _yaml_str(s: str) -> str:
|
||||
"""Quote-safe YAML string. JSON strings are valid YAML strings."""
|
||||
return json.dumps(s, ensure_ascii=False)
|
||||
|
||||
|
||||
def existing_basenames(*dirs: Path) -> set[str]:
|
||||
"""Collect all .md basenames (without extension) across the given dirs (recursive)."""
|
||||
seen: set[str] = set()
|
||||
for d in dirs:
|
||||
if not d.exists():
|
||||
continue
|
||||
for p in d.rglob("*.md"):
|
||||
seen.add(p.stem)
|
||||
return seen
|
||||
|
||||
|
||||
PROP_ADDR_RE = re.compile(r"proposal_address:\s*[\"']?([A-Za-z0-9]{32,44})[\"']?")
|
||||
URL_ADDR_RE = re.compile(r"(?:futard\.io|metadao\.fi)(?:/[^/\s\"']*)*?/proposal/([A-Za-z0-9]{32,44})")
|
||||
|
||||
|
||||
def existing_proposal_addresses(*dirs: Path) -> set[str]:
|
||||
"""Scan frontmatter / URLs in existing source files to collect known proposal addresses.
|
||||
|
||||
Reads only the first 4KB of each file (frontmatter + URL line are at the top)
|
||||
to keep this fast on large archives.
|
||||
"""
|
||||
addrs: set[str] = set()
|
||||
for d in dirs:
|
||||
if not d.exists():
|
||||
continue
|
||||
for p in d.rglob("*.md"):
|
||||
try:
|
||||
head = p.read_text(errors="replace")[:4096]
|
||||
except Exception:
|
||||
continue
|
||||
for m in PROP_ADDR_RE.finditer(head):
|
||||
addrs.add(m.group(1))
|
||||
for m in URL_ADDR_RE.finditer(head):
|
||||
addrs.add(m.group(1))
|
||||
return addrs
|
||||
|
||||
|
||||
def list_project_slugs(page) -> list[str]:
|
||||
"""Read /projects and extract distinct project slugs."""
|
||||
page.goto(f"{BASE}/projects", wait_until="domcontentloaded", timeout=30000)
|
||||
page.wait_for_timeout(1500)
|
||||
hrefs = page.evaluate(
|
||||
"""() => {
|
||||
const links = Array.from(document.querySelectorAll('a[href^="/projects/"]'));
|
||||
const slugs = new Set();
|
||||
for (const a of links) {
|
||||
const m = a.getAttribute('href').match(/^\\/projects\\/([a-z0-9-]+)(?:\\/|$)/);
|
||||
if (m && m[1]) slugs.add(m[1]);
|
||||
}
|
||||
return [...slugs];
|
||||
}"""
|
||||
)
|
||||
return list(hrefs)
|
||||
|
||||
|
||||
def get_project_metadata(page, slug: str) -> dict:
|
||||
"""Visit a project page and return basic metadata + proposal addresses + card text.
|
||||
Card text typically contains 'SOLO-004 ENDED DP-00003 (MEM): The Gigabus Proposal Pass $0.64...'
|
||||
so we capture it for downstream title parsing.
|
||||
"""
|
||||
url = f"{BASE}/projects/{slug}"
|
||||
page.goto(url, wait_until="domcontentloaded", timeout=30000)
|
||||
page.wait_for_timeout(1500)
|
||||
|
||||
proposals = page.evaluate(
|
||||
"""() => {
|
||||
const links = Array.from(document.querySelectorAll('a[href*="/proposal/"]'));
|
||||
const seen = new Set();
|
||||
const out = [];
|
||||
const TARGET_ADDR_RE = /\\/proposal\\/([A-Za-z0-9]+)/;
|
||||
for (const a of links) {
|
||||
const m = a.getAttribute('href').match(TARGET_ADDR_RE);
|
||||
if (!m) continue;
|
||||
if (seen.has(m[1])) continue;
|
||||
seen.add(m[1]);
|
||||
const addr = m[1];
|
||||
// Walk up only while the ancestor contains exactly one proposal link
|
||||
// (so we get the card, not a parent that contains all cards).
|
||||
let card = a;
|
||||
while (card.parentElement) {
|
||||
const parent = card.parentElement;
|
||||
const propLinks = parent.querySelectorAll('a[href*="/proposal/"]');
|
||||
if (propLinks.length > 1) break;
|
||||
card = parent;
|
||||
}
|
||||
out.push({
|
||||
address: addr,
|
||||
link_text: (a.innerText || '').trim().slice(0, 600),
|
||||
card_text: (card.innerText || '').trim().slice(0, 1500),
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}"""
|
||||
)
|
||||
|
||||
# Try to read project name from h1 / title
|
||||
project_name = page.evaluate(
|
||||
"""() => {
|
||||
const h = document.querySelector('h1');
|
||||
return h ? h.innerText.trim() : '';
|
||||
}"""
|
||||
) or slug.title()
|
||||
|
||||
return {"slug": slug, "name": project_name, "url": url, "proposals": proposals}
|
||||
|
||||
|
||||
# Strict pattern: DP-NNNNN (CAT): Title — the canonical proposal heading.
|
||||
DP_STRICT_RE = re.compile(r"DP-\d+\s*\([A-Z]+\)\s*[:\-]\s*[^\n\r]+", re.MULTILINE)
|
||||
# Loose pattern: any line starting with DP-NNNNN followed by something.
|
||||
DP_LOOSE_RE = re.compile(r"DP-\d+\s*(?:\([A-Z]+\))?\s*[:\-]?\s*[^\n\r]+", re.MULTILINE)
|
||||
STAT_BLEED_RE = re.compile(
|
||||
# Stat keywords only bleed when followed by a numeric/symbolic stat token,
|
||||
# so word-only sequences like "Active Capital" or "Live Streaming Service" pass.
|
||||
r"\s+\b(?:Pass|Fail|Passed|Failed|Active|Pending|Ended|Live|TOTAL|VOLUME|STATUS|MCAP|PRICE|SPOT)\b\s+(?:\$|\+|-|\d)"
|
||||
r"|\s*(?:\$\d|\+\d{2,}|\d+\.\d+%|\d{5,})",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _clean_title_candidate(line: str) -> str:
|
||||
line = line.strip()
|
||||
# Find first bleed match past offset 10. re.search returns leftmost, but the
|
||||
# DP-NNNNN digit sequence always wins first place; we want the first POST-title
|
||||
# match instead. Walk all matches and trim at the earliest one past the guard.
|
||||
for bleed in STAT_BLEED_RE.finditer(line):
|
||||
if bleed.start() > 10:
|
||||
line = line[: bleed.start()].rstrip(" :-—")
|
||||
break
|
||||
return line.strip()[:200]
|
||||
|
||||
|
||||
def extract_dp_title(*texts: str) -> str:
|
||||
"""Find the canonical 'DP-NNNNN (CAT): Title' line.
|
||||
|
||||
Strategy:
|
||||
1. Try strict pattern (with parenthetical category code) across all sources.
|
||||
Take the SHORTEST hit — prose continuations of an already-correct title
|
||||
tend to be longer than the title itself.
|
||||
2. Fall back to loose pattern, longest match.
|
||||
"""
|
||||
strict: list[str] = []
|
||||
loose: list[str] = []
|
||||
for t in texts:
|
||||
if not t:
|
||||
continue
|
||||
for m in DP_STRICT_RE.finditer(t):
|
||||
cleaned = _clean_title_candidate(m.group(0))
|
||||
if cleaned:
|
||||
strict.append(cleaned)
|
||||
for m in DP_LOOSE_RE.finditer(t):
|
||||
cleaned = _clean_title_candidate(m.group(0))
|
||||
if cleaned:
|
||||
loose.append(cleaned)
|
||||
if strict:
|
||||
return min(strict, key=len)
|
||||
if loose:
|
||||
return max(loose, key=len)
|
||||
return ""
|
||||
|
||||
|
||||
def fetch_proposal(page, project_slug: str, addr: str, card_text: str = "") -> dict | None:
|
||||
"""Visit proposal page, capture rendered text + decode instructions via in-browser fetch."""
|
||||
url = f"{BASE}/projects/{project_slug}/proposal/{addr}"
|
||||
log.info("fetching proposal %s/%s", project_slug, addr[:8])
|
||||
try:
|
||||
page.goto(url, wait_until="domcontentloaded", timeout=45000)
|
||||
except PWTimeout:
|
||||
log.warning("timeout loading %s — using whatever rendered", url)
|
||||
page.wait_for_timeout(2500) # let RSC stream finish
|
||||
|
||||
body_text = page.evaluate("() => document.body.innerText || ''")
|
||||
|
||||
# Title preference: card_text (from project page) → body_text DP-NNNNN match → first h1/h2
|
||||
title_block = extract_dp_title(card_text, body_text)
|
||||
if not title_block:
|
||||
title_block = page.evaluate(
|
||||
"""() => {
|
||||
const h = document.querySelector('h1, h2');
|
||||
return h ? h.innerText.trim() : '';
|
||||
}"""
|
||||
) or f"proposal-{addr[:8]}"
|
||||
|
||||
# Status: 'Passed' / 'Failed' / 'Active' / 'Pending'
|
||||
status = page.evaluate(
|
||||
"""() => {
|
||||
const text = document.body.innerText || '';
|
||||
const m = text.match(/\\n(Passed|Failed|Active|Pending|Live|Ended)\\b/);
|
||||
return m ? m[1] : '';
|
||||
}"""
|
||||
)
|
||||
|
||||
# Get the structured /api/decode-proposal data
|
||||
decoded = None
|
||||
try:
|
||||
decoded = page.evaluate(
|
||||
f"""async () => {{
|
||||
try {{
|
||||
const r = await fetch('/api/decode-proposal/{addr}');
|
||||
if (!r.ok) return null;
|
||||
return await r.json();
|
||||
}} catch (e) {{ return null; }}
|
||||
}}"""
|
||||
)
|
||||
except Exception as e:
|
||||
log.debug("decode fetch failed for %s: %s", addr, e)
|
||||
|
||||
return {
|
||||
"address": addr,
|
||||
"project_slug": project_slug,
|
||||
"url": url,
|
||||
"title": title_block,
|
||||
"status": status,
|
||||
"body_text": body_text,
|
||||
"decoded": decoded,
|
||||
}
|
||||
|
||||
|
||||
def parse_dp_code(title: str) -> tuple[str, str]:
|
||||
"""Parse 'DP-00003 (MEM): The Gigabus Proposal' → ('dp-00003-mem', 'The Gigabus Proposal').
|
||||
Falls back gracefully if format doesn't match.
|
||||
"""
|
||||
# Match leading DP-NNNNN[space(category)]?[:]?[space]? plus the rest
|
||||
m = re.match(r"^(DP-\d+(?:\s*\([A-Z]+\))?)\s*[:\-]?\s*(.*)$", title.strip())
|
||||
if m:
|
||||
code = re.sub(r"[^a-z0-9]+", "-", m.group(1).lower()).strip("-")
|
||||
rest = m.group(2).strip()
|
||||
return code, rest
|
||||
return "", title.strip()
|
||||
|
||||
|
||||
def build_filename(project_slug: str, proposal: dict, today: str) -> str:
|
||||
"""YYYY-MM-DD-metadao-{slug}-{title-fragment}-{addr8}.md
|
||||
|
||||
Embedding the address fragment makes filenames stable across runs even when
|
||||
the title isn't unique (e.g. projects that don't use DP-NNNNN naming).
|
||||
"""
|
||||
title = proposal.get("title") or ""
|
||||
code, rest = parse_dp_code(title)
|
||||
parts: list[str] = []
|
||||
if code:
|
||||
parts.append(code)
|
||||
if rest:
|
||||
parts.append(slugify(rest, max_len=40))
|
||||
body_slug = "-".join(p for p in parts if p)[:60].rstrip("-")
|
||||
addr_frag = proposal["address"][:8].lower()
|
||||
if body_slug:
|
||||
return f"{today}-metadao-{project_slug}-{body_slug}-{addr_frag}.md"
|
||||
return f"{today}-metadao-{project_slug}-{addr_frag}.md"
|
||||
|
||||
|
||||
def build_source_markdown(project: dict, proposal: dict, today: str) -> str:
|
||||
"""Build the source markdown matching the existing schema."""
|
||||
title = proposal.get("title") or f"{project['name']} proposal {proposal['address'][:8]}"
|
||||
body_text = (proposal.get("body_text") or "").strip()
|
||||
decoded = proposal.get("decoded") or {}
|
||||
|
||||
# Build YAML frontmatter — all free-text values escaped via _yaml_str (json.dumps).
|
||||
# project_slug is constrained to [a-z0-9-] by slugify upstream, but pass through
|
||||
# the same path for consistency.
|
||||
full_title = f"MetaDAO: {project['name']} — {title}"
|
||||
fm_lines = [
|
||||
"---",
|
||||
"type: source",
|
||||
f"title: {_yaml_str(full_title)}",
|
||||
f"author: {_yaml_str('metadao.fi')}",
|
||||
f"url: {_yaml_str(proposal['url'])}",
|
||||
f"date: {today}",
|
||||
"domain: internet-finance",
|
||||
"format: data",
|
||||
"status: unprocessed",
|
||||
f"tags: [futardio, metadao, futarchy, solana, governance, {project['slug']}]",
|
||||
"event_type: proposal",
|
||||
f"project_slug: {_yaml_str(project['slug'])}",
|
||||
f"proposal_address: {_yaml_str(proposal['address'])}",
|
||||
]
|
||||
if proposal.get("status"):
|
||||
fm_lines.append(f"proposal_status: {_yaml_str(proposal['status'])}")
|
||||
if decoded.get("squadsProposal"):
|
||||
fm_lines.append(f"squads_proposal: {_yaml_str(decoded['squadsProposal'])}")
|
||||
if decoded.get("squadsStatus"):
|
||||
fm_lines.append(f"squads_status: {_yaml_str(decoded['squadsStatus'])}")
|
||||
fm_lines.append("---")
|
||||
fm_lines.append("")
|
||||
|
||||
# Header section — quick facts
|
||||
body_md = [
|
||||
f"# {title}",
|
||||
"",
|
||||
"## Proposal Details",
|
||||
f"- Project: {project['name']} (`{project['slug']}`)",
|
||||
f"- Proposal: {title}",
|
||||
f"- Address: `{proposal['address']}`",
|
||||
]
|
||||
if proposal.get("status"):
|
||||
body_md.append(f"- Status: {proposal['status']}")
|
||||
body_md.append(f"- URL: {proposal['url']}")
|
||||
|
||||
# Proposal prose body (rendered text from the page)
|
||||
body_md.append("")
|
||||
body_md.append("## Proposal Body")
|
||||
body_md.append("")
|
||||
body_md.append(body_text or "_(no body captured)_")
|
||||
|
||||
# Decoded on-chain instructions
|
||||
if decoded:
|
||||
body_md.append("")
|
||||
body_md.append("## On-chain Decoded")
|
||||
if decoded.get("squadsUrl"):
|
||||
body_md.append(f"- Squads: {decoded['squadsUrl']}")
|
||||
instrs = decoded.get("instructions") or []
|
||||
if instrs:
|
||||
body_md.append("")
|
||||
body_md.append("### Instructions")
|
||||
for i, instr in enumerate(instrs, 1):
|
||||
body_md.append(f"{i}. **{instr.get('description', instr.get('type', 'instruction'))}** ({instr.get('program', '')})")
|
||||
for f in instr.get("fields", []) or []:
|
||||
val = f.get("fullValue") or f.get("value") or ""
|
||||
body_md.append(f" - {f.get('label', '')}: `{val}`")
|
||||
if instr.get("summary"):
|
||||
body_md.append(f" - Summary: {instr['summary']}")
|
||||
|
||||
return "\n".join(fm_lines + body_md) + "\n"
|
||||
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser(description="Scrape MetaDAO proposals into inbox source files")
|
||||
p.add_argument("--archive-dir", required=True, help="existing archive dir (skip if basename exists here)")
|
||||
p.add_argument("--output-dir", required=True, help="dir to write new source markdown into")
|
||||
p.add_argument("--project", help="restrict to a single project slug (default: scan all)")
|
||||
p.add_argument("--limit", type=int, default=0, help="max number of new proposals to capture (0 = unlimited)")
|
||||
p.add_argument("--dry-run", action="store_true", help="print intended writes instead of writing")
|
||||
p.add_argument("--headless", action="store_true", default=True)
|
||||
args = p.parse_args()
|
||||
|
||||
archive_dir = Path(args.archive_dir).resolve()
|
||||
output_dir = Path(args.output_dir).resolve()
|
||||
seen_basenames = existing_basenames(archive_dir, output_dir)
|
||||
seen_addresses = existing_proposal_addresses(archive_dir, output_dir)
|
||||
log.info("loaded %d existing basenames + %d known proposal addresses from %s + %s",
|
||||
len(seen_basenames), len(seen_addresses), archive_dir, output_dir)
|
||||
|
||||
today = date.today().isoformat()
|
||||
|
||||
written: list[str] = []
|
||||
skipped_existing = 0
|
||||
|
||||
with sync_playwright() as pw:
|
||||
browser = pw.chromium.launch(headless=args.headless)
|
||||
ctx = browser.new_context(user_agent=USER_AGENT)
|
||||
page = ctx.new_page()
|
||||
|
||||
# Prime cookies
|
||||
log.info("priming Vercel session via homepage")
|
||||
page.goto(f"{BASE}/", wait_until="domcontentloaded", timeout=30000)
|
||||
page.wait_for_timeout(1500)
|
||||
|
||||
# Discovery
|
||||
if args.project:
|
||||
project_slugs = [args.project]
|
||||
else:
|
||||
project_slugs = list_project_slugs(page)
|
||||
log.info("discovered %d project slugs: %s", len(project_slugs), project_slugs)
|
||||
|
||||
for slug in project_slugs:
|
||||
try:
|
||||
project = get_project_metadata(page, slug)
|
||||
except Exception:
|
||||
log.exception("failed to read project %s", slug)
|
||||
continue
|
||||
log.info(" %s — %d proposals", slug, len(project["proposals"]))
|
||||
|
||||
for prop in project["proposals"]:
|
||||
addr = prop["address"]
|
||||
# Pre-check #1: known proposal address (cheapest, no browser visit)
|
||||
if addr in seen_addresses:
|
||||
skipped_existing += 1
|
||||
continue
|
||||
# Pre-check #2: address fragment in an existing basename
|
||||
addr_frag = addr[:8].lower()
|
||||
if any(addr_frag in b.lower() for b in seen_basenames):
|
||||
skipped_existing += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
proposal_data = fetch_proposal(page, slug, addr, card_text=prop.get("card_text", ""))
|
||||
except Exception:
|
||||
log.exception("failed to fetch proposal %s/%s", slug, addr)
|
||||
continue
|
||||
if not proposal_data:
|
||||
continue
|
||||
|
||||
# Minimum-render gate: skip partial renders rather than archiving stubs.
|
||||
# Successful captures are 20KB+; require either a real body or a DP-N title.
|
||||
body_len = len(proposal_data.get("body_text") or "")
|
||||
has_dp_match = bool(re.search(r"DP-\d+", proposal_data.get("title", "") or ""))
|
||||
if body_len < 500 and not has_dp_match:
|
||||
log.warning(" skip (insufficient render): %s body=%dB title=%r",
|
||||
addr, body_len, proposal_data.get("title", ""))
|
||||
continue
|
||||
|
||||
fname = build_filename(slug, proposal_data, today)
|
||||
|
||||
if Path(fname).stem in seen_basenames:
|
||||
skipped_existing += 1
|
||||
log.info(" skip (already archived by title): %s", fname)
|
||||
continue
|
||||
|
||||
content = build_source_markdown(project, proposal_data, today)
|
||||
target = output_dir / fname
|
||||
if args.dry_run:
|
||||
log.info(" DRY: would write %s (%d bytes)", target, len(content))
|
||||
else:
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
target.write_text(content)
|
||||
log.info(" wrote %s (%d bytes)", target, len(content))
|
||||
written.append(fname)
|
||||
|
||||
if args.limit and len(written) >= args.limit:
|
||||
log.info("hit limit=%d, stopping", args.limit)
|
||||
browser.close()
|
||||
print(json.dumps({"written": written, "skipped_existing": skipped_existing, "dry_run": args.dry_run}))
|
||||
return 0
|
||||
|
||||
browser.close()
|
||||
|
||||
print(json.dumps({"written": written, "skipped_existing": skipped_existing, "dry_run": args.dry_run}))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
114
scripts/normalize-submitted-by.py
Executable file
114
scripts/normalize-submitted-by.py
Executable file
|
|
@ -0,0 +1,114 @@
|
|||
#!/usr/bin/env python3
|
||||
"""One-time backfill: canonicalize prs.submitted_by and sources.submitted_by.
|
||||
|
||||
Strips legacy decorators ("(self-directed)", "(reweave)"), lowercases, drops
|
||||
the @ prefix. After this runs, every value matches the contract documented
|
||||
on diagnostics/activity_feed_api.py::_normalize_contributor — and the
|
||||
companion read-side fix becomes redundant defense-in-depth instead of
|
||||
load-bearing.
|
||||
|
||||
Defaults to --dry-run. Pass --apply to commit.
|
||||
|
||||
Usage:
|
||||
python3 normalize-submitted-by.py --dry-run
|
||||
python3 normalize-submitted-by.py --apply
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import sys
|
||||
from collections import Counter
|
||||
|
||||
DEFAULT_DB = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
|
||||
|
||||
# Valid handle: lowercase alphanum + _-, 1-39 chars (matches GitHub rules,
|
||||
# same as pipeline/lib/attribution._HANDLE_RE). Anything with parens, spaces,
|
||||
# or uppercase needs canonicalization.
|
||||
_TRAILING_PAREN_RE = re.compile(r"\s*\([^)]*\)\s*$")
|
||||
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
|
||||
|
||||
|
||||
def canonicalize(raw):
|
||||
if raw is None:
|
||||
return None
|
||||
h = raw.strip().lower().lstrip("@")
|
||||
h = _TRAILING_PAREN_RE.sub("", h).strip()
|
||||
return h or None
|
||||
|
||||
|
||||
def normalize_table(conn, table, dry_run):
|
||||
cur = conn.execute(
|
||||
f"SELECT rowid, submitted_by FROM {table} WHERE submitted_by IS NOT NULL"
|
||||
)
|
||||
changes = []
|
||||
for row in cur.fetchall():
|
||||
old = row[1]
|
||||
new = canonicalize(old)
|
||||
if new != old:
|
||||
changes.append((row[0], old, new))
|
||||
|
||||
print(f"\n{table}: {len(changes)} rows need normalization")
|
||||
if not changes:
|
||||
return 0
|
||||
|
||||
# Distribution preview
|
||||
from_to = Counter((old, new) for _, old, new in changes)
|
||||
for (old, new), count in from_to.most_common(15):
|
||||
print(f" {count:>5} {old!r:40} -> {new!r}")
|
||||
if len(from_to) > 15:
|
||||
print(f" ... ({len(from_to) - 15} more distinct mappings)")
|
||||
|
||||
# Sanity: every result is a valid handle (no garbage falls through).
|
||||
invalid = [(rowid, old, new) for rowid, old, new in changes
|
||||
if new is not None and not _HANDLE_RE.match(new)]
|
||||
if invalid:
|
||||
print(f"\n WARNING: {len(invalid)} rows would normalize to invalid handles:")
|
||||
for rowid, old, new in invalid[:10]:
|
||||
print(f" rowid={rowid} {old!r} -> {new!r}")
|
||||
print(" These rows will be SKIPPED (left as-is). Inspect manually.")
|
||||
|
||||
valid_changes = [(rowid, old, new) for rowid, old, new in changes
|
||||
if new is None or _HANDLE_RE.match(new)]
|
||||
|
||||
if dry_run:
|
||||
print(f" [dry-run] would update {len(valid_changes)} rows in {table}")
|
||||
return len(valid_changes)
|
||||
|
||||
for rowid, _, new in valid_changes:
|
||||
conn.execute(
|
||||
f"UPDATE {table} SET submitted_by = ? WHERE rowid = ?",
|
||||
(new, rowid),
|
||||
)
|
||||
print(f" updated {len(valid_changes)} rows in {table}")
|
||||
return len(valid_changes)
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--db", default=DEFAULT_DB)
|
||||
ap.add_argument("--apply", action="store_true", help="Commit changes (default is dry-run)")
|
||||
ap.add_argument("--dry-run", action="store_true", help="Preview only (default)")
|
||||
args = ap.parse_args()
|
||||
|
||||
dry_run = not args.apply
|
||||
print(f"DB: {args.db}")
|
||||
print(f"Mode: {'DRY-RUN' if dry_run else 'APPLY'}")
|
||||
|
||||
conn = sqlite3.connect(args.db, timeout=30)
|
||||
try:
|
||||
total = 0
|
||||
total += normalize_table(conn, "prs", dry_run)
|
||||
total += normalize_table(conn, "sources", dry_run)
|
||||
if not dry_run:
|
||||
conn.commit()
|
||||
print(f"\nCommitted. Total rows updated: {total}")
|
||||
else:
|
||||
print(f"\nDry-run complete. Run with --apply to commit ({total} rows pending).")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
168
scripts/reattribute-by-branch-prefix.py
Executable file
168
scripts/reattribute-by-branch-prefix.py
Executable file
|
|
@ -0,0 +1,168 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Reattribute PRs and their author events from m3taversal to the true author.
|
||||
|
||||
Scope (intentionally conservative):
|
||||
- branch reweave/* -> pipeline (system maintenance, no human author)
|
||||
- branch ingestion/* -> pipeline (pipeline-internal source intake)
|
||||
- branch <agent>/* -> <agent> (autonomous agent work)
|
||||
for agent in {leo, vida, rio, astra, clay, theseus}.
|
||||
|
||||
NOT in scope:
|
||||
- branch extract/* -- proposed_by may legitimately be absent
|
||||
(telegram source drops default to operator).
|
||||
|
||||
Per affected PR (atomic):
|
||||
1. UPDATE prs.submitted_by -> target
|
||||
2. UPDATE sources.submitted_by where path = pr.source_path
|
||||
3. UPDATE contribution_events.handle for every m3ta author event on this PR
|
||||
(kind set to 'agent', since pipeline + the six agents are all kind='agent'
|
||||
per attribution.PENTAGON_AGENTS).
|
||||
|
||||
Idempotent. Dry-run by default; --apply commits.
|
||||
Run AFTER scripts/normalize-submitted-by.py.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from collections import Counter
|
||||
|
||||
DB_PATH = os.environ.get("DB_PATH", "/opt/teleo-eval/pipeline/pipeline.db")
|
||||
|
||||
AGENT_PREFIXES = ("leo/", "vida/", "rio/", "astra/", "clay/", "theseus/")
|
||||
PIPELINE_PREFIXES = ("reweave/", "ingestion/")
|
||||
|
||||
|
||||
def target_for(branch):
|
||||
if not branch:
|
||||
return None
|
||||
if branch.startswith(PIPELINE_PREFIXES):
|
||||
return "pipeline"
|
||||
for prefix in AGENT_PREFIXES:
|
||||
if branch.startswith(prefix):
|
||||
return prefix.rstrip("/")
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)")
|
||||
ap.add_argument("--db", default=DB_PATH)
|
||||
args = ap.parse_args()
|
||||
|
||||
conn = sqlite3.connect(args.db, timeout=30)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA busy_timeout = 30000")
|
||||
|
||||
mode = "APPLY" if args.apply else "DRY-RUN"
|
||||
print("DB: {}\nMode: {}\n".format(args.db, mode))
|
||||
|
||||
rows = conn.execute("""
|
||||
SELECT number, branch, source_path
|
||||
FROM prs
|
||||
WHERE submitted_by = 'm3taversal'
|
||||
AND branch IS NOT NULL
|
||||
""").fetchall()
|
||||
|
||||
pr_targets = []
|
||||
pr_counts = Counter()
|
||||
for r in rows:
|
||||
tgt = target_for(r["branch"])
|
||||
if tgt is None:
|
||||
continue
|
||||
pr_targets.append((r["number"], r["branch"], r["source_path"], tgt))
|
||||
pr_counts[tgt] += 1
|
||||
|
||||
print("prs to reattribute: {}".format(len(pr_targets)))
|
||||
for tgt, n in pr_counts.most_common():
|
||||
print(" {:6d} -> {!r}".format(n, tgt))
|
||||
|
||||
src_paths = [t[2] for t in pr_targets if t[2]]
|
||||
src_count = 0
|
||||
if src_paths:
|
||||
placeholders = ",".join("?" * len(src_paths))
|
||||
src_count = conn.execute(
|
||||
"SELECT COUNT(*) FROM sources "
|
||||
"WHERE submitted_by = 'm3taversal' AND path IN ({})".format(placeholders),
|
||||
src_paths,
|
||||
).fetchone()[0]
|
||||
print("\nsources rows that will be re-pointed: {}".format(src_count))
|
||||
|
||||
pr_to_target = {p[0]: p[3] for p in pr_targets}
|
||||
events = []
|
||||
if pr_to_target:
|
||||
pr_placeholders = ",".join("?" * len(pr_to_target))
|
||||
events = conn.execute(
|
||||
"SELECT id, pr_number FROM contribution_events "
|
||||
"WHERE handle = 'm3taversal' AND role = 'author' "
|
||||
"AND pr_number IN ({})".format(pr_placeholders),
|
||||
list(pr_to_target.keys()),
|
||||
).fetchall()
|
||||
print("contribution_events author rows to move: {}".format(len(events)))
|
||||
ev_counts = Counter(pr_to_target[e["pr_number"]] for e in events)
|
||||
for tgt, n in ev_counts.most_common():
|
||||
print(" {:6d} events -> {!r}".format(n, tgt))
|
||||
|
||||
if not args.apply:
|
||||
print("\nDry-run complete. Run with --apply to commit "
|
||||
"({} PRs + {} sources + {} events).".format(
|
||||
len(pr_targets), src_count, len(events)))
|
||||
return 0
|
||||
|
||||
pr_updated = 0
|
||||
src_updated = 0
|
||||
ev_updated = 0
|
||||
ev_collisions = 0
|
||||
|
||||
try:
|
||||
for pr_num, branch, source_path, target in pr_targets:
|
||||
cur = conn.execute(
|
||||
"UPDATE prs SET submitted_by = ? "
|
||||
"WHERE number = ? AND submitted_by = 'm3taversal'",
|
||||
(target, pr_num),
|
||||
)
|
||||
pr_updated += cur.rowcount
|
||||
|
||||
if source_path:
|
||||
cur = conn.execute(
|
||||
"UPDATE sources SET submitted_by = ? "
|
||||
"WHERE path = ? AND submitted_by = 'm3taversal'",
|
||||
(target, source_path),
|
||||
)
|
||||
src_updated += cur.rowcount
|
||||
|
||||
for ev in conn.execute(
|
||||
"SELECT id FROM contribution_events "
|
||||
"WHERE handle = 'm3taversal' AND role = 'author' AND pr_number = ?",
|
||||
(pr_num,),
|
||||
).fetchall():
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE contribution_events SET handle = ?, kind = 'agent' "
|
||||
"WHERE id = ?",
|
||||
(target, ev["id"]),
|
||||
)
|
||||
ev_updated += 1
|
||||
except sqlite3.IntegrityError:
|
||||
conn.execute(
|
||||
"DELETE FROM contribution_events WHERE id = ?",
|
||||
(ev["id"],),
|
||||
)
|
||||
ev_collisions += 1
|
||||
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
|
||||
print("\nCommitted.")
|
||||
print(" prs.submitted_by moves: {}".format(pr_updated))
|
||||
print(" sources.submitted_by moves: {}".format(src_updated))
|
||||
print(" contribution_events moves: {}".format(ev_updated))
|
||||
print(" ce collisions deleted: {}".format(ev_collisions))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
108
scripts/reset-m3taversal-sourcer.py
Normal file
108
scripts/reset-m3taversal-sourcer.py
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Reset m3taversal.sourcer_count from inflated legacy value to file-truth count.
|
||||
|
||||
Background: pre-Phase-A extract.py had a `submitted_by` fallback that credited
|
||||
m3taversal as sourcer for every Telegram-ingested source, accumulating to 1011
|
||||
sourcer_count in the contributors table. The actual file-truth count (sourcer
|
||||
frontmatter equal to "m3taversal" in claim files) is 21. The 990-row delta is
|
||||
infrastructure attribution that doesn't reflect content authorship.
|
||||
|
||||
The Phase A event-sourced ledger (contribution_events) computed the correct
|
||||
389.55 CI from author events; /api/leaderboard reads from there directly.
|
||||
But the legacy /api/contributors endpoint reads contributors.claims_merged
|
||||
which carries the inflated 1011. Until that endpoint is deprecated, the
|
||||
divergence shows two different numbers depending on which surface the UI
|
||||
queries.
|
||||
|
||||
This script applies the surgical UPDATE that was run on VPS on 2026-04-27
|
||||
during the leaderboard cutover. Committed as a script per Ganymede review:
|
||||
"DB mutations go through reviewable code paths matters more than the
|
||||
convenience of one-shot SQL. The artifact explains what was done and why."
|
||||
|
||||
Idempotent — safe to re-run. If sourcer_count is already 21, no change.
|
||||
|
||||
Usage:
|
||||
python3 scripts/reset-m3taversal-sourcer.py --dry-run
|
||||
python3 scripts/reset-m3taversal-sourcer.py
|
||||
"""
|
||||
import argparse
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
|
||||
TARGET_HANDLE = "m3taversal"
|
||||
TRUTH_SOURCER_COUNT = 21
|
||||
TRUTH_CLAIMS_MERGED = 21
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
if not Path(DB_PATH).exists():
|
||||
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
conn = sqlite3.connect(DB_PATH, timeout=30)
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
row = conn.execute(
|
||||
"SELECT handle, sourcer_count, claims_merged FROM contributors WHERE handle = ?",
|
||||
(TARGET_HANDLE,),
|
||||
).fetchone()
|
||||
if not row:
|
||||
print(f" No contributors row for {TARGET_HANDLE} — nothing to reset.")
|
||||
return
|
||||
|
||||
print(
|
||||
f" Current: {row['handle']} sourcer_count={row['sourcer_count']} "
|
||||
f"claims_merged={row['claims_merged']}"
|
||||
)
|
||||
print(f" Target: sourcer_count={TRUTH_SOURCER_COUNT} claims_merged={TRUTH_CLAIMS_MERGED}")
|
||||
|
||||
if (row["sourcer_count"] == TRUTH_SOURCER_COUNT
|
||||
and row["claims_merged"] == TRUTH_CLAIMS_MERGED):
|
||||
print(" Already at target values — no-op.")
|
||||
return
|
||||
|
||||
if args.dry_run:
|
||||
print(" (dry-run) UPDATE would be applied. Re-run without --dry-run.")
|
||||
return
|
||||
|
||||
conn.execute(
|
||||
"""UPDATE contributors SET
|
||||
sourcer_count = ?,
|
||||
claims_merged = ?,
|
||||
updated_at = datetime('now')
|
||||
WHERE handle = ?""",
|
||||
(TRUTH_SOURCER_COUNT, TRUTH_CLAIMS_MERGED, TARGET_HANDLE),
|
||||
)
|
||||
conn.execute(
|
||||
"""INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)""",
|
||||
(
|
||||
"manual",
|
||||
"m3taversal_sourcer_reset",
|
||||
(
|
||||
'{"reason":"Pre-Phase-A submitted_by fallback inflated to 1011; '
|
||||
'file-truth is 21","sourcer_count_before":1011,'
|
||||
'"sourcer_count_after":21,"claims_merged_after":21}'
|
||||
),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
after = conn.execute(
|
||||
"SELECT sourcer_count, claims_merged FROM contributors WHERE handle = ?",
|
||||
(TARGET_HANDLE,),
|
||||
).fetchone()
|
||||
print(
|
||||
f" Applied. Now: sourcer_count={after['sourcer_count']} "
|
||||
f"claims_merged={after['claims_merged']}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -20,7 +20,7 @@ from lib import log as logmod
|
|||
from lib.breaker import CircuitBreaker
|
||||
from lib.evaluate import evaluate_cycle
|
||||
from lib.fixer import fix_cycle as mechanical_fix_cycle
|
||||
from lib.substantive_fixer import substantive_fix_cycle
|
||||
from lib.substantive_fixer import substantive_fix_cycle, verdict_deadlock_reaper_cycle
|
||||
from lib.health import start_health_server, stop_health_server
|
||||
from lib.llm import kill_active_subprocesses
|
||||
from lib.merge import merge_cycle
|
||||
|
|
@ -91,14 +91,30 @@ async def ingest_cycle(conn, max_workers=None):
|
|||
|
||||
|
||||
async def fix_cycle(conn, max_workers=None):
|
||||
"""Combined fix stage: mechanical fixes first, then substantive fixes.
|
||||
"""Combined fix stage: mechanical fixes first, then substantive fixes,
|
||||
finally the verdict-deadlock reaper.
|
||||
|
||||
Mechanical (fixer.py): wiki link bracket stripping, $0
|
||||
Substantive (substantive_fixer.py): confidence/title/scope fixes via LLM, $0.001
|
||||
Reaper (substantive_fixer.verdict_deadlock_reaper_cycle): defense-in-depth
|
||||
for stuck-verdict PRs that the substantive fixer can't progress on.
|
||||
Hourly throttle, dry-run by default. Cost $0.
|
||||
"""
|
||||
m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers)
|
||||
s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers)
|
||||
return m_fixed + s_fixed, m_errors + s_errors
|
||||
# Defense-in-depth: reaper exception must never block primary fix paths.
|
||||
# Same exception-isolation pattern as ingest_cycle's extract_cycle wrapper —
|
||||
# propagating would trip the fix breaker and lock out mechanical+substantive
|
||||
# for 15 min after 5 reaper failures.
|
||||
try:
|
||||
r_closed = await verdict_deadlock_reaper_cycle(conn)
|
||||
except Exception:
|
||||
import logging
|
||||
logging.getLogger("pipeline").exception(
|
||||
"Reaper cycle failed (non-fatal)"
|
||||
)
|
||||
r_closed = 0
|
||||
return m_fixed + s_fixed + r_closed, m_errors + s_errors
|
||||
|
||||
|
||||
async def snapshot_cycle(conn, max_workers=None):
|
||||
|
|
|
|||
152
tests/test_activity_classify.py
Normal file
152
tests/test_activity_classify.py
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
"""Tests for diagnostics/activity_endpoint.py classify_pr_operation.
|
||||
|
||||
Covers the Leo gotcha — extract/* branches with commit_type=enrich or
|
||||
challenge classify by commit_type, not branch prefix. Same class of bug
|
||||
as the contributor-role wiring fix.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# diagnostics/ isn't on sys.path by default; add it for these tests.
|
||||
_DIAG = Path(__file__).resolve().parents[1] / "diagnostics"
|
||||
if str(_DIAG) not in sys.path:
|
||||
sys.path.insert(0, str(_DIAG))
|
||||
|
||||
# aiohttp is imported at module load time; skip cleanly if not installed.
|
||||
pytest.importorskip("aiohttp")
|
||||
|
||||
from activity_endpoint import classify_pr_operation # noqa: E402
|
||||
|
||||
|
||||
# ─── Merged PRs: commit_type wins over branch prefix ───────────────────────
|
||||
|
||||
|
||||
def test_extract_branch_legacy_knowledge_classifies_new():
|
||||
assert classify_pr_operation("merged", "knowledge", "extract/foo", None) == "new"
|
||||
|
||||
|
||||
def test_extract_branch_with_enrich_commit_type_classifies_enrich():
|
||||
"""Leo gotcha: extract/* + commit_type=enrich → enrich, not new."""
|
||||
assert classify_pr_operation("merged", "enrich", "extract/foo", None) == "enrich"
|
||||
|
||||
|
||||
def test_extract_branch_with_challenge_commit_type_classifies_challenge():
|
||||
"""Leo gotcha: extract/* + commit_type=challenge → challenge, not new."""
|
||||
assert classify_pr_operation("merged", "challenge", "extract/foo", None) == "challenge"
|
||||
|
||||
|
||||
def test_challenged_by_in_description_classifies_challenge():
|
||||
assert (
|
||||
classify_pr_operation(
|
||||
"merged", "knowledge", "extract/foo", "evidence for challenged_by claim"
|
||||
)
|
||||
== "challenge"
|
||||
)
|
||||
|
||||
|
||||
# ─── Branch prefix fallback (when commit_type is generic) ──────────────────
|
||||
|
||||
|
||||
def test_reweave_branch_classifies_enrich():
|
||||
assert classify_pr_operation("merged", "knowledge", "reweave/batch-1", None) == "enrich"
|
||||
|
||||
|
||||
def test_challenge_branch_classifies_challenge():
|
||||
assert (
|
||||
classify_pr_operation("merged", "knowledge", "challenge/nuclear-moloch", None)
|
||||
== "challenge"
|
||||
)
|
||||
|
||||
|
||||
# ─── Maintenance commit_types → infra ──────────────────────────────────────
|
||||
|
||||
|
||||
def test_fix_commit_type_classifies_infra():
|
||||
assert classify_pr_operation("merged", "fix", "fix/deploy-bug", None) == "infra"
|
||||
|
||||
|
||||
def test_pipeline_commit_type_classifies_infra():
|
||||
assert (
|
||||
classify_pr_operation("merged", "pipeline", "epimetheus/migration-v14", None)
|
||||
== "infra"
|
||||
)
|
||||
|
||||
|
||||
# ─── Knowledge-producing commit_types → new ────────────────────────────────
|
||||
|
||||
|
||||
def test_research_commit_type_classifies_new():
|
||||
assert (
|
||||
classify_pr_operation("merged", "research", "theseus/cornelius-batch-2", None)
|
||||
== "new"
|
||||
)
|
||||
|
||||
|
||||
def test_entity_commit_type_classifies_new():
|
||||
assert classify_pr_operation("merged", "entity", "leo/entities-update", None) == "new"
|
||||
|
||||
|
||||
# ─── Non-merged statuses route through NON_MERGED_STATUS_TO_OPERATION ──────
|
||||
|
||||
|
||||
def test_open_pr_classifies_extract():
|
||||
assert classify_pr_operation("open", None, "extract/foo", None) == "extract"
|
||||
|
||||
|
||||
def test_approved_pr_classifies_new():
|
||||
assert classify_pr_operation("approved", None, "extract/foo", None) == "new"
|
||||
|
||||
|
||||
def test_closed_pr_classifies_infra():
|
||||
assert classify_pr_operation("closed", None, "extract/foo", None) == "infra"
|
||||
|
||||
|
||||
def test_conflict_pr_classifies_challenge():
|
||||
assert classify_pr_operation("conflict", None, "extract/foo", None) == "challenge"
|
||||
|
||||
|
||||
def test_validating_pr_classifies_extract():
|
||||
assert classify_pr_operation("validating", None, "extract/foo", None) == "extract"
|
||||
|
||||
|
||||
def test_reviewing_pr_classifies_extract():
|
||||
assert classify_pr_operation("reviewing", None, "extract/foo", None) == "extract"
|
||||
|
||||
|
||||
def test_merging_pr_classifies_new():
|
||||
assert classify_pr_operation("merging", None, "extract/foo", None) == "new"
|
||||
|
||||
|
||||
def test_zombie_pr_classifies_infra():
|
||||
assert classify_pr_operation("zombie", None, "extract/foo", None) == "infra"
|
||||
|
||||
|
||||
# ─── Priority order: reweave commit_type vs reweave/ branch ─────────────────
|
||||
# Reweave commit_type is in _MAINTENANCE_COMMIT_TYPES (→ infra), but
|
||||
# branch.startswith('reweave/') is checked first (→ enrich). The bifurcation
|
||||
# is real spec behavior — nightly reweave PRs must classify as enrich, not
|
||||
# infra. Locking this in prevents a silent flip on future priority refactors.
|
||||
|
||||
|
||||
def test_reweave_commit_type_with_reweave_branch_classifies_enrich():
|
||||
"""Branch prefix wins over maintenance — reweave PRs are enrich, not infra."""
|
||||
assert classify_pr_operation("merged", "reweave", "reweave/batch-1", None) == "enrich"
|
||||
|
||||
|
||||
def test_reweave_commit_type_without_reweave_branch_classifies_infra():
|
||||
"""Without reweave/ prefix, reweave commit_type falls to maintenance → infra."""
|
||||
assert classify_pr_operation("merged", "reweave", "epimetheus/foo", None) == "infra"
|
||||
|
||||
|
||||
# ─── Defensive cases — null/empty inputs shouldn't crash ───────────────────
|
||||
|
||||
|
||||
def test_null_commit_type_and_branch_classifies_new():
|
||||
assert classify_pr_operation("merged", None, None, None) == "new"
|
||||
|
||||
|
||||
def test_unknown_status_falls_back_to_infra():
|
||||
assert classify_pr_operation("nonsense", None, None, None) == "infra"
|
||||
437
tests/test_leaderboard.py
Normal file
437
tests/test_leaderboard.py
Normal file
|
|
@ -0,0 +1,437 @@
|
|||
"""Tests for /api/leaderboard endpoint (diagnostics/leaderboard_routes.py).
|
||||
|
||||
Locks behavior for the four slicings consumed by Argus + Oberon:
|
||||
- window: all_time | Nd | Nh
|
||||
- domain: per-domain filter
|
||||
- kind: person | agent | org | all
|
||||
- limit: pagination + has_more flag
|
||||
|
||||
Regression coverage includes the AND-prefix SQL bug (commit 42d35d4): _parse_window
|
||||
returned clauses prefixed with 'AND ' which produced 'WHERE 1=1 AND AND ...' when
|
||||
joined into the WHERE clause via " AND ".join(...).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Skip whole file if aiohttp isn't available (matches test_activity_classify.py pattern)
|
||||
aiohttp = pytest.importorskip("aiohttp")
|
||||
|
||||
# Make diagnostics/ importable
|
||||
import sys
|
||||
DIAG_ROOT = Path(__file__).parent.parent / "diagnostics"
|
||||
sys.path.insert(0, str(DIAG_ROOT))
|
||||
|
||||
from leaderboard_routes import ( # noqa: E402
|
||||
_parse_window,
|
||||
handle_leaderboard,
|
||||
KIND_VALUES,
|
||||
LEADERBOARD_PUBLIC_PATHS,
|
||||
)
|
||||
from aiohttp.test_utils import make_mocked_request # noqa: E402
|
||||
|
||||
|
||||
# ─── Schema lifted from lib/db.py:138-209 (v25 minimum) ──────────────────────
|
||||
|
||||
SCHEMA = """
|
||||
CREATE TABLE contributors (
|
||||
handle TEXT PRIMARY KEY,
|
||||
kind TEXT DEFAULT 'person',
|
||||
tier TEXT DEFAULT 'new',
|
||||
claims_merged INTEGER DEFAULT 0,
|
||||
sourcer_count INTEGER DEFAULT 0,
|
||||
extractor_count INTEGER DEFAULT 0,
|
||||
challenger_count INTEGER DEFAULT 0,
|
||||
synthesizer_count INTEGER DEFAULT 0,
|
||||
reviewer_count INTEGER DEFAULT 0,
|
||||
challenges_survived INTEGER DEFAULT 0,
|
||||
domains TEXT DEFAULT '[]',
|
||||
first_contribution TEXT,
|
||||
last_contribution TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE contribution_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
handle TEXT NOT NULL,
|
||||
kind TEXT NOT NULL DEFAULT 'person',
|
||||
role TEXT NOT NULL,
|
||||
weight REAL NOT NULL,
|
||||
pr_number INTEGER NOT NULL,
|
||||
claim_path TEXT,
|
||||
domain TEXT,
|
||||
channel TEXT,
|
||||
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
|
||||
handle, role, pr_number, claim_path
|
||||
) WHERE claim_path IS NOT NULL;
|
||||
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
|
||||
handle, role, pr_number
|
||||
) WHERE claim_path IS NULL;
|
||||
"""
|
||||
|
||||
|
||||
# ─── Fixtures ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db_path(tmp_path):
|
||||
"""Seeded pipeline.db with deterministic events.
|
||||
|
||||
Cohort:
|
||||
- alice (person): 3 author events, 1 originator (recent 3d, internet-finance)
|
||||
- bob (person): 5 author events (older, 60d ago, ai-alignment)
|
||||
- carol (person): 1 author + 1 evaluator (today, internet-finance)
|
||||
- rio (agent): 4 author + 2 evaluator (mixed, internet-finance + grand-strategy)
|
||||
- leo (agent): 8 evaluator events (today, mixed domains)
|
||||
- cnbc (org): 2 originator events (legacy, before classifier moved orgs)
|
||||
- newhandle (no contributors row): 1 author event — tests LEFT JOIN COALESCE
|
||||
"""
|
||||
p = tmp_path / "pipeline.db"
|
||||
conn = sqlite3.connect(str(p))
|
||||
conn.executescript(SCHEMA)
|
||||
|
||||
contribs = [
|
||||
("alice", "person"),
|
||||
("bob", "person"),
|
||||
("carol", "person"),
|
||||
("rio", "agent"),
|
||||
("leo", "agent"),
|
||||
("cnbc", "org"),
|
||||
# newhandle intentionally absent — tests LEFT JOIN
|
||||
]
|
||||
for handle, kind in contribs:
|
||||
conn.execute(
|
||||
"INSERT INTO contributors (handle, kind) VALUES (?, ?)",
|
||||
(handle, kind),
|
||||
)
|
||||
|
||||
# (handle, role, weight, pr_number, claim_path, domain, timestamp)
|
||||
events = [
|
||||
# alice — 3 author + 1 originator, recent (all >24h ago, all <7d)
|
||||
# Most-recent event at -2 days (not -1 days) so 24h window exclusion is
|
||||
# unambiguous and not subject to fixture-vs-query microsecond drift.
|
||||
("alice", "author", 0.30, 100, None, "internet-finance", "now,-2 days"),
|
||||
("alice", "author", 0.30, 101, None, "internet-finance", "now,-2 days"),
|
||||
("alice", "author", 0.30, 102, None, "ai-alignment", "now,-3 days"),
|
||||
("alice", "originator", 0.15, 103, "domains/internet-finance/x.md", "internet-finance", "now,-2 days"),
|
||||
# bob — 5 author, all 60d ago (outside 30d, inside all_time)
|
||||
("bob", "author", 0.30, 200, None, "ai-alignment", "now,-60 days"),
|
||||
("bob", "author", 0.30, 201, None, "ai-alignment", "now,-60 days"),
|
||||
("bob", "author", 0.30, 202, None, "ai-alignment", "now,-61 days"),
|
||||
("bob", "author", 0.30, 203, None, "ai-alignment", "now,-62 days"),
|
||||
("bob", "author", 0.30, 204, None, "ai-alignment", "now,-63 days"),
|
||||
# carol — 1 author + 1 evaluator, today
|
||||
("carol", "author", 0.30, 300, None, "internet-finance", "now"),
|
||||
("carol", "evaluator", 0.05, 301, None, "internet-finance", "now"),
|
||||
# rio agent — 4 author + 2 evaluator
|
||||
("rio", "author", 0.30, 400, None, "internet-finance", "now,-2 days"),
|
||||
("rio", "author", 0.30, 401, None, "grand-strategy", "now,-2 days"),
|
||||
("rio", "author", 0.30, 402, None, "internet-finance", "now,-2 days"),
|
||||
("rio", "author", 0.30, 403, None, "internet-finance", "now,-2 days"),
|
||||
("rio", "evaluator", 0.05, 404, None, "ai-alignment", "now,-2 days"),
|
||||
("rio", "evaluator", 0.05, 405, None, "ai-alignment", "now,-2 days"),
|
||||
# leo agent — 8 evaluator
|
||||
*[
|
||||
("leo", "evaluator", 0.05, 500 + i, None, "internet-finance" if i % 2 == 0 else "ai-alignment", "now")
|
||||
for i in range(8)
|
||||
],
|
||||
# cnbc org — 2 originator (legacy data, kept by classifier+gate split)
|
||||
("cnbc", "originator", 0.15, 600, "domains/internet-finance/y.md", "internet-finance", "now,-5 days"),
|
||||
("cnbc", "originator", 0.15, 601, "domains/internet-finance/z.md", "internet-finance", "now,-5 days"),
|
||||
# newhandle — handle in events but no contributors row (LEFT JOIN COALESCE → person)
|
||||
# -2 days so 24h-window test exclusion is unambiguous (matches alice).
|
||||
("newhandle", "author", 0.30, 700, None, "ai-alignment", "now,-2 days"),
|
||||
]
|
||||
for handle, role, weight, pr_num, claim_path, domain, ts_modifier in events:
|
||||
# Use SQLite datetime() to compute timestamps relative to "now" so tests
|
||||
# are deterministic across days. Multi-arg form: datetime('now', '-1 days').
|
||||
ts_args = ts_modifier.split(",")
|
||||
if len(ts_args) == 1:
|
||||
ts_sql = f"datetime('{ts_args[0]}')"
|
||||
else:
|
||||
ts_sql = f"datetime('{ts_args[0]}', '{ts_args[1].strip()}')"
|
||||
conn.execute(
|
||||
f"""INSERT INTO contribution_events
|
||||
(handle, kind, role, weight, pr_number, claim_path, domain, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, {ts_sql})""",
|
||||
(handle, "agent" if handle in {"rio", "leo"} else "person",
|
||||
role, weight, pr_num, claim_path, domain),
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return str(p)
|
||||
|
||||
|
||||
def _call(db_path, **query):
|
||||
"""Build a mocked request, call handle_leaderboard, return parsed JSON."""
|
||||
qs = "&".join(f"{k}={v}" for k, v in query.items())
|
||||
req = make_mocked_request("GET", f"/api/leaderboard?{qs}")
|
||||
# make_mocked_request gives us req.app — write db_path into it.
|
||||
req.app["db_path"] = db_path
|
||||
response = asyncio.run(handle_leaderboard(req))
|
||||
return json.loads(response.body.decode())
|
||||
|
||||
|
||||
# ─── _parse_window unit tests ────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestParseWindow:
|
||||
def test_default_is_all_time(self):
|
||||
clause, params, label = _parse_window(None)
|
||||
assert clause == ""
|
||||
assert params == ()
|
||||
assert label == "all_time"
|
||||
|
||||
def test_explicit_all_time(self):
|
||||
clause, params, label = _parse_window("all_time")
|
||||
assert clause == ""
|
||||
assert label == "all_time"
|
||||
|
||||
def test_seven_days(self):
|
||||
clause, params, label = _parse_window("7d")
|
||||
assert clause == "ce.timestamp >= datetime('now', ?)"
|
||||
assert params == ("-7 days",)
|
||||
assert label == "7d"
|
||||
# Regression: must NOT begin with "AND " (handle_leaderboard composes via " AND ".join)
|
||||
assert not clause.startswith("AND")
|
||||
|
||||
def test_thirty_days(self):
|
||||
clause, params, label = _parse_window("30d")
|
||||
assert params == ("-30 days",)
|
||||
assert label == "30d"
|
||||
|
||||
def test_hours(self):
|
||||
clause, params, label = _parse_window("24h")
|
||||
assert clause == "ce.timestamp >= datetime('now', ?)"
|
||||
assert params == ("-24 hours",)
|
||||
assert label == "24h"
|
||||
|
||||
def test_caps_days_at_365(self):
|
||||
clause, params, label = _parse_window("9999d")
|
||||
assert params == ("-365 days",)
|
||||
|
||||
def test_caps_hours_at_8760(self):
|
||||
clause, params, label = _parse_window("99999h")
|
||||
assert params == ("-8760 hours",)
|
||||
|
||||
def test_garbage_falls_to_all_time(self):
|
||||
clause, params, label = _parse_window("foobar")
|
||||
assert clause == ""
|
||||
assert label == "all_time"
|
||||
|
||||
def test_uppercase_normalized(self):
|
||||
clause, params, label = _parse_window("7D")
|
||||
assert label == "7d"
|
||||
|
||||
def test_zero_days_still_emits_clause(self):
|
||||
# 0d means "now or later" — empty result, but parse should succeed
|
||||
clause, params, label = _parse_window("0d")
|
||||
assert "datetime" in clause
|
||||
assert label == "0d"
|
||||
|
||||
|
||||
# ─── handle_leaderboard integration tests ────────────────────────────────────
|
||||
|
||||
|
||||
class TestLeaderboardEndpoint:
|
||||
def test_all_time_default_kind_person(self, db_path):
|
||||
"""Default kind is 'person'. Returns all persons, sorted by CI desc."""
|
||||
body = _call(db_path)
|
||||
assert body["window"] == "all_time"
|
||||
assert body["kind_filter"] == "person"
|
||||
assert body["domain"] is None
|
||||
assert body["source"] == "contribution_events"
|
||||
# alice 3*0.30 + 0.15 = 1.05
|
||||
# bob 5*0.30 = 1.50
|
||||
# carol 0.30 + 0.05 = 0.35
|
||||
# newhandle 0.30 (LEFT JOIN COALESCE → 'person')
|
||||
# cnbc excluded (kind='org')
|
||||
# rio/leo excluded (kind='agent')
|
||||
handles = [r["handle"] for r in body["leaderboard"]]
|
||||
assert "bob" in handles
|
||||
assert "alice" in handles
|
||||
assert "newhandle" in handles, "LEFT JOIN COALESCE should default missing contributors to 'person'"
|
||||
assert "cnbc" not in handles, "kind=person should exclude orgs"
|
||||
assert "rio" not in handles, "kind=person should exclude agents"
|
||||
# Descending by CI
|
||||
cis = [r["ci"] for r in body["leaderboard"]]
|
||||
assert cis == sorted(cis, reverse=True)
|
||||
|
||||
def test_window_7d_excludes_old_events(self, db_path):
|
||||
"""REGRESSION: 7d window must execute (no AND-prefix SQL error).
|
||||
|
||||
Bob has all events 60d ago → must not appear in 7d window.
|
||||
Alice has events 1-3d ago → must appear.
|
||||
"""
|
||||
body = _call(db_path, window="7d")
|
||||
assert body["window"] == "7d"
|
||||
handles = [r["handle"] for r in body["leaderboard"]]
|
||||
assert "alice" in handles
|
||||
assert "bob" not in handles, "60d-old events must be excluded from 7d window"
|
||||
assert "carol" in handles # today
|
||||
|
||||
def test_window_30d_excludes_60d_events(self, db_path):
|
||||
"""REGRESSION: 30d window must execute. Bob (60d) excluded; alice/carol included."""
|
||||
body = _call(db_path, window="30d")
|
||||
assert body["window"] == "30d"
|
||||
handles = [r["handle"] for r in body["leaderboard"]]
|
||||
assert "alice" in handles
|
||||
assert "carol" in handles
|
||||
assert "bob" not in handles
|
||||
|
||||
def test_window_24h_only_today(self, db_path):
|
||||
"""24h window picks up today's events only.
|
||||
|
||||
Default kind=person. Within 24h: only carol (events at 'now').
|
||||
Excluded: alice/newhandle (events at -2 days), bob (-60d), rio/leo (kind),
|
||||
cnbc (-5d AND kind=org).
|
||||
"""
|
||||
body = _call(db_path, window="24h")
|
||||
handles = [r["handle"] for r in body["leaderboard"]]
|
||||
assert handles == ["carol"], (
|
||||
"24h + kind=person should return only carol; got %r" % handles
|
||||
)
|
||||
|
||||
def test_kind_agent(self, db_path):
|
||||
"""kind=agent returns only agents."""
|
||||
body = _call(db_path, kind="agent")
|
||||
handles = [r["handle"] for r in body["leaderboard"]]
|
||||
assert "rio" in handles
|
||||
assert "leo" in handles
|
||||
assert "alice" not in handles
|
||||
assert "bob" not in handles
|
||||
|
||||
def test_kind_org(self, db_path):
|
||||
"""kind=org returns only orgs (legacy events still queryable)."""
|
||||
body = _call(db_path, kind="org")
|
||||
handles = [r["handle"] for r in body["leaderboard"]]
|
||||
assert handles == ["cnbc"]
|
||||
assert body["leaderboard"][0]["ci"] == 0.30 # 2 * 0.15
|
||||
|
||||
def test_kind_all_returns_everyone(self, db_path):
|
||||
"""kind=all returns all kinds — persons + agents + orgs."""
|
||||
body = _call(db_path, kind="all")
|
||||
handles = {r["handle"] for r in body["leaderboard"]}
|
||||
assert handles == {"alice", "bob", "carol", "rio", "leo", "cnbc", "newhandle"}
|
||||
|
||||
def test_invalid_kind_falls_to_person(self, db_path):
|
||||
"""Defensive: unknown kind value silently falls back to 'person'."""
|
||||
body = _call(db_path, kind="bogus")
|
||||
assert body["kind_filter"] == "person"
|
||||
|
||||
def test_domain_filter(self, db_path):
|
||||
"""domain=internet-finance scopes events; kind filter still applies."""
|
||||
body = _call(db_path, domain="internet-finance")
|
||||
assert body["domain"] == "internet-finance"
|
||||
handles = {r["handle"] for r in body["leaderboard"]}
|
||||
# alice has 2 internet-finance authors + 1 originator
|
||||
# carol has 1 internet-finance author + 1 evaluator
|
||||
# bob has 0 (all ai-alignment)
|
||||
# newhandle has 0 (ai-alignment only)
|
||||
assert "alice" in handles
|
||||
assert "carol" in handles
|
||||
assert "bob" not in handles
|
||||
assert "newhandle" not in handles
|
||||
|
||||
def test_composed_window_kind_domain(self, db_path):
|
||||
"""REGRESSION: composed filters must build SQL correctly.
|
||||
|
||||
7d + person + internet-finance — alice only.
|
||||
"""
|
||||
body = _call(db_path, window="7d", kind="person", domain="internet-finance")
|
||||
handles = [r["handle"] for r in body["leaderboard"]]
|
||||
assert "alice" in handles
|
||||
assert "carol" in handles
|
||||
assert "bob" not in handles # excluded by 7d
|
||||
assert "rio" not in handles # excluded by kind=person
|
||||
|
||||
def test_limit_caps_results(self, db_path):
|
||||
"""limit caps the leaderboard slice; total reflects unfiltered count."""
|
||||
body = _call(db_path, kind="all", limit=3)
|
||||
assert body["shown"] == 3
|
||||
assert body["has_more"] is True
|
||||
assert body["total"] == 7
|
||||
|
||||
def test_no_has_more_when_under_limit(self, db_path):
|
||||
body = _call(db_path, kind="org")
|
||||
assert body["shown"] == 1
|
||||
assert body["has_more"] is False
|
||||
assert body["total"] == 1
|
||||
|
||||
def test_invalid_limit_falls_to_default(self, db_path):
|
||||
"""Defensive: garbage limit param falls to default 100. 7 entries < 100."""
|
||||
body = _call(db_path, kind="all", limit="not-a-number")
|
||||
assert body["shown"] == 7
|
||||
assert body["has_more"] is False
|
||||
|
||||
def test_limit_capped_at_500(self, db_path):
|
||||
"""Defensive: limit > 500 silently caps at 500."""
|
||||
body = _call(db_path, limit=99999, kind="all")
|
||||
# No assertion on the value of the cap from the response — just that
|
||||
# it doesn't error and shown <= 500.
|
||||
assert body["shown"] <= 500
|
||||
|
||||
def test_role_breakdown_present(self, db_path):
|
||||
"""Each row includes ci_breakdown with all 5 roles."""
|
||||
body = _call(db_path)
|
||||
for entry in body["leaderboard"]:
|
||||
assert set(entry["ci_breakdown"].keys()) == {
|
||||
"author", "challenger", "synthesizer", "originator", "evaluator",
|
||||
}
|
||||
|
||||
def test_alice_role_breakdown_correct(self, db_path):
|
||||
"""Alice has 3 author (0.90) + 1 originator (0.15) = 1.05 total."""
|
||||
body = _call(db_path)
|
||||
alice = next(r for r in body["leaderboard"] if r["handle"] == "alice")
|
||||
assert alice["ci"] == 1.05
|
||||
assert alice["ci_breakdown"]["author"] == 0.90
|
||||
assert alice["ci_breakdown"]["originator"] == 0.15
|
||||
assert alice["ci_breakdown"]["challenger"] == 0
|
||||
assert alice["ci_breakdown"]["synthesizer"] == 0
|
||||
assert alice["ci_breakdown"]["evaluator"] == 0
|
||||
assert alice["events_count"] == 4
|
||||
assert alice["pr_count"] == 4
|
||||
assert alice["domain_count"] == 2 # internet-finance + ai-alignment
|
||||
|
||||
def test_empty_window_returns_clean_response(self, db_path):
|
||||
"""Window with no matching events returns shape-correct empty response."""
|
||||
# 24h window + kind=org → cnbc is 5d ago, so empty
|
||||
body = _call(db_path, window="24h", kind="org")
|
||||
assert body["leaderboard"] == []
|
||||
assert body["total"] == 0
|
||||
assert body["shown"] == 0
|
||||
assert body["has_more"] is False
|
||||
assert body["source"] == "contribution_events"
|
||||
|
||||
def test_left_join_handles_missing_contributors_row(self, db_path):
|
||||
"""REGRESSION: handle in events but missing from contributors must default to kind='person'.
|
||||
|
||||
Catches the failure mode where a handle classified as cited (auto-create
|
||||
deferred to Branch 3) accumulates events but has no contributors row yet.
|
||||
"""
|
||||
body = _call(db_path)
|
||||
newhandle_row = next(
|
||||
(r for r in body["leaderboard"] if r["handle"] == "newhandle"), None
|
||||
)
|
||||
assert newhandle_row is not None
|
||||
assert newhandle_row["kind"] == "person"
|
||||
assert newhandle_row["ci"] == 0.30
|
||||
|
||||
|
||||
# ─── Public path constant (auth middleware bypass) ───────────────────────────
|
||||
|
||||
|
||||
def test_public_paths_includes_leaderboard():
|
||||
"""Auth middleware needs LEADERBOARD_PUBLIC_PATHS to skip API key for /api/leaderboard."""
|
||||
assert "/api/leaderboard" in LEADERBOARD_PUBLIC_PATHS
|
||||
|
||||
|
||||
def test_kind_values_matches_contract():
|
||||
"""API contract: only these 4 kind values are accepted."""
|
||||
assert set(KIND_VALUES) == {"person", "agent", "org", "all"}
|
||||
167
tests/test_research_backfill_idempotent.py
Normal file
167
tests/test_research_backfill_idempotent.py
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
"""Verify research-attribution backfill is replay-safe against real schema.
|
||||
|
||||
Three things to prove:
|
||||
1. (handle, role, pr_number) with claim_path=NULL deduplicates correctly
|
||||
(idx_ce_unique_pr partial index handles SQLite NULL-not-equal-NULL).
|
||||
2. Re-inserting an existing (handle, role, pr_number, NULL) row via INSERT OR IGNORE
|
||||
is a true no-op — does not create a phantom duplicate.
|
||||
3. The backfill script's specific operation (DELETE then INSERT for same key)
|
||||
nets zero rows when run twice in sequence.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import sys
|
||||
|
||||
# Schema lifted verbatim from lib/db.py:181-209
|
||||
SCHEMA = """
|
||||
CREATE TABLE contribution_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
handle TEXT NOT NULL,
|
||||
kind TEXT NOT NULL DEFAULT 'person',
|
||||
role TEXT NOT NULL,
|
||||
weight REAL NOT NULL,
|
||||
pr_number INTEGER NOT NULL,
|
||||
claim_path TEXT,
|
||||
domain TEXT,
|
||||
channel TEXT,
|
||||
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
|
||||
handle, role, pr_number, claim_path
|
||||
) WHERE claim_path IS NOT NULL;
|
||||
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
|
||||
handle, role, pr_number
|
||||
) WHERE claim_path IS NULL;
|
||||
"""
|
||||
|
||||
|
||||
def setup() -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(":memory:")
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.executescript(SCHEMA)
|
||||
return conn
|
||||
|
||||
|
||||
def insert_event(conn, handle, role, pr_number, claim_path=None):
|
||||
cur = conn.execute(
|
||||
"""INSERT OR IGNORE INTO contribution_events
|
||||
(handle, kind, role, weight, pr_number, claim_path)
|
||||
VALUES (?, 'agent', ?, 0.30, ?, ?)""",
|
||||
(handle, role, pr_number, claim_path),
|
||||
)
|
||||
return cur.rowcount
|
||||
|
||||
|
||||
def count(conn) -> int:
|
||||
return conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
|
||||
|
||||
|
||||
def test_pr_level_dedup_with_null_claim_path():
|
||||
"""Two inserts of same (handle, role, pr_number, NULL) → 1 row."""
|
||||
conn = setup()
|
||||
r1 = insert_event(conn, "rio", "author", 4061)
|
||||
r2 = insert_event(conn, "rio", "author", 4061)
|
||||
n = count(conn)
|
||||
assert r1 == 1, f"first insert should write, got rowcount={r1}"
|
||||
assert r2 == 0, f"second insert should be ignored, got rowcount={r2}"
|
||||
assert n == 1, f"expected 1 row, got {n}"
|
||||
print("PASS: pr-level dedup with NULL claim_path")
|
||||
|
||||
|
||||
def test_per_claim_dedup_with_path():
|
||||
"""Two inserts of same (handle, role, pr_number, path) → 1 row."""
|
||||
conn = setup()
|
||||
r1 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
|
||||
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
|
||||
n = count(conn)
|
||||
assert r1 == 1 and r2 == 0 and n == 1
|
||||
print("PASS: per-claim dedup with claim_path")
|
||||
|
||||
|
||||
def test_pr_level_and_per_claim_coexist():
|
||||
"""A (handle, role, pr_number, NULL) and (handle, role, pr_number, 'x.md') coexist
|
||||
because the partial indexes target different rows."""
|
||||
conn = setup()
|
||||
r1 = insert_event(conn, "rio", "author", 4061, claim_path=None)
|
||||
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
|
||||
n = count(conn)
|
||||
assert r1 == 1 and r2 == 1 and n == 2
|
||||
print("PASS: pr-level and per-claim events coexist on same pr_number")
|
||||
|
||||
|
||||
def test_backfill_replay_is_noop():
|
||||
"""Simulate the exact backfill operation: INSERT correct event, DELETE wrong event.
|
||||
Run twice. Expect identical state — no phantom rows, no double-deletions."""
|
||||
conn = setup()
|
||||
|
||||
# Initial state: m3taversal has the wrong author event for pr=4061
|
||||
insert_event(conn, "m3taversal", "author", 4061)
|
||||
assert count(conn) == 1
|
||||
|
||||
def backfill_pr_4061():
|
||||
# Insert the correct event (rio is the real author)
|
||||
conn.execute(
|
||||
"""INSERT OR IGNORE INTO contribution_events
|
||||
(handle, kind, role, weight, pr_number, claim_path)
|
||||
VALUES (?, 'agent', 'author', 0.30, 4061, NULL)""",
|
||||
("rio (self-directed)",),
|
||||
)
|
||||
# Delete the wrong event
|
||||
conn.execute(
|
||||
"""DELETE FROM contribution_events
|
||||
WHERE handle='m3taversal' AND role='author'
|
||||
AND pr_number=4061 AND claim_path IS NULL""",
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
backfill_pr_4061()
|
||||
state_after_first = sorted(
|
||||
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
|
||||
for r in conn.execute("SELECT * FROM contribution_events")
|
||||
)
|
||||
assert state_after_first == [("rio (self-directed)", "author", 4061, None)], state_after_first
|
||||
|
||||
# Replay
|
||||
backfill_pr_4061()
|
||||
state_after_second = sorted(
|
||||
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
|
||||
for r in conn.execute("SELECT * FROM contribution_events")
|
||||
)
|
||||
assert state_after_first == state_after_second, "replay should be idempotent"
|
||||
assert count(conn) == 1, f"expected 1 row after replay, got {count(conn)}"
|
||||
print("PASS: backfill replay is a true no-op")
|
||||
|
||||
|
||||
def test_replay_against_already_backfilled_pr_does_not_double_delete():
|
||||
"""If m3taversal event was already deleted, running backfill again must not error
|
||||
or affect anything else."""
|
||||
conn = setup()
|
||||
# Already-correct state: rio has the author event, m3taversal does not
|
||||
insert_event(conn, "rio (self-directed)", "author", 4061)
|
||||
insert_event(conn, "leo", "evaluator", 4061) # noise — should not be touched
|
||||
|
||||
# Run backfill: tries to INSERT (rio, author, 4061) — already exists, no-op
|
||||
# Tries to DELETE (m3taversal, author, 4061) — already absent, 0 rows affected
|
||||
cur1 = conn.execute(
|
||||
"""INSERT OR IGNORE INTO contribution_events
|
||||
(handle, kind, role, weight, pr_number, claim_path)
|
||||
VALUES ('rio (self-directed)', 'agent', 'author', 0.30, 4061, NULL)""",
|
||||
)
|
||||
cur2 = conn.execute(
|
||||
"""DELETE FROM contribution_events
|
||||
WHERE handle='m3taversal' AND role='author'
|
||||
AND pr_number=4061 AND claim_path IS NULL""",
|
||||
)
|
||||
assert cur1.rowcount == 0, f"insert should be no-op, got {cur1.rowcount}"
|
||||
assert cur2.rowcount == 0, f"delete should be no-op, got {cur2.rowcount}"
|
||||
assert count(conn) == 2, f"expected 2 rows preserved, got {count(conn)}"
|
||||
print("PASS: replay against already-backfilled state preserves unrelated events")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_pr_level_dedup_with_null_claim_path()
|
||||
test_per_claim_dedup_with_path()
|
||||
test_pr_level_and_per_claim_coexist()
|
||||
test_backfill_replay_is_noop()
|
||||
test_replay_against_already_backfilled_pr_does_not_double_delete()
|
||||
print("\nAll 5 tests passed against real schema.")
|
||||
Loading…
Reference in a new issue