teleo-infrastructure/tests/test_attribution.py
m3taversal 5f554bc2de
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
feat: atomic extract-and-connect + stale PR monitor + response audit
Atomic extract-and-connect (lib/connect.py):
- After extraction writes claim files, each new claim is embedded via
  OpenRouter, searched against Qdrant, and top-5 neighbors (cosine > 0.55)
  are added as `related` edges in the claim's frontmatter
- Edges written on NEW claim only — avoids merge conflicts
- Cross-domain connections enabled, non-fatal on Qdrant failure
- Wired into openrouter-extract-v2.py post-extraction step

Stale PR monitor (lib/stale_pr.py):
- Every watchdog cycle checks open extract/* PRs
- If open >30 min AND 0 claim files → auto-close with comment
- After 2 stale closures → marks source as extraction_failed
- Wired into watchdog.py as check #6

Response audit system:
- response_audit table (migration v8), persistent audit conn in bot.py
- 90-day retention cleanup, tool_calls JSON column
- Confidence tag stripping, systemd ReadWritePaths for pipeline.db

Supporting infrastructure:
- reweave.py: nightly edge reconnection for orphan claims
- reconcile-sources.py: source status reconciliation
- backfill-domains.py: domain classification backfill
- ops/reconcile-source-status.sh: operational reconciliation script
- Attribution improvements, post-extract enrichments, merge improvements

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 22:34:20 +00:00

136 lines
4.9 KiB
Python

"""Tests for attribution module."""
import pytest
from lib.attribution import (
build_attribution_block,
parse_attribution,
role_counts_from_attribution,
validate_attribution,
)
class TestParseAttribution:
def test_nested_format(self):
fm = {
"type": "claim",
"attribution": {
"extractor": [{"handle": "rio", "agent_id": "760F7FE7"}],
"sourcer": [{"handle": "@theiaresearch", "context": "annual letter"}],
},
}
result = parse_attribution(fm)
assert len(result["extractor"]) == 1
assert result["extractor"][0]["handle"] == "rio"
assert result["sourcer"][0]["handle"] == "theiaresearch" # @ stripped
def test_flat_format(self):
fm = {
"type": "claim",
"attribution_extractor": "rio",
"attribution_sourcer": "@theiaresearch",
}
result = parse_attribution(fm)
assert result["extractor"][0]["handle"] == "rio"
assert result["sourcer"][0]["handle"] == "theiaresearch"
def test_legacy_source_fallback(self):
fm = {
"type": "claim",
"source": "@pineanalytics, Q4 2025 report",
}
result = parse_attribution(fm)
assert result["sourcer"][0]["handle"] == "pineanalytics"
def test_empty_attribution(self):
fm = {"type": "claim"}
result = parse_attribution(fm)
assert all(len(v) == 0 for v in result.values())
def test_string_entries(self):
fm = {
"attribution": {
"extractor": ["rio"],
"sourcer": "theiaresearch",
},
}
result = parse_attribution(fm)
assert result["extractor"][0]["handle"] == "rio"
assert result["sourcer"][0]["handle"] == "theiaresearch"
class TestValidateAttribution:
def test_valid_attribution(self):
fm = {
"attribution": {
"extractor": [{"handle": "rio"}],
},
}
issues = validate_attribution(fm)
assert len(issues) == 0
def test_missing_extractor(self):
fm = {"attribution": {"sourcer": [{"handle": "someone"}]}}
issues = validate_attribution(fm)
assert "missing_attribution_extractor" in issues
def test_no_attribution_block_passes(self):
"""Legacy claims without attribution block should NOT be blocked."""
fm = {"type": "claim", "source": "some source"}
issues = validate_attribution(fm)
assert len(issues) == 0 # No attribution block = legacy, not an error
def test_attribution_block_missing_extractor(self):
"""Claims WITH attribution block but missing extractor SHOULD be blocked."""
fm = {"type": "claim", "attribution": {"sourcer": [{"handle": "someone"}]}}
issues = validate_attribution(fm)
assert "missing_attribution_extractor" in issues
def test_missing_extractor_auto_fix_with_agent(self):
"""When agent is provided, auto-fix missing extractor instead of blocking."""
fm = {"attribution": {"sourcer": [{"handle": "someone"}]}}
issues = validate_attribution(fm, agent="leo")
assert "fixed_missing_extractor" in issues
assert "missing_attribution_extractor" not in issues
# Verify the fix was applied in-place
assert fm["attribution"]["extractor"] == [{"handle": "leo"}]
def test_missing_extractor_no_agent_still_blocks(self):
"""Without agent context, missing extractor is still a hard failure."""
fm = {"attribution": {"sourcer": [{"handle": "someone"}]}}
issues = validate_attribution(fm, agent=None)
assert "missing_attribution_extractor" in issues
class TestBuildAttributionBlock:
def test_basic_build(self):
attr = build_attribution_block("rio", agent_id="760F7FE7")
assert attr["extractor"][0]["handle"] == "rio"
assert attr["extractor"][0]["agent_id"] == "760F7FE7"
def test_with_sourcer(self):
attr = build_attribution_block("rio", source_handle="@PineAnalytics", source_context="Q4 report")
assert attr["sourcer"][0]["handle"] == "pineanalytics"
assert attr["sourcer"][0]["context"] == "Q4 report"
def test_empty_roles(self):
attr = build_attribution_block("rio")
assert attr["challenger"] == []
assert attr["synthesizer"] == []
assert attr["reviewer"] == []
class TestRoleCounts:
def test_basic_counts(self):
attribution = {
"extractor": [{"handle": "rio"}],
"sourcer": [{"handle": "theia"}, {"handle": "pine"}],
"challenger": [],
"synthesizer": [],
"reviewer": [{"handle": "leo"}],
}
counts = role_counts_from_attribution(attribution)
assert counts["extractor"] == ["rio"]
assert counts["sourcer"] == ["theia", "pine"]
assert "challenger" not in counts
assert counts["reviewer"] == ["leo"]