Schema v26 (commit 3fe524d) split orgs/citations from contributors into
the publishers table. Without a writer-side gate, every merged PR with
`sourcer: cnbc` (or similar) re-creates CNBC as a contributor and
undoes the v26 classifier cleanup. Once normal pipeline traffic resumes,
the contributors table re-pollutes within hours.
Fix: belt-and-suspenders gate at both writer surfaces.
1. `lib/attribution.py::is_publisher_handle(handle, conn)` — returns
publisher.id if handle exists in publishers.name, else None. Falls
back gracefully on pre-v26 DBs (no publishers table → returns None →
writer behaves like before, no regression).
2. `lib/contributor.py::insert_contribution_event` — checks
is_publisher_handle on canonical handle before INSERT. If it's a
publisher, debug-log + return False. Prevents originator events for
CNBC/SpaceNews/etc.
3. `lib/contributor.py::upsert_contributor` — same gate at top. Prevents
the contributors table from re-acquiring publisher rows.
Verified end-to-end against live VPS DB snapshot:
- CNBC originator event: blocked (insert returns False)
- CNBC contributors row: blocked (no row created)
- alexastrum, thesensatore, newhandle_xyz: pass through unchanged
- is_publisher_handle handles case-insensitive lookup correctly
(CNBC and cnbc both match publisher_id=3)
Pre-deploy event count was 3705. Post-classifier cleanup: 3623 (82 org
events purged). Going forward, no new org events accumulate.
Branch 2 of the schema-v26 rollout. Branch 3 (auto-create at tier='cited',
extract.py sources.publisher_id wiring) is separate scope and not required
for regression prevention.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
325 lines
13 KiB
Python
325 lines
13 KiB
Python
"""Attribution module — shared between post_extract.py and merge.py.
|
|
|
|
Owns: parsing attribution from YAML frontmatter, validating role entries,
|
|
computing role counts for contributor upserts, building attribution blocks.
|
|
|
|
Avoids circular dependency between post_extract.py (validates attribution at
|
|
extraction time) and merge.py (records attribution at merge time). Both
|
|
import from this shared module.
|
|
|
|
Schema reference: schemas/attribution.md
|
|
Weights reference: schemas/contribution-weights.yaml
|
|
|
|
Epimetheus owns this module. Leo reviews changes.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger("pipeline.attribution")
|
|
|
|
VALID_ROLES = frozenset({"sourcer", "extractor", "challenger", "synthesizer", "reviewer"})
|
|
|
|
# Agent-owned branch prefixes — PRs from these branches get Pentagon-Agent trailer
|
|
# credit for challenger/synthesizer roles. Pipeline-infra branches (extract/ reweave/
|
|
# fix/ ingestion/) are deliberately excluded: they're automation, not contribution.
|
|
# Single source of truth; imported by contributor.py and backfill-events.py.
|
|
AGENT_BRANCH_PREFIXES = (
|
|
"rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/",
|
|
)
|
|
|
|
# Handle sanity: lowercase alphanumerics, hyphens, underscores. 1-39 chars (matches
|
|
# GitHub's handle rules). Rejects garbage like "governance---meritocratic-voting-+-futarchy"
|
|
# or "sec-interpretive-release-s7-2026-09-(march-17" that upstream frontmatter hygiene
|
|
# bugs produce. Apply at parse time so bad handles never reach the contributors table.
|
|
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
|
|
|
|
|
|
def _valid_handle(handle: str) -> bool:
|
|
"""Return True if handle matches the handle format (alphanum + _-, ≤39 chars)."""
|
|
if not handle or not isinstance(handle, str):
|
|
return False
|
|
h = handle.strip().lower().lstrip("@")
|
|
if h.endswith("-") or h.endswith("_"):
|
|
return False
|
|
return bool(_HANDLE_RE.match(h))
|
|
|
|
|
|
def _filter_valid_handles(result: dict) -> dict:
|
|
"""Drop entries with invalid handles from a parsed attribution dict."""
|
|
filtered: dict[str, list[dict]] = {role: [] for role in VALID_ROLES}
|
|
for role, entries in result.items():
|
|
for entry in entries:
|
|
if _valid_handle(entry.get("handle", "")):
|
|
filtered[role].append(entry)
|
|
return filtered
|
|
|
|
|
|
# ─── Handle normalization + kind classification (schema v24) ──────────────
|
|
|
|
# Known Pentagon agents. Used to classify contributor kind='agent' so the
|
|
# leaderboard can filter them out of the default person view.
|
|
PENTAGON_AGENTS = frozenset({
|
|
"rio", "leo", "theseus", "vida", "clay", "astra",
|
|
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
|
"pipeline", # pipeline-owned commits (extract/*, reweave/*, fix/*)
|
|
})
|
|
|
|
|
|
def normalize_handle(handle: str, conn=None) -> str:
|
|
"""Canonicalize a handle: lowercase, strip @, resolve alias if conn provided.
|
|
|
|
Examples:
|
|
'@thesensatore' → 'thesensatore'
|
|
'Cameron' → 'cameron' → 'cameron-s1' (via alias if seeded)
|
|
'CNBC' → 'cnbc'
|
|
|
|
Always lowercases and strips @ prefix. Alias resolution requires a conn
|
|
argument (not always available at parse time; merge-time writer passes it).
|
|
"""
|
|
if not handle:
|
|
return ""
|
|
h = handle.strip().lower().lstrip("@")
|
|
if conn is None:
|
|
return h
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,),
|
|
).fetchone()
|
|
if row:
|
|
return row["canonical"] if isinstance(row, dict) or hasattr(row, "keys") else row[0]
|
|
except Exception:
|
|
# Alias table might not exist yet on pre-v24 DBs — degrade gracefully.
|
|
logger.debug("normalize_handle: alias lookup failed for %r", h, exc_info=True)
|
|
return h
|
|
|
|
|
|
def classify_kind(handle: str) -> str:
|
|
"""Return 'agent' for known Pentagon agents, 'person' otherwise.
|
|
|
|
The 'org' kind (CNBC, SpaceNews, etc.) is assigned by operator review,
|
|
not inferred here. Keeping heuristics narrow: we know our own agents;
|
|
everything else defaults to person until explicitly classified.
|
|
"""
|
|
h = handle.strip().lower().lstrip("@")
|
|
if h in PENTAGON_AGENTS:
|
|
return "agent"
|
|
return "person"
|
|
|
|
|
|
def is_publisher_handle(handle: str, conn) -> int | None:
|
|
"""Return publisher.id if the handle exists as a publisher name, else None.
|
|
|
|
Schema v26 split orgs/citations into the publishers table. Writer code
|
|
(upsert_contributor, insert_contribution_event) calls this to gate creating
|
|
contributor rows or events for handles that belong to publishers.
|
|
|
|
Without this gate, every merged PR with `sourcer: cnbc` (for example) would
|
|
re-create CNBC as a contributor and undo the v26 classifier cleanup.
|
|
|
|
Falls back gracefully on pre-v26 DBs: returns None if publishers table
|
|
doesn't exist yet (writer behaves like before, no regression).
|
|
"""
|
|
if not handle or conn is None:
|
|
return None
|
|
h = handle.strip().lower().lstrip("@")
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT id FROM publishers WHERE name = ?", (h,),
|
|
).fetchone()
|
|
if row:
|
|
return row["id"] if hasattr(row, "keys") else row[0]
|
|
except Exception:
|
|
logger.debug("is_publisher_handle: lookup failed for %r", h, exc_info=True)
|
|
return None
|
|
|
|
|
|
# ─── Parse attribution from claim content ──────────────────────────────────
|
|
|
|
|
|
def parse_attribution(fm: dict) -> dict[str, list[dict]]:
|
|
"""Extract attribution block from claim frontmatter.
|
|
|
|
Returns {role: [{"handle": str, "agent_id": str|None, "context": str|None}]}
|
|
Handles both nested YAML format and flat field format.
|
|
"""
|
|
result = {role: [] for role in VALID_ROLES}
|
|
|
|
attribution = fm.get("attribution")
|
|
if isinstance(attribution, dict):
|
|
# Nested format (from schema spec)
|
|
for role in VALID_ROLES:
|
|
entries = attribution.get(role, [])
|
|
if isinstance(entries, list):
|
|
for entry in entries:
|
|
if isinstance(entry, dict) and "handle" in entry:
|
|
result[role].append({
|
|
"handle": entry["handle"].strip().lower().lstrip("@"),
|
|
"agent_id": entry.get("agent_id"),
|
|
"context": entry.get("context"),
|
|
})
|
|
elif isinstance(entry, str):
|
|
result[role].append({"handle": entry.strip().lower().lstrip("@"), "agent_id": None, "context": None})
|
|
elif isinstance(entries, str):
|
|
# Single entry as string
|
|
result[role].append({"handle": entries.strip().lower().lstrip("@"), "agent_id": None, "context": None})
|
|
# Fall through to the filter at the end (don't early-return). The nested
|
|
# block path was skipping the handle sanity filter, letting garbage like
|
|
# "senator-elissa-slotkin-/-the-hill" through when it was written into
|
|
# frontmatter during the legacy-fallback era.
|
|
return _filter_valid_handles(result)
|
|
|
|
# Flat format fallback (attribution_sourcer, attribution_extractor, etc.)
|
|
for role in VALID_ROLES:
|
|
flat_val = fm.get(f"attribution_{role}")
|
|
if flat_val:
|
|
if isinstance(flat_val, str):
|
|
result[role].append({"handle": flat_val.strip().lower().lstrip("@"), "agent_id": None, "context": None})
|
|
elif isinstance(flat_val, list):
|
|
for v in flat_val:
|
|
if isinstance(v, str):
|
|
result[role].append({"handle": v.strip().lower().lstrip("@"), "agent_id": None, "context": None})
|
|
|
|
# Bare-key flat format: `sourcer: alexastrum`, `extractor: leo`, etc.
|
|
# This is what extract.py writes (line 290: f'sourcer: "{sourcer}"') — the most
|
|
# common format in practice (~42% of claim files). The Apr 24 incident traced
|
|
# missing leaderboard entries to this format being silently dropped because the
|
|
# parser only checked the `attribution_*` prefix.
|
|
# Only fill if the role wasn't already populated by the prefixed form, to avoid
|
|
# double-counting when both formats coexist on the same claim.
|
|
for role in VALID_ROLES:
|
|
if result[role]:
|
|
continue
|
|
bare_val = fm.get(role)
|
|
if isinstance(bare_val, str) and bare_val.strip():
|
|
result[role].append({"handle": bare_val.strip().lower().lstrip("@"), "agent_id": None, "context": None})
|
|
elif isinstance(bare_val, list):
|
|
for v in bare_val:
|
|
if isinstance(v, str) and v.strip():
|
|
result[role].append({"handle": v.strip().lower().lstrip("@"), "agent_id": None, "context": None})
|
|
elif isinstance(v, dict) and v.get("handle"):
|
|
result[role].append({
|
|
"handle": v["handle"].strip().lower().lstrip("@"),
|
|
"agent_id": v.get("agent_id"),
|
|
"context": v.get("context"),
|
|
})
|
|
|
|
# Legacy `source` heuristic REMOVED (Ganymede review, Apr 24). It fabricated
|
|
# handles from descriptive source strings — "governance---meritocratic-voting-+-
|
|
# futarchy", "cameron-(contributor)", "sec-interpretive-release-s7-2026-09-
|
|
# (march-17". Hit rate on real handles was near-zero, false-positive rate was
|
|
# high. Claims without explicit attribution now return empty (better surface as
|
|
# data hygiene than invent fake contributors).
|
|
|
|
# Filter to valid handles only. Bad handles (garbage from upstream frontmatter
|
|
# bugs) get dropped rather than written to the contributors table.
|
|
return _filter_valid_handles(result)
|
|
|
|
|
|
def parse_attribution_from_file(filepath: str) -> dict[str, list[dict]]:
|
|
"""Read a claim file and extract attribution. Returns role→entries dict."""
|
|
try:
|
|
content = Path(filepath).read_text()
|
|
except (FileNotFoundError, PermissionError):
|
|
return {role: [] for role in VALID_ROLES}
|
|
|
|
from .post_extract import parse_frontmatter
|
|
fm, _ = parse_frontmatter(content)
|
|
if fm is None:
|
|
return {role: [] for role in VALID_ROLES}
|
|
|
|
return parse_attribution(fm)
|
|
|
|
|
|
# ─── Validate attribution ──────────────────────────────────────────────────
|
|
|
|
|
|
def validate_attribution(fm: dict, agent: str | None = None) -> list[str]:
|
|
"""Validate attribution block in claim frontmatter.
|
|
|
|
Returns list of issues. Block on missing extractor, warn on missing sourcer.
|
|
(Leo: extractor is always known, sourcer is best-effort.)
|
|
|
|
If agent is provided and extractor is missing, auto-fix by setting the
|
|
agent as extractor (same pattern as created-date auto-fix).
|
|
|
|
Only validates if an attribution block is explicitly present. Legacy claims
|
|
without attribution blocks are not blocked — they'll get attribution when
|
|
enriched. New claims from v2 extraction always have attribution.
|
|
"""
|
|
issues = []
|
|
|
|
# Only validate if attribution block exists (don't break legacy claims)
|
|
has_attribution = (
|
|
fm.get("attribution") is not None
|
|
or any(fm.get(f"attribution_{role}") for role in VALID_ROLES)
|
|
)
|
|
if not has_attribution:
|
|
return [] # No attribution block = legacy claim, not an error
|
|
|
|
attribution = parse_attribution(fm)
|
|
|
|
if not attribution["extractor"]:
|
|
if agent:
|
|
# Auto-fix: set the processing agent as extractor
|
|
attr = fm.get("attribution")
|
|
if isinstance(attr, dict):
|
|
attr["extractor"] = [{"handle": agent}]
|
|
else:
|
|
fm["attribution"] = {"extractor": [{"handle": agent}]}
|
|
issues.append("fixed_missing_extractor")
|
|
else:
|
|
issues.append("missing_attribution_extractor")
|
|
|
|
return issues
|
|
|
|
|
|
# ─── Build attribution block ──────────────────────────────────────────────
|
|
|
|
|
|
def build_attribution_block(
|
|
agent: str,
|
|
agent_id: str | None = None,
|
|
source_handle: str | None = None,
|
|
source_context: str | None = None,
|
|
) -> dict:
|
|
"""Build an attribution dict for a newly extracted claim.
|
|
|
|
Called by openrouter-extract-v2.py when reconstructing claim content.
|
|
"""
|
|
attribution = {
|
|
"extractor": [{"handle": agent}],
|
|
"sourcer": [],
|
|
"challenger": [],
|
|
"synthesizer": [],
|
|
"reviewer": [],
|
|
}
|
|
|
|
if agent_id:
|
|
attribution["extractor"][0]["agent_id"] = agent_id
|
|
|
|
if source_handle:
|
|
entry = {"handle": source_handle.strip().lower().lstrip("@")}
|
|
if source_context:
|
|
entry["context"] = source_context
|
|
attribution["sourcer"].append(entry)
|
|
|
|
return attribution
|
|
|
|
|
|
# ─── Compute role counts for contributor upserts ──────────────────────────
|
|
|
|
|
|
def role_counts_from_attribution(attribution: dict[str, list[dict]]) -> dict[str, list[str]]:
|
|
"""Extract {role: [handle, ...]} for contributor table upserts.
|
|
|
|
Returns a dict mapping each role to the list of contributor handles.
|
|
Used by merge.py to credit contributors after merge.
|
|
"""
|
|
counts: dict[str, list[str]] = {}
|
|
for role in VALID_ROLES:
|
|
handles = [entry["handle"] for entry in attribution.get(role, []) if entry.get("handle")]
|
|
if handles:
|
|
counts[role] = handles
|
|
return counts
|