teleo-infrastructure/lib/attribution.py
m3taversal 93917f9fc2
Some checks are pending
CI / lint-and-test (push) Waiting to run
fix(attribution): --diff-filter=A + handle sanity filter + remove legacy fallback
Ganymede review findings on epimetheus/contributor-attribution-fix branch:

1. BUG: record_contributor_attribution used `git diff --name-only` (all modified
   files), not just added. Enrich/challenge PRs re-credited the sourcer on every
   subsequent modification. Fixed: --diff-filter=A restricts to new files only.
   The synthesizer/challenger/reviewer roles for enrich PRs are still credited
   via the Pentagon-Agent trailer path, so this doesn't lose any correct credit.

2. WARNING: Legacy `source`-field heuristic fabricated garbage handles from
   descriptive strings ("sec-interpretive-release-s7-2026-09-(march-17",
   "governance---meritocratic-voting-+-futarchy"). Removed outright + added
   regex handle sanity filter (`^[a-z0-9][a-z0-9_-]{0,38}$`). Applied before
   every return path in parse_attribution (the nested-block early return was
   previously bypassing the filter).

   Dry-run impact: unique handles 83→70 (13 garbage filtered), NEW contributors
   49→48, EXISTING drift rows 34→22. The filter drops rows where the literal
   garbage string lives in frontmatter (Slotkin case: attribution.sourcer.handle
   was written as "senator-elissa-slotkin-/-the-hill" by the buggy legacy path).

3. NIT: Aligned knowledge_prefixes in the file walker to match is_knowledge_pr
   (removed entities/, convictions/). Widening those requires Cory sign-off
   since is_knowledge_pr currently gates entity-only PRs out of CI.

Tests: 17 pass (added test_bad_handles_filtered, test_valid_handle_with_hyphen_passes,
updated test_legacy_source_fallback → test_legacy_source_fallback_removed).

Ganymede review — 3-message protocol msg 3 pending.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 12:58:55 +01:00

238 lines
9.9 KiB
Python

"""Attribution module — shared between post_extract.py and merge.py.
Owns: parsing attribution from YAML frontmatter, validating role entries,
computing role counts for contributor upserts, building attribution blocks.
Avoids circular dependency between post_extract.py (validates attribution at
extraction time) and merge.py (records attribution at merge time). Both
import from this shared module.
Schema reference: schemas/attribution.md
Weights reference: schemas/contribution-weights.yaml
Epimetheus owns this module. Leo reviews changes.
"""
import logging
import re
from pathlib import Path
logger = logging.getLogger("pipeline.attribution")
VALID_ROLES = frozenset({"sourcer", "extractor", "challenger", "synthesizer", "reviewer"})
# Handle sanity: lowercase alphanumerics, hyphens, underscores. 1-39 chars (matches
# GitHub's handle rules). Rejects garbage like "governance---meritocratic-voting-+-futarchy"
# or "sec-interpretive-release-s7-2026-09-(march-17" that upstream frontmatter hygiene
# bugs produce. Apply at parse time so bad handles never reach the contributors table.
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
def _valid_handle(handle: str) -> bool:
"""Return True if handle matches the handle format (alphanum + _-, ≤39 chars)."""
if not handle or not isinstance(handle, str):
return False
h = handle.strip().lower().lstrip("@")
if h.endswith("-") or h.endswith("_"):
return False
return bool(_HANDLE_RE.match(h))
def _filter_valid_handles(result: dict) -> dict:
"""Drop entries with invalid handles from a parsed attribution dict."""
filtered: dict[str, list[dict]] = {role: [] for role in VALID_ROLES}
for role, entries in result.items():
for entry in entries:
if _valid_handle(entry.get("handle", "")):
filtered[role].append(entry)
return filtered
# ─── Parse attribution from claim content ──────────────────────────────────
def parse_attribution(fm: dict) -> dict[str, list[dict]]:
"""Extract attribution block from claim frontmatter.
Returns {role: [{"handle": str, "agent_id": str|None, "context": str|None}]}
Handles both nested YAML format and flat field format.
"""
result = {role: [] for role in VALID_ROLES}
attribution = fm.get("attribution")
if isinstance(attribution, dict):
# Nested format (from schema spec)
for role in VALID_ROLES:
entries = attribution.get(role, [])
if isinstance(entries, list):
for entry in entries:
if isinstance(entry, dict) and "handle" in entry:
result[role].append({
"handle": entry["handle"].strip().lower().lstrip("@"),
"agent_id": entry.get("agent_id"),
"context": entry.get("context"),
})
elif isinstance(entry, str):
result[role].append({"handle": entry.strip().lower().lstrip("@"), "agent_id": None, "context": None})
elif isinstance(entries, str):
# Single entry as string
result[role].append({"handle": entries.strip().lower().lstrip("@"), "agent_id": None, "context": None})
# Fall through to the filter at the end (don't early-return). The nested
# block path was skipping the handle sanity filter, letting garbage like
# "senator-elissa-slotkin-/-the-hill" through when it was written into
# frontmatter during the legacy-fallback era.
return _filter_valid_handles(result)
# Flat format fallback (attribution_sourcer, attribution_extractor, etc.)
for role in VALID_ROLES:
flat_val = fm.get(f"attribution_{role}")
if flat_val:
if isinstance(flat_val, str):
result[role].append({"handle": flat_val.strip().lower().lstrip("@"), "agent_id": None, "context": None})
elif isinstance(flat_val, list):
for v in flat_val:
if isinstance(v, str):
result[role].append({"handle": v.strip().lower().lstrip("@"), "agent_id": None, "context": None})
# Bare-key flat format: `sourcer: alexastrum`, `extractor: leo`, etc.
# This is what extract.py writes (line 290: f'sourcer: "{sourcer}"') — the most
# common format in practice (~42% of claim files). The Apr 24 incident traced
# missing leaderboard entries to this format being silently dropped because the
# parser only checked the `attribution_*` prefix.
# Only fill if the role wasn't already populated by the prefixed form, to avoid
# double-counting when both formats coexist on the same claim.
for role in VALID_ROLES:
if result[role]:
continue
bare_val = fm.get(role)
if isinstance(bare_val, str) and bare_val.strip():
result[role].append({"handle": bare_val.strip().lower().lstrip("@"), "agent_id": None, "context": None})
elif isinstance(bare_val, list):
for v in bare_val:
if isinstance(v, str) and v.strip():
result[role].append({"handle": v.strip().lower().lstrip("@"), "agent_id": None, "context": None})
elif isinstance(v, dict) and v.get("handle"):
result[role].append({
"handle": v["handle"].strip().lower().lstrip("@"),
"agent_id": v.get("agent_id"),
"context": v.get("context"),
})
# Legacy `source` heuristic REMOVED (Ganymede review, Apr 24). It fabricated
# handles from descriptive source strings — "governance---meritocratic-voting-+-
# futarchy", "cameron-(contributor)", "sec-interpretive-release-s7-2026-09-
# (march-17". Hit rate on real handles was near-zero, false-positive rate was
# high. Claims without explicit attribution now return empty (better surface as
# data hygiene than invent fake contributors).
# Filter to valid handles only. Bad handles (garbage from upstream frontmatter
# bugs) get dropped rather than written to the contributors table.
return _filter_valid_handles(result)
def parse_attribution_from_file(filepath: str) -> dict[str, list[dict]]:
"""Read a claim file and extract attribution. Returns role→entries dict."""
try:
content = Path(filepath).read_text()
except (FileNotFoundError, PermissionError):
return {role: [] for role in VALID_ROLES}
from .post_extract import parse_frontmatter
fm, _ = parse_frontmatter(content)
if fm is None:
return {role: [] for role in VALID_ROLES}
return parse_attribution(fm)
# ─── Validate attribution ──────────────────────────────────────────────────
def validate_attribution(fm: dict, agent: str | None = None) -> list[str]:
"""Validate attribution block in claim frontmatter.
Returns list of issues. Block on missing extractor, warn on missing sourcer.
(Leo: extractor is always known, sourcer is best-effort.)
If agent is provided and extractor is missing, auto-fix by setting the
agent as extractor (same pattern as created-date auto-fix).
Only validates if an attribution block is explicitly present. Legacy claims
without attribution blocks are not blocked — they'll get attribution when
enriched. New claims from v2 extraction always have attribution.
"""
issues = []
# Only validate if attribution block exists (don't break legacy claims)
has_attribution = (
fm.get("attribution") is not None
or any(fm.get(f"attribution_{role}") for role in VALID_ROLES)
)
if not has_attribution:
return [] # No attribution block = legacy claim, not an error
attribution = parse_attribution(fm)
if not attribution["extractor"]:
if agent:
# Auto-fix: set the processing agent as extractor
attr = fm.get("attribution")
if isinstance(attr, dict):
attr["extractor"] = [{"handle": agent}]
else:
fm["attribution"] = {"extractor": [{"handle": agent}]}
issues.append("fixed_missing_extractor")
else:
issues.append("missing_attribution_extractor")
return issues
# ─── Build attribution block ──────────────────────────────────────────────
def build_attribution_block(
agent: str,
agent_id: str | None = None,
source_handle: str | None = None,
source_context: str | None = None,
) -> dict:
"""Build an attribution dict for a newly extracted claim.
Called by openrouter-extract-v2.py when reconstructing claim content.
"""
attribution = {
"extractor": [{"handle": agent}],
"sourcer": [],
"challenger": [],
"synthesizer": [],
"reviewer": [],
}
if agent_id:
attribution["extractor"][0]["agent_id"] = agent_id
if source_handle:
entry = {"handle": source_handle.strip().lower().lstrip("@")}
if source_context:
entry["context"] = source_context
attribution["sourcer"].append(entry)
return attribution
# ─── Compute role counts for contributor upserts ──────────────────────────
def role_counts_from_attribution(attribution: dict[str, list[dict]]) -> dict[str, list[str]]:
"""Extract {role: [handle, ...]} for contributor table upserts.
Returns a dict mapping each role to the list of contributor handles.
Used by merge.py to credit contributors after merge.
"""
counts: dict[str, list[str]] = {}
for role in VALID_ROLES:
handles = [entry["handle"] for entry in attribution.get(role, []) if entry.get("handle")]
if handles:
counts[role] = handles
return counts