Compare commits

..

49 commits

Author SHA1 Message Date
aaab659900 Merge pull request 'fix(activity-feed): emit Forgejo pr_url fallback so every event has a clickthrough' (#12) from fix/forgejo-pr-url-fallback into main
Some checks failed
CI / lint-and-test (push) Has been cancelled
Reviewed-on: #12
2026-05-13 04:40:38 +00:00
Teleo Agents
e78308862a fix(activity-feed): emit Forgejo pr_url fallback so every event has a clickthrough
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Previously _github_pr_url() only returned a URL when prs.github_pr was
populated. That field is set on only 3 of 4094 merged PRs (the rare cases
mirrored to the public GitHub repo), so pr_url was null for ~100% of the
feed. The frontend whole-row PR overlay (livingip-web PR #30) renders
only when pr_url is non-null, so until now no rows had the overlay.

Pipeline-attributed events (reweave/*, ingestion/*) are the most visible
victim: their /contributors/pipeline link lands on a sparse stub, with
no way to reach the actual commit/PR they refer to.

Fix: rename _github_pr_url -> _pr_url and fall back to the canonical
Forgejo URL (git.livingip.xyz/teleo/teleo-codex/pulls/{number}) when no
GitHub mirror exists. Verified 200 OK against a sample (#10568). GitHub
URL still wins when available.

Result: 1972/1972 events in _build_events now carry a pr_url. Whole-row
overlay starts working for everything including pipeline events.
2026-05-13 04:29:54 +00:00
a54f52234a Merge pull request 'fix(attribution): classify submitted_by by branch prefix at PR discovery' (#11) from fix/reattribute-by-branch-prefix into main
Some checks are pending
CI / lint-and-test (push) Waiting to run
Reviewed-on: #11
2026-05-13 03:57:04 +00:00
Teleo Agents
c9515c770a fix(attribution): classify submitted_by by branch prefix at PR discovery
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
reweave.py and ingestion run as the operator Forgejo token, so the prior
opener-based classifier set submitted_by=m3taversal for every system
maintenance PR. backfill_submitted_by.py never overrides non-NULL rows,
so this misattribution accumulated: ~2,748 reweave/ingestion PRs and
~3,706 <agent>/ research/entity PRs were credited to the operator on
the leaderboard and contribution_events table.

Two parts:

1. lib/merge.py: at PR discovery, classify by branch prefix first.
     reweave/, ingestion/             -> submitted_by = 'pipeline'
     <agent>/ (per _AGENT_NAMES)      -> submitted_by = '<agent>'
     otherwise human                  -> submitted_by = author.lower()
     otherwise pipeline               -> submitted_by = None
                                         (extract.py sets from proposed_by)
   Origin flag updated so domain detection and priority still fire for
   branch-classified pipeline PRs. Human PRs lowercased to maintain the
   canonical-handle contract enforced in PR #9.

2. scripts/reattribute-by-branch-prefix.py: historical cleanup.
   Per affected PR (atomic):
     - UPDATE prs.submitted_by  -> target
     - UPDATE sources.submitted_by where source_path matches
     - UPDATE contribution_events handle ('m3taversal',role='author')
       -> target, kind='agent'. Collision (target already has author
       event for PR) deletes the m3ta row; target wins.

   Scope is deliberately conservative: extract/ branches stay attributed
   to m3taversal because proposed_by-missing legitimately defaults to the
   operator (telegram drops). Only reweave/, ingestion/, and <agent>/.

   Dry-run shows 6,454 PRs + 284 events to move. Pre-flight collision
   query returns 0; pre-flight kind check confirms m3ta has only role=author
   events on this set (no challenger/synthesizer/evaluator).

   Idempotent. Dry-run by default. Run with --apply after deploy + DB
   snapshot.
2026-05-13 03:49:10 +00:00
3dca3aab5f Merge pull request 'docs: rewrite public README' (#8) from ship/readme-public-rewrite into main
Some checks are pending
CI / lint-and-test (push) Waiting to run
Reviewed-on: #8
2026-05-13 03:20:02 +00:00
2ee9dd5150 Merge pull request 'fix(activity-feed): canonicalize contributor handle so profile links resolve' (#9) from fix/activity-feed-canonical-handle into main
Some checks are pending
CI / lint-and-test (push) Waiting to run
Reviewed-on: #9
2026-05-13 03:19:41 +00:00
b29ec95dd8 Merge pull request 'fix(attribution): canonicalize submitted_by at write time + historical normalizer' (#10) from fix/canonicalize-submitted-by into main
Some checks are pending
CI / lint-and-test (push) Waiting to run
Reviewed-on: #10
2026-05-13 03:19:27 +00:00
Teleo Agents
74bf0461e8 fix(attribution): canonicalize submitted_by at write time + historical normalizer
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Companion / write-side fix to fix/activity-feed-canonical-handle.

The activity-feed canonicalization was a read-side guard. The bug at the
source is that extract.py and two backfill scripts write decorated
strings (Vida (self-directed), pipeline (reweave), @m3taversal) into
prs.submitted_by and sources.submitted_by. Downstream readers
(lib.contributor.insert_contribution_event, scripts/scoring_digest,
diagnostics/activity_feed_api) all strip the decorator on read — but
anything that reads the column verbatim (like /api/activity-feed before
the read-side fix) 404s on /contributors/{decorated-handle}.

Stop writing the decorator. The self-directed signal is already carried
by intake_tier == research-task plus the prs.agent column; the suffix
is redundant string noise that costs us correctness at every consumer
that forgets to strip.

Changes:

- lib/extract.py:690 — write canonical handle via attribution.normalize_handle.
  Direct elif for intake_tier == research-task now stores just agent_name.
  @m3taversal -> m3taversal.

- diagnostics/backfill_submitted_by.py — same fix in two branches plus
  the reweave branch (pipeline (reweave) -> pipeline).

- scripts/backfill-research-session-attribution.py — UPDATE prs sets
  agent handle alone, no suffix. Docstring + log line updated.

- scripts/normalize-submitted-by.py (new) — one-time backfill that
  canonicalizes existing prs.submitted_by and sources.submitted_by rows.
  Strips trailing parenthetical decorators, lowercases, drops @. Defaults
  to dry-run; --apply to commit. Skips rows that would normalize to
  invalid handles (no garbage falls through silently).

Dry-run against live pipeline.db:
  prs:     3008 rows need normalization (clean mappings, 0 invalid)
  sources: 730 rows need normalization (clean mappings, 0 invalid)
  Total:   3738 rows. All map to existing handle column values.

After this lands + auto-deploys, the operator should run
  python3 scripts/normalize-submitted-by.py --apply
once to clean historical rows. The read-side canonicalization in
diagnostics/activity_feed_api.py (fix/activity-feed-canonical-handle)
becomes redundant defense-in-depth instead of load-bearing.

No KB writes.
2026-05-13 02:56:50 +00:00
Teleo Agents
01097da22c fix(activity-feed): canonicalize contributor handle so profile links resolve
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
The activity feed was returning decorated strings like "Vida (self-directed)"
and "@m3taversal" in the contributor field. The frontend uses that field as
both display label and routing handle, so /contributors/Vida%20(self-directed)
404s — Next fires notFound() in [handle]/page.tsx.

Root cause: _normalize_contributor only stripped @ and whitespace; it did not
lowercase or strip the " (self-directed)" suffix that extract.py and the
older backfill_submitted_by.py wrote into prs.submitted_by. Mixed-case
agent names (Vida vs vida) and pipeline decorators ("pipeline (reweave)")
both fell through.

Fix: lowercase + strip any trailing parenthetical decorator. Valid handles
match ^[a-z0-9][a-z0-9_-]{0,38}$ per attribution._HANDLE_RE and cannot
contain parens, so the strip is lossless.

DB simulation against 3612 merged-PR events: 0 orphan handles after
normalization (was 12 orphan label-variants before).

No KB writes — pure read-side normalization in the API layer.
2026-05-13 02:39:18 +00:00
6c66da33e4 feat(activity-feed): add pr_url field for GitHub PR clickthrough
Some checks failed
CI / lint-and-test (push) Has been cancelled
2026-05-11 20:58:36 -04:00
c3f2010a42 feat(activity-feed): add kind + target_url, fix research-session pseudo-slugs
Some checks are pending
CI / lint-and-test (push) Waiting to run
The /api/activity-feed event shape didn't give the frontend a reliable
clickability signal. Two failure modes:

1. Source-archive events (extract/* PRs that filed a paper into
   inbox/archive/ but didn't extract a claim) returned claim_slug="".
   Frontend rendered <Link href="/claims/"> which Next normalized to
   /claims and redirected to /knowledge-base. Wrong page.

2. Research/entity session commits (e.g. astra/research-2026-05-11)
   with empty descriptions fell through to "create" classification with
   a pseudo-slug like research-2026-05-11. Frontend rendered
   /claims/research-2026-05-11 -> 404.

Fix:

- Add `kind` enum (canonical): claim_merged | claim_enriched |
  claim_challenged | source_archived | session_digest. Replaces the
  internal `type` for downstream consumers; `type` kept populated for
  in-flight callers during migration.

- Add `target_url`: explicit clickability signal. Frontend renders
  <Link> when non-null, <span> when null. No special-casing needed.
    * claim_* events -> /claims/{slug}
    * source_archived -> Forgejo blob URL at inbox/archive/{domain}/{slug}.md
    * session_digest -> null (no clickthrough surface yet)

- Detect research/entity commits with empty descriptions as
  session_digest in _classify_event, instead of synthesizing a phantom
  create event with a date-shaped pseudo-slug.

- type filter accepts both legacy `type` and new `kind` values so
  callers migrate at their own pace.

Verified live: source events resolve to inbox/archive/{domain}/...
Forgejo URLs, session-digest rows return target_url=null,
claim_merged events keep /claims/{slug} unchanged.
2026-05-11 12:36:25 +01:00
ed4893e837 fix(claims): unwrap ```markdown code fences + 404 for fragments
Some checks are pending
CI / lint-and-test (push) Waiting to run
Two issues Ship hit on the Montreal Protocol claim:

1. 500 on canonical stem lookup. File starts with ```markdown wrapper
   instead of bare --- frontmatter delimiter. _split_frontmatter checked
   startswith("---") and bailed, returning "frontmatter parse failed".
   Same wrapper exists on 6 other claim files (audit grep). Now strip
   the wrapper before frontmatter detection.

2. 404 on long activity-feed slug. Same root cause — _build_indexes
   couldn't read the file's title from frontmatter, so by_title never
   indexed it, so title-fallback resolution had nothing to match against.
   Both bugs collapse once we unwrap.

Also: switched "file exists but has no frontmatter" from 500 to 404 with
reason=file_no_frontmatter. These are stray enrichment fragments living
in domains/ that never got merged into a parent claim. From the API
caller's perspective there's no claim at that slug — 500 implied
"server bug, retry later" which isn't actionable.

Verified: 3/3 wrapped claims (montreal, medicare, dod) now return 200
warm-cache ~13ms. Long-slug repro (montreal) resolves via title fallback
to canonical stem. Negative test (nonsense slug) still 404.
2026-05-11 12:02:54 +01:00
73880e138d fix(claims): resolve long activity-feed slugs to canonical file stems
Some checks are pending
CI / lint-and-test (push) Waiting to run
Activity feed emits slugs derived from PR description (the slugified claim
title), which can be longer than the on-disk file stem (agents pick shorter
hand-chosen filenames). Pure exact-stem lookup 404s on those.

Three-tier resolution in handle_claim_detail:
1. Exact stem match (existing behavior)
2. Title fallback: normalize requested slug, look up via by_title index
   (already populated from frontmatter title during _build_indexes)
3. Prefix fallback: longest common prefix among stems, anchored at 32 chars
   to prevent spurious hits

Response slug returns the canonical on-disk stem so frontend share-links
and caches converge to one form.

Repro: GET /api/claims/spacex-and-amazon-kuiper-non-endorsement-of-wef-debris-
guidelines-demonstrates-systemic-voluntary-governance-failure-at-the-scale-
where-it-matters-most was 404; now 200, returns shorter on-disk slug
'...-governance-failure'. Negative case (nonsense slug) still 404s.

Reported by Ship — Cory-facing demo path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 19:51:41 +01:00
1bc541ac93 fix(reaper): tighten research-session pattern to literal YYYY-MM-DD shape
Some checks are pending
CI / lint-and-test (push) Waiting to run
Apply Ganymede review of 50b888a:

MUST-FIX — pattern %/research-2% was broader than the comment claimed.
Matched anything/research-2[anything] including agent-named branches like
theseus/research-2nd-attempt-on-X or vida/research-2024-revisited. The
documented invariant said "date suffix only" but the SQL didn't enforce
it. Defense-in-depth was the framing; pattern needed to match the
framing.

Fix uses SQLite `_` single-char wildcards: research-20__-__-__ requires
exactly research-20[2-char][-][2-char][-][2-char], i.e. literal
YYYY-MM-DD shape. Threads the needle:
  - theseus/research-2026-04-30  ✓ (catches all 15 currently stuck)
  - rio/research-2099-12-31      ✓ (good through 2099)
  - theseus/research-2nd-attempt ✗ (correctly excluded)
  - vida/research-2024-revisited ✗ (correctly excluded — no -MM-DD shape)
  - rio/research-batch-agents-... ✗ (no date prefix at all)

NIT — comment said "Three classes qualify" then listed four. Off-by-one
fixed; comment now correctly says "Four classes."

Pre-deploy verified: tighter pattern catches all 15 currently-stuck
research PRs (clay/leo/astra/theseus/vida/rio research-2026-{04-28
through 05-02}). Zero false-positive risk on current branch namespace.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 19:10:49 +01:00
50b888a751 fix(reaper): extend allowlist to */research-2* daily-cron sessions
Apply Step 1 of stuck-PR triage. The May 7 reaper allowlist (extract/,
reweave/, fix/) deliberately excluded all agent-prefix branches per
Ganymede's review nit #3 — the rationale being that agent branches are
WIP feature work owned by the agent and shouldn't be auto-closed.

That decision was correct for theseus/feature-foo style branches.
It's wrong for {agent}/research-{YYYY-MM-DD} branches: those are daily
cron output, categorically disposable, regenerated by tomorrow's session.
Same shape as extract/ — content the pipeline-cron created and can
recreate, not feature work owned by the agent.

Production impact: 15 of 16 currently-stuck PRs are research-session
verdict-deadlocks aged 8-12 days. Without this change they sit forever
because the substantive_fixer can't classify (eval_issues=[] or
mechanical-only) and the reaper allowlist excludes them. Once live, next
hourly reaper cycle picks them up under the standard 24h-deadlock gate.

Pattern choice: %/research-2* (date-suffix) over %/research-% (loose).
Verified 15/15 stuck PRs match the tight pattern; sanity-check found
rio/research-batch-agents-memory-harnesses (manually-named, not date-
suffixed) which the loose pattern would catch and the tight pattern
correctly excludes. Closed-status today, but a future hand-named research
thesis branch sitting in request_changes for 24h would have been at risk.
The date prefix '2' threads the needle until 2030 and ages naturally.

Documented as an allowlist invariant ("disposable pipeline-generated
branches") rather than a list, per Step 3 of the plan — future additions
should match the invariant or update it explicitly.

Verified live before pushing:
- 15/15 currently stuck research PRs match the new pattern
- Zero false positives on existing branch namespace (closed branches
  excluded by status='open' guard regardless)
- Existing extract/ reweave/ fix/ allowlist members unchanged

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 19:00:48 +01:00
0eb26327fc feat(claims): /api/claims/{slug} canonical detail endpoint
Some checks are pending
CI / lint-and-test (push) Waiting to run
Implements Ship's claim detail contract — one round-trip, all data
resolved server-side. Replaces thin domain-only stub with full tree walk
(domains/ + foundations/ + core/), DB joins for PRs and reviews, and
server-side wikilink resolution to eliminate frontend N+1 cascades.

Response shape (Ship brief 2026-04-29):
  slug, title, domain, secondary_domains, confidence, description,
  created, last_review, body (raw markdown), sourced_from, reviews,
  prs, edges {supports,challenges,related,depends_on}, wikilinks

Wikilink resolution:
- Builds title→stem index from frontmatter title field, fallback to
  filename stem normalized via _normalize_for_match
- Returns flat {link_text: slug_or_null} map; unresolved → null so
  frontend can render plain text
- Inline normalization (lowercase, hyphen↔space, collapse whitespace,
  strip punctuation). Note: lib/attribution.py exposes only
  normalize_handle today, not the title normalizer Ship referenced.
  If a canonical helper lands later, point at it.

Caches:
- title→slug index: 60s TTL (warm cache <20ms p50 verified)
- list endpoint: 5min TTL (preserved from prior)
- Cold: ~3.3s for tree walk of 1,866 files; warm: 13-17ms

Bug fixed in second pass:
- _resolve_sourced_from defaulted title="" which leaked LIKE '%%'
  matching every PR. Now requires non-empty title+stem; handler falls
  back to slug.replace("-"," ") when frontmatter title is missing.

Verified live on VPS:
- AI diagnostic triage claim (no fm.title): sourced_from=1, prs=0
  (correct — Feb claim, pre-description-tracking)
- Recent extract PR claim: sourced_from=1 with URL, prs=1, reviews=1,
  last_review populated, edges 3 supports + 7 related, wikilinks 0
- 404 on missing slug: correct
- Claim with [[maps/...]] wikilink: 5/6 resolved (correct null on map)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 17:37:26 +01:00
fc002354d4 fix(substantive_fixer): json_valid guard in front of json_each
Some checks are pending
CI / lint-and-test (push) Waiting to run
Ganymede review of 5db6a02 (msg 2 of 3): json_each(invalid_json) throws
'malformed JSON' and propagates up through EXISTS, failing the SELECT.
The fix-cycle call site at teleo-pipeline.py:104 isn't try/except wrapped
(the reaper at line 109-116 is, the substantive cycle isn't), so a single
corrupt eval_issues row would trip the fix-stage breaker after 5 occurrences.

Fix is one line — AND json_valid(eval_issues) before the EXISTS clause.
json_valid(NULL) returns NULL (false in WHERE), json_valid(invalid) returns 0,
json_valid(valid) returns 1. SQLite 3.9+, predates VPS 3.45.1.

WARN-on-corrupt-JSON path kept per Ganymede's Q3 — json_valid and json.loads
use technically distinct parsers, cost is ~3 rows × parse-empty-string per
cycle, journal entry names the failure mode if SQLite ever surfaces a row
that passes both SQL guards but fails json.loads.

Comment updated to reflect new guard ordering.
2026-05-08 13:12:25 -04:00
5db6a0248c fix(substantive_fixer): SQL-side actionable-tag filter, eliminate head-of-line
Step 4 of the stuck-PR triage. Push the FIXABLE/CONVERTIBLE/UNFIXABLE_TAGS
intersection from a post-fetch Python loop into the SELECT WHERE clause via
json_each + EXISTS. LIMIT 3 now always returns 3 actionable rows (or fewer if
that's all there are), eliminating the head-of-line block where 3 oldest
empty-eval_issues PRs occupied the slots forever.

Background: 11 hours of post-deploy logs showed substantive_fix_cycle stuck
emitting "0 actionable from 3 candidate(s) — head-of-line: [(3922, []), (3926,
[]), (3940, [])]" every cycle. Reaper closed those three on schedule, then a
new triple of empty-eval_issues PRs took their place. Reaper-as-primary-clearance
worked but is defense-in-depth, not the right architecture. Source of the block
is upstream in this SELECT.

Implementation choice: json_each + EXISTS over LIKE. Robust against tag-name
substring overlap, future-proof against tag renames, and SQLite 3.45.1 on VPS
fully supports it. Verified live: returns 13 of 28 currently-stuck PRs as
actionable, 15 fall through to reaper as before.

Tag list builds from the routing constants at runtime so adding a new tag
auto-updates the SELECT filter — no two-place edit footgun.

WARN-on-corrupt-JSON path retained as defense-in-depth (json_each and
json.loads use different parsers; technically possible for a row to pass one
but not the other).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 12:52:12 -04:00
4b2b59b184 fix(reaper): branch allowlist for disposable pipeline-managed branches
Some checks are pending
CI / lint-and-test (push) Waiting to run
Apply Ganymede review nit #3 from f97dd15 review (the deferred close_on_forgejo
fix already landed in e14b5f2 — Ganymede was reviewing the older commit).

SQL gate previously had no branch filter — empirically all 92 candidates were
extract/* but structurally any agent branch in the deadlock shape was a
candidate. Positive allowlist for extract/, reweave/, fix/ scopes the reaper
to disposable pipeline-managed branches that the pipeline created and can
recreate. Agent branches (theseus/, vida/, epimetheus/, etc.) are WIP feature
work and must not be reaped — owners review their own PRs on their own cadence.

Cheap target-class lock complementing the LIMIT 50 blast-radius cap.
Same scoping principle as PIPELINE_OWNED_PREFIXES, but tighter — epimetheus/
review branches are pipeline-owned for merge purposes but NOT disposable.

Items 2-4 from this review:
- WARNING #2 (audit_log idx_audit_event_ts): defer to followup branch alongside
  sync-mirror migration cleanup, as Ganymede suggested.
- NIT #3 (this commit): branch allowlist applied.
- NIT #4 (token asymmetry comment=admin/close=leo): confirmed established
  codebase pattern. merge.py:946-948 does the same — comment system-toned,
  close attributed to Leo for verdict-source UI clarity. Not accidental.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:43:53 -04:00
ba234ec4b3 fix(reaper): apply Ganymede review — dual-PATCH drift, breaker isolation, env config
Followup to f97dd15. Four fixes from review:

MUST-FIX #1 — Forgejo double-PATCH drift
  reaper closes PR via forgejo_api PATCH at line 689, then close_pr() at
  line 700 issued a second PATCH (default close_on_forgejo=True). On
  transient failure of the second PATCH, close_pr returns False without
  updating the DB → status='open' even though Forgejo is closed. Pass
  close_on_forgejo=False so DB close is unconditional after the explicit
  Forgejo PATCH succeeds.

MUST-FIX #2 — reaper exception trips fix breaker
  Unhandled exception in verdict_deadlock_reaper_cycle propagated to
  stage_loop, recording fix-stage failures. After 5 reaper failures the
  fix breaker would open and block mechanical+substantive for 15 min.
  Wrap reaper call in try/except in fix_cycle (same exception-isolation
  pattern as ingest_cycle's extract_cycle wrapper). Defense-in-depth
  must never block primary paths.

WARNING #1 — throttle SQL full-scan
  audit_log only has idx_audit_stage. Filtering on event alone caused
  full-table scans every 60s. Added stage='reaper' so the planner uses
  the existing index — reaper writes audit rows under stage='reaper'
  already so the filter is correct.

WARNING #2 — REAPER_DRY_RUN as code constant
  Flipping dry-run → live required edit + commit + push + deploy +
  restart. Moved REAPER_DRY_RUN, REAPER_DEADLOCK_AGE_HOURS,
  REAPER_INTERVAL_SECONDS, REAPER_MAX_PER_RUN to lib/config.py with
  os.environ.get() overrides. Operator now flips via systemctl edit
  teleo-pipeline.service (Environment=REAPER_DRY_RUN=false) + restart.
  Defaults remain safe: dry-run, 24h age, hourly throttle, 50/run cap.

NIT — dry-run counter naming
  Renamed local `closed` counter in dry-run path to `would_close` so the
  heartbeat audit ("X closed, Y would-close") and journal log are
  unambiguous. Function still returns closed + would_close so callers
  see total work done.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:43:53 -04:00
e63d27d259 fix(reaper): verdict-deadlock reaper — close stuck PRs after 24h
Defense-in-depth for PRs that substantive_fixer can't make progress on.
Targets two stuck-verdict shapes empirically observed in production:

  1. leo:request_changes + domain:approve
     Leo asked for substantive fix; fixer either failed silently
     (no_claim_files / no_review_comments / etc.) or the issue tag isn't
     in FIXABLE | CONVERTIBLE | UNFIXABLE.

  2. leo:skipped + domain:request_changes
     Eval bypassed Leo (eval_attempts >= MAX). Domain rejected with no
     structured eval_issues. fixer can't classify the issue.

92 PRs match this gate today, oldest at 2026-04-24 (13d stuck).

Behavior:
  - Hourly throttle via audit_log sentinel ('verdict_deadlock_reaper_run').
  - REAPER_DRY_RUN=True default — first deploy emits 'would_close' audit
    events only. No DB writes. No Forgejo writes. (Ship Apr 24 directive.)
  - 24h cooldown, oldest-first, capped at 50 per run.
  - Heartbeat audit fires whether dry-run or live, so throttle works.
  - Live mode: posts comment + closes Forgejo PR + close_pr() in DB.
    Audits 'verdict_deadlock_closed' per PR.
  - Forgejo PATCH None → skip DB close (avoid drift).

Wired into fix_cycle() in teleo-pipeline.py. Runs after mechanical
and substantive fixes, never blocks them.

Followup (post first-run audit verification):
  - Operator inspects 'verdict_deadlock_would_close' audit rows
  - Flips REAPER_DRY_RUN to False, redeploys
  - Reaper actually closes on next hourly tick
2026-05-07 23:43:53 -04:00
517e9884cc fix(substantive_fixer): WARN on corrupt eval_issues JSON
Some checks are pending
CI / lint-and-test (push) Waiting to run
Third silent return path in substantive_fix_cycle — JSON-decode except
at the eval_issues parse drops rows that don't reach skipped_no_tags
or substantive_rows. If all 3 LIMIT-3 candidates have corrupt JSON,
cycle returns 0,0 with no log entry.

WARN level (not INFO): corrupt JSON is abnormal (post-merge column
drift, hand-edited DB row, partial write during crash). If this fires,
ops want to chase the upstream column-write path. If it never fires,
baseline noise stays at zero.

Closes the visibility gap on ALL silent returns in this function, not
just the two patched in 3f8666e.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 18:33:08 -04:00
3f8666ee0c fix(substantive_fixer): surface silent-skip reasons at INFO
Two silent paths in substantive_fix_cycle masked a 13-day stall:

1. Filter strips all candidates → return 0,0 with no log. With LIMIT 3
   ordered created_at ASC, if the oldest 3 have no fixer-actionable tags
   (e.g. eval_issues=[] from leo:skipped+domain:request_changes), the
   cycle silently picks the same head-of-line every tick.

2. _fix_pr early-returns logged at DEBUG only — invisible without
   fleet-wide DEBUG. Skip reasons (no_claim_files, no_review_comments,
   not_open lock, worktree_failed, etc.) never surfaced in journalctl.

Patch: log skipped candidate eval_issues when no actionable rows
found (path 1); promote DEBUG→INFO for per-PR skip reasons (path 2).
Zero behavior change — observability only.

Diagnosis context: 98 PRs stuck >3d, last successful substantive_fixer
event 2026-04-24. Need journal evidence to choose between (a) one-line
fix to the cycle, (b) larger _fix_pr regression. (Ship Step 2 directive.)
2026-05-07 11:58:22 -04:00
87f97eb4fa sync-mirror: surface tracker SELECT/INSERT failures to ops log
Some checks failed
CI / lint-and-test (push) Has been cancelled
Per Ganymede review: silent fall-through with no log entry is the
failure mode that bites. SELECT redirects stderr to $LOG, falls back
to empty string on failure. INSERT wrapped in if-not branch with WARN
log naming the (branch, sha, pr_number) so duplicate auto-create
possibility is visible.

Matches the Step 0/0b/4.5 observability pattern from prior reviews.
Behavior unchanged on the success path; failures now greppable.
2026-05-01 15:48:28 +01:00
ad1d82f5ee fix(sync-mirror): tracker gate to break empty auto-create loop
Diagnosis (per Ganymede pushback): the original mechanism story was wrong.
Vida and Leo show 100+ PRs at 0 merge failures — luck doesn't produce
that. Real cause is sync-mirror's auto-create loop, not session spawning.

Verified data:
- vida/research-2026-04-30: 1 commit on branch, 303 PRs in DB
- reweave/2026-04-29: 1 commit on branch, 840 PRs in DB
- Cron fires once/day per agent; reweave fires once/day at 01:00 UTC
- Forgejo currently has 0 PRs for vida (all merged/closed); 3 distinct
  SHAs total across reweave's history (PRs replay same SHA repeatedly)

Mechanism (confirmed in /opt/teleo-eval/logs/sync.log):
1. Pipeline merges PR → calls _delete_remote_branch on Forgejo
2. Next sync cycle: git fetch forgejo --prune drops the local Forgejo
   ref; refs/remotes/origin still has it (GitHub copy untouched)
3. comm sees branch GitHub-only → re-pushes to Forgejo at original SHA
4. HAS_PR check uses ?state=closed&limit=50 — closed PR for this branch
   scrolled out of pagination window long ago → returns "no"
5. Auto-create POST → fresh Forgejo PR (e.g. #7295 created at 21:46 for
   branch SHA from 04:12)
6. Pipeline merges (cherry-pick is empty no-op since content's on main;
   reweave union produces "already up to date" via the empty-diff guard
   shipped in 923454c) → _delete_remote_branch → loop

Fix (per Ganymede design point #2: "right place is discovery, not
_claim_next_pr"): SHA-based tracker in pipeline.db. Records (branch, sha)
after every successful auto-create. Subsequent cycles see the same
(branch, sha) → skip the entire push+create sequence. Cheap O(1) sqlite
lookup per branch per cycle.

Why SHA, not branch: research-session.sh and nightly-reweave.sh both use
--force push, so a branch can legitimately get new commits over time.
Tracker keys on SHA so genuine new commits produce a tracker miss → PR
creation proceeds normally. No regression on legitimate branch reuse.

Why pipeline.db, not flat file: shared with discover_external_prs +
audit_log + the agent's own tooling; survives sync-mirror restarts;
ACID-safe under the cron's 2-min cadence. CREATE IF NOT EXISTS is
inline (no migration needed) because this table is private to
sync-mirror — pipeline daemon doesn't read it.

Validated against /tmp/pipeline-test.db copy: gate fires on known
(branch, sha), misses on new SHA (correctly allows new content).

Defense-in-depth — leaves existing HAS_PR check in place. Tracker is
the durable signal; HAS_PR is best-effort and may catch cases the
tracker hasn't seen yet (e.g. PR created out-of-band).

Reweave numbers (Ganymede point #3): same shape, same fix. Both research
and reweave loops killed by the same gate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 15:42:47 +01:00
923454c9ea extract: document basename-uniqueness invariant + skip _-prefixed archive files
Some checks failed
CI / lint-and-test (push) Has been cancelled
Two nits from Ganymede review of ed4af4d:

1. Archive-basename filter depends on basename-uniqueness across queue+archive.
   Current naming (date-prefix + topic-slug) makes collisions rare, but if
   short generic names like "notes.md" enter the queue, the filter silently
   false-positives. Comment block names the assumption.

2. Archive walk now skips _-prefixed files, matching the standing convention
   everywhere else (search.py STRUCTURAL_FILES, reweave wiki-link skip, Layer
   0 entity exclusion). Defensive — no _*.md exists under inbox/archive/
   today, but consistent with codebase convention if a future operator drops
   _README.md to document the directory.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 11:09:19 +01:00
ed4af4d72e fix(extract): dedup queue sources whose basename is already in archive
Daemon re-extracted same source every ~4h cycle when research-session
commits on agent branches re-introduced already-archived queue files.
Existing daemon filters (DB-status, open-PR, 4h cooldown) all missed
this pattern because the queue file gets a fresh sources row at
status='unprocessed' on each re-add, the cooldown lapses exactly at
the cycle interval, and the open-PR filter only catches in-flight
extractions.

Add an archive-basename filter immediately after the queue scan: if
a file with this basename exists anywhere under inbox/archive/, skip.
Archive copy is the source of truth — once extracted, the queue copy
is stale by definition.

Validation against pipeline.db (last 7d):
  78 sources had multiple extract PRs (32% duplicate rate)
  73/78 (94%) carry an archive copy and would have been caught.
  Current queue: 35/99 sources (35%) have archive duplicates today.

Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887>
2026-04-30 11:05:39 +01:00
ed5f7ef6cc fix(merge): correct audit-ref comment + add sentinel-drift warning
Some checks failed
CI / lint-and-test (push) Has been cancelled
Two nits from Ganymede line-level review of 7741c1e:

1. Comment at lines 562-565 said --force-with-lease but code is plain
   --force. Comment now describes the actual behavior: bot-owned per-PR
   audit ref, intentional overwrite on stale refs from prior aborted
   attempts, no concurrent writer to lease against.

2. Sentinel-regex extraction in _merge_domain_queue dispatch had no
   graceful-failure log. If the _merge_no_ff_external success-message
   contract drifts and any of the three regexes (M, audit_ref, external
   PR #) miss, dispatch silently builds a comment with None values and
   writes audit_log JSON with null fields. Added a warning log when any
   regex misses — signal-only, doesn't gate the close path since the
   merge already succeeded.

Branch: epimetheus/external-merge-flow-bug1
Parent: 7741c1e (Ship Msg 3 architecture review close)
Diff:   +11/-3, single file lib/merge.py

Ganymede: 3-message protocol Msg 3 (nits applied, ball returned).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 16:19:08 +01:00
7741c1e6de fix(merge): synthetic _merged/* ref + function-owned ff-push (Ship Msg 3)
Phase 2 review fix #1 (architectural pushback): replace force-push of
contributor's gh-pr-N/* branch with a three-step synthetic-branch flow:

  1. Worktree on local branch _merged-{slug} from origin/main
  2. git merge --no-ff origin/{branch} into the local branch
  3. Push merge commit to origin/_merged/{branch} (synthetic audit ref)
  4. Function ff-pushes merge_sha → origin/main directly

Contributor's gh-pr-N/* branch on Forgejo is now never touched.
Force-pushing it would have rewritten the tip with a merge commit the
contributor didn't author — confusing bot force-push in Forgejo PR UI.
Mirrors the _clean/* synthetic branch pattern in cherry-pick.

Function now owns the push to main (was dispatch's job for cherry-pick
and reweave). Returns sentinel "merged --no-ff (external PR #N, M=<sha>,
audit_ref=...)" that dispatch detects to skip its ff-push and route
directly to PR-close + mark_merged + audit. Audit detail JSON now
includes merge_commit_sha + audit_ref + github_pr (Ship review #5).

Smoke-tested in scratch repo end-to-end:
  - contributor branch tip unchanged ✓
  - audit ref _merged/gh-pr-90/... carries merge SHA ✓
  - main tip equals merge SHA (ff-push, no force) ✓
  - contributor SHA ancestor of main → GitHub badge fires ✓

Sentinel return parsed via 3 regexes in dispatch (full 40-char SHA in
return string for durability). Branch comment in dispatch explicitly
notes contributor branch is left in place — sync-mirror keeps the
GitHub PR <-> Forgejo PR link observable through it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 15:32:52 +01:00
992b4ee36f feat(merge): _merge_no_ff_external for gh-pr-* branches (Phase 2)
External GitHub fork PRs need their contributor commit SHA in main's history
for GitHub's "merged" badge to fire. Cherry-pick rewrites the SHA, breaking
that detection. New _merge_no_ff_external function preserves the SHA via a
true merge commit.

Mechanics (mirrors _cherry_pick_onto_main shape):
1. Fetch origin/main + origin/{branch}
2. Detached worktree at origin/main, git merge --no-ff origin/{branch}
   with verbose message: "Merge external GitHub PR #{N}: {branch_slug}"
3. Force-push merge commit M as origin/{branch}, replacing branch tip
4. Dispatch's existing ff-push origin/{branch} → main propagates M to main

M has parents [main_sha, contributor_sha]. M is a fast-forward descendant
of main_sha (first-parent chain), so the ff-push to main is valid without
--force. Contributor SHA reachable from main → GitHub recognizes merged.

Conflict handling: same auto-resolve as cherry-pick — entity-only conflicts
take main's version (--ours = current worktree HEAD = main), other conflicts
abort with detail.

Backout: config.EXTERNAL_PR_NO_FF_MERGE = True (default). Set False to fall
back to cherry-pick if no-ff destabilizes throughput one week pre-Accelerate.

Branch dispatch in _merge_domain_queue:
- reweave/* → _merge_reweave_pr (existing)
- gh-pr-N/* AND config.EXTERNAL_PR_NO_FF_MERGE → _merge_no_ff_external (new)
- everything else → _cherry_pick_onto_main (existing default)

Verified end-to-end in scratch repo:
- merge commit M has [main_sha, contributor_sha] as parents
- contributor SHA is ancestor of M
- after ff-push, contributor SHA is in main's history (GitHub badge fires)
- regex parses 8 cases correctly (real fork PR + edge cases reject cleanly)

Architecture per Ship Msg 3 / doc v3 (537cfd5 on epimetheus/external-merge-flow-design).
Phase 1 (sync-mirror self-heal) deployed yesterday. Phase 3 (FwazB PR #90 cleanup)
queued behind this deploy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 15:18:37 +01:00
de204db539 fix(sync-mirror): tighten gh-pr-* regex + document SQL-integer-safety
Some checks are pending
CI / lint-and-test (push) Waiting to run
Ganymede review nit on commit 1eb259d:

- Regex changed from [0-9]* (zero-or-more) to [0-9][0-9]* (one-or-more,
  portable BRE form of [0-9]+ that works on both GNU and BSD sed).
- Empty/non-numeric branches now fail at parse, not just at the empty-guard
  below — SQL-integer-safety load-bearing on the regex alone.
- Comment above the UPDATE notes the integer-validation invariants
  (INTEGER `number` column + regex-validated gh_pr_num) since bash sqlite3
  has no parametric binding.

Smoke tested: gh-pr-/foo, gh-pr-abc/foo no longer parse to non-empty.
gh-pr-90/main, gh-pr-4066/contrib/x, gh-pr-1/x all parse correctly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 13:07:50 +01:00
1eb259de8a fix(sync-mirror): self-heal sweep for orphaned gh-pr-* github_pr links
Step 0 (new): runs once per cron tick before per-repo work. Selects PR rows
where branch matches gh-pr-% but github_pr IS NULL, parses the PR number
from the branch name, and updates github_pr + source_channel='github'.

Recovers from races and transient failures in the existing Step 4.5 link
UPDATE — no retry path before. The sweep IS the backfill: same SELECT/UPDATE
heals historical orphans (FwazB PR 4066 picked up on first cron tick) AND
future races on subsequent ticks. No separate one-shot script needed.

Properties:
- Idempotent: SELECT empty when clean, zero work
- No API calls: branch name encodes the GitHub PR number deterministically
- Bounded log volume: one line per actually-healed row
- Runs before any sync_repo work, ahead of branch-mirror loop and the
  auto-create-PR block in Step 4 — same-cycle convergence on fresh races

Closes the Bug #2 path that left FwazB's PR 4066 with github_pr=NULL,
preventing on_merged() from posting comment + closing the GitHub PR.

Verified end-to-end on live DB snapshot:
- before: 4066 had github_pr=NULL
- after sweep: 4066 has github_pr=90, source_channel='github'
- second run: zero output (idempotent)

Phase 1 of docs/external-contributor-merge-flow.md (v2, sweep-only).
Ship architecturally approved Msg 2/2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 13:02:37 +01:00
b8504c1b60 docs: rewrite public README
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Replaces the directory-listing format with one that explains what the
pipeline does and shows production scale. Verified all numbers against
production (1,546 claims, 13 domains, 1,975 merged PRs, 508 last-7d
throughput, 94% approval, ~\$0.10/merged claim incl. all stages).

Removes the VPS layout section (IP + paths + username) per Epimetheus
review — that detail moves to the private teleo-ops repo. Generalizes
deploy targets without naming the host.

Adds two Mermaid diagrams (pipeline flow + review tier matrix), both
syntactically safe across GitHub and Forgejo 9 / Gitea 1.22.

Drops the per-directory ownership table — CODEOWNERS is the single
source of truth on review authority. Keeps the high-level role map
for orientation only.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-28 10:19:18 +01:00
33f6ca9e3f fix(mirror): setup script pushes main+tags only (consistency with sync-mirror)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Initial setup-infra-mirror.sh did `git push origin --all`, which contradicted
the main_only mode protection landed in b9c4947 — agent review branches
(epimetheus/*, ganymede/*) ended up publicly visible on the new GitHub
teleo-infrastructure mirror until I deleted them.

Initial push now mirrors the recurring sync's main_only path: refs/heads/main
+ tags only. Re-running the setup script is now idempotent at branch level —
won't redo the agent-branch leak.

Cleanup applied to live GitHub teleo-infrastructure: 18 stale agent review
branches deleted via single batched push (epimetheus/* x14, ganymede/* x3,
ship/metadao-scraper). Only main remains. Codex bidirectional mirror unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 23:09:25 +01:00
b9c4947637 fix(mirror): restrict main_only mode to main+tags (Ganymede review)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Finding #1 (recommendation, applied): infra-mode now pushes only main + tags
to GitHub. Agent review branches (epimetheus/*, ganymede/*) stay Forgejo-only.
Public GitHub history reflects merged work, not pre-review WIP with internal
agent context.

Bidirectional mode unchanged — codex still mirrors all branches so external
contributors can fork from any branch.

Nit #4: setup script m3taversal username has a comment explaining it's a
placeholder for fine-grained PAT auth, mirrors the existing teleo-codex remote.

Two pre-existing nits filed for follow-up branch:
- hardcoded `living-ip:` in GH_PR_NUM head filter (line 273)
- spurious CRITICAL log on GH→forgejo→GH cycles (re-fetch forgejo after Step 2.5)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 22:54:18 +01:00
bf647b7abb feat(mirror): refactor sync-mirror.sh for multi-repo, add infra setup script
Wraps the per-repo body in sync_repo() and loops over MIRROR_REPOS at the
bottom. teleo-codex stays bidirectional (full PR roundtrip + pipeline.db
linking). teleo-infrastructure runs main_only: branch+tag sync Forgejo→
GitHub, ff-only GitHub→Forgejo on main, divergence alerting per-repo.
Steps 2.1 (fork PR refs) and 4 (Forgejo PR auto-create + DB link) gated
on MODE=bidirectional.

Setup script (deploy/setup-infra-mirror.sh) initializes the bare repo at
/opt/teleo-eval/mirror/teleo-infrastructure.git, configures remotes,
performs initial Forgejo→GitHub push. Idempotent. Pre-flight checks both
GitHub repo (must be created manually first — fine-grained PAT can't
create repos in the org) and Forgejo repo are accessible.

Per-repo divergence state file (.divergence-count.<repo>) so each repo
has independent counter + alert state. Also pulls in the source_channel
update from Apr 6 that lived only on VPS (line 215 added 'github').

Not deployed yet — pending Ganymede review and GitHub repo creation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 22:22:33 +01:00
1351db70a9 fix(tests): apply Ganymede review nits + add m3taversal reset script
Some checks are pending
CI / lint-and-test (push) Waiting to run
3 nits from review of d60b6f8 + Q4 ask:

1. test_window_24h_only_today: replace always-true assertion with
   concrete `assert handles == ["carol"]`. Push alice's most-recent
   event from -1 days to -2 days to eliminate fixture-vs-query
   microsecond drift on the 24h boundary.
2. _call helper: asyncio.get_event_loop().run_until_complete →
   asyncio.run (deprecation in 3.12, raises in some 3.14 contexts).
3. test_invalid_limit_falls_to_default: dead first call removed,
   misleading "7 entries" comment now matches assertion.

Q4: scripts/reset-m3taversal-sourcer.py captures the surgical
UPDATE we ran on VPS as a reviewable artifact. Idempotent (no-op
on already-reset rows), audit_log entry per run. Ganymede's point:
DB mutations should leave a code paper trail, not just an audit
row whose origin lives only in the executor's memory.

30/30 tests pass on VPS hermes venv (aiohttp 3.13.5, py 3.11.15).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:35:18 +01:00
d60b6f8bf2 test(leaderboard): cover all four slicings + AND-prefix regression
Adds tests/test_leaderboard.py — 30 cases against
diagnostics/leaderboard_routes.py. Two reasons:

(1) Zero coverage on an endpoint Argus + Oberon are about to consume
    for the May 5 hackathon UI. Two bugs slipped through this morning
    (404 wiring missing in app.py; AND-prefix SQL syntax error on
    rolling-window). Tests prevent regression.

(2) Tests serve as living documentation for Oberon's frontend
    integration — each test names a contract guarantee
    (test_left_join_handles_missing_contributors_row,
    test_composed_window_kind_domain, test_role_breakdown_present).

Coverage:
  - _parse_window unit tests (10): all_time, Nd, Nh, caps, garbage,
    case-normalization, and explicit no-AND-prefix assertion
  - handle_leaderboard integration (18): every kind value, every
    window family, domain filter, composed filters, limit + has_more,
    invalid-input fallback, role breakdown shape, empty-window shape,
    LEFT JOIN COALESCE for handles missing from contributors
  - 2 contract assertions: LEADERBOARD_PUBLIC_PATHS membership +
    KIND_VALUES set

Run: 30/30 pass on VPS hermes venv (aiohttp 3.13.5, pytest 9.0.2).
Skips clean locally without aiohttp via pytest.importorskip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:46:47 +01:00
cd5aac5cc6 fix(activity-feed): remove [:120] slug truncation
Some checks are pending
CI / lint-and-test (push) Waiting to run
Claim slugs were being cut at 120 chars in _extract_claim_slugs, causing
Timeline event clicks to 404 when the on-disk filename exceeded that
length (frontend builds /api/claims/<slug> from the truncated value).

This fix landed Apr 26 but regressed when the file was redeployed —
committing the unmangled version to repo so deploy.sh re-shipping
doesn't reintroduce the cap.

Verified live: max slug now 265 chars, 16 of 30 over the old 120 cap.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:27:31 +01:00
7c6417d6be test(diagnostics): activity_endpoint classify_pr_operation suite
Move tests from /tmp into the proper test suite. 22 cases covering:

- Leo gotcha: extract/* + commit_type=enrich/challenge classifies by
  commit_type, not branch prefix (same pattern as the contributor-role
  wiring fix)
- Reweave priority: branch.startswith('reweave/') wins over
  _MAINTENANCE_COMMIT_TYPES — nightly reweave PRs classify as enrich,
  not infra. Locks in the bifurcation against future priority refactors
- Full NON_MERGED_STATUS_TO_OPERATION coverage: open, approved, closed,
  conflict, validating, reviewing, merging, zombie
- Knowledge-producing commit_types (research, entity) → new
- Maintenance commit_types (fix, pipeline) → infra
- Defensive: null inputs, unknown status

aiohttp imported at module load — file uses pytest.importorskip so it
runs cleanly in any environment with aiohttp installed and skips gracefully
otherwise. sys.path inject for diagnostics/ since it isn't packaged.

Reviewed-by: Ganymede

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:39:44 +01:00
42d35d4e15 fix(diagnostics): wire /api/leaderboard into app.py + fix rolling-window SQL
Some checks are pending
CI / lint-and-test (push) Waiting to run
de7e5ec landed leaderboard_routes.py + the route file's register fn but
the import + register_leaderboard_routes(app) call + auth-middleware
allowlist were never added to app.py — endpoint returned 404 in production.

Three minimal edits to app.py mirror the existing register_*_routes pattern
(import at line 28, allowlist OR-clause at line 512, register call at 2365).

Plus a SQL bug in _parse_window: rolling-window clauses prefixed "AND "
but the WHERE composition uses " AND ".join(...), producing
"WHERE 1=1 AND AND ce.timestamp..." → sqlite3.OperationalError on every
window=Nd / window=Nh request. Stripped the prefix and added a comment so
the asymmetry doesn't bite again.

Verified on VPS:
  GET /api/leaderboard?window=all_time&kind=person → 200, 11 rows
  GET /api/leaderboard?window=7d&kind=person → 200, 2 rows
  GET /api/leaderboard?window=30d&kind=person → 200, 9 rows
  GET /api/leaderboard?domain=internet-finance → 200, 3 rows
  GET /api/leaderboard?kind=agent → 200, leo/rio/clay/astra/vida

Unblocks: Argus dashboard cutover, Oberon column reorder, Leo's CI
taxonomy broadcast.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:30:26 +01:00
de7e5ec709 feat(diagnostics): /api/leaderboard reads contribution_events directly
Some checks are pending
CI / lint-and-test (push) Waiting to run
New endpoint replaces the legacy /api/contributors *_count read path with
event-sourced reads from the Phase A contribution_events ledger.

- Params: window (all_time | Nd | Nh), kind (person | agent | org | all),
  domain (filter), limit (default 100, max 500)
- Returns per-handle CI, full role breakdown (author/challenger/synthesizer/
  originator/evaluator), events_count, pr_count, first/last contribution
- ORDER BY ci DESC, last_contribution DESC — recent contributors break ties
- Read-only sqlite URI; total/has_more computed for paginated UIs

Wiring (import + register + _PUBLIC_PATHS entry) currently applied to live
app.py on VPS only — repo app.py has drift from Ship's uncommitted /api/search
POST contract. Next deploy.sh round-trip needs both to land together.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:16:41 +01:00
369f6c96da fix(attribution): credit research-session sources to agents, not m3taversal (#7)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Forward fix: research-session.sh writes intake_tier: research-task (no proposed_by — extract.py infers agent from branch).

Backfill: 304 PRs reattributed across 30 days (rio 74, clay 70, astra 53, vida 48, theseus 30, leo 29). Already applied to production.

Format reconciliation: normalize_handle strips (self-directed) suffix so both halves canonicalize to the same agent handle.

5 idempotency tests passing. Production replay self-extinguishes (delta 3839→3839).
2026-04-27 11:59:54 +00:00
6aff03ff56 fix(attribution): unify research-session format on "(self-directed)" suffix
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Resolves the format inconsistency between the forward fix and the 304-row
backfill. Both halves now produce prs.submitted_by = "rio (self-directed)":

- research-session.sh: drop proposed_by from the frontmatter template.
  extract.py path 1 (proposed_by-driven) no longer fires; path 2 fires
  instead and constructs f"{agent} (self-directed)" — matches backfill.

- attribution.py: normalize_handle now strips "(self-directed)" suffix
  immediately after lowercase+@-strip, before alias lookup. Closes the
  phantom-person-event class on any future replay through
  record_contributor_attribution. Round-trips through alias rules keyed
  on bare agent names.

Test (5 cases) still passes; suffix-strip behavior verified against
hostile inputs (whitespace, casing, mid-string occurrences must NOT
match — only trailing pattern).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:53:52 +01:00
319e03e2c6 test(attribution): prove research-backfill replay is idempotent
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Five tests against the real contribution_events schema (lib/db.py:181-209):
- pr-level dedup with NULL claim_path via idx_ce_unique_pr partial index
- per-claim dedup with non-NULL claim_path via idx_ce_unique_claim partial index
- pr-level and per-claim events coexist on the same pr_number
- backfill (INSERT correct + DELETE wrong) is a true no-op on replay
- replay against already-backfilled state preserves unrelated events

Schema case identified: case 2 with partial-index split solution already in
place. Two partial UNIQUE indexes target disjoint row sets (claim_path IS NULL
vs IS NOT NULL), bypassing SQLite's NULL-not-equal-NULL UNIQUE quirk.

Production replay verified: re-running backfill --apply against the live DB
returns "misattributed PRs found: 0" because the first-run UPDATE flipped the
WHERE predicate. Total contribution_events count: 3839 → 3839.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:50:17 +01:00
2d332c66d4 fix(attribution): credit research-session sources to agents, not m3taversal
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Two-part fix for a bug where every claim extracted from agent overnight
research sessions was being credited to m3taversal in contribution_events
(visible in the activity feed as "@m3taversal" on agent-derived claims).

Forward fix (research/research-session.sh):
The frontmatter template the agent prompt instructs Claude to use now
includes `proposed_by: ${AGENT}` and `intake_tier: research-task`. With
those fields present, extract.py path 1 (line 687) takes precedence and
sets prs.submitted_by to the agent handle, which then propagates into
contribution_events as a kind='agent' author event for the agent.

Without the fields, extract.py fell through to the default branch on
line 695 and set submitted_by='@m3taversal'.

Backfill (scripts/backfill-research-session-attribution.py):
Identifies research-session-derived PRs by finding teleo-codex commits
matching `^<agent>: research session YYYY-MM-DD —`, listing the
inbox/queue/*.md files added in each commit's diff, and matching those
filename basenames against prs.source_path. Only PRs currently
submitted_by='@m3taversal' AND merged within the configurable window
are touched. Default --dry-run; --apply to commit.

For each match the script:
  1. UPDATE prs SET submitted_by = '<agent> (self-directed)'
  2. INSERT OR IGNORE the agent author event (kind='agent', weight=0.30)
     with the original PR's domain, channel, merged_at preserved
  3. DELETE the misattributed m3taversal author event

Applied 30-day backfill on VPS:
  - 304 PRs re-attributed (rio 74, clay 70, astra 53, vida 48,
    theseus 30, leo 29)
  - 297 m3taversal author events deleted, 304 agent author events
    inserted (delta of 7 = pre-v24 PRs that never had m3ta events
    in the first place; we still create the new agent event)
  - m3taversal author count: 1368 → 1071 (−22%)
  - Pre-backfill DB snapshot: pipeline.db.bak-pre-research-attribution

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:38:53 +01:00
dea1b02aa6 fix(attribution): narrow exception + document gate asymmetry (Ganymede review)
Two follow-up fixes from Ganymede's review of d0fb4c9:

1. is_publisher_handle: narrow `except Exception` to sqlite3.OperationalError.
   Pre-v26 DB fallback only needs to catch the "table doesn't exist" case;
   broader exceptions (programming errors, locks, corruption) should propagate.

2. upsert_contributor gate: add comment documenting the alias-resolution
   asymmetry between insert_contribution_event (alias-resolved via
   normalize_handle) and upsert_contributor (bare lower+lstrip-@). Today this
   is fine because the v26 classifier produced one publisher row per canonical
   handle. Branch 3 will normalize alias→canonical at writer entry points,
   tightening this gate transparently.

Unit tests for the gates (positive + negative + alias resolution) deferred to
Branch 3 alongside the auto-create flow tests.

Smoke-tested:
  - pre-v26 fallback (no publishers table) → None (correct)
  - case-insensitive match (CNBC → id=1) → correct
  - @ prefix strip (@cnbc → id=1) → correct
  - non-publisher handle (alexastrum) → None (correct)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 14:25:24 +01:00
d0fb4c96e3 fix(attribution): gate writer on publishers table (regression prevention)
Schema v26 (commit 3fe524d) split orgs/citations from contributors into
the publishers table. Without a writer-side gate, every merged PR with
`sourcer: cnbc` (or similar) re-creates CNBC as a contributor and
undoes the v26 classifier cleanup. Once normal pipeline traffic resumes,
the contributors table re-pollutes within hours.

Fix: belt-and-suspenders gate at both writer surfaces.

1. `lib/attribution.py::is_publisher_handle(handle, conn)` — returns
   publisher.id if handle exists in publishers.name, else None. Falls
   back gracefully on pre-v26 DBs (no publishers table → returns None →
   writer behaves like before, no regression).

2. `lib/contributor.py::insert_contribution_event` — checks
   is_publisher_handle on canonical handle before INSERT. If it's a
   publisher, debug-log + return False. Prevents originator events for
   CNBC/SpaceNews/etc.

3. `lib/contributor.py::upsert_contributor` — same gate at top. Prevents
   the contributors table from re-acquiring publisher rows.

Verified end-to-end against live VPS DB snapshot:
  - CNBC originator event: blocked (insert returns False)
  - CNBC contributors row: blocked (no row created)
  - alexastrum, thesensatore, newhandle_xyz: pass through unchanged
  - is_publisher_handle handles case-insensitive lookup correctly
    (CNBC and cnbc both match publisher_id=3)

Pre-deploy event count was 3705. Post-classifier cleanup: 3623 (82 org
events purged). Going forward, no new org events accumulate.

Branch 2 of the schema-v26 rollout. Branch 3 (auto-create at tier='cited',
extract.py sources.publisher_id wiring) is separate scope and not required
for regression prevention.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 14:21:10 +01:00
926a397839 fix(activity): re-apply source classifier + add date-prefix slug fallback
Some checks are pending
CI / lint-and-test (push) Waiting to run
Regression: aeae712's source/create distinction was lost — VPS reverted to
pre-aeae712 behavior where every extract/* knowledge PR returned type=create
regardless of whether a claim was written. Source archives surfaced as
"New claim" chips with date-prefix slugs that 404 on click.

Root cause: aeae712 was deployed via local file copy and never pushed to
origin; a subsequent rsync from origin/main overwrote it with the older
classifier. This branch ships from origin so deploy.sh's repo-first gate
makes recurrence impossible.

- Restore aeae712: extract/* + empty description -> source, with
  empty claim_slug + source_slug field, ci_earned 0.15
- Add Leo's regex fallback: candidate_slug matching
  ^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$ -> source regardless of branch
  /commit_type/description state. Catches edge cases where description
  leaks but is just a source title (slugified into the inbox filename
  pattern), not a claim insight.
- Add 'challenge' to _FEED_COMMIT_TYPES (latent bug — challenge PRs
  would be filtered out before classification because the filter
  list omitted them; memory says 0 challenges exist so it never
  triggered, but schema support belongs in the filter)
- _build_events: compute candidate slug before classify so the regex
  fallback has a slug to inspect

Verified locally on Leo's example PRs (#4014, #4016) — both classify
as source. VPS smoke pending deploy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 13:47:00 +01:00
26 changed files with 3535 additions and 875 deletions

165
README.md
View file

@ -1,65 +1,134 @@
# teleo-infrastructure
Pipeline infrastructure for the Teleo collective knowledge base. Async Python daemon that extracts, validates, evaluates, and merges claims via Forgejo PRs.
This repo runs the pipeline that processes contributions into the
[teleo-codex](https://github.com/living-ip/teleo-codex) knowledge base.
## Directory Structure
Every claim on `main` has been extracted from a source, validated for schema
and duplicates, evaluated by at least two independent reviewers, and merged
through an event-sourced audit log. The whole flow is an async Python daemon
talking to a Forgejo git server, an SQLite WAL state store, OpenRouter (for
most LLM calls), and the Anthropic Claude CLI (for Opus deep reviews).
**Production state** (live):
| Metric | Value |
|---|---|
| Claims merged into `main` | 1,546 across 13 domains |
| PRs merged through the pipeline | 1,975 |
| Merge throughput (last 7d) | 508 PRs (~73/day) |
| Review approval rate | 94% |
| Cost per merged claim (last 30d) | $0.10 incl. extract + triage + multi-tier review |
| Production agents | 6 (rio, theseus, leo, vida, astra, clay) |
## Pipeline
Concurrent stage loops in a single daemon (`teleo-pipeline.py`), coordinated
by SQLite. Circuit breakers cap costs, retry budgets cap attempts, and merges
are serialized per-domain to avoid cross-PR conflicts.
```mermaid
flowchart LR
Inbox["inbox/queue/"] --> Extract
Extract["Extract<br/>(Sonnet 4.5)"] --> Validate
Validate["Validate<br/>(tier 0, $0)"] --> Evaluate
Evaluate["Evaluate<br/>(tiered, multi-model)"] --> Merge
Merge["Merge<br/>(Forgejo, domain-serial)"] --> Effects
Effects["Effects<br/>cascade · backlinks · reciprocal edges"]
```
teleo-infrastructure/
├── teleo-pipeline.py # Daemon entry point
├── reweave.py # Reciprocal edge maintenance
├── lib/ # Pipeline modules (Python package)
├── diagnostics/ # Monitoring dashboard (port 8081)
├── telegram/ # Telegram bot interface
├── deploy/ # Deployment + mirror scripts
├── systemd/ # Service definitions
├── agent-state/ # Cross-session agent state
├── research/ # Nightly research orchestration
├── hermes-agent/ # Hermes agent setup
├── scripts/ # One-off backfills + migrations
├── tests/ # Test suite
└── docs/ # Operational documentation
If any reviewer rejects, the PR gets a structured rationale and either
re-extraction guidance (for fixable issues) or a terminal close (for
scope or duplicate problems). Approved merges trigger downstream effects:
- **Cascade** — agents whose beliefs/positions depend on the changed claim get inbox notifications
- **Bidirectional provenance**`sourced_from:` is stamped on each claim at extraction; the source's `claims_extracted:` list is updated post-merge
- **Reciprocal edges** — when a new claim has `supports: [X]`, X's frontmatter is updated with `supports: [new]`
- **Cross-domain index** — entity mentions across domain boundaries are logged for silo detection
## Multi-agent review
Reviews aren't free. Tier classification is deterministic where possible
(changes to `core/` or `foundations/` always go Deep) and otherwise picked
by Haiku based on PR scope. Last 30d distribution: 76% Standard, 21% Light,
2% Deep.
```mermaid
flowchart TD
PR[New PR] --> Classify{Classify}
Classify -->|"core/, foundations/, challenged"| Deep
Classify -->|default| Standard
Classify -->|single claim, low risk| Light
Light["Light tier<br/>Domain agent only"] --> Result
Standard["Standard tier<br/>Domain agent + Leo (Sonnet 4.5)"] --> Result
Deep["Deep tier<br/>Domain agent + Leo (Opus)"] --> Result
Result{Both approve?}
Result -->|yes| MergeOK[Merge]
Result -->|no| Reject[Structured rejection<br/>+ re-extract guidance]
```
Domain agents bring domain expertise: **Rio** (internet-finance), **Vida**
(health), **Astra** (space-development), **Clay** (entertainment),
**Theseus** (ai-alignment). **Leo** brings cross-domain consistency on
every PR. Disagreement between the two reviewers surfaces in `audit_log`
and is tracked as a quality signal, not silenced.
Model diversity isn't cosmetic — same-family models share ~60% of their
errors (Kim et al. ICML 2025). Pipeline mixes Haiku for triage, Gemini 2.5
Flash for domain review, Sonnet 4.5 for Leo standard, Opus for Leo deep.
## Contributor flow
External contributors submit PRs to
[`living-ip/teleo-codex`](https://github.com/living-ip/teleo-codex) on GitHub.
A mirror sync (every 2 minutes) fast-forwards the PR onto Forgejo, where
the pipeline picks it up. From there it's the same flow as agent-authored
PRs — same tiers, same reviewers, same merge rules.
The contributor-facing guide lives in
[`teleo-codex/CONTRIBUTING.md`](https://github.com/living-ip/teleo-codex/blob/main/CONTRIBUTING.md).
## Repository layout
| Directory | What it does |
|-----------------|-----------------------------------------------------------|
| `lib/` | Pipeline modules — config, db, extract, evaluate, merge, cascade |
| `diagnostics/` | Argus monitoring dashboard (4 pages: ops, health, agents, epistemic) |
| `telegram/` | Telegram bot that answers from the knowledge base |
| `research/` | Nightly autonomous research sessions for domain agents |
| `agent-state/` | File-backed state for cross-session agent continuity |
| `deploy/` | Auto-deploy pipeline (Forgejo → working dirs → systemd) |
| `systemd/` | Service definitions for daemon + dashboard + agents |
| `scripts/` | Backfills and one-off migrations |
| `tests/` | pytest suite |
| `docs/` | Architecture specs and operational protocols |
## Ownership
Each directory has one owning agent. The owner is accountable for correctness and reviews all changes to their section. See `CODEOWNERS` for per-file detail.
Code review authority is enforced by [`CODEOWNERS`](./CODEOWNERS) — every
file has one accountable agent. The high-level map:
| Directory | Owner | What it does |
|-----------|-------|-------------|
| `lib/` (core) | **Ship** | Config, DB, merge, cascade, validation, LLM calls |
| `lib/` (extraction) | **Epimetheus** | Source extraction, entity processing, pre-screening |
| `lib/` (evaluation) | **Leo** | Claim evaluation, analytics, attribution |
| `lib/` (health) | **Argus** | Health checks, search, claim index |
| `diagnostics/` | **Argus** | 4-page dashboard, alerting, vitality metrics |
| `telegram/` | **Ship** | Telegram bot, X integration, retrieval |
| `deploy/` | **Ship** | rsync deploy, GitHub-Forgejo mirror |
| `systemd/` | **Ship** | teleo-pipeline, teleo-diagnostics, teleo-agent@ |
| `agent-state/` | **Ship** | Bootstrap, state library, cascade inbox processor |
| `research/` | **Ship** | Nightly research sessions, prompt templates |
| `scripts/` | **Ship** | Backfills, migrations, one-off maintenance |
| `tests/` | **Ganymede** | pytest suite, integration tests |
| `docs/` | Shared | Architecture, specs, protocols |
- **Ship** — pipeline core, telegram, deploy, agent-state, research, systemd
- **Epimetheus** — extraction (intake, entity processing, pre-screening, post-extract validation)
- **Leo** — evaluation (claim review, analytics, attribution)
- **Argus** — health (diagnostics dashboard, alerting, claim index, search)
- **Ganymede** — tests (pytest suite, integration, code review gate)
## VPS Layout
For active sprint work and per-agent in-flight items, see each agent's
status report in their Pentagon profile.
Runs on Hetzner CAX31 (77.42.65.182) as user `teleo`.
| VPS Path | Repo Source | Service |
|----------|-------------|---------|
| `/opt/teleo-eval/pipeline/` | `lib/`, `teleo-pipeline.py`, `reweave.py` | teleo-pipeline |
| `/opt/teleo-eval/diagnostics/` | `diagnostics/` | teleo-diagnostics |
| `/opt/teleo-eval/telegram/` | `telegram/` | (manual) |
| `/opt/teleo-eval/agent-state/` | `agent-state/` | (used by research-session.sh) |
## Quick Start
## Development
```bash
# Run tests
pip install -e ".[dev]"
pytest
# Deploy to VPS
./deploy/deploy.sh --dry-run # preview
./deploy/deploy.sh # deploy
```
## Operations
Production deployment runs on a single VPS. Runbook, restart procedures,
secret rotation, and on-call live in the private
[`teleo-ops`](https://github.com/living-ip/teleo-ops) repo (request access).
## License
[TBD]

View file

@ -51,7 +51,7 @@ fi
# Syntax check all Python files before copying
ERRORS=0
for f in lib/*.py *.py diagnostics/*.py telegram/*.py tests/*.py scripts/*.py; do
for f in lib/*.py *.py diagnostics/*.py telegram/*.py tests/*.py; do
[ -f "$f" ] || continue
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>&1; then
log "SYNTAX ERROR: $f"
@ -77,7 +77,6 @@ rsync "${RSYNC_OPTS[@]}" telegram/ "$PIPELINE_DIR/telegram/"
rsync "${RSYNC_OPTS[@]}" diagnostics/ "$DIAGNOSTICS_DIR/"
rsync "${RSYNC_OPTS[@]}" agent-state/ "$AGENT_STATE_DIR/"
rsync "${RSYNC_OPTS[@]}" tests/ "$PIPELINE_DIR/tests/"
rsync "${RSYNC_OPTS[@]}" scripts/ "$PIPELINE_DIR/scripts/"
[ -f research/research-session.sh ] && rsync "${RSYNC_OPTS[@]}" research/research-session.sh /opt/teleo-eval/research-session.sh
# Safety net: ensure all .sh files are executable after rsync

View file

@ -41,7 +41,7 @@ echo ""
# Syntax check all Python files before deploying
echo "=== Pre-deploy syntax check ==="
ERRORS=0
for f in "$REPO_ROOT/lib/"*.py "$REPO_ROOT/"*.py "$REPO_ROOT/diagnostics/"*.py "$REPO_ROOT/telegram/"*.py "$REPO_ROOT/scripts/"*.py; do
for f in "$REPO_ROOT/lib/"*.py "$REPO_ROOT/"*.py "$REPO_ROOT/diagnostics/"*.py "$REPO_ROOT/telegram/"*.py; do
[ -f "$f" ] || continue
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>/dev/null; then
echo "SYNTAX ERROR: $f"
@ -80,10 +80,6 @@ echo "=== Tests ==="
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/tests/" "$VPS_HOST:$VPS_PIPELINE/tests/"
echo ""
echo "=== Scripts ==="
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/scripts/" "$VPS_HOST:$VPS_PIPELINE/scripts/"
echo ""
echo "=== Diagnostics ==="
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/diagnostics/" "$VPS_HOST:$VPS_DIAGNOSTICS/"
echo ""

120
deploy/setup-infra-mirror.sh Executable file
View file

@ -0,0 +1,120 @@
#!/bin/bash
# One-time setup: prepare the bare mirror repo for teleo-infrastructure.
#
# Prerequisites (must happen BEFORE running this):
# 1. GitHub repo `living-ip/teleo-infrastructure` created (manual via web or
# `gh repo create` — the deploy PAT is fine-grained to teleo-codex only
# and cannot create new repos in the org).
# 2. GitHub PAT updated to include push access on the new repo (or rotate
# to a classic PAT with `repo` scope covering both).
#
# This script is idempotent — safe to re-run.
set -euo pipefail
MIRROR_BASE="/opt/teleo-eval/mirror"
REPO_DIR="$MIRROR_BASE/teleo-infrastructure.git"
FORGEJO_URL="http://localhost:3000/teleo/teleo-infrastructure.git"
GITHUB_REPO="living-ip/teleo-infrastructure"
FORGEJO_TOKEN_FILE="/opt/teleo-eval/secrets/forgejo-admin-token"
GITHUB_PAT_FILE="/opt/teleo-eval/secrets/github-pat"
if [ ! -f "$FORGEJO_TOKEN_FILE" ]; then
echo "ERROR: missing $FORGEJO_TOKEN_FILE" >&2
exit 1
fi
if [ ! -f "$GITHUB_PAT_FILE" ]; then
echo "ERROR: missing $GITHUB_PAT_FILE" >&2
exit 1
fi
FORGEJO_TOKEN=$(cat "$FORGEJO_TOKEN_FILE" | tr -d '[:space:]')
GITHUB_PAT=$(cat "$GITHUB_PAT_FILE" | tr -d '[:space:]')
# Sanity check: GitHub repo must exist before we point a remote at it.
echo "Verifying GitHub repo $GITHUB_REPO exists..."
GH_STATUS=$(curl -sS -o /dev/null -w "%{http_code}" \
-H "Authorization: Bearer $GITHUB_PAT" \
"https://api.github.com/repos/$GITHUB_REPO")
if [ "$GH_STATUS" != "200" ]; then
echo "ERROR: GitHub repo $GITHUB_REPO not accessible (HTTP $GH_STATUS)" >&2
echo "Create it first: gh repo create $GITHUB_REPO --public --description 'Pipeline + diagnostics infra for the LivingIP collective'" >&2
exit 2
fi
echo " OK — $GITHUB_REPO accessible"
# Sanity check: Forgejo repo must exist.
echo "Verifying Forgejo repo teleo/teleo-infrastructure exists..."
FG_STATUS=$(curl -sS -o /dev/null -w "%{http_code}" \
-H "Authorization: token $FORGEJO_TOKEN" \
"http://localhost:3000/api/v1/repos/teleo/teleo-infrastructure")
if [ "$FG_STATUS" != "200" ]; then
echo "ERROR: Forgejo repo teleo/teleo-infrastructure not accessible (HTTP $FG_STATUS)" >&2
exit 3
fi
echo " OK — Forgejo repo accessible"
# Init bare mirror if missing
if [ -d "$REPO_DIR" ]; then
echo "Bare repo already exists at $REPO_DIR — skipping init"
else
echo "Creating bare repo at $REPO_DIR..."
mkdir -p "$REPO_DIR"
cd "$REPO_DIR"
git init --bare >/dev/null
chown -R teleo:teleo "$REPO_DIR"
echo " OK — bare repo initialized"
fi
cd "$REPO_DIR"
# Configure remotes (idempotent: set-url succeeds whether remote exists or not)
# Forgejo remote (origin convention is reversed in this codebase: origin=GitHub,
# forgejo=Forgejo, matching the existing teleo-codex.git layout).
FORGEJO_REMOTE_URL="http://github-mirror:${FORGEJO_TOKEN}@localhost:3000/teleo/teleo-infrastructure.git"
# NOTE: "m3taversal" is a placeholder username — for fine-grained PATs the
# username field is decorative; the token does the auth. Matches the existing
# teleo-codex.git remote for consistency. (Ganymede review nit #4.)
GITHUB_REMOTE_URL="https://m3taversal:${GITHUB_PAT}@github.com/${GITHUB_REPO}.git"
if git remote get-url forgejo >/dev/null 2>&1; then
git remote set-url forgejo "$FORGEJO_REMOTE_URL"
echo " Updated forgejo remote URL"
else
git remote add forgejo "$FORGEJO_REMOTE_URL"
echo " Added forgejo remote"
fi
if git remote get-url origin >/dev/null 2>&1; then
git remote set-url origin "$GITHUB_REMOTE_URL"
echo " Updated origin remote URL"
else
git remote add origin "$GITHUB_REMOTE_URL"
echo " Added origin remote"
fi
# Initial fetch from Forgejo
echo "Fetching from Forgejo..."
git fetch forgejo --prune 2>&1 | sed 's/^/ /'
# Initial push to GitHub (will populate the empty repo)
# main_only mode: push ONLY refs/heads/main + tags, mirroring what sync-mirror.sh
# does for this repo on the recurring path. Agent review branches stay Forgejo-only.
echo "Pushing initial main + tags to GitHub..."
git update-ref refs/heads/main refs/remotes/forgejo/main 2>/dev/null || {
echo "ERROR: forgejo/main ref missing — fetch may have failed" >&2
exit 1
}
git push origin "refs/heads/main:refs/heads/main" 2>&1 | sed 's/^/ /' || {
echo "WARN: initial push failed — you may need to authorize the PAT for $GITHUB_REPO" >&2
}
git push origin --tags 2>&1 | sed 's/^/ /' || true
# Final permissions sweep
chown -R teleo:teleo "$REPO_DIR"
echo
echo "Setup complete. Verify with:"
echo " ssh teleo@77.42.65.182 ls -la $REPO_DIR/refs/heads"
echo " /opt/teleo-eval/sync-mirror.sh && tail -50 /opt/teleo-eval/logs/sync.log"

View file

@ -2,22 +2,35 @@
# Bidirectional sync: Forgejo (authoritative) <-> GitHub (public mirror)
# Forgejo wins on conflict. Runs every 2 minutes via cron.
#
# Repos handled (see MIRROR_REPOS below):
# - teleo-codex (mode=bidirectional): full PR roundtrip — fork PR refs from
# GitHub, auto-create Forgejo PR mirrors, link github_pr in pipeline.db.
# - teleo-infrastructure (mode=main_only): one-way sync of branches+tags from
# Forgejo to GitHub. No PR roundtrip — pipeline doesn't process infra PRs;
# external infra PRs land on GitHub for visibility, get reviewed manually.
#
# Security note: GitHub->Forgejo path is for external contributor convenience.
# Never auto-process branches arriving via this path without a PR.
# Eval pipeline and extract cron only act on PRs, not raw branches.
set -euo pipefail
REPO_DIR="/opt/teleo-eval/mirror/teleo-codex.git"
LOG="/opt/teleo-eval/logs/sync.log"
LOCKFILE="/tmp/sync-mirror.lock"
PIPELINE_DB="/opt/teleo-eval/pipeline/pipeline.db"
GITHUB_PAT_FILE="/opt/teleo-eval/secrets/github-pat"
GITHUB_REPO="living-ip/teleo-codex"
log() { echo "[$(date -Iseconds)] $1" >> "$LOG"; }
# (forgejo_owner_repo, github_owner_repo, bare_path, mode)
# mode: bidirectional | main_only
MIRROR_REPOS=(
"teleo/teleo-codex living-ip/teleo-codex /opt/teleo-eval/mirror/teleo-codex.git bidirectional"
"teleo/teleo-infrastructure living-ip/teleo-infrastructure /opt/teleo-eval/mirror/teleo-infrastructure.git main_only"
)
# Lockfile — prevent concurrent runs
REPO_TAG="main"
log() { echo "[$(date -Iseconds)] [$REPO_TAG] $1" >> "$LOG"; }
# Lockfile — prevent concurrent runs (single lock for whole script)
if [ -f "$LOCKFILE" ]; then
pid=$(cat "$LOCKFILE" 2>/dev/null)
if kill -0 "$pid" 2>/dev/null; then
@ -28,116 +41,204 @@ fi
echo $$ > "$LOCKFILE"
trap 'rm -f "$LOCKFILE"' EXIT
# Pre-flight: fix permissions if another user touched the mirror dir (Rhea)
BAD_PERMS=$(find "$REPO_DIR" ! -user teleo 2>/dev/null | head -1 || true)
if [ -n "$BAD_PERMS" ]; then
log "Fixing mirror permissions (found: $BAD_PERMS)"
chown -R teleo:teleo "$REPO_DIR" 2>/dev/null
fi
cd "$REPO_DIR" || { log "ERROR: cannot cd to $REPO_DIR"; exit 1; }
# Step 1: Fetch from Forgejo (must succeed — it's authoritative)
log "Fetching from Forgejo..."
if ! git fetch forgejo --prune >> "$LOG" 2>&1; then
log "ERROR: Forgejo fetch failed — aborting"
exit 1
fi
# ─────────────────────────────────────────────────────────────────────────────
# sync_repo: process one mirror entry. Sets module-level FORGEJO_REPO,
# GITHUB_REPO, REPO_DIR, MODE, REPO_TAG used by inner steps.
# ─────────────────────────────────────────────────────────────────────────────
sync_repo() {
FORGEJO_REPO="$1" # e.g. teleo/teleo-codex (path on Forgejo)
GITHUB_REPO="$2" # e.g. living-ip/teleo-codex (path on GitHub)
REPO_DIR="$3" # bare mirror dir
MODE="$4" # bidirectional | main_only
REPO_TAG="${FORGEJO_REPO##*/}" # short name for log prefix
# Step 2: Fetch from GitHub (warn on failure, don't abort)
log "Fetching from GitHub..."
git fetch origin --prune >> "$LOG" 2>&1 || log "WARN: GitHub fetch failed"
# Pre-flight: bare repo must exist
if [ ! -d "$REPO_DIR" ]; then
log "ERROR: bare repo missing at $REPO_DIR — skipping"
return 0
fi
# Step 2.1: Fetch GitHub fork PR refs
# Fork-based PRs don't create branches on origin — they create refs/pull/N/head
# Fetch these so we can push them to Forgejo for evaluation
GITHUB_PAT_STEP2=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
if [ -n "$GITHUB_PAT_STEP2" ]; then
OPEN_PRS=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?state=open&per_page=100" \
-H "Authorization: token $GITHUB_PAT_STEP2" 2>/dev/null || echo "[]")
echo "$OPEN_PRS" | python3 -c "
# Pre-flight: fix permissions if another user touched the mirror dir (Rhea)
BAD_PERMS=$(find "$REPO_DIR" ! -user teleo 2>/dev/null | head -1 || true)
if [ -n "$BAD_PERMS" ]; then
log "Fixing mirror permissions (found: $BAD_PERMS)"
chown -R teleo:teleo "$REPO_DIR" 2>/dev/null || true
fi
cd "$REPO_DIR" || { log "ERROR: cannot cd to $REPO_DIR"; return 0; }
# Step 1: Fetch from Forgejo (must succeed — it's authoritative)
log "Fetching from Forgejo..."
if ! git fetch forgejo --prune >> "$LOG" 2>&1; then
log "ERROR: Forgejo fetch failed — skipping this repo"
return 0
fi
# Step 2: Fetch from GitHub (warn on failure, don't abort)
log "Fetching from GitHub..."
git fetch origin --prune >> "$LOG" 2>&1 || log "WARN: GitHub fetch failed"
# Step 2.1: Fetch GitHub fork PR refs (bidirectional only)
# Fork-based PRs don't create branches on origin — they create refs/pull/N/head.
# main_only repos don't accept fork PRs through the mirror path.
if [ "$MODE" = "bidirectional" ]; then
local PAT
PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
if [ -n "$PAT" ]; then
local OPEN_PRS
OPEN_PRS=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?state=open&per_page=100" \
-H "Authorization: token $PAT" 2>/dev/null || echo "[]")
echo "$OPEN_PRS" | python3 -c "
import sys, json
prs = json.load(sys.stdin)
for pr in prs:
head = pr.get('head', {})
# Only process fork PRs (repo differs from base repo)
base_repo = pr.get('base', {}).get('repo', {}).get('full_name', '')
head_repo = head.get('repo', {}) or {}
head_full = head_repo.get('full_name', '')
if head_full and head_full != base_repo:
print(f\"{pr['number']} {head.get('ref', '')} {head.get('sha', '')}\")
" 2>/dev/null | while read pr_num branch_name head_sha; do
if [ -z "$pr_num" ] || [ -z "$branch_name" ]; then continue; fi
PR_BRANCH="gh-pr-${pr_num}/${branch_name}"
# Check if we already have this ref at the right SHA
EXISTING=$(git rev-parse "refs/heads/$PR_BRANCH" 2>/dev/null || true)
if [ "$EXISTING" = "$head_sha" ]; then continue; fi
# Fetch the PR ref and create a local branch
git fetch origin "refs/pull/${pr_num}/head:refs/heads/$PR_BRANCH" >> "$LOG" 2>&1 && \
log "Fetched fork PR #$pr_num -> $PR_BRANCH" || \
log "WARN: Failed to fetch fork PR #$pr_num"
done
fi
# Step 2.5: GitHub main -> Forgejo main (ff-only)
# If a PR was merged on GitHub, GitHub main is ahead of Forgejo main.
# Fast-forward Forgejo main to match — safe because ff-only guarantees no divergence.
GITHUB_MAIN_FF=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FORGEJO_MAIN_FF=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GITHUB_MAIN_FF" ] && [ -n "$FORGEJO_MAIN_FF" ]; then
if [ "$GITHUB_MAIN_FF" != "$FORGEJO_MAIN_FF" ]; then
if git merge-base --is-ancestor "$FORGEJO_MAIN_FF" "$GITHUB_MAIN_FF"; then
log "GitHub main ($GITHUB_MAIN_FF) ahead of Forgejo main ($FORGEJO_MAIN_FF) — fast-forwarding"
git push forgejo "refs/remotes/origin/main:refs/heads/main" >> "$LOG" 2>&1 && \
log "Forgejo main fast-forwarded to $GITHUB_MAIN_FF" || \
log "WARN: Failed to fast-forward Forgejo main"
if [ -z "$pr_num" ] || [ -z "$branch_name" ]; then continue; fi
local PR_BRANCH="gh-pr-${pr_num}/${branch_name}"
local EXISTING
EXISTING=$(git rev-parse "refs/heads/$PR_BRANCH" 2>/dev/null || true)
if [ "$EXISTING" = "$head_sha" ]; then continue; fi
git fetch origin "refs/pull/${pr_num}/head:refs/heads/$PR_BRANCH" >> "$LOG" 2>&1 && \
log "Fetched fork PR #$pr_num -> $PR_BRANCH" || \
log "WARN: Failed to fetch fork PR #$pr_num"
done
fi
fi
fi
# Step 3: Forgejo -> GitHub (primary direction)
# Update local refs from Forgejo remote refs using process substitution (avoids subshell)
log "Syncing Forgejo -> GitHub..."
while read branch; do
[ "$branch" = "HEAD" ] && continue
git update-ref "refs/heads/$branch" "refs/remotes/forgejo/$branch" 2>/dev/null || \
log "WARN: Failed to update ref $branch"
done < <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/)
# Safety: verify Forgejo main descends from GitHub main before force-pushing
GITHUB_MAIN=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FORGEJO_MAIN=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
PUSH_MAIN=true
if [ -n "$GITHUB_MAIN" ] && [ -n "$FORGEJO_MAIN" ]; then
if ! git merge-base --is-ancestor "$GITHUB_MAIN" "$FORGEJO_MAIN"; then
log "CRITICAL: Forgejo main is NOT a descendant of GitHub main — skipping main push"
log "CRITICAL: GitHub main: $GITHUB_MAIN, Forgejo main: $FORGEJO_MAIN"
PUSH_MAIN=false
# Step 2.5: GitHub main -> Forgejo main (ff-only)
# If a PR was merged on GitHub, GitHub main is ahead of Forgejo main.
# Fast-forward Forgejo main to match — safe because ff-only guarantees no divergence.
local GITHUB_MAIN_FF FORGEJO_MAIN_FF
GITHUB_MAIN_FF=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FORGEJO_MAIN_FF=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GITHUB_MAIN_FF" ] && [ -n "$FORGEJO_MAIN_FF" ]; then
if [ "$GITHUB_MAIN_FF" != "$FORGEJO_MAIN_FF" ]; then
if git merge-base --is-ancestor "$FORGEJO_MAIN_FF" "$GITHUB_MAIN_FF"; then
log "GitHub main ($GITHUB_MAIN_FF) ahead of Forgejo main ($FORGEJO_MAIN_FF) — fast-forwarding"
git push forgejo "refs/remotes/origin/main:refs/heads/main" >> "$LOG" 2>&1 && \
log "Forgejo main fast-forwarded to $GITHUB_MAIN_FF" || \
log "WARN: Failed to fast-forward Forgejo main"
fi
fi
fi
fi
if [ "$PUSH_MAIN" = true ]; then
git push origin --all --force >> "$LOG" 2>&1 || log "WARN: Push to GitHub failed"
else
# Push all branches except main
# Step 3: Forgejo -> GitHub (primary direction)
log "Syncing Forgejo -> GitHub..."
while read branch; do
[ "$branch" = "main" ] && continue
[ "$branch" = "HEAD" ] && continue
git push origin --force "refs/heads/$branch:refs/heads/$branch" >> "$LOG" 2>&1 || \
log "WARN: Failed to push $branch to GitHub"
done < <(git for-each-ref --format="%(refname:lstrip=2)" refs/heads/)
fi
git push origin --tags --force >> "$LOG" 2>&1 || log "WARN: Tag push to GitHub failed"
git update-ref "refs/heads/$branch" "refs/remotes/forgejo/$branch" 2>/dev/null || \
log "WARN: Failed to update ref $branch"
done < <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/)
# Step 4: GitHub -> Forgejo (external contributions only)
# Only push branches that exist on GitHub but NOT on Forgejo
log "Checking GitHub-only branches..."
GITHUB_ONLY=$(comm -23 \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/origin/ | grep -v HEAD | sort) \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/ | grep -v HEAD | sort))
# Safety: verify Forgejo main descends from GitHub main before force-pushing
local GITHUB_MAIN FORGEJO_MAIN PUSH_MAIN
GITHUB_MAIN=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FORGEJO_MAIN=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
PUSH_MAIN=true
if [ -n "$GITHUB_MAIN" ] && [ -n "$FORGEJO_MAIN" ]; then
if ! git merge-base --is-ancestor "$GITHUB_MAIN" "$FORGEJO_MAIN"; then
log "CRITICAL: Forgejo main is NOT a descendant of GitHub main — skipping main push"
log "CRITICAL: GitHub main: $GITHUB_MAIN, Forgejo main: $FORGEJO_MAIN"
PUSH_MAIN=false
fi
fi
if [ -n "$GITHUB_ONLY" ]; then
if [ "$MODE" = "main_only" ]; then
# Infra-style mirror: push main + tags ONLY. Pre-review agent branches
# (epimetheus/*, ganymede/*, etc.) carry internal context — agent UUIDs,
# in-flight discussion, WIP — and must not land in the public GitHub
# history. (Ganymede review, finding #1.)
if [ "$PUSH_MAIN" = true ]; then
git push origin --force "refs/heads/main:refs/heads/main" >> "$LOG" 2>&1 || \
log "WARN: main push to GitHub failed"
fi
else
# Bidirectional mirror (codex): push all branches so external
# contributors can fork from any branch, not just main.
if [ "$PUSH_MAIN" = true ]; then
git push origin --all --force >> "$LOG" 2>&1 || log "WARN: Push to GitHub failed"
else
# Push all branches except main when main is divergent
while read branch; do
[ "$branch" = "main" ] && continue
[ "$branch" = "HEAD" ] && continue
git push origin --force "refs/heads/$branch:refs/heads/$branch" >> "$LOG" 2>&1 || \
log "WARN: Failed to push $branch to GitHub"
done < <(git for-each-ref --format="%(refname:lstrip=2)" refs/heads/)
fi
fi
git push origin --tags --force >> "$LOG" 2>&1 || log "WARN: Tag push to GitHub failed"
# Step 4: GitHub -> Forgejo + Forgejo PR auto-create (bidirectional only)
if [ "$MODE" = "bidirectional" ]; then
sync_github_to_forgejo_with_prs
fi
# Step 6: Divergence alerting (applies to both modes)
check_divergence
}
# ─────────────────────────────────────────────────────────────────────────────
# Step 4 split out: codex-specific GitHub→Forgejo branch push + PR auto-create.
# Reads FORGEJO_REPO, GITHUB_REPO, PIPELINE_DB, REPO_TAG from sync_repo scope.
# ─────────────────────────────────────────────────────────────────────────────
sync_github_to_forgejo_with_prs() {
log "Checking GitHub-only branches..."
local FORGEJO_HOST="http://localhost:3000/api/v1/repos/$FORGEJO_REPO"
local GITHUB_ONLY
GITHUB_ONLY=$(comm -23 \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/origin/ | grep -v HEAD | sort) \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/ | grep -v HEAD | sort))
if [ -z "$GITHUB_ONLY" ]; then
log "No new GitHub-only branches"
return 0
fi
local FORGEJO_TOKEN
FORGEJO_TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-admin-token 2>/dev/null)
# Lazy schema for sync-mirror's auto-create tracker. Records (branch, sha)
# pairs we've already auto-created PRs for, so the loop below can skip
# redundant creates after pipeline merge → _delete_remote_branch →
# GitHub-only re-discovery → re-push. Cheap CREATE IF NOT EXISTS on each
# cycle; no migration needed because this table is private to sync-mirror.
sqlite3 "$PIPELINE_DB" "CREATE TABLE IF NOT EXISTS sync_autocreate_tracker (branch TEXT NOT NULL, sha TEXT NOT NULL, pr_number INTEGER, created_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (branch, sha));" 2>/dev/null || true
for branch in $GITHUB_ONLY; do
# Already-tracked gate: if we've previously auto-created a PR for
# this exact (branch, sha), skip the entire push+create sequence.
# Closes the empty-PR loop (research and reweave both observed):
# pipeline merges PR → _delete_remote_branch on Forgejo → next sync
# sees branch GitHub-only (origin still has it) → re-pushes to
# Forgejo → HAS_PR misses (Forgejo ?head= broken; closed PRs scroll
# past 50-item paginated window) → auto-creates fresh PR → pipeline
# merges (empty no-op via cherry-pick / reweave union) → repeat.
# Tracker keys on SHA, so legitimate new commits on the same branch
# produce a new SHA → tracker miss → auto-create proceeds normally.
local BRANCH_SHA TRACKED_PR
if [[ "$branch" == gh-pr-* ]]; then
BRANCH_SHA=$(git rev-parse "refs/heads/$branch" 2>/dev/null || true)
else
BRANCH_SHA=$(git rev-parse "refs/remotes/origin/$branch" 2>/dev/null || true)
fi
if [ -n "$BRANCH_SHA" ]; then
# stderr → $LOG so sustained sqlite3 contention surfaces in ops logs
# rather than silently falling through to a redundant auto-create.
TRACKED_PR=$(sqlite3 "$PIPELINE_DB" "SELECT pr_number FROM sync_autocreate_tracker WHERE branch=$(printf "'%s'" "${branch//\'/\'\'}") AND sha=$(printf "'%s'" "$BRANCH_SHA") LIMIT 1;" 2>>"$LOG" || echo "")
if [ -n "$TRACKED_PR" ]; then
log "Skip auto-create: $branch SHA $BRANCH_SHA already tracked (PR #$TRACKED_PR)"
continue
fi
fi
log "New from GitHub: $branch -> Forgejo"
# Fork PR branches live as local refs (from Step 2.1), not on origin remote
if [[ "$branch" == gh-pr-* ]]; then
@ -151,22 +252,23 @@ if [ -n "$GITHUB_ONLY" ]; then
continue
}
fi
# Auto-create PR on Forgejo for mirrored branches (external contributor path)
# Skip pipeline-internal branches
# Skip pipeline-internal branch prefixes (no PR creation)
case "$branch" in
extract/*|ingestion/*) continue ;;
esac
if [ -n "$FORGEJO_TOKEN" ]; then
# Check if PR already exists for this branch (open or closed)
# NOTE: Forgejo ?head= filter is broken (ignores head value, returns all PRs).
# Workaround: fetch open+closed PRs, pipe to Python, check head.ref.
HAS_PR=$( {
curl -sf "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=open&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
echo ""
curl -sf "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=closed&sort=created&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
} | python3 -c "
if [ -z "$FORGEJO_TOKEN" ]; then continue; fi
# Check if PR already exists for this branch (open or closed)
# NOTE: Forgejo ?head= filter is broken (ignores head value, returns all PRs).
# Workaround: fetch open+closed PRs, pipe to Python, check head.ref.
local HAS_PR
HAS_PR=$( {
curl -sf "$FORGEJO_HOST/pulls?state=open&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
echo ""
curl -sf "$FORGEJO_HOST/pulls?state=closed&sort=created&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
} | python3 -c "
import sys, json
branch = sys.argv[1]
for line in sys.stdin:
@ -179,104 +281,171 @@ for line in sys.stdin:
except: pass
print('no')
" "$branch" 2>/dev/null || echo "no")
if [ "$HAS_PR" = "no" ]; then
# Build PR title — for fork PRs, use the GitHub PR title
if [[ "$branch" == gh-pr-* ]]; then
FORK_GH_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
GITHUB_PAT_T=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
PR_TITLE=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls/$FORK_GH_NUM" \
-H "Authorization: token $GITHUB_PAT_T" 2>/dev/null | \
python3 -c "import sys,json; print(json.load(sys.stdin).get('title',''))" 2>/dev/null || true)
[ -z "$PR_TITLE" ] && PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
else
PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
fi
PAYLOAD=$(python3 -c "import sys,json; print(json.dumps({'title':sys.argv[1],'head':sys.argv[2],'base':'main'}))" "$PR_TITLE" "$branch")
RESULT=$(curl -sf -X POST "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls" \
-H "Authorization: token $FORGEJO_TOKEN" \
-H "Content-Type: application/json" \
-d "$PAYLOAD" 2>/dev/null || echo "")
PR_NUM=$(echo "$RESULT" | grep -o '"number":[0-9]*' | head -1 | grep -o "[0-9]*" || true)
if [ -n "$PR_NUM" ]; then
log "Auto-created PR #$PR_NUM on Forgejo for $branch"
# Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB
if [[ "$branch" == gh-pr-* ]]; then
GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
else
GITHUB_PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
GH_PR_NUM=""
if [ -n "$GITHUB_PAT" ]; then
GH_PR_NUM=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?head=living-ip:$branch&state=all" \
-H "Authorization: token $GITHUB_PAT" 2>/dev/null | \
python3 -c "import sys,json; prs=json.load(sys.stdin); print(prs[0]['number'] if prs else '')" 2>/dev/null || true)
fi
fi
if [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM WHERE number = $PR_NUM;" 2>/dev/null && \
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM" || \
log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB"
fi
else
log "WARN: Failed to auto-create PR for $branch"
fi
if [ "$HAS_PR" = "yes" ]; then continue; fi
# Build PR title — for fork PRs, use the GitHub PR title
local PR_TITLE PAYLOAD RESULT PR_NUM GH_PR_NUM
if [[ "$branch" == gh-pr-* ]]; then
local FORK_GH_NUM PAT_T
FORK_GH_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
PAT_T=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
PR_TITLE=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls/$FORK_GH_NUM" \
-H "Authorization: token $PAT_T" 2>/dev/null | \
python3 -c "import sys,json; print(json.load(sys.stdin).get('title',''))" 2>/dev/null || true)
[ -z "$PR_TITLE" ] && PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
else
PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
fi
PAYLOAD=$(python3 -c "import sys,json; print(json.dumps({'title':sys.argv[1],'head':sys.argv[2],'base':'main'}))" "$PR_TITLE" "$branch")
RESULT=$(curl -sf -X POST "$FORGEJO_HOST/pulls" \
-H "Authorization: token $FORGEJO_TOKEN" \
-H "Content-Type: application/json" \
-d "$PAYLOAD" 2>/dev/null || echo "")
PR_NUM=$(echo "$RESULT" | grep -o '"number":[0-9]*' | head -1 | grep -o "[0-9]*" || true)
if [ -z "$PR_NUM" ]; then
log "WARN: Failed to auto-create PR for $branch"
continue
fi
log "Auto-created PR #$PR_NUM on Forgejo for $branch"
# Record (branch, sha, pr_number) so the tracker gate above can short-
# circuit the next time we see this exact (branch, sha) combination.
# INSERT OR IGNORE: idempotent if a concurrent run already inserted.
# WARN log on failure: silent INSERT failure under sustained sqlite3
# contention would mask the loop reappearing on the next cycle (HAS_PR
# only saves us while the closed PR is in the 50-item pagination window).
if [ -n "$BRANCH_SHA" ] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
if ! sqlite3 "$PIPELINE_DB" "INSERT OR IGNORE INTO sync_autocreate_tracker (branch, sha, pr_number) VALUES ($(printf "'%s'" "${branch//\'/\'\'}"), $(printf "'%s'" "$BRANCH_SHA"), $PR_NUM);" 2>>"$LOG"; then
log "WARN: tracker insert failed for $branch SHA $BRANCH_SHA (PR #$PR_NUM) — duplicate auto-create possible next cycle"
fi
fi
# Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB
if [[ "$branch" == gh-pr-* ]]; then
GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
else
local PAT
PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
GH_PR_NUM=""
if [ -n "$PAT" ]; then
GH_PR_NUM=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?head=living-ip:$branch&state=all" \
-H "Authorization: token $PAT" 2>/dev/null | \
python3 -c "import sys,json; prs=json.load(sys.stdin); print(prs[0]['number'] if prs else '')" 2>/dev/null || true)
fi
fi
if [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM, source_channel = 'github' WHERE number = $PR_NUM;" 2>/dev/null && \
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM" || \
log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB"
fi
done
else
log "No new GitHub-only branches"
fi
}
# Step 6: Divergence alerting
# After all sync steps, check if GitHub and Forgejo main still differ.
# 2 consecutive divergent cycles (4 min) triggers a one-shot Telegram alert.
DIVERGENCE_FILE="/opt/teleo-eval/logs/.divergence-count"
git fetch forgejo main --quiet 2>/dev/null || true
git fetch origin main --quiet 2>/dev/null || true
GH_MAIN_FINAL=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FG_MAIN_FINAL=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GH_MAIN_FINAL" ] && [ -n "$FG_MAIN_FINAL" ] && [ "$GH_MAIN_FINAL" != "$FG_MAIN_FINAL" ]; then
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" = "alerted" ]; then
log "DIVERGENCE: still diverged (already alerted)"
else
COUNT=$((PREV + 1))
echo "$COUNT" > "$DIVERGENCE_FILE"
log "DIVERGENCE: cycle $COUNT — GitHub=$GH_MAIN_FINAL Forgejo=$FG_MAIN_FINAL"
if [ "$COUNT" -ge 2 ]; then
BOT_TOKEN=$(cat /opt/teleo-eval/secrets/telegram-bot-token 2>/dev/null || true)
ADMIN_CHAT=$(cat /opt/teleo-eval/secrets/admin-chat-id 2>/dev/null || true)
if [ -n "$BOT_TOKEN" ] && [ -n "$ADMIN_CHAT" ]; then
ALERT_MSG=$(python3 -c "
# ─────────────────────────────────────────────────────────────────────────────
# Step 6 split out: divergence alerting. Per-repo state file so each repo
# has its own divergence counter and alert state.
# ─────────────────────────────────────────────────────────────────────────────
check_divergence() {
local DIVERGENCE_FILE="/opt/teleo-eval/logs/.divergence-count.${REPO_TAG}"
git fetch forgejo main --quiet 2>/dev/null || true
git fetch origin main --quiet 2>/dev/null || true
local GH_MAIN_FINAL FG_MAIN_FINAL
GH_MAIN_FINAL=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FG_MAIN_FINAL=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GH_MAIN_FINAL" ] && [ -n "$FG_MAIN_FINAL" ] && [ "$GH_MAIN_FINAL" != "$FG_MAIN_FINAL" ]; then
local PREV
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" = "alerted" ]; then
log "DIVERGENCE: still diverged (already alerted)"
else
local COUNT=$((PREV + 1))
echo "$COUNT" > "$DIVERGENCE_FILE"
log "DIVERGENCE: cycle $COUNT — GitHub=$GH_MAIN_FINAL Forgejo=$FG_MAIN_FINAL"
if [ "$COUNT" -ge 2 ]; then
local BOT_TOKEN ADMIN_CHAT
BOT_TOKEN=$(cat /opt/teleo-eval/secrets/telegram-bot-token 2>/dev/null || true)
ADMIN_CHAT=$(cat /opt/teleo-eval/secrets/admin-chat-id 2>/dev/null || true)
if [ -n "$BOT_TOKEN" ] && [ -n "$ADMIN_CHAT" ]; then
local ALERT_MSG
ALERT_MSG=$(python3 -c "
import json, sys
msg = '⚠️ Mirror divergence detected\\n\\n'
msg = '⚠️ Mirror divergence detected (' + sys.argv[5] + ')\\n\\n'
msg += f'GitHub main: {sys.argv[1][:8]}\\n'
msg += f'Forgejo main: {sys.argv[2][:8]}\\n'
msg += f'Diverged for {sys.argv[3]} consecutive cycles ({int(sys.argv[3])*2} min)\\n\\n'
msg += 'Check sync-mirror.sh logs: /opt/teleo-eval/logs/sync.log'
print(json.dumps({'chat_id': sys.argv[4], 'text': msg, 'parse_mode': 'HTML'}))
" "$GH_MAIN_FINAL" "$FG_MAIN_FINAL" "$COUNT" "$ADMIN_CHAT")
if curl -sf -X POST "https://api.telegram.org/bot${BOT_TOKEN}/sendMessage" \
-H "Content-Type: application/json" \
-d "$ALERT_MSG" >> "$LOG" 2>&1; then
log "DIVERGENCE: alert sent to admin"
echo "alerted" > "$DIVERGENCE_FILE"
" "$GH_MAIN_FINAL" "$FG_MAIN_FINAL" "$COUNT" "$ADMIN_CHAT" "$REPO_TAG")
if curl -sf -X POST "https://api.telegram.org/bot${BOT_TOKEN}/sendMessage" \
-H "Content-Type: application/json" \
-d "$ALERT_MSG" >> "$LOG" 2>&1; then
log "DIVERGENCE: alert sent to admin"
echo "alerted" > "$DIVERGENCE_FILE"
else
log "WARN: Failed to send divergence alert (will retry next cycle)"
fi
else
log "WARN: Failed to send divergence alert (will retry next cycle)"
log "WARN: Cannot send divergence alert — missing bot token or admin chat ID"
fi
else
log "WARN: Cannot send divergence alert — missing bot token or admin chat ID"
fi
fi
fi
else
if [ -f "$DIVERGENCE_FILE" ]; then
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" != "0" ]; then
log "DIVERGENCE: resolved — repos back in sync"
else
if [ -f "$DIVERGENCE_FILE" ]; then
local PREV
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" != "0" ]; then
log "DIVERGENCE: resolved — repos back in sync"
fi
rm -f "$DIVERGENCE_FILE"
fi
rm -f "$DIVERGENCE_FILE"
fi
}
# ─────────────────────────────────────────────────────────────────────────────
# Main: process each configured mirror in sequence.
# A failure on one repo doesn't block subsequent repos — sync_repo returns 0
# on most error paths to keep the loop going.
# ─────────────────────────────────────────────────────────────────────────────
REPO_TAG="main"
log "Starting sync cycle"
# Step 0: self-heal any gh-pr-* PR rows missing github_pr.
# Runs FIRST — before per-repo work (branch-mirror loop, auto-create-PR block).
# Recovers from races/transient failures in Step 4.5's one-shot link UPDATE.
# Idempotent: SELECT empty when clean, zero-cost path. Same SELECT/UPDATE
# heals historical orphans (PR 4066 picked up on first cron tick post-deploy)
# and future races on subsequent ticks. The branch name encodes the GitHub PR
# number deterministically (gh-pr-{N}/...) so no API call is required.
if [ -f "$PIPELINE_DB" ]; then
sqlite3 -separator '|' "$PIPELINE_DB" \
"SELECT number, branch FROM prs WHERE branch LIKE 'gh-pr-%' AND github_pr IS NULL;" \
2>/dev/null | while IFS='|' read -r pr_num branch; do
# Regex requires >=1 digit — empty/non-numeric branches fail to parse here,
# not just at the empty-guard below. Keeps SQL-integer-safety load-bearing
# on the regex alone. [0-9][0-9]* is the portable BRE form of [0-9]+,
# works on both GNU sed (VPS) and BSD sed (dev macs).
gh_pr_num=$(echo "$branch" | sed -n 's|^gh-pr-\([0-9][0-9]*\)/.*|\1|p')
[ -z "$gh_pr_num" ] && continue
# Both interpolated values are integer-validated upstream (pr_num from
# INTEGER `number` column, gh_pr_num from regex above). No parametric
# binding available in bash sqlite3 — safety relies on those invariants.
if sqlite3 "$PIPELINE_DB" \
"UPDATE prs SET github_pr = $gh_pr_num, source_channel = 'github' WHERE number = $pr_num;" \
2>/dev/null; then
log "self-heal: linked Forgejo PR #$pr_num -> GitHub PR #$gh_pr_num"
fi
done
fi
log "Sync complete"
for entry in "${MIRROR_REPOS[@]}"; do
# Read the 4 fields. `read` splits on $IFS (whitespace) by default.
read -r forgejo_repo github_repo bare_path mode <<< "$entry"
sync_repo "$forgejo_repo" "$github_repo" "$bare_path" "$mode"
done
REPO_TAG="main"
log "Sync cycle complete"

View file

@ -9,6 +9,16 @@ DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
_cache = {"data": None, "ts": 0}
CACHE_TTL = 60 # 1 minute — activity should feel fresh
# commit_types we surface in the activity feed. `pipeline` is system
# maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden.
_FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave")
# Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're
# inbox archive filenames, not claim slugs. Used as a fallback signal when
# branch/description heuristics miss (e.g. populated descriptions that
# happen to be source titles, not claim insights).
_SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$")
def _get_conn():
conn = sqlite3.connect(DB_PATH)
@ -17,28 +27,146 @@ def _get_conn():
return conn
def _classify_event(branch, description, commit_type):
if commit_type != "knowledge":
def _is_source_slug(slug):
return bool(slug and _SOURCE_SLUG_PATTERN.match(slug))
def _classify_event(branch, description, commit_type, candidate_slug=None):
"""Return one of: create | enrich | challenge | source | session_digest | None.
Source-archive PRs are extract/* branches that filed a source into
inbox/archive/ but didn't produce a claim. Session-digest PRs are
agent research/entity commits with no per-claim description they
represent session-level rollups, not specific knowledge artifacts.
"""
commit_type_l = (commit_type or "").lower()
branch = branch or ""
description_lower = (description or "").lower()
has_desc = bool(description and description.strip())
if commit_type_l not in _FEED_COMMIT_TYPES:
return None
if branch and branch.startswith("extract/"):
return "create"
if branch and branch.startswith("reweave/"):
return "enrich"
if branch and branch.startswith("challenge/"):
# Explicit challenge signals win first.
if (commit_type_l == "challenge"
or branch.startswith("challenge/")
or "challenged_by" in description_lower):
return "challenge"
if description and "challenged_by" in description.lower():
return "challenge"
if branch and branch.startswith("enrich/"):
# Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich.
if (commit_type_l == "enrich"
or branch.startswith("enrich/")
or branch.startswith("reweave/")):
return "enrich"
# Research and entity commits with no description are session-level
# rollups (e.g. astra/research-2026-05-11). They have no claim to
# link to — surface as session_digest, not as a phantom create.
if commit_type_l in ("research", "entity") and not has_desc:
return "session_digest"
# Source-only: extract/* with no claim description means inbox archive
# landed but no domain claim was written.
if branch.startswith("extract/") and not has_desc:
return "source"
# Belt-and-suspenders: if the slug we'd surface to the frontend looks
# like an inbox archive filename (date-prefix-hash), treat as source
# regardless of branch/commit_type/description state. Catches cases
# where description leaked but is just a source title, not a claim.
if _is_source_slug(candidate_slug):
return "source"
# Everything else with a description is a new claim.
return "create"
# Internal classifier value -> canonical `kind` enum returned to frontend.
_KIND_MAP = {
"create": "claim_merged",
"enrich": "claim_enriched",
"challenge": "claim_challenged",
"source": "source_archived",
"session_digest": "session_digest",
}
def _archive_slug_from_branch(branch):
"""For extract/YYYY-MM-DD-...-HASH4, return YYYY-MM-DD-... (keep date,
drop the 4-hex hash suffix). Matches inbox/archive filename convention.
"""
if not branch or "/" not in branch:
return ""
slug = branch.split("/", 1)[1]
return re.sub(r"-[a-f0-9]{4}$", "", slug)
def _source_target_url(domain, archive_slug):
"""Forgejo blob URL for an archived source file. Falls back to the
repo-wide inbox/archive directory when domain is unknown so the link
still resolves to something useful instead of a 404.
"""
if not archive_slug:
return None
domain = (domain or "").strip()
if not domain or domain == "unknown":
return "https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive"
return (
"https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive/"
f"{domain}/{archive_slug}.md"
)
def _claim_target_url(claim_slug):
if not claim_slug:
return None
return f"/claims/{claim_slug}"
# Canonical clickthrough URL for an activity-feed event.
#
# Every merged PR in the pipeline.db `prs` table lives on Forgejo at
# git.livingip.xyz/teleo/teleo-codex/pulls/{number}. A small subset (3 of
# 4094 as of 2026-05-13) was additionally mirrored to GitHub and has
# prs.github_pr populated. Prefer GitHub when available (more public-facing
# surface), fall back to Forgejo so every row has a real destination
# instead of None (which makes the frontend whole-row overlay no-op and
# leaves pipeline-attributed events looking dead-on-click).
def _pr_url(pr_number, github_pr):
if github_pr:
return f"https://github.com/living-ip/teleo-codex/pull/{github_pr}"
if pr_number:
return f"https://git.livingip.xyz/teleo/teleo-codex/pulls/{pr_number}"
return None
# Canonicalize contributor labels so frontend links resolve to real
# /contributors/{handle} pages. Pipeline writers (extract.py, manual edits,
# the old backfill_submitted_by.py) historically wrote mixed-case agent
# names with a trailing decorator into prs.submitted_by — e.g.
# "Vida (self-directed)", "pipeline (reweave)", or "@m3taversal".
# These decorated strings do not exist as contributors and 404 the profile
# page. Strip the trailing parenthetical wholesale: valid handles match
# ^[a-z0-9][a-z0-9_-]{0,38}$ (see pipeline/lib/attribution._HANDLE_RE) and
# cannot contain parens, so this is lossless.
_TRAILING_PAREN_RE = re.compile(r"\s*\([^)]*\)\s*$")
def _canonicalize(raw):
if not raw:
return ""
h = raw.strip().lower().lstrip("@")
h = _TRAILING_PAREN_RE.sub("", h).strip()
return h
def _normalize_contributor(submitted_by, agent):
if submitted_by and submitted_by.strip():
name = submitted_by.strip().lstrip("@")
name = _canonicalize(submitted_by)
if name:
return name
name = _canonicalize(agent)
if name and name != "pipeline":
return name
if agent and agent.strip() and agent != "pipeline":
return agent.strip()
return "pipeline"
@ -59,7 +187,7 @@ def _extract_claim_slugs(description, branch=None):
if branch:
parts = branch.split("/", 1)
if len(parts) > 1:
return [parts[1][:120]]
return [parts[1]]
return []
titles = [t.strip() for t in description.split("|") if t.strip()]
slugs = []
@ -68,7 +196,7 @@ def _extract_claim_slugs(description, branch=None):
slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug)
slug = slug.replace(" ", "-").strip("-")
if len(slug) > 10:
slugs.append(slug[:120])
slugs.append(slug)
return slugs
@ -81,33 +209,98 @@ def _hot_score(challenge_count, enrich_count, signal_count, hours_since):
def _build_events():
conn = _get_conn()
try:
rows = conn.execute("""
placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES))
rows = conn.execute(f"""
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
p.merged_at, p.description, p.commit_type, p.cost_usd,
p.source_channel
p.source_channel, p.source_path, p.github_pr
FROM prs p
WHERE p.status = 'merged'
AND p.commit_type = 'knowledge'
AND p.commit_type IN ({placeholders})
AND p.merged_at IS NOT NULL
ORDER BY p.merged_at DESC
LIMIT 2000
""").fetchall()
""", _FEED_COMMIT_TYPES).fetchall()
events = []
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
for row in rows:
event_type = _classify_event(row["branch"], row["description"], row["commit_type"])
slugs = _extract_claim_slugs(row["description"], row["branch"])
candidate_slug = slugs[0] if slugs else ""
event_type = _classify_event(
row["branch"], row["description"], row["commit_type"],
candidate_slug=candidate_slug,
)
if not event_type:
continue
contributor = _normalize_contributor(row["submitted_by"], row["agent"])
slugs = _extract_claim_slugs(row["description"], row["branch"])
# Hide pipeline-attributed events (reweave/*, ingestion/*) from the
# public activity feed. They're automation maintenance, not
# contributions — the daemon re-knits the graph nightly and ingests
# external sources. Internal diagnostics + CI math still see these
# rows in prs / contribution_events; only the public timeline drops
# them. Mirrors the existing _FEED_COMMIT_TYPES filter (which hides
# commit_type='pipeline') along the contributor axis.
if contributor == "pipeline":
continue
merged_at = row["merged_at"] or ""
domain = row["domain"] or "unknown"
kind = _KIND_MAP.get(event_type, event_type)
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40}
ci_map = {
"create": 0.35, "enrich": 0.25, "challenge": 0.40,
"source": 0.15, "session_digest": 0.05,
}
ci_earned = ci_map.get(event_type, 0)
# Source events never carry a claim_slug — no claim was written.
# target_url points at the archived file on Forgejo instead.
if event_type == "source":
archive_slug = _archive_slug_from_branch(row["branch"])
summary_text = _summary_from_branch(row["branch"])
source_display_slug = (
summary_text.lower().replace(" ", "-") or row["branch"]
)
events.append({
"kind": kind,
"type": "source",
"target_url": _source_target_url(domain, archive_slug),
"claim_slug": "",
"source_slug": source_display_slug,
"domain": domain,
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"pr_url": _pr_url(row["number"], row["github_pr"]),
"source_channel": row["source_channel"] or "unknown",
})
continue
# Session digests have no clickthrough surface yet (per-agent
# session pages not built). target_url=null so frontend renders
# plain text instead of a broken /claims/research-... link.
if event_type == "session_digest":
summary_text = _summary_from_branch(row["branch"]) or "Research session"
events.append({
"kind": kind,
"type": "session_digest",
"target_url": None,
"claim_slug": "",
"domain": domain,
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"pr_url": _pr_url(row["number"], row["github_pr"]),
"source_channel": row["source_channel"] or "unknown",
})
continue
for slug in slugs:
if slug not in claim_activity:
claim_activity[slug] = {
@ -132,14 +325,17 @@ def _build_events():
for slug in (slugs[:1] if slugs else [""]):
events.append({
"kind": kind,
"type": event_type,
"target_url": _claim_target_url(slug),
"claim_slug": slug,
"domain": row["domain"] or "unknown",
"domain": domain,
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"pr_url": _pr_url(row["number"], row["github_pr"]),
"source_channel": row["source_channel"] or "unknown",
})
@ -164,8 +360,11 @@ def _sort_events(events, claim_activity, sort_mode, now_ts):
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
events.sort(key=hot_key, reverse=True)
elif sort_mode == "important":
type_rank = {"challenge": 0, "enrich": 1, "create": 2}
events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"])))
type_rank = {
"challenge": 0, "enrich": 1, "create": 2,
"source": 3, "session_digest": 4,
}
events.sort(key=lambda e: (type_rank.get(e["type"], 5), -len(e["summary"])))
return events
@ -175,6 +374,8 @@ async def handle_activity_feed(request):
sort_mode = "recent"
domain = request.query.get("domain", "")
contributor = request.query.get("contributor", "")
type_param = request.query.get("type", "")
type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None
try:
limit = min(int(request.query.get("limit", "20")), 100)
except ValueError:
@ -196,6 +397,14 @@ async def handle_activity_feed(request):
filtered = [e for e in filtered if e["domain"] == domain]
if contributor:
filtered = [e for e in filtered if e["contributor"] == contributor]
if type_filter:
# Accept both legacy `type` values (create/enrich/challenge/source/
# session_digest) and canonical `kind` values (claim_merged/etc.) so
# callers can migrate at their own pace.
filtered = [
e for e in filtered
if e["type"] in type_filter or e.get("kind") in type_filter
]
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
total = len(sorted_events)

View file

@ -25,6 +25,7 @@ from aiohttp import web
from review_queue_routes import register_review_queue_routes
from daily_digest_routes import register_daily_digest_routes
from response_audit_routes import register_response_audit_routes, RESPONSE_AUDIT_PUBLIC_PATHS
from leaderboard_routes import register_leaderboard_routes, LEADERBOARD_PUBLIC_PATHS
from lib.search import search as kb_search, embed_query, search_qdrant
logger = logging.getLogger("argus")
@ -508,7 +509,7 @@ def _load_secret(path: Path) -> str | None:
@web.middleware
async def auth_middleware(request, handler):
"""API key check. Public paths skip auth. Protected paths require X-Api-Key header."""
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path in LEADERBOARD_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
return await handler(request)
expected = request.app.get("api_key")
if not expected:
@ -2361,6 +2362,8 @@ def create_app() -> web.Application:
# Response audit - cost tracking + reasoning traces
app["db_path"] = str(DB_PATH)
register_response_audit_routes(app)
# Event-sourced leaderboard (Phase B — reads contribution_events directly)
register_leaderboard_routes(app)
# Timeline activity feed (per-PR + audit_log events for dashboard v2)
from activity_endpoint import handle_activity
app.router.add_get("/api/activity", handle_activity)

View file

@ -79,12 +79,16 @@ def main():
fm = sfm
break
# `submitted_by` is stored as a canonical handle (lowercase, no @, no
# "(self-directed)" / "(reweave)" suffix). Read consumers normalize via
# attribution.normalize_handle, so writing decorated strings produces
# downstream 404s on /contributors/{handle} (livingip-web timeline).
if fm:
proposed_by = fm.get("proposed_by")
intake_tier = fm.get("intake_tier")
if proposed_by:
contributor = proposed_by.strip().strip('"').strip("'")
contributor = proposed_by.strip().strip('"').strip("'").lower().lstrip("@")
elif intake_tier == "research-task":
# Derive agent from branch prefix
prefix = branch.split("/", 1)[0] if "/" in branch else "unknown"
@ -94,13 +98,12 @@ def main():
"clay": "clay", "astra": "astra", "leo": "leo",
"reweave": "pipeline",
}
agent = agent_map.get(prefix, prefix)
contributor = f"{agent} (self-directed)"
contributor = agent_map.get(prefix, prefix)
elif intake_tier == "directed":
contributor = "@m3taversal"
contributor = "m3taversal"
else:
# Default: if source exists but no proposed_by, it was Cory's submission
contributor = "@m3taversal"
# Default: if source exists but no proposed_by, operator submitted it.
contributor = "m3taversal"
if contributor:
conn.execute(
@ -114,19 +117,19 @@ def main():
agent = branch.split("/", 1)[0]
conn.execute(
"UPDATE prs SET submitted_by = ? WHERE number = ?",
(f"{agent} (self-directed)", pr["number"]),
(agent, pr["number"]),
)
updated += 1
elif branch.startswith("reweave/"):
conn.execute(
"UPDATE prs SET submitted_by = 'pipeline (reweave)' WHERE number = ?",
"UPDATE prs SET submitted_by = 'pipeline' WHERE number = ?",
(pr["number"],),
)
updated += 1
else:
# Everything else (extract/, ingestion/, unknown) → Cory directed it
# Everything else (extract/, ingestion/, unknown) → operator directed it
conn.execute(
"UPDATE prs SET submitted_by = '@m3taversal' WHERE number = ?",
"UPDATE prs SET submitted_by = 'm3taversal' WHERE number = ?",
(pr["number"],),
)
updated += 1

View file

@ -1,79 +1,399 @@
"""Claims API endpoint — serves claim data from the codex filesystem."""
import os
"""Claims API — list endpoint + canonical claim detail page.
Owner: Argus
Routes:
GET /api/claims list/filter (frontmatter scan, lightweight)
GET /api/claims/{slug} full claim detail (Ship contract)
GET /api/domains domain rollups for sidebar
The detail endpoint is the canonical /claims/{slug} backend per Ship's
2026-04-29 brief. One round-trip, no N+1 cascade. Wikilinks resolved
server-side via titleslug index built from a tree walk.
"""
import json
import re
import sqlite3
import time
import yaml
from pathlib import Path
import yaml
from aiohttp import web
CODEX_ROOT = Path("/opt/teleo-eval/workspaces/main/domains")
_cache = {"data": None, "ts": 0}
CACHE_TTL = 300 # 5 minutes
# Codex tree roots — claims live in three places (Sourcer Apr 26 fix scope)
CODEX_BASE = Path("/opt/teleo-eval/workspaces/main")
CLAIM_TREES = [CODEX_BASE / "domains", CODEX_BASE / "foundations", CODEX_BASE / "core"]
def _parse_frontmatter(filepath):
# pipeline.db for joins (review_records, prs, sources)
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
# In-process caches
_list_cache = {"data": None, "ts": 0}
_LIST_CACHE_TTL = 300 # 5 min — list view tolerates staleness
_index_cache = {"by_title": None, "by_stem": None, "ts": 0}
_INDEX_CACHE_TTL = 60 # 1 min — title→slug index for wikilink resolution
CORS_HEADERS = {"Access-Control-Allow-Origin": "*"}
# Wikilink pattern. [[text]] or [[text|alias]] — we keep the link text only.
_WIKILINK_RE = re.compile(r"\[\[([^\]|#]+?)(?:[#|][^\]]*)?\]\]")
# ─── Normalization ─────────────────────────────────────────────────────────
def _normalize_for_match(s):
"""Collapse a title or slug to a comparable form.
Rules (from Ship's brief — match the link-fixer canonicalization):
- lowercase
- hyphen space tolerant (both single space)
- collapse runs of whitespace
- strip leading/trailing whitespace
- drop trailing punctuation that gets stripped from filenames
(`.`, `?`, `!`, `:`, `--`)
NOTE: lib/attribution.py exposes only normalize_handle today, not the
title normalizer Ship referenced. Implementing inline; if a canonical
helper lands later we point at it.
"""
if not s:
return ""
s = str(s).lower().strip()
# Treat hyphens as spaces, then collapse whitespace runs
s = s.replace("-", " ").replace("_", " ")
s = re.sub(r"\s+", " ", s)
# Strip ASCII punctuation that filenames drop
s = re.sub(r"[^\w\s]", "", s)
return s.strip()
# ─── Frontmatter parse ─────────────────────────────────────────────────────
_CODE_FENCE_WRAPPER_RE = re.compile(r"^\s*```(?:markdown|md)?\s*\n(.*?)\n```\s*$", re.DOTALL)
def _split_frontmatter(text):
"""Return (frontmatter_dict, body_str) or (None, None) if not a claim file.
Tolerates files wrapped in a top-level ```markdown ... ``` code fence
some agents have produced these (e.g. Montreal Protocol claim from Astra,
2024-12-09). Unwrap once before frontmatter detection.
"""
if not text:
return None, None
m = _CODE_FENCE_WRAPPER_RE.match(text)
if m:
text = m.group(1)
text = text.lstrip()
if not text.startswith("---"):
return None, None
try:
end = text.index("\n---", 3)
except ValueError:
return None, None
try:
fm = yaml.safe_load(text[3:end])
except Exception:
return None, None
if not isinstance(fm, dict):
return None, None
body = text[end + 4:].lstrip()
return fm, body
def _read_claim_file(filepath):
"""Read a claim file from disk. Returns (frontmatter, body) or (None, None)."""
try:
text = filepath.read_text(encoding="utf-8")
if not text.startswith("---"):
return None
end = text.index("---", 3)
fm = yaml.safe_load(text[3:end])
if not fm or fm.get("type") != "claim":
return None
body = text[end+3:].strip()
# Count wiki-links
links = re.findall(r"\[\[([^\]]+)\]\]", body)
# Extract first paragraph as summary
paragraphs = [p.strip() for p in body.split("\n\n") if p.strip() and not p.strip().startswith("#")]
summary = paragraphs[0][:300] if paragraphs else ""
return {
"slug": filepath.stem,
"title": fm.get("title", filepath.stem.replace("-", " ")),
"domain": fm.get("domain", "unknown"),
"confidence": fm.get("confidence", "unknown"),
"agent": fm.get("agent"),
"scope": fm.get("scope"),
"created": str(fm.get("created", "")),
"source": fm.get("source", "") if isinstance(fm.get("source"), str) else "",
"sourcer": fm.get("sourcer", ""),
"wiki_link_count": len(links),
"summary": summary,
"challenged_by": fm.get("challenged_by"),
"related_claims": fm.get("related_claims", []),
}
except Exception:
return None
except (OSError, UnicodeDecodeError):
return None, None
return _split_frontmatter(text)
def _load_all_claims():
now = time.time()
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
return _cache["data"]
# ─── Tree walk + indexing ──────────────────────────────────────────────────
claims = []
for domain_dir in sorted(CODEX_ROOT.iterdir()):
if not domain_dir.is_dir():
def _walk_claim_files():
"""Yield Path objects for every .md claim file in domains/, foundations/, core/."""
for root in CLAIM_TREES:
if not root.exists():
continue
for f in sorted(domain_dir.glob("*.md")):
for f in root.rglob("*.md"):
if f.name == "_map.md":
continue
c = _parse_frontmatter(f)
if c:
claims.append(c)
yield f
_cache["data"] = claims
_cache["ts"] = now
def _build_indexes():
"""Build (title→stem, stem→relpath) indexes for wikilink resolution.
Cached for _INDEX_CACHE_TTL. Pulls from claim-index endpoint when
possible (already cached upstream) and falls back to filesystem walk.
"""
now = time.time()
if _index_cache["by_title"] is not None and now - _index_cache["ts"] < _INDEX_CACHE_TTL:
return _index_cache["by_title"], _index_cache["by_stem"]
by_title = {}
by_stem = {}
for f in _walk_claim_files():
stem = f.stem
rel = str(f.relative_to(CODEX_BASE))
by_stem[stem] = rel
# Index by stem-as-normalized too (covers wikilinks that use the slug)
by_title[_normalize_for_match(stem)] = stem
# Also try parsing the title from frontmatter for higher-fidelity matches
fm, _ = _read_claim_file(f)
if fm:
title = fm.get("title")
if title:
key = _normalize_for_match(title)
if key and key not in by_title:
by_title[key] = stem
_index_cache["by_title"] = by_title
_index_cache["by_stem"] = by_stem
_index_cache["ts"] = now
return by_title, by_stem
def _resolve_wikilinks(body, by_title):
"""Extract [[link]] occurrences from body, return {link_text: slug_or_null}."""
out = {}
for match in _WIKILINK_RE.finditer(body or ""):
link_text = match.group(1).strip()
if not link_text or link_text in out:
continue
norm = _normalize_for_match(link_text)
out[link_text] = by_title.get(norm)
return out
# ─── Edge extraction from frontmatter ──────────────────────────────────────
_EDGE_FIELDS = {
"supports": "supports",
"challenges": "challenges",
"challenged_by": "challenges", # canonical: store as challenges direction
"related": "related",
"related_claims": "related",
"depends_on": "depends_on",
}
def _extract_edges(fm, by_title, by_stem):
"""Return edges dict shaped per Ship's contract.
Each edge is {slug, title, exists}. Slug resolved through title index.
"""
edges = {"supports": [], "challenges": [], "related": [], "depends_on": []}
for fm_key, edge_kind in _EDGE_FIELDS.items():
raw = fm.get(fm_key)
if not raw:
continue
items = raw if isinstance(raw, list) else [raw]
for item in items:
if not isinstance(item, str):
continue
text = item.strip()
# Strip wikilink wrapping if present
text = re.sub(r"^\[\[|\]\]$", "", text)
# Strip pipe annotations: "[[link|alias]]" style or "claim | edge_type | date"
text = text.split("|")[0].strip()
if not text:
continue
# Try title match first, fall back to stem match
slug = by_title.get(_normalize_for_match(text))
if not slug and text in by_stem:
slug = text
edges[edge_kind].append({
"slug": slug,
"title": text,
"exists": slug is not None,
})
return edges
# ─── Source provenance ─────────────────────────────────────────────────────
def _resolve_sourced_from(conn, claim_filepath, fm, title, stem):
"""Build sourced_from list for the claim.
Strategy: find PRs that produced this claim (via prs.description LIKE
or branch slug match), look at prs.source_path inbox archive file
parse that source's frontmatter for title/url. Falls back to the raw
`source` string from the claim's own frontmatter.
Both `title` and `stem` must be non-empty caller (handler) already
falls back stemtitle; passing empty values would leak `LIKE '%%'`
and match unrelated PRs.
"""
out = []
seen_paths = set()
pr_rows = []
if (title or "").strip() and (stem or "").strip():
try:
pr_rows = conn.execute(
"""SELECT DISTINCT source_path
FROM prs
WHERE source_path IS NOT NULL AND source_path != ''
AND (description LIKE ? OR branch LIKE ?)
LIMIT 10""",
(f"%{title}%", f"%{stem}%"),
).fetchall()
except sqlite3.OperationalError:
pr_rows = []
for row in pr_rows:
path = row["source_path"]
if not path or path in seen_paths:
continue
seen_paths.add(path)
out.append(_resolve_source_file(path))
# 2. Fallback: parse raw source frontmatter field if no PR match
if not out:
raw = fm.get("source")
if isinstance(raw, str) and raw.strip():
out.append({"path": None, "title": raw.strip()[:200], "url": None})
return out
def _resolve_source_file(rel_path):
"""Given inbox/archive/... path, parse frontmatter for title+url. Best-effort."""
full = CODEX_BASE / rel_path
entry = {"path": rel_path, "title": None, "url": None}
if full.exists():
fm, _ = _read_claim_file(full)
if fm:
entry["title"] = fm.get("title") or fm.get("source") or rel_path
entry["url"] = fm.get("url")
if not entry["title"]:
# Last resort: derive from filename
entry["title"] = Path(rel_path).stem.replace("-", " ")
return entry
# ─── Reviews + PRs ─────────────────────────────────────────────────────────
def _load_pr_history(conn, title, stem):
"""Find PRs that touched this claim and their reviews.
Both title and stem must be non-empty strings empty leaks `LIKE '%%'`
which matches every PR. Handler already populates a fallback so this
is a defense-in-depth guard.
"""
if not (title or "").strip() or not (stem or "").strip():
return [], []
try:
pr_rows = conn.execute(
"""SELECT number, merged_at, commit_type, agent, branch, status
FROM prs
WHERE merged_at IS NOT NULL
AND (description LIKE ? OR branch LIKE ?)
ORDER BY merged_at ASC
LIMIT 50""",
(f"%{title}%", f"%{stem}%"),
).fetchall()
except sqlite3.OperationalError:
return [], []
prs = [
{
"number": r["number"],
"merged_at": r["merged_at"],
"kind": r["commit_type"] or "unknown",
"agent": r["agent"],
"branch": r["branch"],
}
for r in pr_rows
]
pr_numbers = [p["number"] for p in prs]
if not pr_numbers:
return prs, []
placeholders = ",".join("?" * len(pr_numbers))
try:
review_rows = conn.execute(
f"""SELECT pr_number, reviewer, reviewer_model, outcome,
rejection_reason, notes, reviewed_at
FROM review_records
WHERE pr_number IN ({placeholders})
ORDER BY reviewed_at ASC""",
pr_numbers,
).fetchall()
except sqlite3.OperationalError:
review_rows = []
reviews = [
{
"pr_number": r["pr_number"],
"reviewer": r["reviewer"],
"model": r["reviewer_model"],
"outcome": r["outcome"],
"rejection_reason": r["rejection_reason"],
"notes": r["notes"],
"reviewed_at": r["reviewed_at"],
}
for r in review_rows
]
return prs, reviews
# ─── List view (preserved) ─────────────────────────────────────────────────
def _parse_list_entry(filepath):
fm, body = _read_claim_file(filepath)
if not fm or fm.get("type") != "claim":
return None
links = _WIKILINK_RE.findall(body or "")
paragraphs = [p.strip() for p in (body or "").split("\n\n")
if p.strip() and not p.strip().startswith("#")]
summary = paragraphs[0][:300] if paragraphs else ""
return {
"slug": filepath.stem,
"title": fm.get("title", filepath.stem.replace("-", " ")),
"domain": fm.get("domain", "unknown"),
"confidence": fm.get("confidence", "unknown"),
"agent": fm.get("agent"),
"scope": fm.get("scope"),
"created": str(fm.get("created", "")),
"source": fm.get("source", "") if isinstance(fm.get("source"), str) else "",
"sourcer": fm.get("sourcer", ""),
"wiki_link_count": len(links),
"summary": summary,
"challenged_by": fm.get("challenged_by"),
"related_claims": fm.get("related_claims", []),
}
def _load_all_claims_list():
now = time.time()
if _list_cache["data"] and now - _list_cache["ts"] < _LIST_CACHE_TTL:
return _list_cache["data"]
claims = []
for f in _walk_claim_files():
entry = _parse_list_entry(f)
if entry:
claims.append(entry)
_list_cache["data"] = claims
_list_cache["ts"] = now
return claims
async def handle_claims(request):
claims = _load_all_claims()
# ─── Handlers ──────────────────────────────────────────────────────────────
async def handle_claims(request):
claims = _load_all_claims_list()
# Filters
domain = request.query.get("domain")
search = request.query.get("q", "").lower()
confidence = request.query.get("confidence")
agent = request.query.get("agent")
sort = request.query.get("sort", "recent") # recent, alpha, domain
sort = request.query.get("sort", "recent")
filtered = claims
if domain:
@ -83,9 +403,9 @@ async def handle_claims(request):
if agent:
filtered = [c for c in filtered if c["agent"] == agent]
if search:
filtered = [c for c in filtered if search in c["title"].lower() or search in c["summary"].lower()]
filtered = [c for c in filtered
if search in c["title"].lower() or search in c["summary"].lower()]
# Sort
if sort == "recent":
filtered.sort(key=lambda c: c["created"], reverse=True)
elif sort == "alpha":
@ -93,12 +413,10 @@ async def handle_claims(request):
elif sort == "domain":
filtered.sort(key=lambda c: (c["domain"], c["title"].lower()))
# Pagination
limit = min(int(request.query.get("limit", "50")), 200)
offset = int(request.query.get("offset", "0"))
page = filtered[offset:offset+limit]
page = filtered[offset:offset + limit]
# Domain counts for sidebar
domain_counts = {}
for c in claims:
domain_counts[c["domain"]] = domain_counts.get(c["domain"], 0) + 1
@ -111,31 +429,114 @@ async def handle_claims(request):
"domains": dict(sorted(domain_counts.items(), key=lambda x: -x[1])),
"confidence_levels": sorted(set(c["confidence"] for c in claims)),
"agents": sorted(set(c["agent"] for c in claims if c["agent"])),
}, headers={"Access-Control-Allow-Origin": "*"})
}, headers=CORS_HEADERS)
async def handle_claim_detail(request):
slug = request.match_info["slug"]
claims = _load_all_claims()
for c in claims:
if c["slug"] == slug:
# Read full body for detail view
for domain_dir in CODEX_ROOT.iterdir():
if not domain_dir.is_dir():
continue
f = domain_dir / f"{slug}.md"
if f.exists():
text = f.read_text(encoding="utf-8")
end = text.index("---", 3)
body = text[end+3:].strip()
c["body"] = body
"""GET /api/claims/{slug} — canonical claim detail page (Ship contract).
One round-trip, all data resolved server-side. Wikilinks pre-resolved.
"""
requested_slug = request.match_info["slug"]
by_title, by_stem = _build_indexes()
# Resolution order: exact stem → title-normalized (handles description-derived
# slugs from /api/activity-feed that are longer than on-disk file stems) →
# stem-as-prefix (handles description-derived slugs that are shorter than the
# file stem because the description was truncated upstream).
slug = requested_slug
rel_path = by_stem.get(slug)
if not rel_path:
# Title fallback: requested slug = slugified frontmatter title
norm = _normalize_for_match(requested_slug)
resolved_stem = by_title.get(norm)
if resolved_stem:
slug = resolved_stem
rel_path = by_stem.get(resolved_stem)
if not rel_path:
# Prefix fallback: walk stems sharing a common prefix with the request,
# pick longest match. Anchored at 32 chars to avoid spurious hits.
norm_req = _normalize_for_match(requested_slug)
best_stem = None
best_len = 0
for stem in by_stem:
norm_stem = _normalize_for_match(stem)
common = 0
for a, b in zip(norm_req, norm_stem):
if a != b:
break
return web.json_response(c, headers={"Access-Control-Allow-Origin": "*"})
return web.json_response({"error": "claim not found"}, status=404)
common += 1
if common >= 32 and common > best_len:
best_stem = stem
best_len = common
if best_stem:
slug = best_stem
rel_path = by_stem.get(best_stem)
if not rel_path:
return web.json_response({"error": "claim not found", "slug": requested_slug},
status=404, headers=CORS_HEADERS)
filepath = CODEX_BASE / rel_path
fm, body = _read_claim_file(filepath)
if not fm:
# File exists at this stem but has no parseable frontmatter — almost
# always a stray enrichment fragment that landed in domains/ without
# being merged into a parent claim. Surfacing as 404 (no claim here)
# not 500: the caller can't act on it differently anyway.
return web.json_response({"error": "claim not found", "slug": slug,
"reason": "file_no_frontmatter"},
status=404, headers=CORS_HEADERS)
# Open read-only DB connection for this request
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
try:
title = fm.get("title") or slug.replace("-", " ")
prs, reviews = _load_pr_history(conn, title, slug)
sourced_from = _resolve_sourced_from(conn, filepath, fm, title, slug)
finally:
conn.close()
last_review = None
if reviews:
latest = reviews[-1]
last_review = {
"outcome": latest["outcome"],
"reviewer": latest["reviewer"],
"date": (latest["reviewed_at"] or "")[:10],
}
# secondary_domains: explicit list, or empty
secondary = fm.get("secondary_domains") or fm.get("cross_domain_links") or []
if isinstance(secondary, str):
secondary = [secondary]
description = fm.get("description") or ""
edges = _extract_edges(fm, by_title, by_stem)
wikilinks = _resolve_wikilinks(body, by_title)
response = {
"slug": slug,
"title": title,
"domain": fm.get("domain", "unknown"),
"secondary_domains": secondary,
"confidence": fm.get("confidence", "unknown"),
"description": description,
"created": str(fm.get("created", "")),
"last_review": last_review,
"body": body or "",
"sourced_from": sourced_from,
"reviews": reviews,
"prs": prs,
"edges": edges,
"wikilinks": wikilinks,
}
return web.json_response(response, headers=CORS_HEADERS)
async def handle_domains(request):
claims = _load_all_claims()
claims = _load_all_claims_list()
domains = {}
for c in claims:
d = c["domain"]
@ -146,13 +547,11 @@ async def handle_domains(request):
domains[d]["agents"].add(c["agent"])
conf = c["confidence"]
domains[d]["confidence_dist"][conf] = domains[d]["confidence_dist"].get(conf, 0) + 1
result = []
for d in sorted(domains.values(), key=lambda x: -x["count"]):
d["agents"] = sorted(d["agents"])
result.append(d)
return web.json_response(result, headers={"Access-Control-Allow-Origin": "*"})
return web.json_response(result, headers=CORS_HEADERS)
def register_claims_routes(app):

View file

@ -0,0 +1,166 @@
"""Leaderboard endpoint reading from event-sourced contribution_events.
Owner: Argus
Source of truth: pipeline.db contribution_events (Epimetheus, schema v25)
Reads contribution_events GROUP BY handle, computes CI as SUM(weight),
joins contributors for kind, returns sorted leaderboard with role breakdown.
Roles + weights (Phase A):
author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05
Endpoints:
GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100
"""
import logging
import re
import sqlite3
from aiohttp import web
logger = logging.getLogger("argus.leaderboard_routes")
ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator")
KIND_VALUES = ("person", "agent", "org", "all")
# Public path set so auth middleware lets it through
LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"})
def _conn(app):
"""Read-only connection to pipeline.db."""
db_path = app["db_path"]
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
def _parse_window(raw):
"""Parse window param. Returns (sql_clause, params_tuple, label).
Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours).
Caps N at 365d / 8760h to prevent abuse.
"""
if not raw or raw == "all_time":
return ("", (), "all_time")
m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower())
if not m:
return ("", (), "all_time")
n = int(m.group(1))
unit = m.group(2)
# Note: WHERE clause is composed via " AND ".join(...) — do NOT prefix with "AND ".
if unit == "d":
n = min(n, 365)
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d")
n = min(n, 8760)
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h")
async def handle_leaderboard(request):
"""GET /api/leaderboard.
Query params:
window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h')
domain: filter by domain (optional)
kind: 'person' (default) | 'agent' | 'org' | 'all'
limit: max entries (default 100, max 500)
"""
window_clause, window_params, window_label = _parse_window(request.query.get("window"))
domain = request.query.get("domain")
kind = request.query.get("kind", "person")
if kind not in KIND_VALUES:
kind = "person"
try:
limit = min(int(request.query.get("limit", "100")), 500)
except (ValueError, TypeError):
limit = 100
where = ["1=1", window_clause] if window_clause else ["1=1"]
params = list(window_params)
if domain:
where.append("ce.domain = ?")
params.append(domain)
if kind != "all":
where.append("COALESCE(c.kind, 'person') = ?")
params.append(kind)
where_sql = " AND ".join([w for w in where if w])
conn = _conn(request.app)
try:
# Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp
# LEFT JOIN contributors so handles in events but not in contributors still appear
# (defaults to kind='person' via COALESCE).
rows = conn.execute(f"""
SELECT
ce.handle,
COALESCE(c.kind, 'person') AS kind,
ROUND(SUM(ce.weight), 4) AS ci,
COUNT(*) AS events_count,
MIN(ce.timestamp) AS first_contribution,
MAX(ce.timestamp) AS last_contribution,
SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author,
SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger,
SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer,
SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator,
SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator,
COUNT(DISTINCT ce.domain) AS domain_count,
COUNT(DISTINCT ce.pr_number) AS pr_count
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
GROUP BY ce.handle, COALESCE(c.kind, 'person')
ORDER BY ci DESC, last_contribution DESC
LIMIT ?
""", (*params, limit + 1)).fetchall() # +1 to detect overflow
has_more = len(rows) > limit
rows = rows[:limit]
# Total count of distinct handles matching filters (without limit)
total_row = conn.execute(f"""
SELECT COUNT(DISTINCT ce.handle) AS total
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
""", params).fetchone()
total = total_row["total"] if total_row else 0
leaderboard = []
for r in rows:
leaderboard.append({
"handle": r["handle"],
"kind": r["kind"],
"ci": r["ci"],
"ci_breakdown": {
"author": round(r["ci_author"] or 0, 4),
"challenger": round(r["ci_challenger"] or 0, 4),
"synthesizer": round(r["ci_synthesizer"] or 0, 4),
"originator": round(r["ci_originator"] or 0, 4),
"evaluator": round(r["ci_evaluator"] or 0, 4),
},
"events_count": r["events_count"],
"domain_count": r["domain_count"],
"pr_count": r["pr_count"],
"first_contribution": r["first_contribution"],
"last_contribution": r["last_contribution"],
})
return web.json_response({
"window": window_label,
"domain": domain,
"kind_filter": kind,
"total": total,
"shown": len(leaderboard),
"has_more": has_more,
"source": "contribution_events", # explicit so consumers know the data origin
"leaderboard": leaderboard,
})
finally:
conn.close()
def register_leaderboard_routes(app: web.Application):
"""Register /api/leaderboard. Requires app['db_path'] to be set."""
app.router.add_get("/api/leaderboard", handle_leaderboard)

View file

@ -15,6 +15,7 @@ Epimetheus owns this module. Leo reviews changes.
import logging
import re
import sqlite3
from pathlib import Path
logger = logging.getLogger("pipeline.attribution")
@ -81,6 +82,7 @@ def normalize_handle(handle: str, conn=None) -> str:
if not handle:
return ""
h = handle.strip().lower().lstrip("@")
h = re.sub(r"\s*\(self-directed\)\s*$", "", h)
if conn is None:
return h
try:
@ -108,6 +110,36 @@ def classify_kind(handle: str) -> str:
return "person"
def is_publisher_handle(handle: str, conn) -> int | None:
"""Return publisher.id if the handle exists as a publisher name, else None.
Schema v26 split orgs/citations into the publishers table. Writer code
(upsert_contributor, insert_contribution_event) calls this to gate creating
contributor rows or events for handles that belong to publishers.
Without this gate, every merged PR with `sourcer: cnbc` (for example) would
re-create CNBC as a contributor and undo the v26 classifier cleanup.
Falls back gracefully on pre-v26 DBs: returns None if publishers table
doesn't exist yet (writer behaves like before, no regression).
"""
if not handle or conn is None:
return None
h = handle.strip().lower().lstrip("@")
try:
row = conn.execute(
"SELECT id FROM publishers WHERE name = ?", (h,),
).fetchone()
if row:
return row["id"] if hasattr(row, "keys") else row[0]
except sqlite3.OperationalError:
# Pre-v26 DB: publishers table doesn't exist yet. Fall through to None
# so writer behaves as before. Any other exception class is real signal
# (programming error, lock contention, corruption) — let it propagate.
logger.debug("is_publisher_handle: publishers table not present (pre-v26?)", exc_info=True)
return None
# ─── Parse attribution from claim content ──────────────────────────────────

View file

@ -84,6 +84,14 @@ MAX_EXTRACT_WORKERS = int(os.environ.get("MAX_EXTRACT_WORKERS", "5"))
MAX_EVAL_WORKERS = int(os.environ.get("MAX_EVAL_WORKERS", "7"))
MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain
# --- External GitHub PR merge strategy ---
# When True, gh-pr-N/* branches merge with --no-ff (preserves contributor SHA in
# main's history → GitHub recognizes "merged" badge). When False, fall back to
# cherry-pick (the default for all other branches). Default True; flip to False
# as an emergency backout if the no-ff path destabilizes merge throughput.
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
EXTERNAL_PR_NO_FF_MERGE = True
# --- Timeouts (seconds) ---
EXTRACT_TIMEOUT = 600 # 10 min
EVAL_TIMEOUT = 120 # 2 min — routine Sonnet/Gemini Flash calls (was 600, caused 10-min stalls)
@ -203,6 +211,14 @@ HEALTH_CHECK_INTERVAL = 60
# --- Extraction gates ---
EXTRACTION_COOLDOWN_HOURS = 4 # Skip sources with any PR activity in this window. Defense-in-depth for DB-status filter.
# --- Verdict-deadlock reaper ---
# Defaults safe (dry-run, 24h age, hourly throttle). Operator flips REAPER_DRY_RUN
# to "false" via systemctl edit teleo-pipeline → restart, no code change required.
REAPER_DRY_RUN = os.environ.get("REAPER_DRY_RUN", "true").lower() == "true"
REAPER_DEADLOCK_AGE_HOURS = int(os.environ.get("REAPER_DEADLOCK_AGE_HOURS", "24"))
REAPER_INTERVAL_SECONDS = int(os.environ.get("REAPER_INTERVAL_SECONDS", "3600"))
REAPER_MAX_PER_RUN = int(os.environ.get("REAPER_MAX_PER_RUN", "50"))
# --- Retrieval (Telegram bot) ---
RETRIEVAL_RRF_K = 20 # RRF smoothing constant — tuned for 5-10 results per source
RETRIEVAL_ENTITY_BOOST = 1.5 # RRF score multiplier for claims wiki-linked from matched entities

View file

@ -14,7 +14,7 @@ import logging
import re
from . import config, db
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, normalize_handle
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, is_publisher_handle, normalize_handle
from .forgejo import get_pr_diff
logger = logging.getLogger("pipeline.contributor")
@ -62,6 +62,12 @@ def insert_contribution_event(
canonical = normalize_handle(handle, conn=conn)
if not canonical:
return False
# Schema v26 gate: handles classified as publishers (CNBC, SpaceNews, arxiv,
# etc.) are provenance metadata, not contributors. Don't credit them. Without
# this gate every merge re-creates org events and undoes the v26 cleanup.
if is_publisher_handle(canonical, conn) is not None:
logger.debug("insert_contribution_event: %r is a publisher — skipping event", canonical)
return False
kind = classify_kind(canonical)
try:
cur = conn.execute(
@ -419,6 +425,21 @@ def upsert_contributor(
logger.warning("Unknown contributor role: %s", role)
return
# Schema v26 gate: orgs/citations live in publishers table, not contributors.
# Skip without writing so the v26 classifier cleanup isn't undone by every
# merge that has `sourcer: cnbc` (or similar) in claim frontmatter.
#
# Note: bare normalization (lower + lstrip @), no alias resolution. This is
# consistent with the existing `SELECT handle FROM contributors WHERE handle = ?`
# below — both look up by canonical-form-as-stored. Today's classifier produces
# one publisher row per canonical handle, so bare lookup hits. Branch 3 will
# normalize alias→canonical at writer entry points (extract.py, post_extract);
# at that point this gate auto-tightens because callers pass canonical handles.
canonical_handle = handle.strip().lower().lstrip("@") if handle else ""
if canonical_handle and is_publisher_handle(canonical_handle, conn) is not None:
logger.debug("upsert_contributor: %r is a publisher — skipping contributor row", canonical_handle)
return
existing = conn.execute(
"SELECT handle FROM contributors WHERE handle = ?", (handle,)
).fetchone()

View file

@ -32,6 +32,7 @@ from datetime import date
from pathlib import Path
from . import config
from .attribution import normalize_handle
from .costs import record_usage
from .db import classify_source_channel
from .domains import agent_for_domain
@ -683,16 +684,25 @@ async def _extract_one_source(
logger.info("PR #%d created for %s (%d claims, %d entities)", pr_num, source_file, len(claim_files), len(entity_files))
# Store contributor attribution: who submitted this source?
# Priority: proposed_by field → intake_tier inference → "unknown"
# Priority: proposed_by field → intake_tier inference → operator default.
# NB: `submitted_by` is a CANONICAL HANDLE — lowercase, no @, no
# trailing "(self-directed)" decorator. The "self-directed" signal is
# already carried by intake_tier == "research-task" + the prs.agent
# column; persisting it here as a string suffix produced decorated
# values like "Vida (self-directed)" that broke /contributors/{handle}
# lookups downstream (livingip-web timeline → 404). Read consumers
# (lib/contributor.insert_contribution_event, scripts/scoring_digest,
# diagnostics/activity_feed_api) all normalize via attribution.normalize_handle
# anyway, so writing the canonical form is the source-of-truth fix.
if proposed_by:
contributor = proposed_by.strip().strip('"').strip("'")
contributor = normalize_handle(proposed_by, conn=conn)
elif intake_tier == "research-task":
contributor = f"{agent_name} (self-directed)"
contributor = normalize_handle(agent_name, conn=conn)
elif intake_tier == "directed":
contributor = "@m3taversal"
contributor = "m3taversal"
else:
# Default: if no proposed_by and not a research task, Cory submitted it
contributor = "@m3taversal"
# Default: if no proposed_by and not a research task, operator submitted it.
contributor = "m3taversal"
# Build pipe-separated claim titles for the description field
claim_titles = " | ".join(
@ -923,6 +933,36 @@ async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
except Exception:
logger.debug("Failed to read source %s", f, exc_info=True)
# Archive-basename filter: skip queue files whose basename already exists in
# inbox/archive/. Research-session commits on agent branches occasionally
# re-introduce already-archived queue files when the branch is re-merged,
# producing same-source re-extractions every cooldown cycle. The archive
# copy is the source of truth — if a file with this basename is in archive,
# the source is processed regardless of queue state. Single archive scan
# per cycle, cheap (~1k files).
#
# Assumes basename uniqueness across queue+archive — current naming
# convention (date-prefix + topic-slug) makes collisions vanishingly
# rare. If short generic names like "notes.md" enter the queue, this
# filter silently false-positives.
if unprocessed:
archive_dir = main / "inbox" / "archive"
archived_basenames: set[str] = set()
if archive_dir.exists():
for af in archive_dir.rglob("*.md"):
if af.name.startswith("_"):
continue
archived_basenames.add(af.name)
if archived_basenames:
before = len(unprocessed)
unprocessed = [
(sp, c, f) for sp, c, f in unprocessed
if Path(sp).name not in archived_basenames
]
skipped = before - len(unprocessed)
if skipped:
logger.info("Skipped %d queue source(s) — basename already in inbox/archive/", skipped)
# Don't early-return here — re-extraction sources may exist even when queue is empty
# (the re-extraction check runs after open-PR filtering below)

View file

@ -112,16 +112,44 @@ async def discover_external_prs(conn) -> int:
# Detect origin: pipeline agents have per-agent Forgejo users
pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"}
author = pr.get("user", {}).get("login", "")
is_pipeline = author.lower() in pipeline_users
branch_ref = pr["head"]["ref"]
# Pre-classify by branch prefix — pipeline-shape branches are
# pipeline regardless of Forgejo opener. reweave.py and
# ingestion run as the operator's token, so opener-based
# classification mis-credited system maintenance to the
# operator (~2.7k PRs on m3ta's contributor row before the
# 2026-05-12 reattribute fix). Branch prefix is the canonical
# signal: reweave/ingestion -> 'pipeline', <agent>/ -> agent.
branch_target = None
if branch_ref.startswith(("reweave/", "ingestion/")):
branch_target = "pipeline"
elif branch_ref.startswith(_AGENT_NAMES):
# _AGENT_NAMES is a tuple of bare names; agent branches
# are "<name>/..." so use the tuple as a startswith prefix
# set after appending '/'.
for name in _AGENT_NAMES:
if branch_ref.startswith(name + "/"):
branch_target = name
break
is_pipeline = author.lower() in pipeline_users or branch_target is not None
origin = "pipeline" if is_pipeline else "human"
priority = "high" if origin == "human" else None
domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"])
agent, commit_type = classify_branch(pr["head"]["ref"])
source_channel = classify_source_channel(pr["head"]["ref"])
domain = None if not is_pipeline else detect_domain_from_branch(branch_ref)
agent, commit_type = classify_branch(branch_ref)
source_channel = classify_source_channel(branch_ref)
# For human PRs, submitted_by is the Forgejo author.
# For pipeline PRs, submitted_by is set later by extract.py (from source proposed_by).
submitted_by = author if origin == "human" else None
# submitted_by precedence (canonical handles only):
# 1. branch prefix (pipeline/agent) — set here at discovery
# 2. Forgejo opener for human PRs — set here, lowercased
# 3. extract.py later (from source proposed_by) — left None
if branch_target is not None:
submitted_by = branch_target
elif origin == "human":
submitted_by = author.lower().lstrip("@") if author else None
else:
submitted_by = None
conn.execute(
"""INSERT OR IGNORE INTO prs
@ -429,6 +457,171 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
await _git("branch", "-D", clean_branch)
_GH_PR_BRANCH_RE = re.compile(r"^gh-pr-(\d+)/(.+)$")
async def _merge_no_ff_external(branch: str) -> tuple[bool, str]:
"""Merge an external GitHub fork PR with --no-ff so contributor SHA lands in main.
Why this differs from _cherry_pick_onto_main:
- Cherry-pick rewrites the contributor's commit SHA → GitHub's "is PR head SHA
an ancestor of main?" check returns false → "merged" badge never fires.
- --no-ff preserves the contributor's commit SHA as a parent of the merge
commit. After ff-push to main (the existing dispatch step), GitHub sees
the SHA in ancestry and marks the PR merged.
Mechanics:
1. Fetch origin/main + origin/{branch}
2. Worktree on local branch _merged-{slug} from origin/main
3. git merge --no-ff origin/{branch} with verbose message:
"Merge external GitHub PR #{N}: {branch_slug}"
4. Push merge commit to origin/_merged/{branch} (synthetic audit ref)
5. ff-push merge_sha origin/main directly (function owns the push, NOT
dispatch see sentinel return below)
The merge commit M has parents [main_sha, branch_sha]. M is a fast-forward
descendant of main_sha (via first-parent chain), so the push to main
works without --force.
Synthetic branch (Ship review Apr 28): we deliberately do NOT force-push
the contributor's gh-pr-N/* branch. Force-pushing it would rewrite the
branch tip with a merge commit the contributor didn't author, showing as
a confusing bot force-push in Forgejo's PR UI. The synthetic _merged/*
audit ref lets us track the merge commit without touching the contributor's
branch. Mirrors the _clean/* synthetic branch pattern in cherry-pick.
Sentinel return: function pushes merge_sha main itself (dispatch's ff-push
can't, since origin/{branch} is unchanged and not a descendant of main).
Returns a "merged --no-ff" sentinel string that dispatch detects to skip
its ff-push step and route directly to PR-close + mark_merged + audit.
The full 40-char merge SHA is in the return string for dispatch to extract.
Conflict handling: same auto-resolve pattern as cherry-pick entity-only
conflicts take main's version (--ours = current worktree HEAD = main),
other conflicts abort and return False with detail.
Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
"""
m = _GH_PR_BRANCH_RE.match(branch)
if not m:
return False, f"branch {branch} doesn't match gh-pr-N/* format"
gh_pr_num = m.group(1)
branch_slug = m.group(2)
slug = branch.replace("/", "-")
worktree_path = f"/tmp/teleo-merge-{slug}"
local_branch = f"_merged-{slug}" # local working branch in worktree
audit_ref = f"_merged/{branch}" # remote synthetic ref (preserves hierarchy)
# Fetch latest state — separate calls (long branch names break combined refspec)
rc, out = await _git("fetch", "origin", "main", timeout=15)
if rc != 0:
return False, f"fetch main failed: {out}"
rc, out = await _git("fetch", "origin", branch, timeout=15)
if rc != 0:
return False, f"fetch branch failed: {out}"
# Up-to-date check (mirrors cherry-pick path semantics)
rc, merge_base = await _git("merge-base", "origin/main", f"origin/{branch}")
rc2, main_sha = await _git("rev-parse", "origin/main")
if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip():
rc_diff, diff_out = await _git(
"diff", "--stat", f"origin/main..origin/{branch}", timeout=10,
)
if rc_diff != 0 or not diff_out.strip():
return True, "already up to date"
logger.info("External PR branch %s is descendant of main but has new content — proceeding", branch)
async with _bare_repo_lock:
# Clean up any stale local branch from a prior failed run
await _git("branch", "-D", local_branch)
rc, out = await _git("worktree", "add", "-b", local_branch, worktree_path, "origin/main")
if rc != 0:
return False, f"worktree add failed: {out}"
try:
merge_msg = f"Merge external GitHub PR #{gh_pr_num}: {branch_slug}"
rc, out = await _git(
"merge", "--no-ff", f"origin/{branch}",
"-m", merge_msg,
cwd=worktree_path, timeout=60,
)
if rc != 0:
# Identify conflicts
rc_ls, conflicting = await _git(
"diff", "--name-only", "--diff-filter=U", cwd=worktree_path,
)
conflict_files = [
f.strip() for f in conflicting.split("\n") if f.strip()
] if rc_ls == 0 else []
if conflict_files and all(f.startswith("entities/") for f in conflict_files):
# Entity-only conflicts: take main's version (entities are recoverable)
# In merge: --ours = branch we're ON (worktree HEAD = main)
# --theirs = branch merging in (origin/{branch})
for cf in conflict_files:
await _git("checkout", "--ours", cf, cwd=worktree_path)
await _git("add", cf, cwd=worktree_path)
# Complete the merge using the prepared MERGE_MSG (no editor)
rc_cont, cont_out = await _git(
"-c", "core.editor=true",
"commit", "--no-edit",
cwd=worktree_path, timeout=60,
)
if rc_cont != 0:
await _git("merge", "--abort", cwd=worktree_path)
return False, f"merge entity resolution failed for PR #{gh_pr_num}: {cont_out}"
logger.info(
"External PR #%s merge: entity conflict auto-resolved (dropped %s)",
gh_pr_num, ", ".join(sorted(conflict_files)),
)
else:
conflict_detail = ", ".join(conflict_files) if conflict_files else out[:200]
await _git("merge", "--abort", cwd=worktree_path)
return False, f"merge conflict on PR #{gh_pr_num}: {conflict_detail}"
# Capture the merge commit SHA before any pushes
rc, merge_sha = await _git("rev-parse", "HEAD", cwd=worktree_path)
if rc != 0:
return False, f"rev-parse merge HEAD failed: {merge_sha}"
merge_sha = merge_sha.strip().split("\n")[0]
# Push to synthetic audit ref _merged/{branch} (does not touch contributor's
# gh-pr-N/* branch). Plain --force: the audit ref is bot-owned and per-PR;
# if a prior aborted attempt left a stale ref, overwriting it is the
# intended behavior, and there's no concurrent writer to lease against.
rc, out = await _git(
"push", "--force", "origin", f"HEAD:refs/heads/{audit_ref}",
cwd=worktree_path, timeout=30,
)
if rc != 0:
return False, f"push to audit ref {audit_ref} failed: {out}"
# ff-push the merge commit to main. This is a true fast-forward (M is a
# descendant of origin/main via its first parent), so no --force needed.
# Forgejo's branch protection allows ff-push to main from authorized users.
rc, out = await _git(
"push", "origin", f"{merge_sha}:main",
cwd=worktree_path, timeout=30,
)
if rc != 0:
# Roll back audit ref if main push failed — keeps state consistent.
await _git("push", "--delete", "origin", f"refs/heads/{audit_ref}",
cwd=worktree_path, timeout=15)
return False, f"ff-push to main failed: {out}"
# Sentinel return: "merged --no-ff" prefix triggers dispatch's external-PR
# close path (skips ff-push, does PR-close + mark_merged + audit).
# Full 40-char merge SHA in the message so dispatch can parse it for audit.
return True, f"merged --no-ff (external PR #{gh_pr_num}, M={merge_sha}, audit_ref={audit_ref})"
finally:
async with _bare_repo_lock:
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", local_branch)
from .frontmatter import (
REWEAVE_EDGE_FIELDS,
parse_yaml_frontmatter,
@ -733,6 +926,12 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
# (Ganymede: manifest approach, Theseus: superset assertion + order-preserving dedup)
if branch.startswith("reweave/"):
merge_fn = _merge_reweave_pr(branch)
elif branch.startswith("gh-pr-") and config.EXTERNAL_PR_NO_FF_MERGE:
# External GitHub fork PRs: --no-ff merge so contributor SHA lands
# in main's history → GitHub recognizes "merged" badge.
# Backout via config.EXTERNAL_PR_NO_FF_MERGE = False (falls back to cherry-pick).
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
merge_fn = _merge_no_ff_external(branch)
else:
# Extraction commits ADD new files — cherry-pick applies cleanly.
merge_fn = _cherry_pick_onto_main(branch)
@ -786,6 +985,58 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
succeeded += 1
continue
# External GitHub PR (gh-pr-*): _merge_no_ff_external already pushed
# the merge commit to origin/main + the synthetic _merged/{branch}
# audit ref. Skip dispatch's ff-push (would fail — origin/{branch} is
# the contributor's untouched branch, not a descendant of main).
# Just close PR + mark_merged + audit, parsing merge SHA from sentinel.
if pick_msg.startswith("merged --no-ff"):
m = re.search(r"M=([a-f0-9]{40})", pick_msg)
merge_sha = m.group(1) if m else None
m_ref = re.search(r"audit_ref=(\S+?)\)", pick_msg)
audit_ref = m_ref.group(1) if m_ref else None
m_pr = re.search(r"external PR #(\d+)", pick_msg)
gh_pr_num = m_pr.group(1) if m_pr else None
# Surface drift between dispatch and _merge_no_ff_external if the
# success-message contract changes. Merge already succeeded; this
# is signal-only, not a gate on the close path.
if not (m and m_ref and m_pr):
logger.warning(
"PR #%d sentinel parse incomplete: M=%s, audit_ref=%s, gh_pr=%s, msg=%r",
pr_num, bool(m), bool(m_ref), bool(m_pr), pick_msg,
)
leo_token = get_agent_token("leo")
comment_body = (
f"Merged via --no-ff into main.\n"
f"Merge commit: `{merge_sha}`\n"
f"Audit ref: `{audit_ref}`\n"
f"Branch: `{branch}` (preserved unchanged)"
)
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
{"body": comment_body})
result = await forgejo_api("PATCH", repo_path(f"pulls/{pr_num}"),
{"state": "closed"}, token=leo_token)
if result is None:
logger.error("PR #%d: Forgejo close failed (no-ff path), skipping DB update", pr_num)
failed += 1
continue
mark_merged(conn, pr_num)
db.audit(conn, "merge", "merged", json.dumps({
"pr": pr_num, "branch": branch, "method": "no-ff",
"merge_commit_sha": merge_sha,
"audit_ref": audit_ref,
"github_pr": gh_pr_num,
}))
# NOTE: do NOT _delete_remote_branch(branch) here. The contributor's
# gh-pr-N/* branch is the mirror of their fork PR head — leaving it
# in place lets sync-mirror keep the GitHub PR <-> Forgejo PR link
# observable. The synthetic _merged/{branch} ref carries the merge.
logger.info("PR #%d merged via --no-ff (M=%s)", pr_num,
merge_sha[:8] if merge_sha else "?")
succeeded += 1
continue
# Local ff-push: cherry-picked branch is a descendant of origin/main.
# Regular push = fast-forward. Non-ff rejected by default (same safety).
# --force-with-lease removed: Forgejo categorically blocks it on protected branches.

View file

@ -522,30 +522,53 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
Finds PRs with substantive issue tags that haven't exceeded fix budget.
Processes up to 3 per cycle (Rhea: 180s interval, don't overwhelm eval).
"""
# Build the actionable-tag list from the routing constants so adding a new
# tag to FIXABLE_TAGS / CONVERTIBLE_TAGS / UNFIXABLE_TAGS auto-updates the
# SELECT filter — no two-place edit footgun.
actionable_tags = sorted(FIXABLE_TAGS | CONVERTIBLE_TAGS | UNFIXABLE_TAGS)
placeholders = ",".join(["?"] * len(actionable_tags))
# Push the actionable-tag filter into SQL (was a post-fetch Python loop).
# The old shape selected the 3 oldest request_changes PRs and then dropped
# ones without actionable tags, so empty-eval_issues rows occupied LIMIT-3
# forever (head-of-line). Now LIMIT-3 always returns 3 actionable rows.
# Reaper handles the empty-tag PRs after their 24h cooldown.
rows = conn.execute(
"""SELECT number, eval_issues FROM prs
f"""SELECT number, eval_issues FROM prs
WHERE status = 'open'
AND tier0_pass = 1
AND (domain_verdict = 'request_changes' OR leo_verdict = 'request_changes')
AND COALESCE(fix_attempts, 0) < ?
AND (last_attempt IS NULL OR last_attempt < datetime('now', '-3 minutes'))
AND json_valid(eval_issues)
AND EXISTS (
SELECT 1 FROM json_each(eval_issues)
WHERE value IN ({placeholders})
)
ORDER BY created_at ASC
LIMIT 3""",
(MAX_SUBSTANTIVE_FIXES + config.MAX_FIX_ATTEMPTS,), # Total budget: mechanical + substantive
(MAX_SUBSTANTIVE_FIXES + config.MAX_FIX_ATTEMPTS, *actionable_tags),
).fetchall()
if not rows:
return 0, 0
# Filter to only PRs with substantive issues (not just mechanical)
# Defense-in-depth: json_valid(eval_issues) in the SELECT already filters
# corrupt JSON before json_each runs, so this WARN should be unreachable.
# Kept anyway: json_valid and json.loads use technically distinct parsers,
# and the journal entry names the failure mode if SQLite ever surfaces a
# row that passes json_valid + json_each but fails json.loads.
substantive_rows = []
for row in rows:
try:
issues = json.loads(row["eval_issues"] or "[]")
json.loads(row["eval_issues"] or "[]")
except (json.JSONDecodeError, TypeError):
logger.warning(
"PR #%d: corrupt eval_issues JSON — skipping in substantive fix cycle",
row["number"],
)
continue
if set(issues) & (FIXABLE_TAGS | CONVERTIBLE_TAGS | UNFIXABLE_TAGS):
substantive_rows.append(row)
substantive_rows.append(row)
if not substantive_rows:
return 0, 0
@ -559,7 +582,13 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
if result.get("action"):
fixed += 1
elif result.get("skipped"):
logger.debug("PR #%d: substantive fix skipped: %s", row["number"], result.get("reason"))
# Was DEBUG — promoted to INFO to make stuck-PR root cause
# visible without enabling DEBUG fleet-wide. (Ship Apr 24+
# silent skip diagnosis.)
logger.info(
"PR #%d: substantive fix skipped: %s",
row["number"], result.get("reason"),
)
except Exception:
logger.exception("PR #%d: substantive fix failed", row["number"])
errors += 1
@ -569,3 +598,191 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
logger.info("Substantive fix cycle: %d fixed, %d errors", fixed, errors)
return fixed, errors
# ─── Verdict-deadlock reaper ──────────────────────────────────────────────
#
# Defense-in-depth for PRs that substantive_fixer can't make progress on.
# Targets two stuck-verdict shapes empirically observed in production:
#
# 1. leo:request_changes + domain:approve
# Leo asked for substantive fix; fixer either failed silently
# (no_claim_files / no_review_comments / etc.) or the issue tag isn't
# in FIXABLE | CONVERTIBLE | UNFIXABLE. PR sits forever.
#
# 2. leo:skipped + domain:request_changes
# Eval bypassed Leo (eval_attempts >= MAX). Domain rejected with no
# structured eval_issues. fixer can't classify → silent skip → forever.
#
# Both shapes need a clearance path. Reaper closes them after a 24h cooldown
# with audit_log breadcrumbs for forensics. First deploy runs in dry-run mode
# (audit "would_close" events only — no Forgejo writes, no DB closes).
#
# Reaper config (REAPER_DRY_RUN, REAPER_DEADLOCK_AGE_HOURS, REAPER_INTERVAL_SECONDS,
# REAPER_MAX_PER_RUN) lives in lib/config.py with env-var overrides — operator
# flips dry-run to live via `systemctl edit teleo-pipeline.service`
# (Environment=REAPER_DRY_RUN=false) + restart. No code change, no commit, no
# redeploy required.
async def verdict_deadlock_reaper_cycle(conn) -> int:
"""Reap PRs stuck in conflicting-verdict deadlock for >24h.
Returns count of PRs closed (or "would-close" in dry-run mode).
Throttled to once per REAPER_INTERVAL_SECONDS via sentinel audit event.
"""
# Throttle: skip if last reaper run was within REAPER_INTERVAL_SECONDS.
# Uses audit_log as the rate-limit ledger so no schema/state needed.
# stage='reaper' filter so the planner uses idx_audit_stage (avoids full scan).
last_run = conn.execute(
"SELECT MAX(timestamp) FROM audit_log "
"WHERE stage = 'reaper' AND event = 'verdict_deadlock_reaper_run'"
).fetchone()[0]
if last_run:
cur = conn.execute(
"SELECT (julianday('now') - julianday(?)) * 86400 < ?",
(last_run, config.REAPER_INTERVAL_SECONDS),
).fetchone()[0]
if cur:
return 0
# Two stuck-verdict shapes: leo:rc+domain:approve, leo:skipped+domain:rc.
#
# Branch allowlist invariant: the reaper closes ONLY disposable, pipeline-
# generated branches — content the pipeline (or a daily cron) created and
# can recreate. Four classes qualify:
#
# extract/* — per-source extraction PRs, regenerated next ingest cycle
# reweave/* — nightly graph-edge maintenance, regenerated next reweave
# fix/* — pipeline-internal fix branches
# */research-YYYY-MM-DD — daily {agent}/research-{date} cron sessions.
# Matched via SQLite `_` single-char wildcards as
# `research-20__-__-__` to literally enforce the date-
# suffix shape. Excludes hand-named research branches
# (rio/research-batch-agents-memory-harnesses,
# theseus/research-2nd-attempt-on-X, etc.) which are
# feature work owned by the agent. Pattern good through
# 2099; revisit then.
#
# WIP agent feature branches (theseus/feature-foo, epimetheus/some-fix,
# rio/research-thesis-name) are NEVER reaped — owners review their own PRs
# on their own cadence. The date-shaped pattern threads the needle: picks
# up daily synthesis output the agent regenerates tomorrow while leaving
# manually-named research work alone.
rows = conn.execute(
"""SELECT number, branch, eval_issues, leo_verdict, domain_verdict,
last_attempt, fix_attempts
FROM prs
WHERE status = 'open'
AND tier0_pass = 1
AND last_attempt IS NOT NULL
AND last_attempt < datetime('now', ? || ' hours')
AND (branch LIKE 'extract/%'
OR branch LIKE 'reweave/%'
OR branch LIKE 'fix/%'
OR branch LIKE '%/research-20__-__-__')
AND (
(leo_verdict = 'request_changes' AND domain_verdict = 'approve')
OR (leo_verdict = 'skipped' AND domain_verdict = 'request_changes')
)
ORDER BY last_attempt ASC
LIMIT ?""",
(f"-{config.REAPER_DEADLOCK_AGE_HOURS}", config.REAPER_MAX_PER_RUN),
).fetchall()
mode = "dryrun" if config.REAPER_DRY_RUN else "live"
if not rows:
# Heartbeat anyway so throttle ticks even when nothing to reap.
db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({
"candidates": 0, "closed": 0, "mode": mode,
}))
return 0
logger.info(
"Verdict-deadlock reaper [%s]: %d candidate(s) in deadlock >%dh",
mode, len(rows), config.REAPER_DEADLOCK_AGE_HOURS,
)
closed = 0
would_close = 0
errors = 0
for row in rows:
pr = row["number"]
reason_detail = {
"pr": pr,
"branch": row["branch"],
"leo_verdict": row["leo_verdict"],
"domain_verdict": row["domain_verdict"],
"eval_issues": row["eval_issues"],
"last_attempt": row["last_attempt"],
"fix_attempts": row["fix_attempts"],
}
if config.REAPER_DRY_RUN:
# Audit only — do NOT touch DB row or Forgejo state.
db.audit(conn, "reaper", "verdict_deadlock_would_close",
json.dumps(reason_detail))
logger.info(
"Reaper [dryrun]: would close PR #%d (leo=%s domain=%s issues=%s)",
pr, row["leo_verdict"], row["domain_verdict"], row["eval_issues"],
)
would_close += 1
continue
try:
comment_body = (
"Closed by verdict-deadlock reaper.\n\n"
f"This PR sat for >{config.REAPER_DEADLOCK_AGE_HOURS}h with conflicting "
f"verdicts (leo={row['leo_verdict']}, domain={row['domain_verdict']}) "
f"that the substantive fixer couldn't auto-resolve.\n\n"
f"Eval issues: `{row['eval_issues']}`\n"
f"Last attempt: {row['last_attempt']}\n\n"
"_Automated message from the LivingIP pipeline._"
)
await forgejo_api(
"POST", repo_path(f"issues/{pr}/comments"), {"body": comment_body},
)
patch_result = await forgejo_api(
"PATCH", repo_path(f"pulls/{pr}"), {"state": "closed"},
token=get_agent_token("leo"),
)
if patch_result is None:
logger.warning(
"Reaper: PR #%d Forgejo close failed — skipping DB close to "
"avoid drift", pr,
)
errors += 1
continue
# Forgejo already closed at the PATCH above — pass close_on_forgejo=False
# so close_pr() doesn't issue a redundant PATCH (which on transient
# failure returns False and skips the DB close → status drift).
await close_pr(
conn, pr,
last_error=(
f"verdict_deadlock_reaper: leo={row['leo_verdict']} "
f"domain={row['domain_verdict']} age>{config.REAPER_DEADLOCK_AGE_HOURS}h"
),
close_on_forgejo=False,
)
db.audit(conn, "reaper", "verdict_deadlock_closed",
json.dumps(reason_detail))
closed += 1
except Exception:
logger.exception("Reaper: PR #%d close failed", pr)
errors += 1
db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({
"candidates": len(rows), "closed": closed, "would_close": would_close,
"errors": errors, "mode": mode,
}))
if errors:
logger.warning(
"Verdict-deadlock reaper [%s]: %d closed, %d would-close, %d errors",
mode, closed, would_close, errors,
)
elif config.REAPER_DRY_RUN:
logger.info("Verdict-deadlock reaper [dryrun]: %d would-close", would_close)
else:
logger.info("Verdict-deadlock reaper [live]: %d closed", closed)
return closed + would_close

View file

@ -267,6 +267,7 @@ format: tweet | thread
status: unprocessed
priority: high | medium | low
tags: [topic1, topic2]
intake_tier: research-task
---
## Content

View file

@ -0,0 +1,282 @@
#!/usr/bin/env python3
"""Backfill: re-attribute research-session-derived PRs from m3taversal to agent.
Problem: research-session.sh used to write source frontmatter without
`proposed_by` / `intake_tier`, so extract.py's contributor-classification
fallback set `prs.submitted_by = '@m3taversal'`, which propagated into
`contribution_events` as a `handle='m3taversal', role='author'` row per
research-derived claim. Result: agent research credited to the human.
Forward fix is a frontmatter-template patch to research-session.sh.
This script corrects historical records.
Identification:
Research-session source archives are committed to teleo-codex with a
message matching `^<agent>: research session YYYY-MM-DD `. The diff
for that commit lists `inbox/queue/*.md` files the agent created. Any
PR whose `source_path` matches one of those filenames is research-derived.
Touch list (per matched PR):
1. UPDATE prs SET submitted_by = '<agent>' (canonical handle, lowercase, no
trailing "(self-directed)" suffix see lib/extract.py and
diagnostics/activity_feed_api.py for why decorators leak into 404s)
2. DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author' AND pr_number=?
3. INSERT OR IGNORE INTO contribution_events with handle=<agent>,
kind='agent', role='author', weight=0.30, original timestamp/domain/channel.
Defaults to --dry-run. Pass --apply to commit changes.
Usage:
python3 backfill-research-session-attribution.py --dry-run --days 30
python3 backfill-research-session-attribution.py --apply --days 30
"""
import argparse
import logging
import os
import re
import sqlite3
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("backfill-research-attr")
DEFAULT_REPO = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
DEFAULT_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
KNOWN_AGENTS = frozenset({"rio", "leo", "theseus", "vida", "clay", "astra"})
COMMIT_HEADER_RE = re.compile(r"^([a-z]+):\s+research session\s+\d{4}-\d{2}-\d{2}\s+—")
AUTHOR_WEIGHT = 0.30
def git(repo: Path, *args: str) -> str:
"""Run a git command in repo, return stdout. Raises on non-zero."""
result = subprocess.run(
["git", "-C", str(repo), *args],
capture_output=True, text=True, check=True,
)
return result.stdout
def discover_research_session_archives(repo: Path, days: int) -> dict[str, str]:
"""Return {source_filename_basename: agent_handle} for last N days.
Walks teleo-codex `git log --since`, filters to research-session commits,
parses agent from message header, lists inbox/queue/*.md files added in
that commit's diff. Maps the basename (which becomes source_path on extract)
to the agent who created it.
"""
log = git(repo, "log", f"--since={days} days ago", "--pretty=%H|%s", "--no-merges")
file_to_agent: dict[str, str] = {}
commits_seen = 0
commits_matched = 0
for line in log.splitlines():
if not line or "|" not in line:
continue
commits_seen += 1
sha, _, subject = line.partition("|")
m = COMMIT_HEADER_RE.match(subject)
if not m:
continue
agent = m.group(1)
if agent not in KNOWN_AGENTS:
logger.debug("skipping commit %s — unknown agent %r", sha[:8], agent)
continue
commits_matched += 1
# List files added in this commit (inbox/queue/*.md only)
try:
added = git(repo, "diff-tree", "--no-commit-id", "--name-only", "-r",
"--diff-filter=A", sha)
except subprocess.CalledProcessError:
logger.warning("diff-tree failed for %s", sha[:8])
continue
for f in added.splitlines():
if f.startswith("inbox/queue/") and f.endswith(".md"):
basename = Path(f).name
if basename in file_to_agent and file_to_agent[basename] != agent:
logger.warning(
"filename collision: %s — was %s, now %s (keeping first)",
basename, file_to_agent[basename], agent,
)
continue
file_to_agent.setdefault(basename, agent)
logger.info(
"scanned %d commits, %d research-session matches, %d unique source files",
commits_seen, commits_matched, len(file_to_agent),
)
return file_to_agent
def find_misattributed_prs(conn: sqlite3.Connection, file_to_agent: dict[str, str], days: int):
"""Return list of (pr_number, current_submitted_by, source_path, agent, domain, channel, merged_at).
Only includes PRs:
- with source_path basename in our research-session map
- currently attributed to '@m3taversal'
- merged within the last N days (cap on temporal scope)
"""
rows = conn.execute(
"""SELECT number, submitted_by, source_path, domain, source_channel, merged_at
FROM prs
WHERE submitted_by = '@m3taversal'
AND source_path IS NOT NULL
AND status = 'merged'
AND merged_at > datetime('now', ?)""",
(f"-{days} days",),
).fetchall()
matches = []
for row in rows:
basename = Path(row["source_path"]).name
agent = file_to_agent.get(basename)
if agent:
matches.append({
"pr": row["number"],
"current_submitted_by": row["submitted_by"],
"source_path": row["source_path"],
"basename": basename,
"agent": agent,
"domain": row["domain"],
"channel": row["source_channel"],
"merged_at": row["merged_at"],
})
return matches
def existing_event_count(conn: sqlite3.Connection, pr: int, handle: str, role: str) -> int:
"""Return count of contribution_events rows matching (handle, role, pr_number, claim_path IS NULL)."""
return conn.execute(
"""SELECT COUNT(*) FROM contribution_events
WHERE handle = ? AND role = ? AND pr_number = ? AND claim_path IS NULL""",
(handle, role, pr),
).fetchone()[0]
def apply_backfill(conn: sqlite3.Connection, matches: list[dict], dry_run: bool) -> dict:
"""Apply the backfill. Returns counters."""
counters = defaultdict(int)
if not dry_run:
conn.execute("BEGIN")
try:
for m in matches:
pr = m["pr"]
agent = m["agent"]
# Pre-checks for accurate dry-run reporting
old_event_exists = existing_event_count(conn, pr, "m3taversal", "author") > 0
new_event_exists = existing_event_count(conn, pr, agent, "author") > 0
if dry_run:
logger.info(
"would update pr=%d submitted_by '%s''%s' "
"[m3ta_event=%s, agent_event=%s]",
pr, m["current_submitted_by"], agent,
old_event_exists, new_event_exists,
)
counters["prs"] += 1
if old_event_exists:
counters["events_to_delete"] += 1
if not new_event_exists:
counters["events_to_insert"] += 1
continue
# 1. UPDATE prs.submitted_by — canonical handle only.
conn.execute(
"UPDATE prs SET submitted_by = ? WHERE number = ?",
(agent, pr),
)
counters["prs"] += 1
# 2. INSERT new agent author event (idempotent via UNIQUE index)
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, 'agent', 'author', ?, ?, NULL, ?, ?, COALESCE(?, datetime('now')))""",
(agent, AUTHOR_WEIGHT, pr, m["domain"], m["channel"], m["merged_at"]),
)
if cur.rowcount > 0:
counters["events_inserted"] += 1
# 3. DELETE old m3taversal author event
cur = conn.execute(
"""DELETE FROM contribution_events
WHERE handle = 'm3taversal' AND role = 'author'
AND pr_number = ? AND claim_path IS NULL""",
(pr,),
)
if cur.rowcount > 0:
counters["events_deleted"] += 1
if not dry_run:
conn.execute("COMMIT")
except Exception:
if not dry_run:
conn.execute("ROLLBACK")
raise
return dict(counters)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--repo", type=Path, default=DEFAULT_REPO)
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)")
parser.add_argument("--limit", type=int, default=0,
help="cap PR updates (0 = no cap; useful for testing on a small slice)")
args = parser.parse_args()
dry_run = not args.apply
logger.info("repo=%s db=%s days=%d mode=%s",
args.repo, args.db, args.days, "DRY-RUN" if dry_run else "APPLY")
if not args.repo.exists():
logger.error("repo not found: %s", args.repo)
sys.exit(1)
if not args.db.exists():
logger.error("db not found: %s", args.db)
sys.exit(1)
file_to_agent = discover_research_session_archives(args.repo, args.days)
if not file_to_agent:
logger.warning("no research-session source files found in last %d days", args.days)
sys.exit(0)
# Per-agent breakdown
by_agent = defaultdict(int)
for agent in file_to_agent.values():
by_agent[agent] += 1
for agent, count in sorted(by_agent.items()):
logger.info(" research-session sources by %s: %d", agent, count)
conn = sqlite3.connect(args.db)
conn.row_factory = sqlite3.Row
matches = find_misattributed_prs(conn, file_to_agent, args.days)
logger.info("misattributed PRs found: %d", len(matches))
if args.limit and len(matches) > args.limit:
logger.info("--limit=%d — truncating from %d", args.limit, len(matches))
matches = matches[:args.limit]
if not matches:
logger.info("nothing to do")
return
# Per-agent breakdown of misattribution
miss_by_agent = defaultdict(int)
for m in matches:
miss_by_agent[m["agent"]] += 1
logger.info("misattributed PR breakdown:")
for agent, count in sorted(miss_by_agent.items()):
logger.info(" %s: %d", agent, count)
counters = apply_backfill(conn, matches, dry_run)
logger.info("RESULT (%s): %s", "DRY-RUN" if dry_run else "APPLIED", counters)
if __name__ == "__main__":
main()

View file

@ -1,495 +0,0 @@
#!/usr/bin/env python3
"""metadao-scrape.py — pull active/recent proposals from metadao.fi into source markdown.
Replaces the broken futard.io GraphQL ingestion (Cloud Run teleo-api).
metadao.fi is a Vercel-protected Next.js App Router site; direct curl is blocked
by the anti-bot challenge. A real headless browser passes the challenge cleanly,
and once cookies are issued for the context we can call /api/decode-proposal/{addr}
from inside the browser to get structured instruction data.
Discovery flow:
1. visit / to prime Vercel cookies
2. visit /projects, scrape distinct /projects/{slug} hrefs
3. for each project, visit /projects/{slug}, scrape proposal addresses from DOM
4. for each NEW proposal (basename not already in --archive-dir):
a. visit proposal page, capture rendered prose
b. call /api/decode-proposal/{addr} via in-browser fetch for instructions
c. write source markdown to --output-dir
Idempotent. Skips proposals whose basename is already present in archive-dir
or output-dir. Designed to run from a systemd timer or one-shot.
Usage:
python3 metadao-scrape.py --archive-dir /opt/teleo-eval/workspaces/main/inbox/archive \\
--output-dir /opt/teleo-eval/workspaces/main/inbox/queue \\
[--dry-run] [--limit 10] [--project solomon]
"""
from __future__ import annotations
import argparse
import json
import logging
import re
import sys
from datetime import date, datetime
from pathlib import Path
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("metadao-scrape")
BASE = "https://www.metadao.fi"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
)
def slugify(text: str, max_len: int = 60) -> str:
s = text.lower().strip()
s = re.sub(r"[^a-z0-9\s-]", "", s)
s = re.sub(r"\s+", "-", s)
s = re.sub(r"-+", "-", s)
return s.strip("-")[:max_len].rstrip("-")
def _yaml_str(s: str) -> str:
"""Quote-safe YAML string. JSON strings are valid YAML strings."""
return json.dumps(s, ensure_ascii=False)
def existing_basenames(*dirs: Path) -> set[str]:
"""Collect all .md basenames (without extension) across the given dirs (recursive)."""
seen: set[str] = set()
for d in dirs:
if not d.exists():
continue
for p in d.rglob("*.md"):
seen.add(p.stem)
return seen
PROP_ADDR_RE = re.compile(r"proposal_address:\s*[\"']?([A-Za-z0-9]{32,44})[\"']?")
URL_ADDR_RE = re.compile(r"(?:futard\.io|metadao\.fi)(?:/[^/\s\"']*)*?/proposal/([A-Za-z0-9]{32,44})")
def existing_proposal_addresses(*dirs: Path) -> set[str]:
"""Scan frontmatter / URLs in existing source files to collect known proposal addresses.
Reads only the first 4KB of each file (frontmatter + URL line are at the top)
to keep this fast on large archives.
"""
addrs: set[str] = set()
for d in dirs:
if not d.exists():
continue
for p in d.rglob("*.md"):
try:
head = p.read_text(errors="replace")[:4096]
except Exception:
continue
for m in PROP_ADDR_RE.finditer(head):
addrs.add(m.group(1))
for m in URL_ADDR_RE.finditer(head):
addrs.add(m.group(1))
return addrs
def list_project_slugs(page) -> list[str]:
"""Read /projects and extract distinct project slugs."""
page.goto(f"{BASE}/projects", wait_until="domcontentloaded", timeout=30000)
page.wait_for_timeout(1500)
hrefs = page.evaluate(
"""() => {
const links = Array.from(document.querySelectorAll('a[href^="/projects/"]'));
const slugs = new Set();
for (const a of links) {
const m = a.getAttribute('href').match(/^\\/projects\\/([a-z0-9-]+)(?:\\/|$)/);
if (m && m[1]) slugs.add(m[1]);
}
return [...slugs];
}"""
)
return list(hrefs)
def get_project_metadata(page, slug: str) -> dict:
"""Visit a project page and return basic metadata + proposal addresses + card text.
Card text typically contains 'SOLO-004 ENDED DP-00003 (MEM): The Gigabus Proposal Pass $0.64...'
so we capture it for downstream title parsing.
"""
url = f"{BASE}/projects/{slug}"
page.goto(url, wait_until="domcontentloaded", timeout=30000)
page.wait_for_timeout(1500)
proposals = page.evaluate(
"""() => {
const links = Array.from(document.querySelectorAll('a[href*="/proposal/"]'));
const seen = new Set();
const out = [];
const TARGET_ADDR_RE = /\\/proposal\\/([A-Za-z0-9]+)/;
for (const a of links) {
const m = a.getAttribute('href').match(TARGET_ADDR_RE);
if (!m) continue;
if (seen.has(m[1])) continue;
seen.add(m[1]);
const addr = m[1];
// Walk up only while the ancestor contains exactly one proposal link
// (so we get the card, not a parent that contains all cards).
let card = a;
while (card.parentElement) {
const parent = card.parentElement;
const propLinks = parent.querySelectorAll('a[href*="/proposal/"]');
if (propLinks.length > 1) break;
card = parent;
}
out.push({
address: addr,
link_text: (a.innerText || '').trim().slice(0, 600),
card_text: (card.innerText || '').trim().slice(0, 1500),
});
}
return out;
}"""
)
# Try to read project name from h1 / title
project_name = page.evaluate(
"""() => {
const h = document.querySelector('h1');
return h ? h.innerText.trim() : '';
}"""
) or slug.title()
return {"slug": slug, "name": project_name, "url": url, "proposals": proposals}
# Strict pattern: DP-NNNNN (CAT): Title — the canonical proposal heading.
DP_STRICT_RE = re.compile(r"DP-\d+\s*\([A-Z]+\)\s*[:\-]\s*[^\n\r]+", re.MULTILINE)
# Loose pattern: any line starting with DP-NNNNN followed by something.
DP_LOOSE_RE = re.compile(r"DP-\d+\s*(?:\([A-Z]+\))?\s*[:\-]?\s*[^\n\r]+", re.MULTILINE)
STAT_BLEED_RE = re.compile(
# Stat keywords only bleed when followed by a numeric/symbolic stat token,
# so word-only sequences like "Active Capital" or "Live Streaming Service" pass.
r"\s+\b(?:Pass|Fail|Passed|Failed|Active|Pending|Ended|Live|TOTAL|VOLUME|STATUS|MCAP|PRICE|SPOT)\b\s+(?:\$|\+|-|\d)"
r"|\s*(?:\$\d|\+\d{2,}|\d+\.\d+%|\d{5,})",
re.IGNORECASE,
)
def _clean_title_candidate(line: str) -> str:
line = line.strip()
# Find first bleed match past offset 10. re.search returns leftmost, but the
# DP-NNNNN digit sequence always wins first place; we want the first POST-title
# match instead. Walk all matches and trim at the earliest one past the guard.
for bleed in STAT_BLEED_RE.finditer(line):
if bleed.start() > 10:
line = line[: bleed.start()].rstrip(" :-—")
break
return line.strip()[:200]
def extract_dp_title(*texts: str) -> str:
"""Find the canonical 'DP-NNNNN (CAT): Title' line.
Strategy:
1. Try strict pattern (with parenthetical category code) across all sources.
Take the SHORTEST hit prose continuations of an already-correct title
tend to be longer than the title itself.
2. Fall back to loose pattern, longest match.
"""
strict: list[str] = []
loose: list[str] = []
for t in texts:
if not t:
continue
for m in DP_STRICT_RE.finditer(t):
cleaned = _clean_title_candidate(m.group(0))
if cleaned:
strict.append(cleaned)
for m in DP_LOOSE_RE.finditer(t):
cleaned = _clean_title_candidate(m.group(0))
if cleaned:
loose.append(cleaned)
if strict:
return min(strict, key=len)
if loose:
return max(loose, key=len)
return ""
def fetch_proposal(page, project_slug: str, addr: str, card_text: str = "") -> dict | None:
"""Visit proposal page, capture rendered text + decode instructions via in-browser fetch."""
url = f"{BASE}/projects/{project_slug}/proposal/{addr}"
log.info("fetching proposal %s/%s", project_slug, addr[:8])
try:
page.goto(url, wait_until="domcontentloaded", timeout=45000)
except PWTimeout:
log.warning("timeout loading %s — using whatever rendered", url)
page.wait_for_timeout(2500) # let RSC stream finish
body_text = page.evaluate("() => document.body.innerText || ''")
# Title preference: card_text (from project page) → body_text DP-NNNNN match → first h1/h2
title_block = extract_dp_title(card_text, body_text)
if not title_block:
title_block = page.evaluate(
"""() => {
const h = document.querySelector('h1, h2');
return h ? h.innerText.trim() : '';
}"""
) or f"proposal-{addr[:8]}"
# Status: 'Passed' / 'Failed' / 'Active' / 'Pending'
status = page.evaluate(
"""() => {
const text = document.body.innerText || '';
const m = text.match(/\\n(Passed|Failed|Active|Pending|Live|Ended)\\b/);
return m ? m[1] : '';
}"""
)
# Get the structured /api/decode-proposal data
decoded = None
try:
decoded = page.evaluate(
f"""async () => {{
try {{
const r = await fetch('/api/decode-proposal/{addr}');
if (!r.ok) return null;
return await r.json();
}} catch (e) {{ return null; }}
}}"""
)
except Exception as e:
log.debug("decode fetch failed for %s: %s", addr, e)
return {
"address": addr,
"project_slug": project_slug,
"url": url,
"title": title_block,
"status": status,
"body_text": body_text,
"decoded": decoded,
}
def parse_dp_code(title: str) -> tuple[str, str]:
"""Parse 'DP-00003 (MEM): The Gigabus Proposal' → ('dp-00003-mem', 'The Gigabus Proposal').
Falls back gracefully if format doesn't match.
"""
# Match leading DP-NNNNN[space(category)]?[:]?[space]? plus the rest
m = re.match(r"^(DP-\d+(?:\s*\([A-Z]+\))?)\s*[:\-]?\s*(.*)$", title.strip())
if m:
code = re.sub(r"[^a-z0-9]+", "-", m.group(1).lower()).strip("-")
rest = m.group(2).strip()
return code, rest
return "", title.strip()
def build_filename(project_slug: str, proposal: dict, today: str) -> str:
"""YYYY-MM-DD-metadao-{slug}-{title-fragment}-{addr8}.md
Embedding the address fragment makes filenames stable across runs even when
the title isn't unique (e.g. projects that don't use DP-NNNNN naming).
"""
title = proposal.get("title") or ""
code, rest = parse_dp_code(title)
parts: list[str] = []
if code:
parts.append(code)
if rest:
parts.append(slugify(rest, max_len=40))
body_slug = "-".join(p for p in parts if p)[:60].rstrip("-")
addr_frag = proposal["address"][:8].lower()
if body_slug:
return f"{today}-metadao-{project_slug}-{body_slug}-{addr_frag}.md"
return f"{today}-metadao-{project_slug}-{addr_frag}.md"
def build_source_markdown(project: dict, proposal: dict, today: str) -> str:
"""Build the source markdown matching the existing schema."""
title = proposal.get("title") or f"{project['name']} proposal {proposal['address'][:8]}"
body_text = (proposal.get("body_text") or "").strip()
decoded = proposal.get("decoded") or {}
# Build YAML frontmatter — all free-text values escaped via _yaml_str (json.dumps).
# project_slug is constrained to [a-z0-9-] by slugify upstream, but pass through
# the same path for consistency.
full_title = f"MetaDAO: {project['name']}{title}"
fm_lines = [
"---",
"type: source",
f"title: {_yaml_str(full_title)}",
f"author: {_yaml_str('metadao.fi')}",
f"url: {_yaml_str(proposal['url'])}",
f"date: {today}",
"domain: internet-finance",
"format: data",
"status: unprocessed",
f"tags: [futardio, metadao, futarchy, solana, governance, {project['slug']}]",
"event_type: proposal",
f"project_slug: {_yaml_str(project['slug'])}",
f"proposal_address: {_yaml_str(proposal['address'])}",
]
if proposal.get("status"):
fm_lines.append(f"proposal_status: {_yaml_str(proposal['status'])}")
if decoded.get("squadsProposal"):
fm_lines.append(f"squads_proposal: {_yaml_str(decoded['squadsProposal'])}")
if decoded.get("squadsStatus"):
fm_lines.append(f"squads_status: {_yaml_str(decoded['squadsStatus'])}")
fm_lines.append("---")
fm_lines.append("")
# Header section — quick facts
body_md = [
f"# {title}",
"",
"## Proposal Details",
f"- Project: {project['name']} (`{project['slug']}`)",
f"- Proposal: {title}",
f"- Address: `{proposal['address']}`",
]
if proposal.get("status"):
body_md.append(f"- Status: {proposal['status']}")
body_md.append(f"- URL: {proposal['url']}")
# Proposal prose body (rendered text from the page)
body_md.append("")
body_md.append("## Proposal Body")
body_md.append("")
body_md.append(body_text or "_(no body captured)_")
# Decoded on-chain instructions
if decoded:
body_md.append("")
body_md.append("## On-chain Decoded")
if decoded.get("squadsUrl"):
body_md.append(f"- Squads: {decoded['squadsUrl']}")
instrs = decoded.get("instructions") or []
if instrs:
body_md.append("")
body_md.append("### Instructions")
for i, instr in enumerate(instrs, 1):
body_md.append(f"{i}. **{instr.get('description', instr.get('type', 'instruction'))}** ({instr.get('program', '')})")
for f in instr.get("fields", []) or []:
val = f.get("fullValue") or f.get("value") or ""
body_md.append(f" - {f.get('label', '')}: `{val}`")
if instr.get("summary"):
body_md.append(f" - Summary: {instr['summary']}")
return "\n".join(fm_lines + body_md) + "\n"
def main() -> int:
p = argparse.ArgumentParser(description="Scrape MetaDAO proposals into inbox source files")
p.add_argument("--archive-dir", required=True, help="existing archive dir (skip if basename exists here)")
p.add_argument("--output-dir", required=True, help="dir to write new source markdown into")
p.add_argument("--project", help="restrict to a single project slug (default: scan all)")
p.add_argument("--limit", type=int, default=0, help="max number of new proposals to capture (0 = unlimited)")
p.add_argument("--dry-run", action="store_true", help="print intended writes instead of writing")
p.add_argument("--headless", action="store_true", default=True)
args = p.parse_args()
archive_dir = Path(args.archive_dir).resolve()
output_dir = Path(args.output_dir).resolve()
seen_basenames = existing_basenames(archive_dir, output_dir)
seen_addresses = existing_proposal_addresses(archive_dir, output_dir)
log.info("loaded %d existing basenames + %d known proposal addresses from %s + %s",
len(seen_basenames), len(seen_addresses), archive_dir, output_dir)
today = date.today().isoformat()
written: list[str] = []
skipped_existing = 0
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=args.headless)
ctx = browser.new_context(user_agent=USER_AGENT)
page = ctx.new_page()
# Prime cookies
log.info("priming Vercel session via homepage")
page.goto(f"{BASE}/", wait_until="domcontentloaded", timeout=30000)
page.wait_for_timeout(1500)
# Discovery
if args.project:
project_slugs = [args.project]
else:
project_slugs = list_project_slugs(page)
log.info("discovered %d project slugs: %s", len(project_slugs), project_slugs)
for slug in project_slugs:
try:
project = get_project_metadata(page, slug)
except Exception:
log.exception("failed to read project %s", slug)
continue
log.info(" %s%d proposals", slug, len(project["proposals"]))
for prop in project["proposals"]:
addr = prop["address"]
# Pre-check #1: known proposal address (cheapest, no browser visit)
if addr in seen_addresses:
skipped_existing += 1
continue
# Pre-check #2: address fragment in an existing basename
addr_frag = addr[:8].lower()
if any(addr_frag in b.lower() for b in seen_basenames):
skipped_existing += 1
continue
try:
proposal_data = fetch_proposal(page, slug, addr, card_text=prop.get("card_text", ""))
except Exception:
log.exception("failed to fetch proposal %s/%s", slug, addr)
continue
if not proposal_data:
continue
# Minimum-render gate: skip partial renders rather than archiving stubs.
# Successful captures are 20KB+; require either a real body or a DP-N title.
body_len = len(proposal_data.get("body_text") or "")
has_dp_match = bool(re.search(r"DP-\d+", proposal_data.get("title", "") or ""))
if body_len < 500 and not has_dp_match:
log.warning(" skip (insufficient render): %s body=%dB title=%r",
addr, body_len, proposal_data.get("title", ""))
continue
fname = build_filename(slug, proposal_data, today)
if Path(fname).stem in seen_basenames:
skipped_existing += 1
log.info(" skip (already archived by title): %s", fname)
continue
content = build_source_markdown(project, proposal_data, today)
target = output_dir / fname
if args.dry_run:
log.info(" DRY: would write %s (%d bytes)", target, len(content))
else:
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content)
log.info(" wrote %s (%d bytes)", target, len(content))
written.append(fname)
if args.limit and len(written) >= args.limit:
log.info("hit limit=%d, stopping", args.limit)
browser.close()
print(json.dumps({"written": written, "skipped_existing": skipped_existing, "dry_run": args.dry_run}))
return 0
browser.close()
print(json.dumps({"written": written, "skipped_existing": skipped_existing, "dry_run": args.dry_run}))
return 0
if __name__ == "__main__":
sys.exit(main())

114
scripts/normalize-submitted-by.py Executable file
View file

@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""One-time backfill: canonicalize prs.submitted_by and sources.submitted_by.
Strips legacy decorators ("(self-directed)", "(reweave)"), lowercases, drops
the @ prefix. After this runs, every value matches the contract documented
on diagnostics/activity_feed_api.py::_normalize_contributor and the
companion read-side fix becomes redundant defense-in-depth instead of
load-bearing.
Defaults to --dry-run. Pass --apply to commit.
Usage:
python3 normalize-submitted-by.py --dry-run
python3 normalize-submitted-by.py --apply
"""
import argparse
import os
import re
import sqlite3
import sys
from collections import Counter
DEFAULT_DB = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Valid handle: lowercase alphanum + _-, 1-39 chars (matches GitHub rules,
# same as pipeline/lib/attribution._HANDLE_RE). Anything with parens, spaces,
# or uppercase needs canonicalization.
_TRAILING_PAREN_RE = re.compile(r"\s*\([^)]*\)\s*$")
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
def canonicalize(raw):
if raw is None:
return None
h = raw.strip().lower().lstrip("@")
h = _TRAILING_PAREN_RE.sub("", h).strip()
return h or None
def normalize_table(conn, table, dry_run):
cur = conn.execute(
f"SELECT rowid, submitted_by FROM {table} WHERE submitted_by IS NOT NULL"
)
changes = []
for row in cur.fetchall():
old = row[1]
new = canonicalize(old)
if new != old:
changes.append((row[0], old, new))
print(f"\n{table}: {len(changes)} rows need normalization")
if not changes:
return 0
# Distribution preview
from_to = Counter((old, new) for _, old, new in changes)
for (old, new), count in from_to.most_common(15):
print(f" {count:>5} {old!r:40} -> {new!r}")
if len(from_to) > 15:
print(f" ... ({len(from_to) - 15} more distinct mappings)")
# Sanity: every result is a valid handle (no garbage falls through).
invalid = [(rowid, old, new) for rowid, old, new in changes
if new is not None and not _HANDLE_RE.match(new)]
if invalid:
print(f"\n WARNING: {len(invalid)} rows would normalize to invalid handles:")
for rowid, old, new in invalid[:10]:
print(f" rowid={rowid} {old!r} -> {new!r}")
print(" These rows will be SKIPPED (left as-is). Inspect manually.")
valid_changes = [(rowid, old, new) for rowid, old, new in changes
if new is None or _HANDLE_RE.match(new)]
if dry_run:
print(f" [dry-run] would update {len(valid_changes)} rows in {table}")
return len(valid_changes)
for rowid, _, new in valid_changes:
conn.execute(
f"UPDATE {table} SET submitted_by = ? WHERE rowid = ?",
(new, rowid),
)
print(f" updated {len(valid_changes)} rows in {table}")
return len(valid_changes)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", default=DEFAULT_DB)
ap.add_argument("--apply", action="store_true", help="Commit changes (default is dry-run)")
ap.add_argument("--dry-run", action="store_true", help="Preview only (default)")
args = ap.parse_args()
dry_run = not args.apply
print(f"DB: {args.db}")
print(f"Mode: {'DRY-RUN' if dry_run else 'APPLY'}")
conn = sqlite3.connect(args.db, timeout=30)
try:
total = 0
total += normalize_table(conn, "prs", dry_run)
total += normalize_table(conn, "sources", dry_run)
if not dry_run:
conn.commit()
print(f"\nCommitted. Total rows updated: {total}")
else:
print(f"\nDry-run complete. Run with --apply to commit ({total} rows pending).")
finally:
conn.close()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""Reattribute PRs and their author events from m3taversal to the true author.
Scope (intentionally conservative):
- branch reweave/* -> pipeline (system maintenance, no human author)
- branch ingestion/* -> pipeline (pipeline-internal source intake)
- branch <agent>/* -> <agent> (autonomous agent work)
for agent in {leo, vida, rio, astra, clay, theseus}.
NOT in scope:
- branch extract/* -- proposed_by may legitimately be absent
(telegram source drops default to operator).
Per affected PR (atomic):
1. UPDATE prs.submitted_by -> target
2. UPDATE sources.submitted_by where path = pr.source_path
3. UPDATE contribution_events.handle for every m3ta author event on this PR
(kind set to 'agent', since pipeline + the six agents are all kind='agent'
per attribution.PENTAGON_AGENTS).
Idempotent. Dry-run by default; --apply commits.
Run AFTER scripts/normalize-submitted-by.py.
"""
import argparse
import os
import sqlite3
import sys
from collections import Counter
DB_PATH = os.environ.get("DB_PATH", "/opt/teleo-eval/pipeline/pipeline.db")
AGENT_PREFIXES = ("leo/", "vida/", "rio/", "astra/", "clay/", "theseus/")
PIPELINE_PREFIXES = ("reweave/", "ingestion/")
def target_for(branch):
if not branch:
return None
if branch.startswith(PIPELINE_PREFIXES):
return "pipeline"
for prefix in AGENT_PREFIXES:
if branch.startswith(prefix):
return prefix.rstrip("/")
return None
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)")
ap.add_argument("--db", default=DB_PATH)
args = ap.parse_args()
conn = sqlite3.connect(args.db, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA busy_timeout = 30000")
mode = "APPLY" if args.apply else "DRY-RUN"
print("DB: {}\nMode: {}\n".format(args.db, mode))
rows = conn.execute("""
SELECT number, branch, source_path
FROM prs
WHERE submitted_by = 'm3taversal'
AND branch IS NOT NULL
""").fetchall()
pr_targets = []
pr_counts = Counter()
for r in rows:
tgt = target_for(r["branch"])
if tgt is None:
continue
pr_targets.append((r["number"], r["branch"], r["source_path"], tgt))
pr_counts[tgt] += 1
print("prs to reattribute: {}".format(len(pr_targets)))
for tgt, n in pr_counts.most_common():
print(" {:6d} -> {!r}".format(n, tgt))
src_paths = [t[2] for t in pr_targets if t[2]]
src_count = 0
if src_paths:
placeholders = ",".join("?" * len(src_paths))
src_count = conn.execute(
"SELECT COUNT(*) FROM sources "
"WHERE submitted_by = 'm3taversal' AND path IN ({})".format(placeholders),
src_paths,
).fetchone()[0]
print("\nsources rows that will be re-pointed: {}".format(src_count))
pr_to_target = {p[0]: p[3] for p in pr_targets}
events = []
if pr_to_target:
pr_placeholders = ",".join("?" * len(pr_to_target))
events = conn.execute(
"SELECT id, pr_number FROM contribution_events "
"WHERE handle = 'm3taversal' AND role = 'author' "
"AND pr_number IN ({})".format(pr_placeholders),
list(pr_to_target.keys()),
).fetchall()
print("contribution_events author rows to move: {}".format(len(events)))
ev_counts = Counter(pr_to_target[e["pr_number"]] for e in events)
for tgt, n in ev_counts.most_common():
print(" {:6d} events -> {!r}".format(n, tgt))
if not args.apply:
print("\nDry-run complete. Run with --apply to commit "
"({} PRs + {} sources + {} events).".format(
len(pr_targets), src_count, len(events)))
return 0
pr_updated = 0
src_updated = 0
ev_updated = 0
ev_collisions = 0
try:
for pr_num, branch, source_path, target in pr_targets:
cur = conn.execute(
"UPDATE prs SET submitted_by = ? "
"WHERE number = ? AND submitted_by = 'm3taversal'",
(target, pr_num),
)
pr_updated += cur.rowcount
if source_path:
cur = conn.execute(
"UPDATE sources SET submitted_by = ? "
"WHERE path = ? AND submitted_by = 'm3taversal'",
(target, source_path),
)
src_updated += cur.rowcount
for ev in conn.execute(
"SELECT id FROM contribution_events "
"WHERE handle = 'm3taversal' AND role = 'author' AND pr_number = ?",
(pr_num,),
).fetchall():
try:
conn.execute(
"UPDATE contribution_events SET handle = ?, kind = 'agent' "
"WHERE id = ?",
(target, ev["id"]),
)
ev_updated += 1
except sqlite3.IntegrityError:
conn.execute(
"DELETE FROM contribution_events WHERE id = ?",
(ev["id"],),
)
ev_collisions += 1
conn.commit()
except Exception:
conn.rollback()
raise
print("\nCommitted.")
print(" prs.submitted_by moves: {}".format(pr_updated))
print(" sources.submitted_by moves: {}".format(src_updated))
print(" contribution_events moves: {}".format(ev_updated))
print(" ce collisions deleted: {}".format(ev_collisions))
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""Reset m3taversal.sourcer_count from inflated legacy value to file-truth count.
Background: pre-Phase-A extract.py had a `submitted_by` fallback that credited
m3taversal as sourcer for every Telegram-ingested source, accumulating to 1011
sourcer_count in the contributors table. The actual file-truth count (sourcer
frontmatter equal to "m3taversal" in claim files) is 21. The 990-row delta is
infrastructure attribution that doesn't reflect content authorship.
The Phase A event-sourced ledger (contribution_events) computed the correct
389.55 CI from author events; /api/leaderboard reads from there directly.
But the legacy /api/contributors endpoint reads contributors.claims_merged
which carries the inflated 1011. Until that endpoint is deprecated, the
divergence shows two different numbers depending on which surface the UI
queries.
This script applies the surgical UPDATE that was run on VPS on 2026-04-27
during the leaderboard cutover. Committed as a script per Ganymede review:
"DB mutations go through reviewable code paths matters more than the
convenience of one-shot SQL. The artifact explains what was done and why."
Idempotent safe to re-run. If sourcer_count is already 21, no change.
Usage:
python3 scripts/reset-m3taversal-sourcer.py --dry-run
python3 scripts/reset-m3taversal-sourcer.py
"""
import argparse
import os
import sqlite3
import sys
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
TARGET_HANDLE = "m3taversal"
TRUTH_SOURCER_COUNT = 21
TRUTH_CLAIMS_MERGED = 21
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT handle, sourcer_count, claims_merged FROM contributors WHERE handle = ?",
(TARGET_HANDLE,),
).fetchone()
if not row:
print(f" No contributors row for {TARGET_HANDLE} — nothing to reset.")
return
print(
f" Current: {row['handle']} sourcer_count={row['sourcer_count']} "
f"claims_merged={row['claims_merged']}"
)
print(f" Target: sourcer_count={TRUTH_SOURCER_COUNT} claims_merged={TRUTH_CLAIMS_MERGED}")
if (row["sourcer_count"] == TRUTH_SOURCER_COUNT
and row["claims_merged"] == TRUTH_CLAIMS_MERGED):
print(" Already at target values — no-op.")
return
if args.dry_run:
print(" (dry-run) UPDATE would be applied. Re-run without --dry-run.")
return
conn.execute(
"""UPDATE contributors SET
sourcer_count = ?,
claims_merged = ?,
updated_at = datetime('now')
WHERE handle = ?""",
(TRUTH_SOURCER_COUNT, TRUTH_CLAIMS_MERGED, TARGET_HANDLE),
)
conn.execute(
"""INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)""",
(
"manual",
"m3taversal_sourcer_reset",
(
'{"reason":"Pre-Phase-A submitted_by fallback inflated to 1011; '
'file-truth is 21","sourcer_count_before":1011,'
'"sourcer_count_after":21,"claims_merged_after":21}'
),
),
)
conn.commit()
after = conn.execute(
"SELECT sourcer_count, claims_merged FROM contributors WHERE handle = ?",
(TARGET_HANDLE,),
).fetchone()
print(
f" Applied. Now: sourcer_count={after['sourcer_count']} "
f"claims_merged={after['claims_merged']}"
)
if __name__ == "__main__":
main()

View file

@ -20,7 +20,7 @@ from lib import log as logmod
from lib.breaker import CircuitBreaker
from lib.evaluate import evaluate_cycle
from lib.fixer import fix_cycle as mechanical_fix_cycle
from lib.substantive_fixer import substantive_fix_cycle
from lib.substantive_fixer import substantive_fix_cycle, verdict_deadlock_reaper_cycle
from lib.health import start_health_server, stop_health_server
from lib.llm import kill_active_subprocesses
from lib.merge import merge_cycle
@ -91,14 +91,30 @@ async def ingest_cycle(conn, max_workers=None):
async def fix_cycle(conn, max_workers=None):
"""Combined fix stage: mechanical fixes first, then substantive fixes.
"""Combined fix stage: mechanical fixes first, then substantive fixes,
finally the verdict-deadlock reaper.
Mechanical (fixer.py): wiki link bracket stripping, $0
Substantive (substantive_fixer.py): confidence/title/scope fixes via LLM, $0.001
Reaper (substantive_fixer.verdict_deadlock_reaper_cycle): defense-in-depth
for stuck-verdict PRs that the substantive fixer can't progress on.
Hourly throttle, dry-run by default. Cost $0.
"""
m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers)
s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers)
return m_fixed + s_fixed, m_errors + s_errors
# Defense-in-depth: reaper exception must never block primary fix paths.
# Same exception-isolation pattern as ingest_cycle's extract_cycle wrapper —
# propagating would trip the fix breaker and lock out mechanical+substantive
# for 15 min after 5 reaper failures.
try:
r_closed = await verdict_deadlock_reaper_cycle(conn)
except Exception:
import logging
logging.getLogger("pipeline").exception(
"Reaper cycle failed (non-fatal)"
)
r_closed = 0
return m_fixed + s_fixed + r_closed, m_errors + s_errors
async def snapshot_cycle(conn, max_workers=None):

View file

@ -0,0 +1,152 @@
"""Tests for diagnostics/activity_endpoint.py classify_pr_operation.
Covers the Leo gotcha extract/* branches with commit_type=enrich or
challenge classify by commit_type, not branch prefix. Same class of bug
as the contributor-role wiring fix.
"""
import sys
from pathlib import Path
import pytest
# diagnostics/ isn't on sys.path by default; add it for these tests.
_DIAG = Path(__file__).resolve().parents[1] / "diagnostics"
if str(_DIAG) not in sys.path:
sys.path.insert(0, str(_DIAG))
# aiohttp is imported at module load time; skip cleanly if not installed.
pytest.importorskip("aiohttp")
from activity_endpoint import classify_pr_operation # noqa: E402
# ─── Merged PRs: commit_type wins over branch prefix ───────────────────────
def test_extract_branch_legacy_knowledge_classifies_new():
assert classify_pr_operation("merged", "knowledge", "extract/foo", None) == "new"
def test_extract_branch_with_enrich_commit_type_classifies_enrich():
"""Leo gotcha: extract/* + commit_type=enrich → enrich, not new."""
assert classify_pr_operation("merged", "enrich", "extract/foo", None) == "enrich"
def test_extract_branch_with_challenge_commit_type_classifies_challenge():
"""Leo gotcha: extract/* + commit_type=challenge → challenge, not new."""
assert classify_pr_operation("merged", "challenge", "extract/foo", None) == "challenge"
def test_challenged_by_in_description_classifies_challenge():
assert (
classify_pr_operation(
"merged", "knowledge", "extract/foo", "evidence for challenged_by claim"
)
== "challenge"
)
# ─── Branch prefix fallback (when commit_type is generic) ──────────────────
def test_reweave_branch_classifies_enrich():
assert classify_pr_operation("merged", "knowledge", "reweave/batch-1", None) == "enrich"
def test_challenge_branch_classifies_challenge():
assert (
classify_pr_operation("merged", "knowledge", "challenge/nuclear-moloch", None)
== "challenge"
)
# ─── Maintenance commit_types → infra ──────────────────────────────────────
def test_fix_commit_type_classifies_infra():
assert classify_pr_operation("merged", "fix", "fix/deploy-bug", None) == "infra"
def test_pipeline_commit_type_classifies_infra():
assert (
classify_pr_operation("merged", "pipeline", "epimetheus/migration-v14", None)
== "infra"
)
# ─── Knowledge-producing commit_types → new ────────────────────────────────
def test_research_commit_type_classifies_new():
assert (
classify_pr_operation("merged", "research", "theseus/cornelius-batch-2", None)
== "new"
)
def test_entity_commit_type_classifies_new():
assert classify_pr_operation("merged", "entity", "leo/entities-update", None) == "new"
# ─── Non-merged statuses route through NON_MERGED_STATUS_TO_OPERATION ──────
def test_open_pr_classifies_extract():
assert classify_pr_operation("open", None, "extract/foo", None) == "extract"
def test_approved_pr_classifies_new():
assert classify_pr_operation("approved", None, "extract/foo", None) == "new"
def test_closed_pr_classifies_infra():
assert classify_pr_operation("closed", None, "extract/foo", None) == "infra"
def test_conflict_pr_classifies_challenge():
assert classify_pr_operation("conflict", None, "extract/foo", None) == "challenge"
def test_validating_pr_classifies_extract():
assert classify_pr_operation("validating", None, "extract/foo", None) == "extract"
def test_reviewing_pr_classifies_extract():
assert classify_pr_operation("reviewing", None, "extract/foo", None) == "extract"
def test_merging_pr_classifies_new():
assert classify_pr_operation("merging", None, "extract/foo", None) == "new"
def test_zombie_pr_classifies_infra():
assert classify_pr_operation("zombie", None, "extract/foo", None) == "infra"
# ─── Priority order: reweave commit_type vs reweave/ branch ─────────────────
# Reweave commit_type is in _MAINTENANCE_COMMIT_TYPES (→ infra), but
# branch.startswith('reweave/') is checked first (→ enrich). The bifurcation
# is real spec behavior — nightly reweave PRs must classify as enrich, not
# infra. Locking this in prevents a silent flip on future priority refactors.
def test_reweave_commit_type_with_reweave_branch_classifies_enrich():
"""Branch prefix wins over maintenance — reweave PRs are enrich, not infra."""
assert classify_pr_operation("merged", "reweave", "reweave/batch-1", None) == "enrich"
def test_reweave_commit_type_without_reweave_branch_classifies_infra():
"""Without reweave/ prefix, reweave commit_type falls to maintenance → infra."""
assert classify_pr_operation("merged", "reweave", "epimetheus/foo", None) == "infra"
# ─── Defensive cases — null/empty inputs shouldn't crash ───────────────────
def test_null_commit_type_and_branch_classifies_new():
assert classify_pr_operation("merged", None, None, None) == "new"
def test_unknown_status_falls_back_to_infra():
assert classify_pr_operation("nonsense", None, None, None) == "infra"

437
tests/test_leaderboard.py Normal file
View file

@ -0,0 +1,437 @@
"""Tests for /api/leaderboard endpoint (diagnostics/leaderboard_routes.py).
Locks behavior for the four slicings consumed by Argus + Oberon:
- window: all_time | Nd | Nh
- domain: per-domain filter
- kind: person | agent | org | all
- limit: pagination + has_more flag
Regression coverage includes the AND-prefix SQL bug (commit 42d35d4): _parse_window
returned clauses prefixed with 'AND ' which produced 'WHERE 1=1 AND AND ...' when
joined into the WHERE clause via " AND ".join(...).
"""
import asyncio
import json
import sqlite3
from pathlib import Path
import pytest
# Skip whole file if aiohttp isn't available (matches test_activity_classify.py pattern)
aiohttp = pytest.importorskip("aiohttp")
# Make diagnostics/ importable
import sys
DIAG_ROOT = Path(__file__).parent.parent / "diagnostics"
sys.path.insert(0, str(DIAG_ROOT))
from leaderboard_routes import ( # noqa: E402
_parse_window,
handle_leaderboard,
KIND_VALUES,
LEADERBOARD_PUBLIC_PATHS,
)
from aiohttp.test_utils import make_mocked_request # noqa: E402
# ─── Schema lifted from lib/db.py:138-209 (v25 minimum) ──────────────────────
SCHEMA = """
CREATE TABLE contributors (
handle TEXT PRIMARY KEY,
kind TEXT DEFAULT 'person',
tier TEXT DEFAULT 'new',
claims_merged INTEGER DEFAULT 0,
sourcer_count INTEGER DEFAULT 0,
extractor_count INTEGER DEFAULT 0,
challenger_count INTEGER DEFAULT 0,
synthesizer_count INTEGER DEFAULT 0,
reviewer_count INTEGER DEFAULT 0,
challenges_survived INTEGER DEFAULT 0,
domains TEXT DEFAULT '[]',
first_contribution TEXT,
last_contribution TEXT
);
CREATE TABLE contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
"""
# ─── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture
def db_path(tmp_path):
"""Seeded pipeline.db with deterministic events.
Cohort:
- alice (person): 3 author events, 1 originator (recent 3d, internet-finance)
- bob (person): 5 author events (older, 60d ago, ai-alignment)
- carol (person): 1 author + 1 evaluator (today, internet-finance)
- rio (agent): 4 author + 2 evaluator (mixed, internet-finance + grand-strategy)
- leo (agent): 8 evaluator events (today, mixed domains)
- cnbc (org): 2 originator events (legacy, before classifier moved orgs)
- newhandle (no contributors row): 1 author event tests LEFT JOIN COALESCE
"""
p = tmp_path / "pipeline.db"
conn = sqlite3.connect(str(p))
conn.executescript(SCHEMA)
contribs = [
("alice", "person"),
("bob", "person"),
("carol", "person"),
("rio", "agent"),
("leo", "agent"),
("cnbc", "org"),
# newhandle intentionally absent — tests LEFT JOIN
]
for handle, kind in contribs:
conn.execute(
"INSERT INTO contributors (handle, kind) VALUES (?, ?)",
(handle, kind),
)
# (handle, role, weight, pr_number, claim_path, domain, timestamp)
events = [
# alice — 3 author + 1 originator, recent (all >24h ago, all <7d)
# Most-recent event at -2 days (not -1 days) so 24h window exclusion is
# unambiguous and not subject to fixture-vs-query microsecond drift.
("alice", "author", 0.30, 100, None, "internet-finance", "now,-2 days"),
("alice", "author", 0.30, 101, None, "internet-finance", "now,-2 days"),
("alice", "author", 0.30, 102, None, "ai-alignment", "now,-3 days"),
("alice", "originator", 0.15, 103, "domains/internet-finance/x.md", "internet-finance", "now,-2 days"),
# bob — 5 author, all 60d ago (outside 30d, inside all_time)
("bob", "author", 0.30, 200, None, "ai-alignment", "now,-60 days"),
("bob", "author", 0.30, 201, None, "ai-alignment", "now,-60 days"),
("bob", "author", 0.30, 202, None, "ai-alignment", "now,-61 days"),
("bob", "author", 0.30, 203, None, "ai-alignment", "now,-62 days"),
("bob", "author", 0.30, 204, None, "ai-alignment", "now,-63 days"),
# carol — 1 author + 1 evaluator, today
("carol", "author", 0.30, 300, None, "internet-finance", "now"),
("carol", "evaluator", 0.05, 301, None, "internet-finance", "now"),
# rio agent — 4 author + 2 evaluator
("rio", "author", 0.30, 400, None, "internet-finance", "now,-2 days"),
("rio", "author", 0.30, 401, None, "grand-strategy", "now,-2 days"),
("rio", "author", 0.30, 402, None, "internet-finance", "now,-2 days"),
("rio", "author", 0.30, 403, None, "internet-finance", "now,-2 days"),
("rio", "evaluator", 0.05, 404, None, "ai-alignment", "now,-2 days"),
("rio", "evaluator", 0.05, 405, None, "ai-alignment", "now,-2 days"),
# leo agent — 8 evaluator
*[
("leo", "evaluator", 0.05, 500 + i, None, "internet-finance" if i % 2 == 0 else "ai-alignment", "now")
for i in range(8)
],
# cnbc org — 2 originator (legacy data, kept by classifier+gate split)
("cnbc", "originator", 0.15, 600, "domains/internet-finance/y.md", "internet-finance", "now,-5 days"),
("cnbc", "originator", 0.15, 601, "domains/internet-finance/z.md", "internet-finance", "now,-5 days"),
# newhandle — handle in events but no contributors row (LEFT JOIN COALESCE → person)
# -2 days so 24h-window test exclusion is unambiguous (matches alice).
("newhandle", "author", 0.30, 700, None, "ai-alignment", "now,-2 days"),
]
for handle, role, weight, pr_num, claim_path, domain, ts_modifier in events:
# Use SQLite datetime() to compute timestamps relative to "now" so tests
# are deterministic across days. Multi-arg form: datetime('now', '-1 days').
ts_args = ts_modifier.split(",")
if len(ts_args) == 1:
ts_sql = f"datetime('{ts_args[0]}')"
else:
ts_sql = f"datetime('{ts_args[0]}', '{ts_args[1].strip()}')"
conn.execute(
f"""INSERT INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, {ts_sql})""",
(handle, "agent" if handle in {"rio", "leo"} else "person",
role, weight, pr_num, claim_path, domain),
)
conn.commit()
conn.close()
return str(p)
def _call(db_path, **query):
"""Build a mocked request, call handle_leaderboard, return parsed JSON."""
qs = "&".join(f"{k}={v}" for k, v in query.items())
req = make_mocked_request("GET", f"/api/leaderboard?{qs}")
# make_mocked_request gives us req.app — write db_path into it.
req.app["db_path"] = db_path
response = asyncio.run(handle_leaderboard(req))
return json.loads(response.body.decode())
# ─── _parse_window unit tests ────────────────────────────────────────────────
class TestParseWindow:
def test_default_is_all_time(self):
clause, params, label = _parse_window(None)
assert clause == ""
assert params == ()
assert label == "all_time"
def test_explicit_all_time(self):
clause, params, label = _parse_window("all_time")
assert clause == ""
assert label == "all_time"
def test_seven_days(self):
clause, params, label = _parse_window("7d")
assert clause == "ce.timestamp >= datetime('now', ?)"
assert params == ("-7 days",)
assert label == "7d"
# Regression: must NOT begin with "AND " (handle_leaderboard composes via " AND ".join)
assert not clause.startswith("AND")
def test_thirty_days(self):
clause, params, label = _parse_window("30d")
assert params == ("-30 days",)
assert label == "30d"
def test_hours(self):
clause, params, label = _parse_window("24h")
assert clause == "ce.timestamp >= datetime('now', ?)"
assert params == ("-24 hours",)
assert label == "24h"
def test_caps_days_at_365(self):
clause, params, label = _parse_window("9999d")
assert params == ("-365 days",)
def test_caps_hours_at_8760(self):
clause, params, label = _parse_window("99999h")
assert params == ("-8760 hours",)
def test_garbage_falls_to_all_time(self):
clause, params, label = _parse_window("foobar")
assert clause == ""
assert label == "all_time"
def test_uppercase_normalized(self):
clause, params, label = _parse_window("7D")
assert label == "7d"
def test_zero_days_still_emits_clause(self):
# 0d means "now or later" — empty result, but parse should succeed
clause, params, label = _parse_window("0d")
assert "datetime" in clause
assert label == "0d"
# ─── handle_leaderboard integration tests ────────────────────────────────────
class TestLeaderboardEndpoint:
def test_all_time_default_kind_person(self, db_path):
"""Default kind is 'person'. Returns all persons, sorted by CI desc."""
body = _call(db_path)
assert body["window"] == "all_time"
assert body["kind_filter"] == "person"
assert body["domain"] is None
assert body["source"] == "contribution_events"
# alice 3*0.30 + 0.15 = 1.05
# bob 5*0.30 = 1.50
# carol 0.30 + 0.05 = 0.35
# newhandle 0.30 (LEFT JOIN COALESCE → 'person')
# cnbc excluded (kind='org')
# rio/leo excluded (kind='agent')
handles = [r["handle"] for r in body["leaderboard"]]
assert "bob" in handles
assert "alice" in handles
assert "newhandle" in handles, "LEFT JOIN COALESCE should default missing contributors to 'person'"
assert "cnbc" not in handles, "kind=person should exclude orgs"
assert "rio" not in handles, "kind=person should exclude agents"
# Descending by CI
cis = [r["ci"] for r in body["leaderboard"]]
assert cis == sorted(cis, reverse=True)
def test_window_7d_excludes_old_events(self, db_path):
"""REGRESSION: 7d window must execute (no AND-prefix SQL error).
Bob has all events 60d ago must not appear in 7d window.
Alice has events 1-3d ago must appear.
"""
body = _call(db_path, window="7d")
assert body["window"] == "7d"
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "bob" not in handles, "60d-old events must be excluded from 7d window"
assert "carol" in handles # today
def test_window_30d_excludes_60d_events(self, db_path):
"""REGRESSION: 30d window must execute. Bob (60d) excluded; alice/carol included."""
body = _call(db_path, window="30d")
assert body["window"] == "30d"
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles
def test_window_24h_only_today(self, db_path):
"""24h window picks up today's events only.
Default kind=person. Within 24h: only carol (events at 'now').
Excluded: alice/newhandle (events at -2 days), bob (-60d), rio/leo (kind),
cnbc (-5d AND kind=org).
"""
body = _call(db_path, window="24h")
handles = [r["handle"] for r in body["leaderboard"]]
assert handles == ["carol"], (
"24h + kind=person should return only carol; got %r" % handles
)
def test_kind_agent(self, db_path):
"""kind=agent returns only agents."""
body = _call(db_path, kind="agent")
handles = [r["handle"] for r in body["leaderboard"]]
assert "rio" in handles
assert "leo" in handles
assert "alice" not in handles
assert "bob" not in handles
def test_kind_org(self, db_path):
"""kind=org returns only orgs (legacy events still queryable)."""
body = _call(db_path, kind="org")
handles = [r["handle"] for r in body["leaderboard"]]
assert handles == ["cnbc"]
assert body["leaderboard"][0]["ci"] == 0.30 # 2 * 0.15
def test_kind_all_returns_everyone(self, db_path):
"""kind=all returns all kinds — persons + agents + orgs."""
body = _call(db_path, kind="all")
handles = {r["handle"] for r in body["leaderboard"]}
assert handles == {"alice", "bob", "carol", "rio", "leo", "cnbc", "newhandle"}
def test_invalid_kind_falls_to_person(self, db_path):
"""Defensive: unknown kind value silently falls back to 'person'."""
body = _call(db_path, kind="bogus")
assert body["kind_filter"] == "person"
def test_domain_filter(self, db_path):
"""domain=internet-finance scopes events; kind filter still applies."""
body = _call(db_path, domain="internet-finance")
assert body["domain"] == "internet-finance"
handles = {r["handle"] for r in body["leaderboard"]}
# alice has 2 internet-finance authors + 1 originator
# carol has 1 internet-finance author + 1 evaluator
# bob has 0 (all ai-alignment)
# newhandle has 0 (ai-alignment only)
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles
assert "newhandle" not in handles
def test_composed_window_kind_domain(self, db_path):
"""REGRESSION: composed filters must build SQL correctly.
7d + person + internet-finance alice only.
"""
body = _call(db_path, window="7d", kind="person", domain="internet-finance")
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles # excluded by 7d
assert "rio" not in handles # excluded by kind=person
def test_limit_caps_results(self, db_path):
"""limit caps the leaderboard slice; total reflects unfiltered count."""
body = _call(db_path, kind="all", limit=3)
assert body["shown"] == 3
assert body["has_more"] is True
assert body["total"] == 7
def test_no_has_more_when_under_limit(self, db_path):
body = _call(db_path, kind="org")
assert body["shown"] == 1
assert body["has_more"] is False
assert body["total"] == 1
def test_invalid_limit_falls_to_default(self, db_path):
"""Defensive: garbage limit param falls to default 100. 7 entries < 100."""
body = _call(db_path, kind="all", limit="not-a-number")
assert body["shown"] == 7
assert body["has_more"] is False
def test_limit_capped_at_500(self, db_path):
"""Defensive: limit > 500 silently caps at 500."""
body = _call(db_path, limit=99999, kind="all")
# No assertion on the value of the cap from the response — just that
# it doesn't error and shown <= 500.
assert body["shown"] <= 500
def test_role_breakdown_present(self, db_path):
"""Each row includes ci_breakdown with all 5 roles."""
body = _call(db_path)
for entry in body["leaderboard"]:
assert set(entry["ci_breakdown"].keys()) == {
"author", "challenger", "synthesizer", "originator", "evaluator",
}
def test_alice_role_breakdown_correct(self, db_path):
"""Alice has 3 author (0.90) + 1 originator (0.15) = 1.05 total."""
body = _call(db_path)
alice = next(r for r in body["leaderboard"] if r["handle"] == "alice")
assert alice["ci"] == 1.05
assert alice["ci_breakdown"]["author"] == 0.90
assert alice["ci_breakdown"]["originator"] == 0.15
assert alice["ci_breakdown"]["challenger"] == 0
assert alice["ci_breakdown"]["synthesizer"] == 0
assert alice["ci_breakdown"]["evaluator"] == 0
assert alice["events_count"] == 4
assert alice["pr_count"] == 4
assert alice["domain_count"] == 2 # internet-finance + ai-alignment
def test_empty_window_returns_clean_response(self, db_path):
"""Window with no matching events returns shape-correct empty response."""
# 24h window + kind=org → cnbc is 5d ago, so empty
body = _call(db_path, window="24h", kind="org")
assert body["leaderboard"] == []
assert body["total"] == 0
assert body["shown"] == 0
assert body["has_more"] is False
assert body["source"] == "contribution_events"
def test_left_join_handles_missing_contributors_row(self, db_path):
"""REGRESSION: handle in events but missing from contributors must default to kind='person'.
Catches the failure mode where a handle classified as cited (auto-create
deferred to Branch 3) accumulates events but has no contributors row yet.
"""
body = _call(db_path)
newhandle_row = next(
(r for r in body["leaderboard"] if r["handle"] == "newhandle"), None
)
assert newhandle_row is not None
assert newhandle_row["kind"] == "person"
assert newhandle_row["ci"] == 0.30
# ─── Public path constant (auth middleware bypass) ───────────────────────────
def test_public_paths_includes_leaderboard():
"""Auth middleware needs LEADERBOARD_PUBLIC_PATHS to skip API key for /api/leaderboard."""
assert "/api/leaderboard" in LEADERBOARD_PUBLIC_PATHS
def test_kind_values_matches_contract():
"""API contract: only these 4 kind values are accepted."""
assert set(KIND_VALUES) == {"person", "agent", "org", "all"}

View file

@ -0,0 +1,167 @@
"""Verify research-attribution backfill is replay-safe against real schema.
Three things to prove:
1. (handle, role, pr_number) with claim_path=NULL deduplicates correctly
(idx_ce_unique_pr partial index handles SQLite NULL-not-equal-NULL).
2. Re-inserting an existing (handle, role, pr_number, NULL) row via INSERT OR IGNORE
is a true no-op does not create a phantom duplicate.
3. The backfill script's specific operation (DELETE then INSERT for same key)
nets zero rows when run twice in sequence.
"""
import sqlite3
import sys
# Schema lifted verbatim from lib/db.py:181-209
SCHEMA = """
CREATE TABLE contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
"""
def setup() -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
return conn
def insert_event(conn, handle, role, pr_number, claim_path=None):
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES (?, 'agent', ?, 0.30, ?, ?)""",
(handle, role, pr_number, claim_path),
)
return cur.rowcount
def count(conn) -> int:
return conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
def test_pr_level_dedup_with_null_claim_path():
"""Two inserts of same (handle, role, pr_number, NULL) → 1 row."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061)
r2 = insert_event(conn, "rio", "author", 4061)
n = count(conn)
assert r1 == 1, f"first insert should write, got rowcount={r1}"
assert r2 == 0, f"second insert should be ignored, got rowcount={r2}"
assert n == 1, f"expected 1 row, got {n}"
print("PASS: pr-level dedup with NULL claim_path")
def test_per_claim_dedup_with_path():
"""Two inserts of same (handle, role, pr_number, path) → 1 row."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
n = count(conn)
assert r1 == 1 and r2 == 0 and n == 1
print("PASS: per-claim dedup with claim_path")
def test_pr_level_and_per_claim_coexist():
"""A (handle, role, pr_number, NULL) and (handle, role, pr_number, 'x.md') coexist
because the partial indexes target different rows."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061, claim_path=None)
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
n = count(conn)
assert r1 == 1 and r2 == 1 and n == 2
print("PASS: pr-level and per-claim events coexist on same pr_number")
def test_backfill_replay_is_noop():
"""Simulate the exact backfill operation: INSERT correct event, DELETE wrong event.
Run twice. Expect identical state no phantom rows, no double-deletions."""
conn = setup()
# Initial state: m3taversal has the wrong author event for pr=4061
insert_event(conn, "m3taversal", "author", 4061)
assert count(conn) == 1
def backfill_pr_4061():
# Insert the correct event (rio is the real author)
conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES (?, 'agent', 'author', 0.30, 4061, NULL)""",
("rio (self-directed)",),
)
# Delete the wrong event
conn.execute(
"""DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author'
AND pr_number=4061 AND claim_path IS NULL""",
)
conn.commit()
backfill_pr_4061()
state_after_first = sorted(
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
for r in conn.execute("SELECT * FROM contribution_events")
)
assert state_after_first == [("rio (self-directed)", "author", 4061, None)], state_after_first
# Replay
backfill_pr_4061()
state_after_second = sorted(
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
for r in conn.execute("SELECT * FROM contribution_events")
)
assert state_after_first == state_after_second, "replay should be idempotent"
assert count(conn) == 1, f"expected 1 row after replay, got {count(conn)}"
print("PASS: backfill replay is a true no-op")
def test_replay_against_already_backfilled_pr_does_not_double_delete():
"""If m3taversal event was already deleted, running backfill again must not error
or affect anything else."""
conn = setup()
# Already-correct state: rio has the author event, m3taversal does not
insert_event(conn, "rio (self-directed)", "author", 4061)
insert_event(conn, "leo", "evaluator", 4061) # noise — should not be touched
# Run backfill: tries to INSERT (rio, author, 4061) — already exists, no-op
# Tries to DELETE (m3taversal, author, 4061) — already absent, 0 rows affected
cur1 = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES ('rio (self-directed)', 'agent', 'author', 0.30, 4061, NULL)""",
)
cur2 = conn.execute(
"""DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author'
AND pr_number=4061 AND claim_path IS NULL""",
)
assert cur1.rowcount == 0, f"insert should be no-op, got {cur1.rowcount}"
assert cur2.rowcount == 0, f"delete should be no-op, got {cur2.rowcount}"
assert count(conn) == 2, f"expected 2 rows preserved, got {count(conn)}"
print("PASS: replay against already-backfilled state preserves unrelated events")
if __name__ == "__main__":
test_pr_level_dedup_with_null_claim_path()
test_per_claim_dedup_with_path()
test_pr_level_and_per_claim_coexist()
test_backfill_replay_is_noop()
test_replay_against_already_backfilled_pr_does_not_double_delete()
print("\nAll 5 tests passed against real schema.")