"""Pure parsing functions for the eval stage — zero I/O, zero async. Extracted from evaluate.py to isolate testable parsing logic from orchestration, DB, and Forgejo API calls. Contents: - Diff helpers: filter, classify, tier routing - Verdict/issue parsing: structured tags + prose inference - Batch response parsing: fan-out validation All functions are pure (input → output). The only external dependency is config.MECHANICAL_ISSUE_TAGS / config.SUBSTANTIVE_ISSUE_TAGS for classify_issues. """ import logging import re from . import config logger = logging.getLogger("pipeline.eval_parse") # ─── Diff helpers ────────────────────────────────────────────────────────── def filter_diff(diff: str) -> tuple[str, str]: """Filter diff to only review-relevant files. Returns (review_diff, entity_diff). Strips: inbox/, schemas/, skills/, agents/*/musings/ """ sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE) skip_patterns = [r"^diff --git a/(inbox/(archive|queue|null-result)|schemas|skills|agents/[^/]+/musings)/"] core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"} claim_sections = [] entity_sections = [] for section in sections: if not section.strip(): continue if any(re.match(p, section) for p in skip_patterns): continue entity_match = re.match(r"^diff --git a/entities/([^/]+)/", section) if entity_match and entity_match.group(1) not in core_domains: entity_sections.append(section) continue claim_sections.append(section) return "".join(claim_sections), "".join(entity_sections) def extract_changed_files(diff: str) -> str: """Extract changed file paths from diff.""" return "\n".join( line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git") ) def is_musings_only(diff: str) -> bool: """Check if PR only modifies musing files.""" has_musings = False has_other = False for line in diff.split("\n"): if line.startswith("diff --git"): if "agents/" in line and "/musings/" in line: has_musings = True else: has_other = True return has_musings and not has_other def diff_contains_claim_type(diff: str) -> bool: """Claim-shape detector: check if any file in diff has type: claim in frontmatter. Mechanical check ($0). If YAML declares type: claim, this is a factual claim — not an entity update or formatting fix. Must be classified STANDARD minimum regardless of Haiku triage. Catches factual claims disguised as LIGHT content. (Theseus: converts semantic problem to mechanical check) """ for line in diff.split("\n"): if line.startswith("+") and not line.startswith("+++"): stripped = line[1:].strip() if stripped in ("type: claim", 'type: "claim"', "type: 'claim'"): return True return False def deterministic_tier(diff: str) -> str | None: """Deterministic tier routing — skip Haiku triage for obvious cases. Checks diff file patterns before calling the LLM. Returns tier string if deterministic, None if Haiku triage is needed. Rules (Leo-calibrated): - All files in entities/ only → LIGHT - All files in inbox/ only (queue, archive, null-result) → LIGHT - Any file in core/ or foundations/ → DEEP (structural KB changes) - Has challenged_by field → DEEP (challenges existing claims) - Modifies existing file (not new) in domains/ → DEEP (enrichment/change) - Otherwise → None (needs Haiku triage) NOTE: Cross-domain wiki links are NOT a DEEP signal — most claims link across domains, that's the whole point of the knowledge graph (Leo). """ changed_files = [] for line in diff.split("\n"): if line.startswith("diff --git a/"): path = line.replace("diff --git a/", "").split(" b/")[0] changed_files.append(path) if not changed_files: return None # All entities/ only → LIGHT if all(f.startswith("entities/") for f in changed_files): logger.info("Deterministic tier: LIGHT (all files in entities/)") return "LIGHT" # All inbox/ only (queue, archive, null-result) → LIGHT if all(f.startswith("inbox/") for f in changed_files): logger.info("Deterministic tier: LIGHT (all files in inbox/)") return "LIGHT" # Any file in core/ or foundations/ → DEEP (structural KB changes) if any(f.startswith("core/") or f.startswith("foundations/") for f in changed_files): logger.info("Deterministic tier: DEEP (touches core/ or foundations/)") return "DEEP" # Check diff content for DEEP signals has_challenged_by = False new_files: set[str] = set() lines = diff.split("\n") for i, line in enumerate(lines): # Detect new files if line.startswith("--- /dev/null") and i + 1 < len(lines) and lines[i + 1].startswith("+++ b/"): new_files.add(lines[i + 1][6:]) # Check for challenged_by field if line.startswith("+") and not line.startswith("+++"): stripped = line[1:].strip() if stripped.startswith("challenged_by:"): has_challenged_by = True if has_challenged_by: logger.info("Deterministic tier: DEEP (has challenged_by field)") return "DEEP" # NOTE: Modified existing domain claims are NOT auto-DEEP — enrichments # (appending evidence) are common and should be STANDARD. Let Haiku triage # distinguish enrichments from structural changes. return None # ─── Verdict parsing ────────────────────────────────────────────────────── def parse_verdict(review_text: str, reviewer: str) -> str: """Parse VERDICT tag from review. Returns 'approve' or 'request_changes'.""" upper = reviewer.upper() if f"VERDICT:{upper}:APPROVE" in review_text: return "approve" elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text: return "request_changes" else: logger.warning("No parseable verdict from %s — treating as request_changes", reviewer) return "request_changes" # Map model-invented tags to valid tags. Models consistently ignore the valid # tag list and invent their own. This normalizes them. (Ganymede, Mar 14) _TAG_ALIASES: dict[str, str] = { "schema_violation": "frontmatter_schema", "missing_schema_fields": "frontmatter_schema", "missing_schema": "frontmatter_schema", "schema": "frontmatter_schema", "missing_frontmatter": "frontmatter_schema", "redundancy": "near_duplicate", "duplicate": "near_duplicate", "missing_confidence": "confidence_miscalibration", "confidence_error": "confidence_miscalibration", "vague_claims": "scope_error", "unfalsifiable": "scope_error", "unverified_wiki_links": "broken_wiki_links", "unverified-wiki-links": "broken_wiki_links", "missing_wiki_links": "broken_wiki_links", "invalid_wiki_links": "broken_wiki_links", "wiki_link_errors": "broken_wiki_links", "overclaiming": "title_overclaims", "title_overclaim": "title_overclaims", "date_error": "date_errors", "factual_error": "factual_discrepancy", "factual_inaccuracy": "factual_discrepancy", } VALID_ISSUE_TAGS = {"broken_wiki_links", "frontmatter_schema", "title_overclaims", "confidence_miscalibration", "date_errors", "factual_discrepancy", "near_duplicate", "scope_error"} def normalize_tag(tag: str) -> str | None: """Normalize a model-generated tag to a valid tag, or None if unrecognizable.""" tag = tag.strip().lower().replace("-", "_") if tag in VALID_ISSUE_TAGS: return tag if tag in _TAG_ALIASES: return _TAG_ALIASES[tag] # Fuzzy: check if any valid tag is a substring or vice versa for valid in VALID_ISSUE_TAGS: if valid in tag or tag in valid: return valid return None # ─── Issue parsing ───────────────────────────────────────────────────────── # Keyword patterns for inferring issue tags from unstructured review prose. # Conservative: only match unambiguous indicators. Order doesn't matter. _PROSE_TAG_PATTERNS: dict[str, list[re.Pattern]] = { "frontmatter_schema": [ re.compile(r"frontmatter", re.IGNORECASE), re.compile(r"missing.{0,20}(type|domain|confidence|source|created)\b", re.IGNORECASE), re.compile(r"yaml.{0,10}(invalid|missing|error|schema)", re.IGNORECASE), re.compile(r"required field", re.IGNORECASE), re.compile(r"lacks?.{0,15}(required|yaml|schema|fields)", re.IGNORECASE), re.compile(r"missing.{0,15}(schema|fields|frontmatter)", re.IGNORECASE), re.compile(r"schema.{0,10}(compliance|violation|missing|invalid)", re.IGNORECASE), ], "broken_wiki_links": [ re.compile(r"(broken|dead|invalid).{0,10}(wiki.?)?link", re.IGNORECASE), re.compile(r"wiki.?link.{0,20}(not found|missing|broken|invalid|resolv|unverif)", re.IGNORECASE), re.compile(r"\[\[.{1,80}\]\].{0,20}(not found|doesn.t exist|missing)", re.IGNORECASE), re.compile(r"unverified.{0,10}(wiki|link)", re.IGNORECASE), ], "factual_discrepancy": [ re.compile(r"factual.{0,10}(error|inaccura|discrepanc|incorrect)", re.IGNORECASE), re.compile(r"misrepresent", re.IGNORECASE), ], "confidence_miscalibration": [ re.compile(r"confidence.{0,20}(too high|too low|miscalibrat|overstat|should be)", re.IGNORECASE), re.compile(r"(overstat|understat).{0,20}confidence", re.IGNORECASE), ], "scope_error": [ re.compile(r"scope.{0,10}(error|too broad|overscop|unscoped)", re.IGNORECASE), re.compile(r"unscoped.{0,10}(universal|claim)", re.IGNORECASE), re.compile(r"(vague|unfalsifiable).{0,15}(claim|assertion)", re.IGNORECASE), re.compile(r"not.{0,10}(specific|falsifiable|disagreeable).{0,10}enough", re.IGNORECASE), ], "title_overclaims": [ re.compile(r"title.{0,20}(overclaim|overstat|too broad)", re.IGNORECASE), re.compile(r"overclaim", re.IGNORECASE), ], "near_duplicate": [ re.compile(r"near.?duplicate", re.IGNORECASE), re.compile(r"(very|too) similar.{0,20}(claim|title|existing)", re.IGNORECASE), re.compile(r"duplicate.{0,20}(of|claim|title|existing|information)", re.IGNORECASE), re.compile(r"redundan", re.IGNORECASE), ], } def parse_issues(review_text: str) -> list[str]: """Extract issue tags from review. First tries structured comment with tag normalization. Falls back to keyword inference from prose. """ match = re.search(r"", review_text) if match: raw_tags = [tag.strip() for tag in match.group(1).split(",") if tag.strip()] normalized = [] for tag in raw_tags: norm = normalize_tag(tag) if norm and norm not in normalized: normalized.append(norm) else: logger.debug("Unrecognized issue tag '%s' — dropped", tag) if normalized: return normalized # Fallback: infer tags from review prose return infer_issues_from_prose(review_text) def infer_issues_from_prose(review_text: str) -> list[str]: """Infer issue tags from unstructured review text via keyword matching. Fallback for reviews that reject without structured tags. Conservative: requires at least one unambiguous keyword match per tag. """ inferred = [] for tag, patterns in _PROSE_TAG_PATTERNS.items(): if any(p.search(review_text) for p in patterns): inferred.append(tag) return inferred def classify_issues(issues: list[str]) -> str: """Classify issue tags as 'mechanical', 'substantive', or 'mixed'.""" if not issues: return "unknown" mechanical = set(issues) & config.MECHANICAL_ISSUE_TAGS substantive = set(issues) & config.SUBSTANTIVE_ISSUE_TAGS if substantive and not mechanical: return "substantive" if mechanical and not substantive: return "mechanical" if mechanical and substantive: return "mixed" return "unknown" # tags not in either set # ─── Batch response parsing ─────────────────────────────────────────────── def parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]: """Parse batched domain review into per-PR review sections. Returns {pr_number: review_text} for each PR found in the response. Missing PRs are omitted — caller handles fallback. """ agent_upper = agent.upper() result: dict[int, str] = {} # Split by PR verdict markers: # Each marker terminates the previous PR's section pattern = re.compile( r"" ) matches = list(pattern.finditer(response)) if not matches: return result for i, match in enumerate(matches): pr_num = int(match.group(1)) marker_end = match.end() # Find the start of this PR's section by looking for the section header # or the end of the previous verdict section_header = f"=== PR #{pr_num}" header_pos = response.rfind(section_header, 0, match.start()) if header_pos >= 0: # Extract from header to end of verdict marker section_text = response[header_pos:marker_end].strip() else: # No header found — extract from previous marker end to this marker end prev_end = matches[i - 1].end() if i > 0 else 0 section_text = response[prev_end:marker_end].strip() # Re-format as individual review comment # Strip the batch section header, keep just the review content # Add batch label for traceability pr_nums_str = ", ".join(f"#{n}" for n in pr_numbers) review_text = ( f"*(batch review with PRs {pr_nums_str})*\n\n" f"{section_text}\n" ) result[pr_num] = review_text return result def validate_batch_fanout( parsed: dict[int, str], pr_diffs: list[dict], agent: str, ) -> tuple[dict[int, str], list[int]]: """Validate batch fan-out for completeness and cross-contamination. Returns (valid_reviews, fallback_pr_numbers). - valid_reviews: reviews that passed validation - fallback_pr_numbers: PRs that need individual review (missing or cross-contaminated) """ valid: dict[int, str] = {} fallback: list[int] = [] # Build file map: pr_number → set of path segments for matching. # Use full paths (e.g., "domains/internet-finance/dao.md") not bare filenames # to avoid false matches on short names like "dao.md" or "space.md" (Leo note #3). pr_files: dict[int, set[str]] = {} for pr in pr_diffs: files = set() for line in pr["diff"].split("\n"): if line.startswith("diff --git a/"): path = line.replace("diff --git a/", "").split(" b/")[0] files.add(path) # Also add the last 2 path segments (e.g., "internet-finance/dao.md") # for models that abbreviate paths parts = path.split("/") if len(parts) >= 2: files.add("/".join(parts[-2:])) pr_files[pr["number"]] = files for pr in pr_diffs: pr_num = pr["number"] # Completeness check: is there a review for this PR? if pr_num not in parsed: logger.warning("Batch fan-out: PR #%d missing from response — fallback to individual", pr_num) fallback.append(pr_num) continue review = parsed[pr_num] # Cross-contamination check: does review mention at least one file from this PR? # Use path segments (min 10 chars) to avoid false substring matches on short names. my_files = pr_files.get(pr_num, set()) mentions_own_file = any(f in review for f in my_files if len(f) >= 10) if not mentions_own_file and my_files: # Check if it references files from OTHER PRs (cross-contamination signal) other_files = set() for other_pr in pr_diffs: if other_pr["number"] != pr_num: other_files.update(pr_files.get(other_pr["number"], set())) mentions_other = any(f in review for f in other_files if len(f) >= 10) if mentions_other: logger.warning( "Batch fan-out: PR #%d review references files from another PR — cross-contamination, fallback", pr_num, ) fallback.append(pr_num) continue # If it doesn't mention any files at all, could be a generic review — accept it # (some PRs have short diffs where the model doesn't reference filenames) valid[pr_num] = review return valid, fallback