teleo-codex/ops/pipeline-v2/lib/substantive_fixer.py
m3taversal a68f38609d fix: add date_errors to substantive fixer tag routing
date_errors was evaluated but never routed to any fixer, leaving PRs
stuck permanently. Now classified as FIXABLE with targeted prompt guidance.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 14:56:02 +00:00

603 lines
25 KiB
Python

"""Substantive fixer — acts on reviewer feedback for non-mechanical issues.
When Leo or a domain agent requests changes with substantive issues
(confidence_miscalibration, title_overclaims, scope_error, near_duplicate),
this module reads the claim + reviewer comment + original source material,
sends to an LLM, pushes the fix, and resets eval.
Issue routing:
FIXABLE (confidence, title, scope) → LLM edits the claim
CONVERTIBLE (near_duplicate) → flag for Leo to pick target, then convert
UNFIXABLE (factual_discrepancy) → close PR, re-extract with feedback
DROPPABLE (low-value, reviewer explicitly closed) → close PR
Design reviewed by Ganymede (architecture), Rhea (ops), Leo (quality).
Epimetheus owns this module. Leo reviews changes.
"""
import asyncio
import json
import logging
import os
import re
from pathlib import Path
from . import config, db
from .forgejo import api as forgejo_api, get_agent_token, get_pr_diff, repo_path
from .llm import openrouter_call
logger = logging.getLogger("pipeline.substantive_fixer")
# Issue type routing
FIXABLE_TAGS = {"confidence_miscalibration", "title_overclaims", "scope_error", "frontmatter_schema", "date_errors"}
CONVERTIBLE_TAGS = {"near_duplicate"}
UNFIXABLE_TAGS = {"factual_discrepancy"}
# Max substantive fix attempts per PR (Rhea: prevent infinite loops)
MAX_SUBSTANTIVE_FIXES = 2
# Model for fixes — Gemini Flash: cheap ($0.001/fix), different family from Sonnet reviewer
FIX_MODEL = config.MODEL_GEMINI_FLASH
# ─── Fix prompt ────────────────────────────────────────────────────────────
def _build_fix_prompt(
claim_content: str,
review_comment: str,
issue_tags: list[str],
source_content: str | None,
domain_index: str | None = None,
) -> str:
"""Build the targeted fix prompt.
Includes claim + reviewer feedback + source material.
Does NOT re-extract — makes targeted edits based on specific feedback.
"""
source_section = ""
if source_content:
# Truncate source to keep prompt manageable
source_section = f"""
## Original Source Material
{source_content[:8000]}
"""
index_section = ""
if domain_index and "near_duplicate" in issue_tags:
index_section = f"""
## Existing Claims in Domain (for near-duplicate resolution)
{domain_index[:4000]}
"""
issue_descriptions = []
for tag in issue_tags:
if tag == "confidence_miscalibration":
issue_descriptions.append("CONFIDENCE: Reviewer says the confidence level doesn't match the evidence.")
elif tag == "title_overclaims":
issue_descriptions.append("TITLE: Reviewer says the title asserts more than the evidence supports.")
elif tag == "scope_error":
issue_descriptions.append("SCOPE: Reviewer says the claim needs explicit scope qualification.")
elif tag == "date_errors":
issue_descriptions.append("DATES: Reviewer flagged incorrect, missing, or inconsistent dates in the claim. Check created dates, event dates cited in the body, and any temporal claims against the source material.")
elif tag == "near_duplicate":
issue_descriptions.append("DUPLICATE: Reviewer says this substantially duplicates an existing claim.")
return f"""You are fixing a knowledge base claim based on reviewer feedback. Make targeted edits — do NOT rewrite from scratch.
## The Claim (current version)
{claim_content}
## Reviewer Feedback
{review_comment}
## Issues to Fix
{chr(10).join(issue_descriptions)}
{source_section}
{index_section}
## Rules
1. **Implement the reviewer's explicit instructions.** If the reviewer says "change confidence to experimental," do that. If the reviewer says "confidence seems high" without a specific target, set it to one level below current.
2. **For title_overclaims:** Scope the title down to match evidence. Add qualifiers. Keep the mechanism but bound the claim.
3. **For scope_error:** Add explicit scope (structural/functional/causal/correlational) to the title. Add scoping language to the body.
4. **For near_duplicate:** Do NOT fix. Instead, identify the top 3 most similar existing claims from the domain index and output them in your response. The reviewer will pick the target.
5. **Preserve the claim's core argument.** You're adjusting precision, not changing what the claim says.
6. **Keep all frontmatter fields.** Do not remove or rename fields. Only modify the values the reviewer flagged.
## Output
For FIXABLE issues (confidence, title, scope):
Return the complete fixed claim file content (full markdown with frontmatter).
For near_duplicate:
Return JSON:
```json
{{"action": "flag_duplicate", "candidates": ["existing-claim-1.md", "existing-claim-2.md", "existing-claim-3.md"], "reasoning": "Why each candidate matches"}}
```
"""
# ─── Git helpers ───────────────────────────────────────────────────────────
async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]:
proc = await asyncio.create_subprocess_exec(
"git", *args,
cwd=cwd or str(config.REPO_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return -1, f"git {args[0]} timed out"
output = (stdout or b"").decode().strip()
if stderr:
output += "\n" + stderr.decode().strip()
return proc.returncode, output
# ─── Source and review retrieval ───────────────────────────────────────────
def _read_source_content(source_path: str) -> str | None:
"""Read source archive from main worktree."""
if not source_path:
return None
full_path = config.MAIN_WORKTREE / source_path
try:
return full_path.read_text()
except (FileNotFoundError, PermissionError):
return None
async def _get_review_comments(pr_number: int) -> str:
"""Get all review comments for a PR, concatenated."""
comments = []
page = 1
while True:
result = await forgejo_api(
"GET",
repo_path(f"issues/{pr_number}/comments?limit=50&page={page}"),
)
if not result:
break
for c in result:
body = c.get("body", "")
# Skip tier0 validation comments and pipeline ack comments
if "TIER0-VALIDATION" in body or "queued for evaluation" in body:
continue
if "VERDICT:" in body or "REJECTION:" in body:
comments.append(body)
if len(result) < 50:
break
page += 1
return "\n\n---\n\n".join(comments)
async def _get_claim_files_from_pr(pr_number: int) -> dict[str, str]:
"""Get claim file contents from a PR's diff."""
diff = await get_pr_diff(pr_number)
if not diff:
return {}
from .validate import extract_claim_files_from_diff
return extract_claim_files_from_diff(diff)
def _get_domain_index(domain: str) -> str | None:
"""Get domain-filtered KB index for near-duplicate resolution."""
index_file = f"/tmp/kb-indexes/{domain}.txt"
if os.path.exists(index_file):
return Path(index_file).read_text()
# Fallback: list domain claim files
domain_dir = config.MAIN_WORKTREE / "domains" / domain
if not domain_dir.is_dir():
return None
lines = []
for f in sorted(domain_dir.glob("*.md")):
if not f.name.startswith("_"):
lines.append(f"- {f.name}: {f.stem.replace('-', ' ')}")
return "\n".join(lines[:150]) if lines else None
# ─── Issue classification ──────────────────────────────────────────────────
def _classify_substantive(issues: list[str]) -> str:
"""Classify issue list as fixable/convertible/unfixable/droppable."""
issue_set = set(issues)
if issue_set & UNFIXABLE_TAGS:
return "unfixable"
if issue_set & CONVERTIBLE_TAGS and not (issue_set & FIXABLE_TAGS):
return "convertible"
if issue_set & FIXABLE_TAGS:
return "fixable"
return "droppable"
# ─── Fix execution ────────────────────────────────────────────────────────
async def _fix_pr(conn, pr_number: int) -> dict:
"""Attempt a substantive fix on a single PR. Returns result dict."""
# Atomic claim
cursor = conn.execute(
"UPDATE prs SET status = 'fixing', last_attempt = datetime('now') WHERE number = ? AND status = 'open'",
(pr_number,),
)
if cursor.rowcount == 0:
return {"pr": pr_number, "skipped": True, "reason": "not_open"}
# Increment fix attempts
conn.execute(
"UPDATE prs SET fix_attempts = COALESCE(fix_attempts, 0) + 1 WHERE number = ?",
(pr_number,),
)
row = conn.execute(
"SELECT branch, source_path, domain, eval_issues, fix_attempts FROM prs WHERE number = ?",
(pr_number,),
).fetchone()
branch = row["branch"]
source_path = row["source_path"]
domain = row["domain"]
fix_attempts = row["fix_attempts"] or 0
# Parse issue tags
try:
issues = json.loads(row["eval_issues"] or "[]")
except (json.JSONDecodeError, TypeError):
issues = []
# Check fix budget
if fix_attempts > MAX_SUBSTANTIVE_FIXES:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "fix_budget_exhausted"}
# Classify
classification = _classify_substantive(issues)
if classification == "unfixable":
# Close and re-extract
logger.info("PR #%d: unfixable (%s) — closing, source re-queued", pr_number, issues)
await _close_and_reextract(conn, pr_number, issues)
return {"pr": pr_number, "action": "closed_reextract", "issues": issues}
if classification == "droppable":
logger.info("PR #%d: droppable (%s) — closing", pr_number, issues)
conn.execute(
"UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?",
(f"droppable: {issues}", pr_number),
)
return {"pr": pr_number, "action": "closed_droppable", "issues": issues}
# Refresh main worktree for source read (Ganymede: ensure freshness)
await _git("fetch", "origin", "main", cwd=str(config.MAIN_WORKTREE))
await _git("reset", "--hard", "origin/main", cwd=str(config.MAIN_WORKTREE))
# Gather context
review_text = await _get_review_comments(pr_number)
claim_files = await _get_claim_files_from_pr(pr_number)
source_content = _read_source_content(source_path)
domain_index = _get_domain_index(domain) if "near_duplicate" in issues else None
if not claim_files:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "no_claim_files"}
if not review_text:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "no_review_comments"}
if classification == "convertible":
# Near-duplicate: auto-convert to enrichment if high-confidence match (>= 0.90).
# Below threshold: flag for Leo. (Leo approved: "evidence loss > wrong target risk")
result = await _auto_convert_near_duplicate(
conn, pr_number, claim_files, domain,
)
if result.get("converted"):
conn.execute(
"UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?",
(f"auto-enriched: {result['target_claim']} (sim={result['similarity']:.2f})", pr_number),
)
await forgejo_api("PATCH", repo_path(f"pulls/{pr_number}"), {"state": "closed"})
await forgejo_api("POST", repo_path(f"issues/{pr_number}/comments"), {
"body": (
f"**Auto-converted:** Evidence from this PR enriched "
f"`{result['target_claim']}` (similarity: {result['similarity']:.2f}).\n\n"
f"Leo: review if wrong target. Enrichment labeled "
f"`### Auto-enrichment (near-duplicate conversion)` in the target file."
),
})
db.audit(conn, "substantive_fixer", "auto_enrichment", json.dumps({
"pr": pr_number, "target_claim": result["target_claim"],
"similarity": round(result["similarity"], 3), "domain": domain,
}))
logger.info("PR #%d: auto-enriched on %s (sim=%.2f)",
pr_number, result["target_claim"], result["similarity"])
return {"pr": pr_number, "action": "auto_enriched", "target": result["target_claim"]}
else:
# Below 0.90 threshold — flag for Leo
logger.info("PR #%d: near_duplicate, best match %.2f < 0.90 — flagging Leo",
pr_number, result.get("best_similarity", 0))
await _flag_for_leo_review(conn, pr_number, claim_files, review_text, domain_index)
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "action": "flagged_duplicate", "issues": issues}
# FIXABLE: send to LLM
# Fix each claim file individually
fixed_any = False
for filepath, content in claim_files.items():
prompt = _build_fix_prompt(content, review_text, issues, source_content, domain_index)
result, _usage = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096)
if not result:
logger.warning("PR #%d: fix LLM call failed for %s", pr_number, filepath)
continue
# Check if result is a duplicate flag (JSON) or fixed content (markdown)
if result.strip().startswith("{"):
try:
parsed = json.loads(result)
if parsed.get("action") == "flag_duplicate":
await _flag_for_leo_review(conn, pr_number, claim_files, review_text, domain_index)
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "action": "flagged_duplicate_by_llm"}
except json.JSONDecodeError:
pass
# Write fixed content to worktree and push
fixed_any = True
logger.info("PR #%d: fixed %s for %s", pr_number, filepath, issues)
if not fixed_any:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "no_fixes_applied"}
# Push fix and reset for re-eval
# Create worktree, apply fix, commit, push
worktree_path = str(config.BASE_DIR / "workspaces" / f"subfix-{pr_number}")
await _git("fetch", "origin", branch, timeout=30)
rc, out = await _git("worktree", "add", "--detach", worktree_path, f"origin/{branch}")
if rc != 0:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "worktree_failed"}
try:
rc, out = await _git("checkout", "-B", branch, f"origin/{branch}", cwd=worktree_path)
if rc != 0:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "checkout_failed"}
# Write fixed files
for filepath, content in claim_files.items():
prompt = _build_fix_prompt(content, review_text, issues, source_content, domain_index)
fixed_content, _usage = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096)
if fixed_content and not fixed_content.strip().startswith("{"):
full_path = Path(worktree_path) / filepath
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(fixed_content)
# Commit and push
rc, _ = await _git("add", "-A", cwd=worktree_path)
commit_msg = f"substantive-fix: address reviewer feedback ({', '.join(issues)})"
rc, _ = await _git("commit", "-m", commit_msg, cwd=worktree_path)
if rc != 0:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "nothing_to_commit"}
# Reset eval state BEFORE push (same pattern as fixer.py)
conn.execute(
"""UPDATE prs SET
status = 'open',
eval_attempts = 0,
eval_issues = '[]',
tier0_pass = NULL,
domain_verdict = 'pending',
leo_verdict = 'pending',
last_error = NULL
WHERE number = ?""",
(pr_number,),
)
rc, out = await _git("push", "origin", branch, cwd=worktree_path, timeout=30)
if rc != 0:
logger.error("PR #%d: push failed: %s", pr_number, out)
return {"pr": pr_number, "skipped": True, "reason": "push_failed"}
db.audit(
conn, "substantive_fixer", "fixed",
json.dumps({"pr": pr_number, "issues": issues, "attempt": fix_attempts}),
)
logger.info("PR #%d: substantive fix pushed, reset for re-eval", pr_number)
return {"pr": pr_number, "action": "fixed", "issues": issues}
finally:
await _git("worktree", "remove", "--force", worktree_path)
async def _auto_convert_near_duplicate(
conn, pr_number: int, claim_files: dict, domain: str,
) -> dict:
"""Auto-convert a near-duplicate claim into an enrichment on the best-match existing claim.
Returns {"converted": True, "target_claim": "...", "similarity": 0.95} on success.
Returns {"converted": False, "best_similarity": 0.80} when no match >= 0.90.
Threshold 0.90 (Leo: conservative, lower later based on false-positive rate).
"""
from difflib import SequenceMatcher
SIMILARITY_THRESHOLD = 0.90
main_wt = str(config.MAIN_WORKTREE)
# Get the duplicate claim's title and body
first_filepath = next(iter(claim_files.keys()), "")
first_content = next(iter(claim_files.values()), "")
dup_title = Path(first_filepath).stem.replace("-", " ").lower()
# Extract the body (evidence) from the duplicate — this is what we preserve
from .post_extract import parse_frontmatter
fm, body = parse_frontmatter(first_content)
if not body:
body = first_content # Fallback: use full content
# Strip the H1 and Relevant Notes sections — keep just the argument
evidence = re.sub(r"^# .+\n*", "", body).strip()
evidence = re.split(r"\n---\n", evidence)[0].strip()
if not evidence or len(evidence) < 20:
return {"converted": False, "best_similarity": 0, "reason": "no_evidence_to_preserve"}
# Find best-match existing claim in the domain
domain_dir = Path(main_wt) / "domains" / (domain or "")
best_match = None
best_similarity = 0.0
if domain_dir.is_dir():
for f in domain_dir.glob("*.md"):
if f.name.startswith("_"):
continue
existing_title = f.stem.replace("-", " ").lower()
sim = SequenceMatcher(None, dup_title, existing_title).ratio()
if sim > best_similarity:
best_similarity = sim
best_match = f
if best_similarity < SIMILARITY_THRESHOLD or best_match is None:
return {"converted": False, "best_similarity": best_similarity}
# Queue the enrichment — entity_batch handles the actual write to main.
# Single writer pattern prevents race conditions. (Ganymede)
from .entity_queue import queue_enrichment
try:
queue_enrichment(
target_claim=best_match.name,
evidence=evidence,
pr_number=pr_number,
original_title=dup_title,
similarity=best_similarity,
domain=domain or "",
)
except Exception as e:
logger.error("PR #%d: failed to queue enrichment: %s", pr_number, e)
return {"converted": False, "best_similarity": best_similarity, "reason": f"queue_failed: {e}"}
return {
"converted": True,
"target_claim": best_match.name,
"similarity": best_similarity,
}
async def _close_and_reextract(conn, pr_number: int, issues: list[str]):
"""Close PR and mark source for re-extraction with feedback."""
await forgejo_api(
"PATCH", repo_path(f"pulls/{pr_number}"), {"state": "closed"},
)
conn.execute(
"UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?",
(f"unfixable: {', '.join(issues)}", pr_number),
)
conn.execute(
"""UPDATE sources SET status = 'needs_reextraction', feedback = ?,
updated_at = datetime('now')
WHERE path = (SELECT source_path FROM prs WHERE number = ?)""",
(json.dumps({"issues": issues, "pr": pr_number}), pr_number),
)
db.audit(conn, "substantive_fixer", "closed_reextract",
json.dumps({"pr": pr_number, "issues": issues}))
async def _flag_for_leo_review(
conn, pr_number: int, claim_files: dict, review_text: str, domain_index: str | None,
):
"""Flag a near-duplicate PR for Leo to pick the enrichment target."""
# Get first claim content for matching
first_claim = next(iter(claim_files.values()), "")
# Use LLM to identify candidate matches
if domain_index:
prompt = _build_fix_prompt(first_claim, review_text, ["near_duplicate"], None, domain_index)
result, _usage = await openrouter_call(FIX_MODEL, prompt, timeout_sec=60, max_tokens=1024)
candidates_text = result or "Could not identify candidates."
else:
candidates_text = "No domain index available."
comment = (
f"**Substantive fixer: near-duplicate detected**\n\n"
f"This PR's claims may duplicate existing KB content. "
f"Leo: please pick the enrichment target or close if not worth converting.\n\n"
f"**Candidate matches:**\n{candidates_text}\n\n"
f"_Reply with the target claim filename to convert, or close the PR._"
)
await forgejo_api(
"POST", repo_path(f"issues/{pr_number}/comments"), {"body": comment},
)
db.audit(conn, "substantive_fixer", "flagged_duplicate",
json.dumps({"pr": pr_number}))
# ─── Stage entry point ─────────────────────────────────────────────────────
async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one substantive fix cycle. Called by the fixer stage after mechanical fixes.
Finds PRs with substantive issue tags that haven't exceeded fix budget.
Processes up to 3 per cycle (Rhea: 180s interval, don't overwhelm eval).
"""
rows = conn.execute(
"""SELECT number, eval_issues FROM prs
WHERE status = 'open'
AND tier0_pass = 1
AND (domain_verdict = 'request_changes' OR leo_verdict = 'request_changes')
AND COALESCE(fix_attempts, 0) < ?
AND (last_attempt IS NULL OR last_attempt < datetime('now', '-3 minutes'))
ORDER BY created_at ASC
LIMIT 3""",
(MAX_SUBSTANTIVE_FIXES + config.MAX_FIX_ATTEMPTS,), # Total budget: mechanical + substantive
).fetchall()
if not rows:
return 0, 0
# Filter to only PRs with substantive issues (not just mechanical)
substantive_rows = []
for row in rows:
try:
issues = json.loads(row["eval_issues"] or "[]")
except (json.JSONDecodeError, TypeError):
continue
if set(issues) & (FIXABLE_TAGS | CONVERTIBLE_TAGS | UNFIXABLE_TAGS):
substantive_rows.append(row)
if not substantive_rows:
return 0, 0
fixed = 0
errors = 0
for row in substantive_rows:
try:
result = await _fix_pr(conn, row["number"])
if result.get("action"):
fixed += 1
elif result.get("skipped"):
logger.debug("PR #%d: substantive fix skipped: %s", row["number"], result.get("reason"))
except Exception:
logger.exception("PR #%d: substantive fix failed", row["number"])
errors += 1
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],))
if fixed or errors:
logger.info("Substantive fix cycle: %d fixed, %d errors", fixed, errors)
return fixed, errors