teleo-infrastructure/lib/eval_parse.py
m3taversal f46e14dfae
Some checks are pending
CI / lint-and-test (push) Waiting to run
refactor: Phase 4 — extract eval_actions.py, drop underscore prefixes in eval_parse
Three changes:

1. Drop underscore prefixes in eval_parse.py — functions are now the public
   API of the module (filter_diff, parse_verdict, classify_issues, etc.).
   All 12 functions renamed, imports updated in evaluate.py and tests.

2. Extract eval_actions.py from evaluate.py — 3 async PR disposition functions:
   - post_formal_approvals: submit Forgejo reviews from 2 agents
   - terminate_pr: close PR, post rejection comment, requeue source
   - dispose_rejected_pr: disposition logic for rejected PRs on attempt 2+
   evaluate.py drops from ~1140 to 911 lines.

3. 14 new tests in test_eval_actions.py covering all three functions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 12:57:51 +01:00

434 lines
17 KiB
Python

"""Pure parsing functions for the eval stage — zero I/O, zero async.
Extracted from evaluate.py to isolate testable parsing logic from
orchestration, DB, and Forgejo API calls.
Contents:
- Diff helpers: filter, classify, tier routing
- Verdict/issue parsing: structured tags + prose inference
- Batch response parsing: fan-out validation
All functions are pure (input → output). The only external dependency
is config.MECHANICAL_ISSUE_TAGS / config.SUBSTANTIVE_ISSUE_TAGS for
classify_issues.
"""
import logging
import re
from . import config
logger = logging.getLogger("pipeline.eval_parse")
# ─── Diff helpers ──────────────────────────────────────────────────────────
def filter_diff(diff: str) -> tuple[str, str]:
"""Filter diff to only review-relevant files.
Returns (review_diff, entity_diff).
Strips: inbox/, schemas/, skills/, agents/*/musings/
"""
sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE)
skip_patterns = [r"^diff --git a/(inbox/(archive|queue|null-result)|schemas|skills|agents/[^/]+/musings)/"]
core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"}
claim_sections = []
entity_sections = []
for section in sections:
if not section.strip():
continue
if any(re.match(p, section) for p in skip_patterns):
continue
entity_match = re.match(r"^diff --git a/entities/([^/]+)/", section)
if entity_match and entity_match.group(1) not in core_domains:
entity_sections.append(section)
continue
claim_sections.append(section)
return "".join(claim_sections), "".join(entity_sections)
def extract_changed_files(diff: str) -> str:
"""Extract changed file paths from diff."""
return "\n".join(
line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git")
)
def is_musings_only(diff: str) -> bool:
"""Check if PR only modifies musing files."""
has_musings = False
has_other = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
if "agents/" in line and "/musings/" in line:
has_musings = True
else:
has_other = True
return has_musings and not has_other
def diff_contains_claim_type(diff: str) -> bool:
"""Claim-shape detector: check if any file in diff has type: claim in frontmatter.
Mechanical check ($0). If YAML declares type: claim, this is a factual claim —
not an entity update or formatting fix. Must be classified STANDARD minimum
regardless of Haiku triage. Catches factual claims disguised as LIGHT content.
(Theseus: converts semantic problem to mechanical check)
"""
for line in diff.split("\n"):
if line.startswith("+") and not line.startswith("+++"):
stripped = line[1:].strip()
if stripped in ("type: claim", 'type: "claim"', "type: 'claim'"):
return True
return False
def deterministic_tier(diff: str) -> str | None:
"""Deterministic tier routing — skip Haiku triage for obvious cases.
Checks diff file patterns before calling the LLM. Returns tier string
if deterministic, None if Haiku triage is needed.
Rules (Leo-calibrated):
- All files in entities/ only → LIGHT
- All files in inbox/ only (queue, archive, null-result) → LIGHT
- Any file in core/ or foundations/ → DEEP (structural KB changes)
- Has challenged_by field → DEEP (challenges existing claims)
- Modifies existing file (not new) in domains/ → DEEP (enrichment/change)
- Otherwise → None (needs Haiku triage)
NOTE: Cross-domain wiki links are NOT a DEEP signal — most claims link
across domains, that's the whole point of the knowledge graph (Leo).
"""
changed_files = []
for line in diff.split("\n"):
if line.startswith("diff --git a/"):
path = line.replace("diff --git a/", "").split(" b/")[0]
changed_files.append(path)
if not changed_files:
return None
# All entities/ only → LIGHT
if all(f.startswith("entities/") for f in changed_files):
logger.info("Deterministic tier: LIGHT (all files in entities/)")
return "LIGHT"
# All inbox/ only (queue, archive, null-result) → LIGHT
if all(f.startswith("inbox/") for f in changed_files):
logger.info("Deterministic tier: LIGHT (all files in inbox/)")
return "LIGHT"
# Any file in core/ or foundations/ → DEEP (structural KB changes)
if any(f.startswith("core/") or f.startswith("foundations/") for f in changed_files):
logger.info("Deterministic tier: DEEP (touches core/ or foundations/)")
return "DEEP"
# Check diff content for DEEP signals
has_challenged_by = False
new_files: set[str] = set()
lines = diff.split("\n")
for i, line in enumerate(lines):
# Detect new files
if line.startswith("--- /dev/null") and i + 1 < len(lines) and lines[i + 1].startswith("+++ b/"):
new_files.add(lines[i + 1][6:])
# Check for challenged_by field
if line.startswith("+") and not line.startswith("+++"):
stripped = line[1:].strip()
if stripped.startswith("challenged_by:"):
has_challenged_by = True
if has_challenged_by:
logger.info("Deterministic tier: DEEP (has challenged_by field)")
return "DEEP"
# NOTE: Modified existing domain claims are NOT auto-DEEP — enrichments
# (appending evidence) are common and should be STANDARD. Let Haiku triage
# distinguish enrichments from structural changes.
return None
# ─── Verdict parsing ──────────────────────────────────────────────────────
def parse_verdict(review_text: str, reviewer: str) -> str:
"""Parse VERDICT tag from review. Returns 'approve' or 'request_changes'."""
upper = reviewer.upper()
if f"VERDICT:{upper}:APPROVE" in review_text:
return "approve"
elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text:
return "request_changes"
else:
logger.warning("No parseable verdict from %s — treating as request_changes", reviewer)
return "request_changes"
# Map model-invented tags to valid tags. Models consistently ignore the valid
# tag list and invent their own. This normalizes them. (Ganymede, Mar 14)
_TAG_ALIASES: dict[str, str] = {
"schema_violation": "frontmatter_schema",
"missing_schema_fields": "frontmatter_schema",
"missing_schema": "frontmatter_schema",
"schema": "frontmatter_schema",
"missing_frontmatter": "frontmatter_schema",
"redundancy": "near_duplicate",
"duplicate": "near_duplicate",
"missing_confidence": "confidence_miscalibration",
"confidence_error": "confidence_miscalibration",
"vague_claims": "scope_error",
"unfalsifiable": "scope_error",
"unverified_wiki_links": "broken_wiki_links",
"unverified-wiki-links": "broken_wiki_links",
"missing_wiki_links": "broken_wiki_links",
"invalid_wiki_links": "broken_wiki_links",
"wiki_link_errors": "broken_wiki_links",
"overclaiming": "title_overclaims",
"title_overclaim": "title_overclaims",
"date_error": "date_errors",
"factual_error": "factual_discrepancy",
"factual_inaccuracy": "factual_discrepancy",
}
VALID_ISSUE_TAGS = {"broken_wiki_links", "frontmatter_schema", "title_overclaims",
"confidence_miscalibration", "date_errors", "factual_discrepancy",
"near_duplicate", "scope_error"}
def normalize_tag(tag: str) -> str | None:
"""Normalize a model-generated tag to a valid tag, or None if unrecognizable."""
tag = tag.strip().lower().replace("-", "_")
if tag in VALID_ISSUE_TAGS:
return tag
if tag in _TAG_ALIASES:
return _TAG_ALIASES[tag]
# Fuzzy: check if any valid tag is a substring or vice versa
for valid in VALID_ISSUE_TAGS:
if valid in tag or tag in valid:
return valid
return None
# ─── Issue parsing ─────────────────────────────────────────────────────────
# Keyword patterns for inferring issue tags from unstructured review prose.
# Conservative: only match unambiguous indicators. Order doesn't matter.
_PROSE_TAG_PATTERNS: dict[str, list[re.Pattern]] = {
"frontmatter_schema": [
re.compile(r"frontmatter", re.IGNORECASE),
re.compile(r"missing.{0,20}(type|domain|confidence|source|created)\b", re.IGNORECASE),
re.compile(r"yaml.{0,10}(invalid|missing|error|schema)", re.IGNORECASE),
re.compile(r"required field", re.IGNORECASE),
re.compile(r"lacks?.{0,15}(required|yaml|schema|fields)", re.IGNORECASE),
re.compile(r"missing.{0,15}(schema|fields|frontmatter)", re.IGNORECASE),
re.compile(r"schema.{0,10}(compliance|violation|missing|invalid)", re.IGNORECASE),
],
"broken_wiki_links": [
re.compile(r"(broken|dead|invalid).{0,10}(wiki.?)?link", re.IGNORECASE),
re.compile(r"wiki.?link.{0,20}(not found|missing|broken|invalid|resolv|unverif)", re.IGNORECASE),
re.compile(r"\[\[.{1,80}\]\].{0,20}(not found|doesn.t exist|missing)", re.IGNORECASE),
re.compile(r"unverified.{0,10}(wiki|link)", re.IGNORECASE),
],
"factual_discrepancy": [
re.compile(r"factual.{0,10}(error|inaccura|discrepanc|incorrect)", re.IGNORECASE),
re.compile(r"misrepresent", re.IGNORECASE),
],
"confidence_miscalibration": [
re.compile(r"confidence.{0,20}(too high|too low|miscalibrat|overstat|should be)", re.IGNORECASE),
re.compile(r"(overstat|understat).{0,20}confidence", re.IGNORECASE),
],
"scope_error": [
re.compile(r"scope.{0,10}(error|too broad|overscop|unscoped)", re.IGNORECASE),
re.compile(r"unscoped.{0,10}(universal|claim)", re.IGNORECASE),
re.compile(r"(vague|unfalsifiable).{0,15}(claim|assertion)", re.IGNORECASE),
re.compile(r"not.{0,10}(specific|falsifiable|disagreeable).{0,10}enough", re.IGNORECASE),
],
"title_overclaims": [
re.compile(r"title.{0,20}(overclaim|overstat|too broad)", re.IGNORECASE),
re.compile(r"overclaim", re.IGNORECASE),
],
"near_duplicate": [
re.compile(r"near.?duplicate", re.IGNORECASE),
re.compile(r"(very|too) similar.{0,20}(claim|title|existing)", re.IGNORECASE),
re.compile(r"duplicate.{0,20}(of|claim|title|existing|information)", re.IGNORECASE),
re.compile(r"redundan", re.IGNORECASE),
],
}
def parse_issues(review_text: str) -> list[str]:
"""Extract issue tags from review.
First tries structured <!-- ISSUES: tag1, tag2 --> comment with tag normalization.
Falls back to keyword inference from prose.
"""
match = re.search(r"<!-- ISSUES: ([^>]+) -->", review_text)
if match:
raw_tags = [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
normalized = []
for tag in raw_tags:
norm = normalize_tag(tag)
if norm and norm not in normalized:
normalized.append(norm)
else:
logger.debug("Unrecognized issue tag '%s' — dropped", tag)
if normalized:
return normalized
# Fallback: infer tags from review prose
return infer_issues_from_prose(review_text)
def infer_issues_from_prose(review_text: str) -> list[str]:
"""Infer issue tags from unstructured review text via keyword matching.
Fallback for reviews that reject without structured <!-- ISSUES: --> tags.
Conservative: requires at least one unambiguous keyword match per tag.
"""
inferred = []
for tag, patterns in _PROSE_TAG_PATTERNS.items():
if any(p.search(review_text) for p in patterns):
inferred.append(tag)
return inferred
def classify_issues(issues: list[str]) -> str:
"""Classify issue tags as 'mechanical', 'substantive', or 'mixed'."""
if not issues:
return "unknown"
mechanical = set(issues) & config.MECHANICAL_ISSUE_TAGS
substantive = set(issues) & config.SUBSTANTIVE_ISSUE_TAGS
if substantive and not mechanical:
return "substantive"
if mechanical and not substantive:
return "mechanical"
if mechanical and substantive:
return "mixed"
return "unknown" # tags not in either set
# ─── Batch response parsing ───────────────────────────────────────────────
def parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]:
"""Parse batched domain review into per-PR review sections.
Returns {pr_number: review_text} for each PR found in the response.
Missing PRs are omitted — caller handles fallback.
"""
agent_upper = agent.upper()
result: dict[int, str] = {}
# Split by PR verdict markers: <!-- PR:NNN VERDICT:AGENT:... -->
# Each marker terminates the previous PR's section
pattern = re.compile(
r"<!-- PR:(\d+) VERDICT:" + re.escape(agent_upper) + r":(APPROVE|REQUEST_CHANGES) -->"
)
matches = list(pattern.finditer(response))
if not matches:
return result
for i, match in enumerate(matches):
pr_num = int(match.group(1))
marker_end = match.end()
# Find the start of this PR's section by looking for the section header
# or the end of the previous verdict
section_header = f"=== PR #{pr_num}"
header_pos = response.rfind(section_header, 0, match.start())
if header_pos >= 0:
# Extract from header to end of verdict marker
section_text = response[header_pos:marker_end].strip()
else:
# No header found — extract from previous marker end to this marker end
prev_end = matches[i - 1].end() if i > 0 else 0
section_text = response[prev_end:marker_end].strip()
# Re-format as individual review comment
# Strip the batch section header, keep just the review content
# Add batch label for traceability
pr_nums_str = ", ".join(f"#{n}" for n in pr_numbers)
review_text = (
f"*(batch review with PRs {pr_nums_str})*\n\n"
f"{section_text}\n"
)
result[pr_num] = review_text
return result
def validate_batch_fanout(
parsed: dict[int, str],
pr_diffs: list[dict],
agent: str,
) -> tuple[dict[int, str], list[int]]:
"""Validate batch fan-out for completeness and cross-contamination.
Returns (valid_reviews, fallback_pr_numbers).
- valid_reviews: reviews that passed validation
- fallback_pr_numbers: PRs that need individual review (missing or cross-contaminated)
"""
valid: dict[int, str] = {}
fallback: list[int] = []
# Build file map: pr_number → set of path segments for matching.
# Use full paths (e.g., "domains/internet-finance/dao.md") not bare filenames
# to avoid false matches on short names like "dao.md" or "space.md" (Leo note #3).
pr_files: dict[int, set[str]] = {}
for pr in pr_diffs:
files = set()
for line in pr["diff"].split("\n"):
if line.startswith("diff --git a/"):
path = line.replace("diff --git a/", "").split(" b/")[0]
files.add(path)
# Also add the last 2 path segments (e.g., "internet-finance/dao.md")
# for models that abbreviate paths
parts = path.split("/")
if len(parts) >= 2:
files.add("/".join(parts[-2:]))
pr_files[pr["number"]] = files
for pr in pr_diffs:
pr_num = pr["number"]
# Completeness check: is there a review for this PR?
if pr_num not in parsed:
logger.warning("Batch fan-out: PR #%d missing from response — fallback to individual", pr_num)
fallback.append(pr_num)
continue
review = parsed[pr_num]
# Cross-contamination check: does review mention at least one file from this PR?
# Use path segments (min 10 chars) to avoid false substring matches on short names.
my_files = pr_files.get(pr_num, set())
mentions_own_file = any(f in review for f in my_files if len(f) >= 10)
if not mentions_own_file and my_files:
# Check if it references files from OTHER PRs (cross-contamination signal)
other_files = set()
for other_pr in pr_diffs:
if other_pr["number"] != pr_num:
other_files.update(pr_files.get(other_pr["number"], set()))
mentions_other = any(f in review for f in other_files if len(f) >= 10)
if mentions_other:
logger.warning(
"Batch fan-out: PR #%d review references files from another PR — cross-contamination, fallback",
pr_num,
)
fallback.append(pr_num)
continue
# If it doesn't mention any files at all, could be a generic review — accept it
# (some PRs have short diffs where the model doesn't reference filenames)
valid[pr_num] = review
return valid, fallback