Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Atomic extract-and-connect (lib/connect.py): - After extraction writes claim files, each new claim is embedded via OpenRouter, searched against Qdrant, and top-5 neighbors (cosine > 0.55) are added as `related` edges in the claim's frontmatter - Edges written on NEW claim only — avoids merge conflicts - Cross-domain connections enabled, non-fatal on Qdrant failure - Wired into openrouter-extract-v2.py post-extraction step Stale PR monitor (lib/stale_pr.py): - Every watchdog cycle checks open extract/* PRs - If open >30 min AND 0 claim files → auto-close with comment - After 2 stale closures → marks source as extraction_failed - Wired into watchdog.py as check #6 Response audit system: - response_audit table (migration v8), persistent audit conn in bot.py - 90-day retention cleanup, tool_calls JSON column - Confidence tag stripping, systemd ReadWritePaths for pipeline.db Supporting infrastructure: - reweave.py: nightly edge reconnection for orphan claims - reconcile-sources.py: source status reconciliation - backfill-domains.py: domain classification backfill - ops/reconcile-source-status.sh: operational reconciliation script - Attribution improvements, post-extract enrichments, merge improvements Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
136 lines
4.9 KiB
Python
136 lines
4.9 KiB
Python
"""Tests for attribution module."""
|
|
|
|
import pytest
|
|
|
|
from lib.attribution import (
|
|
build_attribution_block,
|
|
parse_attribution,
|
|
role_counts_from_attribution,
|
|
validate_attribution,
|
|
)
|
|
|
|
|
|
class TestParseAttribution:
|
|
def test_nested_format(self):
|
|
fm = {
|
|
"type": "claim",
|
|
"attribution": {
|
|
"extractor": [{"handle": "rio", "agent_id": "760F7FE7"}],
|
|
"sourcer": [{"handle": "@theiaresearch", "context": "annual letter"}],
|
|
},
|
|
}
|
|
result = parse_attribution(fm)
|
|
assert len(result["extractor"]) == 1
|
|
assert result["extractor"][0]["handle"] == "rio"
|
|
assert result["sourcer"][0]["handle"] == "theiaresearch" # @ stripped
|
|
|
|
def test_flat_format(self):
|
|
fm = {
|
|
"type": "claim",
|
|
"attribution_extractor": "rio",
|
|
"attribution_sourcer": "@theiaresearch",
|
|
}
|
|
result = parse_attribution(fm)
|
|
assert result["extractor"][0]["handle"] == "rio"
|
|
assert result["sourcer"][0]["handle"] == "theiaresearch"
|
|
|
|
def test_legacy_source_fallback(self):
|
|
fm = {
|
|
"type": "claim",
|
|
"source": "@pineanalytics, Q4 2025 report",
|
|
}
|
|
result = parse_attribution(fm)
|
|
assert result["sourcer"][0]["handle"] == "pineanalytics"
|
|
|
|
def test_empty_attribution(self):
|
|
fm = {"type": "claim"}
|
|
result = parse_attribution(fm)
|
|
assert all(len(v) == 0 for v in result.values())
|
|
|
|
def test_string_entries(self):
|
|
fm = {
|
|
"attribution": {
|
|
"extractor": ["rio"],
|
|
"sourcer": "theiaresearch",
|
|
},
|
|
}
|
|
result = parse_attribution(fm)
|
|
assert result["extractor"][0]["handle"] == "rio"
|
|
assert result["sourcer"][0]["handle"] == "theiaresearch"
|
|
|
|
|
|
class TestValidateAttribution:
|
|
def test_valid_attribution(self):
|
|
fm = {
|
|
"attribution": {
|
|
"extractor": [{"handle": "rio"}],
|
|
},
|
|
}
|
|
issues = validate_attribution(fm)
|
|
assert len(issues) == 0
|
|
|
|
def test_missing_extractor(self):
|
|
fm = {"attribution": {"sourcer": [{"handle": "someone"}]}}
|
|
issues = validate_attribution(fm)
|
|
assert "missing_attribution_extractor" in issues
|
|
|
|
def test_no_attribution_block_passes(self):
|
|
"""Legacy claims without attribution block should NOT be blocked."""
|
|
fm = {"type": "claim", "source": "some source"}
|
|
issues = validate_attribution(fm)
|
|
assert len(issues) == 0 # No attribution block = legacy, not an error
|
|
|
|
def test_attribution_block_missing_extractor(self):
|
|
"""Claims WITH attribution block but missing extractor SHOULD be blocked."""
|
|
fm = {"type": "claim", "attribution": {"sourcer": [{"handle": "someone"}]}}
|
|
issues = validate_attribution(fm)
|
|
assert "missing_attribution_extractor" in issues
|
|
|
|
def test_missing_extractor_auto_fix_with_agent(self):
|
|
"""When agent is provided, auto-fix missing extractor instead of blocking."""
|
|
fm = {"attribution": {"sourcer": [{"handle": "someone"}]}}
|
|
issues = validate_attribution(fm, agent="leo")
|
|
assert "fixed_missing_extractor" in issues
|
|
assert "missing_attribution_extractor" not in issues
|
|
# Verify the fix was applied in-place
|
|
assert fm["attribution"]["extractor"] == [{"handle": "leo"}]
|
|
|
|
def test_missing_extractor_no_agent_still_blocks(self):
|
|
"""Without agent context, missing extractor is still a hard failure."""
|
|
fm = {"attribution": {"sourcer": [{"handle": "someone"}]}}
|
|
issues = validate_attribution(fm, agent=None)
|
|
assert "missing_attribution_extractor" in issues
|
|
|
|
|
|
class TestBuildAttributionBlock:
|
|
def test_basic_build(self):
|
|
attr = build_attribution_block("rio", agent_id="760F7FE7")
|
|
assert attr["extractor"][0]["handle"] == "rio"
|
|
assert attr["extractor"][0]["agent_id"] == "760F7FE7"
|
|
|
|
def test_with_sourcer(self):
|
|
attr = build_attribution_block("rio", source_handle="@PineAnalytics", source_context="Q4 report")
|
|
assert attr["sourcer"][0]["handle"] == "pineanalytics"
|
|
assert attr["sourcer"][0]["context"] == "Q4 report"
|
|
|
|
def test_empty_roles(self):
|
|
attr = build_attribution_block("rio")
|
|
assert attr["challenger"] == []
|
|
assert attr["synthesizer"] == []
|
|
assert attr["reviewer"] == []
|
|
|
|
|
|
class TestRoleCounts:
|
|
def test_basic_counts(self):
|
|
attribution = {
|
|
"extractor": [{"handle": "rio"}],
|
|
"sourcer": [{"handle": "theia"}, {"handle": "pine"}],
|
|
"challenger": [],
|
|
"synthesizer": [],
|
|
"reviewer": [{"handle": "leo"}],
|
|
}
|
|
counts = role_counts_from_attribution(attribution)
|
|
assert counts["extractor"] == ["rio"]
|
|
assert counts["sourcer"] == ["theia", "pine"]
|
|
assert "challenger" not in counts
|
|
assert counts["reviewer"] == ["leo"]
|