"""Attribution module — shared between post_extract.py and merge.py. Owns: parsing attribution from YAML frontmatter, validating role entries, computing role counts for contributor upserts, building attribution blocks. Avoids circular dependency between post_extract.py (validates attribution at extraction time) and merge.py (records attribution at merge time). Both import from this shared module. Schema reference: schemas/attribution.md Weights reference: schemas/contribution-weights.yaml Epimetheus owns this module. Leo reviews changes. """ import logging import re from pathlib import Path logger = logging.getLogger("pipeline.attribution") VALID_ROLES = frozenset({"sourcer", "extractor", "challenger", "synthesizer", "reviewer"}) # Agent-owned branch prefixes — PRs from these branches get Pentagon-Agent trailer # credit for challenger/synthesizer roles. Pipeline-infra branches (extract/ reweave/ # fix/ ingestion/) are deliberately excluded: they're automation, not contribution. # Single source of truth; imported by contributor.py and backfill-events.py. AGENT_BRANCH_PREFIXES = ( "rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/", ) # Handle sanity: lowercase alphanumerics, hyphens, underscores. 1-39 chars (matches # GitHub's handle rules). Rejects garbage like "governance---meritocratic-voting-+-futarchy" # or "sec-interpretive-release-s7-2026-09-(march-17" that upstream frontmatter hygiene # bugs produce. Apply at parse time so bad handles never reach the contributors table. _HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$") def _valid_handle(handle: str) -> bool: """Return True if handle matches the handle format (alphanum + _-, ≤39 chars).""" if not handle or not isinstance(handle, str): return False h = handle.strip().lower().lstrip("@") if h.endswith("-") or h.endswith("_"): return False return bool(_HANDLE_RE.match(h)) def _filter_valid_handles(result: dict) -> dict: """Drop entries with invalid handles from a parsed attribution dict.""" filtered: dict[str, list[dict]] = {role: [] for role in VALID_ROLES} for role, entries in result.items(): for entry in entries: if _valid_handle(entry.get("handle", "")): filtered[role].append(entry) return filtered # ─── Handle normalization + kind classification (schema v24) ────────────── # Known Pentagon agents. Used to classify contributor kind='agent' so the # leaderboard can filter them out of the default person view. PENTAGON_AGENTS = frozenset({ "rio", "leo", "theseus", "vida", "clay", "astra", "oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship", "pipeline", # pipeline-owned commits (extract/*, reweave/*, fix/*) }) def normalize_handle(handle: str, conn=None) -> str: """Canonicalize a handle: lowercase, strip @, resolve alias if conn provided. Examples: '@thesensatore' → 'thesensatore' 'Cameron' → 'cameron' → 'cameron-s1' (via alias if seeded) 'CNBC' → 'cnbc' Always lowercases and strips @ prefix. Alias resolution requires a conn argument (not always available at parse time; merge-time writer passes it). """ if not handle: return "" h = handle.strip().lower().lstrip("@") if conn is None: return h try: row = conn.execute( "SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,), ).fetchone() if row: return row["canonical"] if isinstance(row, dict) or hasattr(row, "keys") else row[0] except Exception: # Alias table might not exist yet on pre-v24 DBs — degrade gracefully. logger.debug("normalize_handle: alias lookup failed for %r", h, exc_info=True) return h def classify_kind(handle: str) -> str: """Return 'agent' for known Pentagon agents, 'person' otherwise. The 'org' kind (CNBC, SpaceNews, etc.) is assigned by operator review, not inferred here. Keeping heuristics narrow: we know our own agents; everything else defaults to person until explicitly classified. """ h = handle.strip().lower().lstrip("@") if h in PENTAGON_AGENTS: return "agent" return "person" # ─── Parse attribution from claim content ────────────────────────────────── def parse_attribution(fm: dict) -> dict[str, list[dict]]: """Extract attribution block from claim frontmatter. Returns {role: [{"handle": str, "agent_id": str|None, "context": str|None}]} Handles both nested YAML format and flat field format. """ result = {role: [] for role in VALID_ROLES} attribution = fm.get("attribution") if isinstance(attribution, dict): # Nested format (from schema spec) for role in VALID_ROLES: entries = attribution.get(role, []) if isinstance(entries, list): for entry in entries: if isinstance(entry, dict) and "handle" in entry: result[role].append({ "handle": entry["handle"].strip().lower().lstrip("@"), "agent_id": entry.get("agent_id"), "context": entry.get("context"), }) elif isinstance(entry, str): result[role].append({"handle": entry.strip().lower().lstrip("@"), "agent_id": None, "context": None}) elif isinstance(entries, str): # Single entry as string result[role].append({"handle": entries.strip().lower().lstrip("@"), "agent_id": None, "context": None}) # Fall through to the filter at the end (don't early-return). The nested # block path was skipping the handle sanity filter, letting garbage like # "senator-elissa-slotkin-/-the-hill" through when it was written into # frontmatter during the legacy-fallback era. return _filter_valid_handles(result) # Flat format fallback (attribution_sourcer, attribution_extractor, etc.) for role in VALID_ROLES: flat_val = fm.get(f"attribution_{role}") if flat_val: if isinstance(flat_val, str): result[role].append({"handle": flat_val.strip().lower().lstrip("@"), "agent_id": None, "context": None}) elif isinstance(flat_val, list): for v in flat_val: if isinstance(v, str): result[role].append({"handle": v.strip().lower().lstrip("@"), "agent_id": None, "context": None}) # Bare-key flat format: `sourcer: alexastrum`, `extractor: leo`, etc. # This is what extract.py writes (line 290: f'sourcer: "{sourcer}"') — the most # common format in practice (~42% of claim files). The Apr 24 incident traced # missing leaderboard entries to this format being silently dropped because the # parser only checked the `attribution_*` prefix. # Only fill if the role wasn't already populated by the prefixed form, to avoid # double-counting when both formats coexist on the same claim. for role in VALID_ROLES: if result[role]: continue bare_val = fm.get(role) if isinstance(bare_val, str) and bare_val.strip(): result[role].append({"handle": bare_val.strip().lower().lstrip("@"), "agent_id": None, "context": None}) elif isinstance(bare_val, list): for v in bare_val: if isinstance(v, str) and v.strip(): result[role].append({"handle": v.strip().lower().lstrip("@"), "agent_id": None, "context": None}) elif isinstance(v, dict) and v.get("handle"): result[role].append({ "handle": v["handle"].strip().lower().lstrip("@"), "agent_id": v.get("agent_id"), "context": v.get("context"), }) # Legacy `source` heuristic REMOVED (Ganymede review, Apr 24). It fabricated # handles from descriptive source strings — "governance---meritocratic-voting-+- # futarchy", "cameron-(contributor)", "sec-interpretive-release-s7-2026-09- # (march-17". Hit rate on real handles was near-zero, false-positive rate was # high. Claims without explicit attribution now return empty (better surface as # data hygiene than invent fake contributors). # Filter to valid handles only. Bad handles (garbage from upstream frontmatter # bugs) get dropped rather than written to the contributors table. return _filter_valid_handles(result) def parse_attribution_from_file(filepath: str) -> dict[str, list[dict]]: """Read a claim file and extract attribution. Returns role→entries dict.""" try: content = Path(filepath).read_text() except (FileNotFoundError, PermissionError): return {role: [] for role in VALID_ROLES} from .post_extract import parse_frontmatter fm, _ = parse_frontmatter(content) if fm is None: return {role: [] for role in VALID_ROLES} return parse_attribution(fm) # ─── Validate attribution ────────────────────────────────────────────────── def validate_attribution(fm: dict, agent: str | None = None) -> list[str]: """Validate attribution block in claim frontmatter. Returns list of issues. Block on missing extractor, warn on missing sourcer. (Leo: extractor is always known, sourcer is best-effort.) If agent is provided and extractor is missing, auto-fix by setting the agent as extractor (same pattern as created-date auto-fix). Only validates if an attribution block is explicitly present. Legacy claims without attribution blocks are not blocked — they'll get attribution when enriched. New claims from v2 extraction always have attribution. """ issues = [] # Only validate if attribution block exists (don't break legacy claims) has_attribution = ( fm.get("attribution") is not None or any(fm.get(f"attribution_{role}") for role in VALID_ROLES) ) if not has_attribution: return [] # No attribution block = legacy claim, not an error attribution = parse_attribution(fm) if not attribution["extractor"]: if agent: # Auto-fix: set the processing agent as extractor attr = fm.get("attribution") if isinstance(attr, dict): attr["extractor"] = [{"handle": agent}] else: fm["attribution"] = {"extractor": [{"handle": agent}]} issues.append("fixed_missing_extractor") else: issues.append("missing_attribution_extractor") return issues # ─── Build attribution block ────────────────────────────────────────────── def build_attribution_block( agent: str, agent_id: str | None = None, source_handle: str | None = None, source_context: str | None = None, ) -> dict: """Build an attribution dict for a newly extracted claim. Called by openrouter-extract-v2.py when reconstructing claim content. """ attribution = { "extractor": [{"handle": agent}], "sourcer": [], "challenger": [], "synthesizer": [], "reviewer": [], } if agent_id: attribution["extractor"][0]["agent_id"] = agent_id if source_handle: entry = {"handle": source_handle.strip().lower().lstrip("@")} if source_context: entry["context"] = source_context attribution["sourcer"].append(entry) return attribution # ─── Compute role counts for contributor upserts ────────────────────────── def role_counts_from_attribution(attribution: dict[str, list[dict]]) -> dict[str, list[str]]: """Extract {role: [handle, ...]} for contributor table upserts. Returns a dict mapping each role to the list of contributor handles. Used by merge.py to credit contributors after merge. """ counts: dict[str, list[str]] = {} for role in VALID_ROLES: handles = [entry["handle"] for entry in attribution.get(role, []) if entry.get("handle")] if handles: counts[role] = handles return counts