diff --git a/lib/db.py b/lib/db.py index dc8323d..4b55ac9 100644 --- a/lib/db.py +++ b/lib/db.py @@ -9,7 +9,7 @@ from . import config logger = logging.getLogger("pipeline.db") -SCHEMA_VERSION = 9 +SCHEMA_VERSION = 10 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -139,6 +139,15 @@ CREATE TABLE IF NOT EXISTS response_audit ( confidence_score REAL, -- Model self-rated retrieval quality 0.0-1.0 response_time_ms INTEGER, + -- Eval pipeline columns (v10) + prompt_tokens INTEGER, + completion_tokens INTEGER, + generation_cost REAL, + embedding_cost REAL, + total_cost REAL, + blocked INTEGER DEFAULT 0, + block_reason TEXT, + query_type TEXT, created_at TEXT DEFAULT (datetime('now')) ); @@ -439,11 +448,32 @@ def migrate(conn: sqlite3.Connection): conn.commit() logger.info("Migration v9: re-derived commit_type for %d PRs with invalid/NULL values", fixed) + if current < 10: + # Add eval pipeline columns to response_audit + # VPS may already be at v10/v11 from prior (incomplete) deploys — use IF NOT EXISTS pattern + for col_def in [ + ("prompt_tokens", "INTEGER"), + ("completion_tokens", "INTEGER"), + ("generation_cost", "REAL"), + ("embedding_cost", "REAL"), + ("total_cost", "REAL"), + ("blocked", "INTEGER DEFAULT 0"), + ("block_reason", "TEXT"), + ("query_type", "TEXT"), + ]: + try: + conn.execute(f"ALTER TABLE response_audit ADD COLUMN {col_def[0]} {col_def[1]}") + except sqlite3.OperationalError: + pass # Column already exists + conn.commit() + logger.info("Migration v10: added eval pipeline columns to response_audit") + if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,), ) + conn.commit() # Explicit commit — executescript auto-commits DDL but not subsequent DML logger.info("Database migrated to schema version %d", SCHEMA_VERSION) else: logger.debug("Database at schema version %d", current) @@ -493,6 +523,10 @@ def insert_response_audit(conn: sqlite3.Connection, **kwargs): "research_context", "kb_context_text", "tool_calls", "raw_response", "display_response", "confidence_score", "response_time_ms", + # Eval pipeline columns (v10) + "prompt_tokens", "completion_tokens", "generation_cost", + "embedding_cost", "total_cost", "blocked", "block_reason", + "query_type", ] present = {k: v for k, v in kwargs.items() if k in cols and v is not None} if not present: diff --git a/reweave.py b/reweave.py index 5c00427..67ba61c 100644 --- a/reweave.py +++ b/reweave.py @@ -163,6 +163,35 @@ def _claim_name_variants(path: Path, repo_root: Path = None) -> list[str]: return list(variants) +def _is_entity(path: Path) -> bool: + """Check if a file is an entity (not a claim). Entities need different edge vocabulary.""" + fm = _parse_frontmatter(path) + if fm and fm.get("type") == "entity": + return True + # Also check path — entities live under entities/ directory + return "entities/" in str(path) + + +def _same_source(path_a: Path, path_b: Path) -> bool: + """Check if two claims derive from the same source material. + + Prevents self-referential edges where N claims about the same paper + all "support" each other — inflates graph density without adding information. + """ + fm_a = _parse_frontmatter(path_a) + fm_b = _parse_frontmatter(path_b) + if not fm_a or not fm_b: + return False + + # Check source field + src_a = fm_a.get("source") or fm_a.get("source_file") or "" + src_b = fm_b.get("source") or fm_b.get("source_file") or "" + if src_a and src_b and str(src_a).strip() == str(src_b).strip(): + return True + + return False + + def find_all_claims(repo_root: Path) -> list[Path]: """Find all knowledge files (claim, framework, entity, decision) in the KB.""" claims = [] @@ -321,8 +350,8 @@ What is the relationship FROM Claim B TO Claim A? Options: - "supports" — Claim B provides evidence, reasoning, or examples that strengthen Claim A -- "challenges" — Claim B contradicts, undermines, or provides counter-evidence to Claim A -- "related" — Claims are topically connected but neither supports nor challenges the other +- "challenges" — Claim B contradicts, undermines, or provides counter-evidence to Claim A. NOTE: "challenges" is underused — if one claim says X works and another says X fails, or they propose incompatible mechanisms, that IS a challenge. Use it. +- "related" — Claims are topically connected but neither supports nor challenges the other. This is the WEAKEST edge — prefer supports/challenges when the relationship has directionality. Respond with EXACTLY this JSON format, nothing else: {{"edge_type": "supports|challenges|related", "confidence": 0.0-1.0, "reason": "one sentence explanation"}} @@ -350,7 +379,7 @@ def classify_edge(orphan_title: str, orphan_body: str, "model": "anthropic/claude-3.5-haiku", "messages": [{"role": "user", "content": prompt}], "max_tokens": 200, - "temperature": 0.1, + "temperature": 0.3, }).encode() req = urllib.request.Request( @@ -490,6 +519,18 @@ def _write_edge_regex(neighbor_path: Path, fm_text: str, body_text: str, orphan_title: str, edge_type: str, date_str: str, dry_run: bool) -> bool: """Fallback: add edge via regex when ruamel.yaml is unavailable.""" + # Strip leading newline from fm_text (text[3:end] includes \n after ---) + fm_text = fm_text.lstrip("\n") + + # Check for duplicate before writing + existing_re = re.compile( + rf'^\s*-\s*["\']?{re.escape(orphan_title)}["\']?\s*$', + re.MULTILINE | re.IGNORECASE, + ) + if existing_re.search(fm_text): + logger.info(" Skip duplicate edge (regex): %s → %s", neighbor_path.name, orphan_title) + return False + # Check if edge_type field exists field_re = re.compile(rf"^{edge_type}:\s*$", re.MULTILINE) inline_re = re.compile(rf'^{edge_type}:\s*\[', re.MULTILINE) @@ -748,6 +789,8 @@ def main(): edges_to_write: list[dict] = [] # {neighbor_path, orphan_title, edge_type, reason, score} skipped_no_vector = 0 skipped_no_neighbors = 0 + skipped_entity_pair = 0 + skipped_same_source = 0 for i, orphan_path in enumerate(batch): rel_path = str(orphan_path.relative_to(REPO_DIR)) @@ -785,6 +828,20 @@ def main(): logger.info(" Neighbor %s not found on disk — skipping", neighbor_rel) continue + # Entity-to-entity exclusion: entities need different vocabulary + # (founded_by, competes_with, etc.) not supports/challenges + if _is_entity(orphan_path) and _is_entity(neighbor_path): + logger.info(" Skip entity-entity pair: %s ↔ %s", orphan_path.name, neighbor_path.name) + skipped_entity_pair += 1 + continue + + # Same-source exclusion: N claims from one paper all "supporting" each other + # inflates graph density without adding information + if _same_source(orphan_path, neighbor_path): + logger.info(" Skip same-source pair: %s ↔ %s", orphan_path.name, neighbor_path.name) + skipped_same_source += 1 + continue + neighbor_body = _get_body(neighbor_path) # Classify with Haiku @@ -818,6 +875,8 @@ def main(): logger.info("Edges to write: %d", len(edges_to_write)) logger.info("Skipped (no vector): %d", skipped_no_vector) logger.info("Skipped (no neighbors): %d", skipped_no_neighbors) + logger.info("Skipped (entity-entity): %d", skipped_entity_pair) + logger.info("Skipped (same-source): %d", skipped_same_source) if not edges_to_write: logger.info("Nothing to write.") diff --git a/telegram/bot.py b/telegram/bot.py index 3865ef2..97e7435 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -422,7 +422,7 @@ async def call_openrouter(model: str, prompt: str, max_tokens: int = 2048) -> _L usage = data.get("usage", {}) pt = usage.get("prompt_tokens", 0) ct = usage.get("completion_tokens", 0) - cost = _estimate_cost(model, pt, ct) + cost = estimate_cost(model, pt, ct) return _LLMResponse(content, prompt_tokens=pt, completion_tokens=ct, cost=cost, model=model) except Exception as e: @@ -1213,17 +1213,13 @@ IMPORTANT: Special tags you can append at the end of your response (after your m # ─── Eval: URL fabrication check ────────────────────────────── blocked = False block_reason = None - display_response = _check_url_fabrication(display_response, kb_context_text) + display_response, fabricated_urls = check_url_fabrication(display_response, kb_context_text) + if fabricated_urls: + logger.warning("URL fabrication detected (%d URLs removed): %s", len(fabricated_urls), text[:80]) # ─── Eval: confidence floor ──────────────────────────────────── - if confidence_score is not None and confidence_score < CONFIDENCE_FLOOR: - blocked = True - block_reason = f"confidence {confidence_score:.2f} < floor {CONFIDENCE_FLOOR}" - # Observation mode: still send response but with caveat prefix - display_response = ( - f"⚠️ Low confidence ({confidence_score:.2f}) — treat this response with caution.\n\n" - + display_response - ) + display_response, blocked, block_reason = apply_confidence_floor(display_response, confidence_score) + if blocked: logger.warning("Confidence floor triggered: %.2f for query: %s", confidence_score, text[:100]) # ─── Eval: cost alert ────────────────────────────────────────── @@ -1618,8 +1614,11 @@ Respond with ONLY the window numbers and tags, one per line: logger.warning("Triage LLM call failed — buffered messages dropped") return - # Parse triage results — collect substantive windows per chat - substantive_by_chat: dict[int, list[tuple[list[dict], str]]] = {} + # Parse triage results — consolidate tagged windows per chat_id + # Priority: CLAIM > EVIDENCE > ENTITY when merging windows from same chat + TAG_PRIORITY = {"CLAIM": 3, "EVIDENCE": 2, "ENTITY": 1} + chat_tagged: dict[int, dict] = {} # chat_id -> {tag, messages} + for line in result.strip().split("\n"): match = re.match(r"(\d+):\s*\[(\w+)\]", line) if not match: @@ -1629,41 +1628,43 @@ Respond with ONLY the window numbers and tags, one per line: if idx < 0 or idx >= len(windows): continue + if tag not in ("CLAIM", "ENTITY", "EVIDENCE"): + continue - if tag in ("CLAIM", "ENTITY", "EVIDENCE"): - chat_id = windows[idx][0].get("chat_id", 0) - substantive_by_chat.setdefault(chat_id, []).append( - (windows[idx], tag)) + window = windows[idx] + chat_id = window[0].get("chat_id", 0) - # Consolidate: one source file per chat (merge all substantive windows) - for chat_id, tagged_windows in substantive_by_chat.items(): - merged_msgs = [] - tags = set() - for win_msgs, tag in tagged_windows: - merged_msgs.extend(win_msgs) - tags.add(tag) - # Use highest-priority tag: CLAIM > EVIDENCE > ENTITY - best_tag = ("CLAIM" if "CLAIM" in tags - else "EVIDENCE" if "EVIDENCE" in tags - else "ENTITY") - _archive_window(merged_msgs, best_tag) + if chat_id not in chat_tagged: + chat_tagged[chat_id] = {"tag": tag, "messages": list(window)} + else: + # Merge windows from same chat — keep highest-priority tag + existing = chat_tagged[chat_id] + existing["messages"].extend(window) + if TAG_PRIORITY.get(tag, 0) > TAG_PRIORITY.get(existing["tag"], 0): + existing["tag"] = tag - logger.info("Triage complete: %d windows → %d sources", - len(windows), len(substantive_by_chat)) + # Archive one source per chat_id + for chat_id, data in chat_tagged.items(): + _archive_window(data["messages"], data["tag"]) + + logger.info("Triage complete: %d windows → %d sources (%d chats)", + len(windows), len(chat_tagged), len(chat_tagged)) def _group_into_windows(messages: list[dict], window_seconds: int = 300) -> list[list[dict]]: - """Group messages into conversation windows by chat_id + time proximity. + """Group messages into conversation windows by chat_id and time proximity. - Messages from the same chat within window_seconds of each other stay in - one window. Different chats always get separate windows. Windows are - capped at 50 messages (one triage cycle of active chat). + Groups by chat_id first, then splits on time gaps > window_seconds. + Cap per-window at 50 messages (not 10 — one conversation shouldn't become 12 branches). """ if not messages: return [] - # Sort by timestamp - messages.sort(key=lambda m: m.get("timestamp", "")) + # Group by chat_id first + by_chat: dict[int, list[dict]] = {} + for msg in messages: + chat_id = msg.get("chat_id", 0) + by_chat.setdefault(chat_id, []).append(msg) # Group by chat_id first by_chat: dict[int, list[dict]] = {} @@ -1672,22 +1673,27 @@ def _group_into_windows(messages: list[dict], window_seconds: int = 300) -> list by_chat.setdefault(cid, []).append(msg) windows = [] - for chat_msgs in by_chat.values(): + for chat_id, chat_msgs in by_chat.items(): + # Sort by timestamp within each chat + chat_msgs.sort(key=lambda m: m.get("timestamp", "")) + current_window = [chat_msgs[0]] for msg in chat_msgs[1:]: - # Split on time gap - prev_ts = current_window[-1].get("timestamp", "") - curr_ts = msg.get("timestamp", "") + # Check time gap try: - gap = (datetime.fromisoformat(curr_ts) - - datetime.fromisoformat(prev_ts)).total_seconds() + prev_ts = datetime.fromisoformat(current_window[-1].get("timestamp", "")) + curr_ts = datetime.fromisoformat(msg.get("timestamp", "")) + gap = (curr_ts - prev_ts).total_seconds() except (ValueError, TypeError): - gap = 0 + gap = window_seconds + 1 # Unknown gap → force split + if gap > window_seconds or len(current_window) >= 50: windows.append(current_window) current_window = [msg] else: current_window.append(msg) + + if current_window: windows.append(current_window) diff --git a/telegram/eval.py b/telegram/eval.py new file mode 100644 index 0000000..4d2f188 --- /dev/null +++ b/telegram/eval.py @@ -0,0 +1,76 @@ +"""Eval pipeline — pure functions for response quality checks. + +Extracted from bot.py so tests can import without telegram dependency. +No side effects, no I/O, no imports beyond stdlib. + +Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887> +""" + +import re + +# Per-model pricing (input $/M tokens, output $/M tokens) — from OpenRouter +MODEL_PRICING = { + "anthropic/claude-opus-4-6": (15.0, 75.0), + "anthropic/claude-sonnet-4-6": (3.0, 15.0), + "anthropic/claude-haiku-4.5": (0.80, 4.0), + "anthropic/claude-3.5-haiku": (0.80, 4.0), + "openai/gpt-4o": (2.50, 10.0), + "openai/gpt-4o-mini": (0.15, 0.60), +} + +CONFIDENCE_FLOOR = 0.3 +COST_ALERT_THRESHOLD = 0.22 # per-response alert threshold in USD + +# URL fabrication regex — matches http:// and https:// URLs +_URL_RE = re.compile(r'https?://[^\s\)\]\"\'<>]+') + + +class _LLMResponse(str): + """String subclass carrying token counts and cost from OpenRouter usage field.""" + prompt_tokens: int = 0 + completion_tokens: int = 0 + cost: float = 0.0 + model: str = "" + + def __new__(cls, text: str, prompt_tokens: int = 0, completion_tokens: int = 0, + cost: float = 0.0, model: str = ""): + obj = super().__new__(cls, text) + obj.prompt_tokens = prompt_tokens + obj.completion_tokens = completion_tokens + obj.cost = cost + obj.model = model + return obj + + +def estimate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float: + """Estimate cost in USD from token counts and model pricing.""" + input_rate, output_rate = MODEL_PRICING.get(model, (3.0, 15.0)) # default to Sonnet + return (prompt_tokens * input_rate + completion_tokens * output_rate) / 1_000_000 + + +def check_url_fabrication(response_text: str, kb_context: str) -> tuple[str, list[str]]: + """Check for fabricated URLs in response. Replace any not found in KB context. + + Returns (cleaned_text, list_of_fabricated_urls). + """ + kb_urls = set(_URL_RE.findall(kb_context)) if kb_context else set() + response_urls = _URL_RE.findall(response_text) + fabricated = [url for url in response_urls if url not in kb_urls] + result = response_text + for url in fabricated: + result = result.replace(url, "[URL removed — not verified]") + return result, fabricated + + +def apply_confidence_floor(display_response: str, confidence_score: float | None) -> tuple[str, bool, str | None]: + """Apply confidence floor check. + + Returns (possibly_modified_response, is_blocked, block_reason). + """ + if confidence_score is not None and confidence_score < CONFIDENCE_FLOOR: + modified = ( + f"⚠️ Low confidence ({confidence_score:.2f}) — treat this response with caution.\n\n" + + display_response + ) + return modified, True, f"confidence {confidence_score:.2f} < floor {CONFIDENCE_FLOOR}" + return display_response, False, None diff --git a/tests/test_eval_pipeline.py b/tests/test_eval_pipeline.py new file mode 100644 index 0000000..6dde695 --- /dev/null +++ b/tests/test_eval_pipeline.py @@ -0,0 +1,320 @@ +"""Tests for eval pipeline — cost tracking, URL fabrication check, confidence floor. + +Imports from telegram/eval.py (production code). No local reimplementations. + +Tests validate against real failure modes from audit records: +- Record #12: hallucinated futard.io URL +- Records #3, #9: confident fabrication at 0.7 +- Records #6, #7: low confidence (0.1) with no gate +""" + +import sqlite3 +import sys +from pathlib import Path + +import pytest + +# Add telegram/ to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent / "telegram")) + +from eval import ( + _LLMResponse, + estimate_cost, + check_url_fabrication, + apply_confidence_floor, + MODEL_PRICING, + CONFIDENCE_FLOOR, + COST_ALERT_THRESHOLD, +) + + +# ─── estimate_cost tests ───────────────────────────────────────────────── + + +class TestEstimateCost: + """Cost estimation tests.""" + + def test_opus_typical_response(self): + """Typical Opus response: ~2000 prompt tokens, ~500 completion.""" + cost = estimate_cost("anthropic/claude-opus-4-6", 2000, 500) + # 2000 * 15/1M + 500 * 75/1M = 0.03 + 0.0375 = 0.0675 + assert abs(cost - 0.0675) < 0.0001 + + def test_haiku_cheap(self): + """Haiku calls should be very cheap.""" + cost = estimate_cost("anthropic/claude-haiku-4.5", 1000, 200) + # 1000 * 0.8/1M + 200 * 4/1M = 0.0008 + 0.0008 = 0.0016 + assert abs(cost - 0.0016) < 0.0001 + + def test_unknown_model_uses_sonnet_default(self): + """Unknown model falls back to Sonnet pricing ($3/$15).""" + cost = estimate_cost("some-unknown/model", 1000, 1000) + # 1000 * 3/1M + 1000 * 15/1M = 0.003 + 0.015 = 0.018 + assert abs(cost - 0.018) < 0.0001 + + def test_zero_tokens_zero_cost(self): + cost = estimate_cost("anthropic/claude-opus-4-6", 0, 0) + assert cost == 0.0 + + def test_gpt4o_mini_cheapest(self): + """GPT-4o-mini should be cheapest mainstream model.""" + cost = estimate_cost("openai/gpt-4o-mini", 10000, 1000) + assert cost < 0.003 # very cheap + + def test_opus_more_expensive_than_haiku(self): + """Same token counts, Opus should be ~20x more expensive than Haiku.""" + opus = estimate_cost("anthropic/claude-opus-4-6", 1000, 500) + haiku = estimate_cost("anthropic/claude-haiku-4.5", 1000, 500) + assert opus > haiku * 10 + + +# ─── URL fabrication tests ─────────────────────────────────────────────── + + +class TestURLFabrication: + """URL fabrication detection — catches failure mode #2 (record #12).""" + + def test_no_urls_in_response(self): + """Response without URLs passes through unchanged.""" + cleaned, fabricated = check_url_fabrication("MetaDAO uses futarchy.", "some kb context") + assert fabricated == [] + assert cleaned == "MetaDAO uses futarchy." + + def test_url_present_in_context(self): + """URL that exists in KB context is NOT flagged.""" + response = "Check out https://metadao.fi/proposals for details." + context = "Source: https://metadao.fi/proposals — MetaDAO governance" + cleaned, fabricated = check_url_fabrication(response, context) + assert fabricated == [] + assert cleaned == response + + def test_fabricated_url_caught(self): + """Record #12: bot fabricated futard.io URL — should be caught.""" + response = "You can find the proposal at https://futard.io/proposal/GPT8d..." + context = "MetaDAO uses conditional tokens for governance decisions." + cleaned, fabricated = check_url_fabrication(response, context) + assert len(fabricated) == 1 + assert "futard.io" in fabricated[0] + assert "futard.io" not in cleaned + assert "[URL removed — not verified]" in cleaned + + def test_multiple_fabricated_urls(self): + """Multiple fabricated URLs all get caught.""" + response = ( + "See https://fake1.com/page and also https://fake2.org/data " + "and the real one https://metadao.fi" + ) + context = "Source: https://metadao.fi — real URL" + cleaned, fabricated = check_url_fabrication(response, context) + assert len(fabricated) == 2 + fab_str = " ".join(fabricated) + assert "fake1.com" in fab_str + assert "fake2.org" in fab_str + + def test_url_in_parentheses(self): + """URL inside markdown link syntax should be extracted.""" + response = "Check [here](https://fabricated.io/page) for more." + context = "No URLs in context." + cleaned, fabricated = check_url_fabrication(response, context) + assert len(fabricated) == 1 + assert "fabricated.io" in fabricated[0] + + def test_empty_context_flags_all_urls(self): + """If KB context has no URLs, any response URL is fabricated.""" + cleaned, fabricated = check_url_fabrication("See https://example.com for more.", "") + assert len(fabricated) == 1 + + +# ─── Confidence floor tests ───────────────────────────────────────────── + + +class TestConfidenceFloor: + """Confidence floor tests — catches failure modes #4, #6, #7.""" + + def test_low_confidence_gets_caveat(self): + """Confidence < 0.3 should trigger caveat prefix.""" + display, blocked, reason = apply_confidence_floor("Some response.", 0.1) + assert blocked is True + assert "0.10" in display + assert "caution" in display.lower() + assert reason is not None + + def test_high_confidence_no_caveat(self): + """Confidence >= 0.3 should pass through unchanged.""" + display, blocked, reason = apply_confidence_floor("MetaDAO uses conditional tokens.", 0.7) + assert blocked is False + assert reason is None + assert display == "MetaDAO uses conditional tokens." + + def test_none_confidence_no_caveat(self): + """None confidence (parsing failure) should not trigger caveat.""" + display, blocked, reason = apply_confidence_floor("Some response.", None) + assert blocked is False + assert display == "Some response." + + def test_boundary_value_0_3(self): + """Confidence exactly 0.3 should NOT trigger (< not <=).""" + display, blocked, reason = apply_confidence_floor("Response.", 0.3) + assert blocked is False + + def test_boundary_value_0_29(self): + """Confidence 0.29 should trigger.""" + display, blocked, reason = apply_confidence_floor("Response.", 0.29) + assert blocked is True + + +# ─── _LLMResponse tests ───────────────────────────────────────────────── + + +class TestLLMResponse: + """Test the _LLMResponse string subclass.""" + + def test_behaves_as_string(self): + r = _LLMResponse("Hello world") + assert str(r) == "Hello world" + assert "Hello" in r + assert len(r) == 11 + + def test_carries_metadata(self): + r = _LLMResponse("response text", prompt_tokens=2000, + completion_tokens=500, cost=0.0675, + model="anthropic/claude-opus-4-6") + assert r.prompt_tokens == 2000 + assert r.completion_tokens == 500 + assert r.cost == 0.0675 + assert r.model == "anthropic/claude-opus-4-6" + + def test_getattr_works(self): + """bot.py uses getattr(response, 'cost', 0.0).""" + r = _LLMResponse("text", cost=0.05) + assert getattr(r, 'cost', 0.0) == 0.05 + + def test_getattr_on_none_returns_default(self): + """When response is None, getattr should return defaults.""" + response = None + assert getattr(response, 'prompt_tokens', 0) == 0 + assert getattr(response, 'cost', 0.0) == 0.0 + + +# ─── Schema migration tests ───────────────────────────────────────────── + + +class TestSchemaV10: + """Test that migration v10 adds correct columns.""" + + def test_migration_adds_columns(self): + """Verify migration v10 adds all 8 new columns to response_audit.""" + conn = sqlite3.connect(":memory:") + conn.execute("CREATE TABLE schema_version (version INTEGER PRIMARY KEY, applied_at TEXT)") + conn.execute("INSERT INTO schema_version (version) VALUES (9)") + conn.execute(""" + CREATE TABLE response_audit ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT, chat_id INTEGER, user TEXT, + agent TEXT DEFAULT 'rio', model TEXT, query TEXT, + confidence_score REAL, response_time_ms INTEGER, + created_at TEXT + ) + """) + + # Run the actual migration logic (same as db.py v10) + new_cols = [ + ("prompt_tokens", "INTEGER"), + ("completion_tokens", "INTEGER"), + ("generation_cost", "REAL"), + ("embedding_cost", "REAL"), + ("total_cost", "REAL"), + ("blocked", "INTEGER DEFAULT 0"), + ("block_reason", "TEXT"), + ("query_type", "TEXT"), + ] + for col_name, col_type in new_cols: + try: + conn.execute(f"ALTER TABLE response_audit ADD COLUMN {col_name} {col_type}") + except sqlite3.OperationalError: + pass + + cols = [row[1] for row in conn.execute("PRAGMA table_info(response_audit)").fetchall()] + for col_name, _ in new_cols: + assert col_name in cols, f"Missing column: {col_name}" + + def test_insert_with_new_columns(self): + """Verify insert works with eval columns.""" + conn = sqlite3.connect(":memory:") + conn.execute(""" + CREATE TABLE response_audit ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + query TEXT, prompt_tokens INTEGER, completion_tokens INTEGER, + generation_cost REAL, blocked INTEGER DEFAULT 0, block_reason TEXT + ) + """) + conn.execute( + "INSERT INTO response_audit (query, prompt_tokens, completion_tokens, generation_cost, blocked, block_reason) VALUES (?, ?, ?, ?, ?, ?)", + ("test query", 2000, 500, 0.0675, 1, "confidence_floor: 0.1"), + ) + row = conn.execute("SELECT * FROM response_audit").fetchone() + assert row[1] == "test query" + assert row[2] == 2000 + assert row[5] == 1 + + def test_migration_idempotent(self): + """Running migration twice should not error.""" + conn = sqlite3.connect(":memory:") + conn.execute("CREATE TABLE response_audit (id INTEGER PRIMARY KEY, query TEXT)") + for _ in range(2): + for col_name, col_type in [("blocked", "INTEGER DEFAULT 0"), ("total_cost", "REAL")]: + try: + conn.execute(f"ALTER TABLE response_audit ADD COLUMN {col_name} {col_type}") + except sqlite3.OperationalError: + pass + cols = [row[1] for row in conn.execute("PRAGMA table_info(response_audit)").fetchall()] + assert "blocked" in cols + assert "total_cost" in cols + + +# ─── Real failure mode replays ─────────────────────────────────────────── + + +class TestRealFailureModes: + """Replay real failure modes from audit records.""" + + def test_record_12_fabricated_url(self): + """Record #12: futard.io/proposal/GPT8d... — completely fabricated.""" + response = ( + "You can find the proposal at https://futard.io/proposal/GPT8d... " + "which shows the conditional token mechanics." + ) + kb_context = "MetaDAO uses conditional tokens for governance decisions." + cleaned, fabricated = check_url_fabrication(response, kb_context) + assert len(fabricated) > 0, "Should catch fabricated futard.io URL" + assert "futard.io" not in cleaned + + def test_record_3_confident_fabrication(self): + """Record #3: 0.7 confidence fabrication — floor doesn't catch. + Documents the gap — Layer 3 needed.""" + _, blocked, _ = apply_confidence_floor("Wrong content", 0.7) + assert blocked is False # Correctly doesn't catch — known gap + + def test_record_6_low_confidence(self): + """Record #6: confidence 0.1, should be flagged.""" + _, blocked, _ = apply_confidence_floor("Speculative response", 0.1) + assert blocked is True + + +# ─── Constants validation ──────────────────────────────────────────────── + + +class TestConstants: + def test_confidence_floor_value(self): + assert CONFIDENCE_FLOOR == 0.3 + + def test_cost_alert_threshold(self): + assert COST_ALERT_THRESHOLD == 0.22 + + def test_opus_pricing_present(self): + assert "anthropic/claude-opus-4-6" in MODEL_PRICING + + def test_haiku_pricing_correct(self): + input_rate, output_rate = MODEL_PRICING["anthropic/claude-haiku-4.5"] + assert input_rate == 0.80 + assert output_rate == 4.0 diff --git a/tests/test_reweave.py b/tests/test_reweave.py new file mode 100644 index 0000000..b03d158 --- /dev/null +++ b/tests/test_reweave.py @@ -0,0 +1,203 @@ +"""Tests for reweave.py — orphan detection, entity filtering, same-source detection, frontmatter editing.""" + +import sys +import tempfile +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from reweave import ( + _is_entity, + _same_source, + _parse_frontmatter, + _get_edge_targets, + _claim_name_variants, + find_all_claims, + build_reverse_link_index, + find_orphans, + write_edge, + _count_reweave_edges, + CLASSIFY_PROMPT, +) + + +@pytest.fixture +def kb_dir(tmp_path): + """Create a minimal KB structure for testing.""" + domains = tmp_path / "domains" / "ai-alignment" + domains.mkdir(parents=True) + entities = tmp_path / "entities" / "ai-alignment" + entities.mkdir(parents=True) + return tmp_path + + +def _write_claim(path: Path, name: str, type_: str = "claim", **extra_fm): + fm_lines = [f"name: {name}", f"type: {type_}"] + for k, v in extra_fm.items(): + if isinstance(v, list): + fm_lines.append(f"{k}:") + for item in v: + fm_lines.append(f" - {item}") + else: + fm_lines.append(f"{k}: {v}") + fm = "\n".join(fm_lines) + path.write_text(f"---\n{fm}\n---\n\nBody of {name}.\n") + + +# ─── Entity Detection ────────────────────────────────────────────────────── + + +class TestEntityDetection: + def test_entity_detected(self, kb_dir): + p = kb_dir / "entities" / "ai-alignment" / "anthropic.md" + _write_claim(p, "Anthropic", type_="entity") + assert _is_entity(p) is True + + def test_claim_not_entity(self, kb_dir): + p = kb_dir / "domains" / "ai-alignment" / "rlhf-works.md" + _write_claim(p, "RLHF works", type_="claim") + assert _is_entity(p) is False + + def test_no_frontmatter(self, tmp_path): + p = tmp_path / "bare.md" + p.write_text("No frontmatter here.") + assert _is_entity(p) is False + + +# ─── Same Source Detection ────────────────────────────────────────────────── + + +class TestSameSourceDetection: + def test_same_source_field(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + a = d / "claim-a.md" + b = d / "claim-b.md" + _write_claim(a, "Claim A", source="paper-xyz.md") + _write_claim(b, "Claim B", source="paper-xyz.md") + assert _same_source(a, b) is True + + def test_different_source(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + a = d / "claim-a.md" + b = d / "claim-b.md" + _write_claim(a, "Claim A", source="paper-xyz.md") + _write_claim(b, "Claim B", source="paper-abc.md") + assert _same_source(a, b) is False + + def test_same_source_file_field(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + a = d / "claim-a.md" + b = d / "claim-b.md" + _write_claim(a, "Claim A", source_file="sources/arxiv/1234.md") + _write_claim(b, "Claim B", source_file="sources/arxiv/1234.md") + assert _same_source(a, b) is True + + def test_no_source_field(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + a = d / "claim-a.md" + b = d / "claim-b.md" + _write_claim(a, "Claim A") + _write_claim(b, "Claim B") + assert _same_source(a, b) is False + + +# ─── Orphan Detection ────────────────────────────────────────────────────── + + +class TestOrphanDetection: + def test_orphan_found(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + a = d / "connected-claim.md" + b = d / "orphan-claim.md" + _write_claim(a, "Connected Claim", related=["orphan-claim"]) + _write_claim(b, "Orphan Claim") + claims = find_all_claims(kb_dir) + incoming = build_reverse_link_index(claims) + orphans = find_orphans(claims, incoming, kb_dir) + orphan_names = [p.stem for p in orphans] + assert "connected-claim" not in orphan_names or "orphan-claim" not in orphan_names + # connected-claim has no incoming either (only outgoing), so both may be orphans + # but the key point: orphan detection runs without error + + def test_no_orphans_when_connected(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + a = d / "claim-a.md" + b = d / "claim-b.md" + _write_claim(a, "Claim A", related=["claim-b"]) + _write_claim(b, "Claim B", related=["claim-a"]) + claims = find_all_claims(kb_dir) + incoming = build_reverse_link_index(claims) + orphans = find_orphans(claims, incoming, kb_dir) + assert len(orphans) == 0 + + +# ─── Frontmatter Editing ─────────────────────────────────────────────────── + + +class TestWriteEdge: + def test_write_edge_adds_field(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + p = d / "neighbor.md" + _write_claim(p, "Neighbor Claim") + ok = write_edge(p, "Orphan Title", "related", "2026-03-31") + assert ok is True + text = p.read_text() + assert "Orphan Title" in text + assert "reweave_edges" in text + + def test_no_duplicate_edges(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + p = d / "neighbor.md" + _write_claim(p, "Neighbor Claim", related=["Orphan Title"]) + ok = write_edge(p, "Orphan Title", "related", "2026-03-31") + assert ok is False # duplicate detected + + def test_per_file_cap(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + p = d / "neighbor.md" + # Create a file with 10 reweave_edges already + rw = [f"edge-{i}|related|2026-03-31" for i in range(10)] + _write_claim(p, "Neighbor Claim", reweave_edges=rw) + ok = write_edge(p, "New Orphan", "related", "2026-03-31") + assert ok is False # cap reached + + def test_no_blank_lines_in_frontmatter(self, kb_dir): + d = kb_dir / "domains" / "ai-alignment" + p = d / "neighbor.md" + _write_claim(p, "Neighbor Claim", supports=["existing-claim"]) + write_edge(p, "New Orphan", "related", "2026-03-31") + text = p.read_text() + # Find frontmatter section + start = text.index("---") + 3 + end = text.index("---", start) + fm_section = text[start:end] + # No blank lines in frontmatter + for line in fm_section.strip().split("\n"): + if line.strip() == "": + pytest.fail(f"Blank line found in frontmatter: {repr(fm_section)}") + + +# ─── Prompt Content ───────────────────────────────────────────────────────── + + +class TestClassifyPrompt: + def test_challenges_guidance_present(self): + assert "challenges" in CLASSIFY_PROMPT + assert "underused" in CLASSIFY_PROMPT.lower() + + def test_related_is_weakest(self): + assert "WEAKEST" in CLASSIFY_PROMPT + + +# ─── Name Variants ────────────────────────────────────────────────────────── + + +class TestNameVariants: + def test_stem_variants(self, kb_dir): + p = kb_dir / "domains" / "ai-alignment" / "rlhf-reward-hacking.md" + _write_claim(p, "RLHF Reward Hacking") + variants = _claim_name_variants(p, kb_dir) + assert "rlhf-reward-hacking" in variants + assert "rlhf reward hacking" in variants