"""Validate stage — Tier 0 deterministic validation gate. Ported from tier0-gate.py + validate_claims.py. Pure Python, no LLM calls. Validates claim frontmatter, title format, wiki links, domain-directory match, proposition heuristic, universal quantifiers, near-duplicate detection. Runs against PRs with status 'open' that have tier0_pass IS NULL. Posts results as PR comments. In gate mode, sets tier0_pass = 0/1. """ import json import logging import re from datetime import date, datetime, timezone from difflib import SequenceMatcher from pathlib import Path from . import config, db logger = logging.getLogger("pipeline.validate") # ─── Constants ────────────────────────────────────────────────────────────── VALID_DOMAINS = frozenset( { "internet-finance", "entertainment", "health", "ai-alignment", "space-development", "grand-strategy", "mechanisms", "living-capital", "living-agents", "teleohumanity", "critical-systems", "collective-intelligence", "teleological-economics", "cultural-dynamics", } ) VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"}) VALID_TYPES = frozenset({"claim", "framework"}) REQUIRED_FIELDS = ("type", "domain", "description", "confidence", "source", "created") DATE_MIN = date(2020, 1, 1) WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]") DEDUP_THRESHOLD = 0.85 # Proposition heuristic patterns _STRONG_SIGNALS = re.compile( r"\b(because|therefore|however|although|despite|since|" r"rather than|instead of|not just|more than|less than|" r"by\b|through\b|via\b|without\b|" r"when\b|where\b|while\b|if\b|unless\b|" r"which\b|that\b|" r"is\b|are\b|was\b|were\b|will\b|would\b|" r"can\b|could\b|should\b|must\b|" r"has\b|have\b|had\b|does\b|did\b)", re.IGNORECASE, ) _VERB_ENDINGS = re.compile( r"\b\w{2,}(ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns|ps|ts|rs|ns|ds)\b", re.IGNORECASE, ) _UNIVERSAL_QUANTIFIERS = re.compile( r"\b(all|every|always|never|no one|nobody|nothing|none of|" r"the only|the fundamental|the sole|the single|" r"universally|invariably|without exception|in every case)\b", re.IGNORECASE, ) _SCOPING_LANGUAGE = re.compile( r"\b(when|if|under|given|assuming|provided|in cases where|" r"for .+ that|among|within|across|during|between|" r"approximately|roughly|nearly|most|many|often|typically|" r"tends? to|generally|usually|frequently)\b", re.IGNORECASE, ) # ─── YAML frontmatter parser ─────────────────────────────────────────────── def parse_frontmatter(text: str) -> tuple[dict | None, str]: """Extract YAML frontmatter and body from markdown text.""" if not text.startswith("---"): return None, text end = text.find("---", 3) if end == -1: return None, text raw = text[3:end] body = text[end + 3 :].strip() try: import yaml fm = yaml.safe_load(raw) if not isinstance(fm, dict): return None, body return fm, body except ImportError: pass except Exception: return None, body # Fallback: simple key-value parser fm = {} for line in raw.strip().split("\n"): line = line.strip() if not line or line.startswith("#"): continue if ":" not in line: continue key, _, val = line.partition(":") key = key.strip() val = val.strip().strip('"').strip("'") if val.lower() == "null" or val == "": val = None elif val.startswith("["): val = [v.strip().strip('"').strip("'") for v in val.strip("[]").split(",") if v.strip()] fm[key] = val return fm if fm else None, body # ─── Validators ───────────────────────────────────────────────────────────── def validate_schema(fm: dict) -> list[str]: """Check required fields and valid enums.""" violations = [] for field in REQUIRED_FIELDS: if field not in fm or fm[field] is None: violations.append(f"missing_field:{field}") ftype = fm.get("type") if ftype and ftype not in VALID_TYPES: violations.append(f"invalid_type:{ftype}") domain = fm.get("domain") if domain and domain not in VALID_DOMAINS: violations.append(f"invalid_domain:{domain}") confidence = fm.get("confidence") if confidence and confidence not in VALID_CONFIDENCE: violations.append(f"invalid_confidence:{confidence}") desc = fm.get("description") if isinstance(desc, str) and len(desc.strip()) < 10: violations.append("description_too_short") source = fm.get("source") if isinstance(source, str) and len(source.strip()) < 3: violations.append("source_too_short") return violations def validate_date(date_val) -> list[str]: """Validate created date.""" violations = [] if date_val is None: return ["missing_field:created"] parsed = None if isinstance(date_val, date): parsed = date_val elif isinstance(date_val, str): try: parsed = datetime.strptime(date_val, "%Y-%m-%d").date() except ValueError: return [f"invalid_date_format:{date_val}"] else: return [f"invalid_date_type:{type(date_val).__name__}"] today = date.today() if parsed > today: violations.append(f"future_date:{parsed}") if parsed < DATE_MIN: violations.append(f"date_before_2020:{parsed}") return violations def validate_title(filepath: str) -> list[str]: """Check filename follows prose-as-claim convention.""" violations = [] name = Path(filepath).stem normalized = name.replace("-", " ") if len(normalized) < 20: violations.append("title_too_short") words = normalized.split() if len(words) < 4: violations.append("title_too_few_words") cleaned = re.sub(r"[a-zA-Z0-9\s\-\.,'()%]", "", name) if cleaned: violations.append(f"title_special_chars:{cleaned[:20]}") return violations def validate_wiki_links(body: str, existing_claims: set[str]) -> list[str]: """Check that [[wiki links]] resolve to known claims.""" violations = [] for link in WIKI_LINK_RE.findall(body): if link.strip() and link.strip() not in existing_claims: violations.append(f"broken_wiki_link:{link.strip()[:80]}") return violations def validate_proposition(title: str) -> list[str]: """Check title reads as a proposition, not a label.""" normalized = title.replace("-", " ") words = normalized.split() n = len(words) if n < 4: return ["title_not_proposition:too short to be a disagreeable sentence"] if _STRONG_SIGNALS.search(normalized): return [] if _VERB_ENDINGS.search(normalized): return [] if n >= 8: return [] return ["title_not_proposition:no verb or connective found"] def validate_universal_quantifiers(title: str) -> list[str]: """Flag unscoped universal quantifiers (warning, not gate).""" universals = _UNIVERSAL_QUANTIFIERS.findall(title) if universals and not _SCOPING_LANGUAGE.search(title): return [f"unscoped_universal:{','.join(universals)}"] return [] def validate_domain_directory_match(filepath: str, fm: dict) -> list[str]: """Check file's directory matches its domain field.""" domain = fm.get("domain") if not domain: return [] parts = Path(filepath).parts for i, part in enumerate(parts): if part == "domains" and i + 1 < len(parts): dir_domain = parts[i + 1] if dir_domain != domain: secondary = fm.get("secondary_domains", []) if isinstance(secondary, str): secondary = [secondary] if dir_domain not in (secondary or []): return [f"domain_directory_mismatch:file in domains/{dir_domain}/ but domain field says '{domain}'"] break return [] def validate_description_not_title(title: str, description: str) -> list[str]: """Check description adds info beyond the title.""" if not description: return [] title_lower = title.lower().strip() desc_lower = description.lower().strip().rstrip(".") if desc_lower in title_lower or title_lower in desc_lower: return ["description_echoes_title"] ratio = SequenceMatcher(None, title_lower, desc_lower).ratio() if ratio > 0.75: return [f"description_too_similar:{ratio:.0%}"] return [] def find_near_duplicates(title: str, existing_claims: set[str]) -> list[str]: """Find near-duplicate titles using SequenceMatcher with word pre-filter.""" title_lower = title.lower() title_words = set(title_lower.split()[:6]) warnings = [] for existing in existing_claims: existing_lower = existing.lower() if len(title_words & set(existing_lower.split()[:6])) < 2: continue ratio = SequenceMatcher(None, title_lower, existing_lower).ratio() if ratio >= DEDUP_THRESHOLD: warnings.append(f"near_duplicate:{existing[:80]} (similarity={ratio:.2f})") return warnings # ─── Full Tier 0 validation ──────────────────────────────────────────────── def tier0_validate_claim(filepath: str, content: str, existing_claims: set[str]) -> dict: """Run full Tier 0 validation. Returns {filepath, passes, violations, warnings}.""" violations = [] warnings = [] fm, body = parse_frontmatter(content) if fm is None: return {"filepath": filepath, "passes": False, "violations": ["no_frontmatter"], "warnings": []} violations.extend(validate_schema(fm)) violations.extend(validate_date(fm.get("created"))) violations.extend(validate_title(filepath)) violations.extend(validate_wiki_links(body, existing_claims)) title = Path(filepath).stem violations.extend(validate_proposition(title)) warnings.extend(validate_universal_quantifiers(title)) violations.extend(validate_domain_directory_match(filepath, fm)) desc = fm.get("description", "") if isinstance(desc, str): warnings.extend(validate_description_not_title(title, desc)) warnings.extend(find_near_duplicates(title, existing_claims)) return {"filepath": filepath, "passes": len(violations) == 0, "violations": violations, "warnings": warnings} # ─── Diff parsing ────────────────────────────────────────────────────────── def extract_claim_files_from_diff(diff: str) -> dict[str, str]: """Parse unified diff to extract new/modified claim file contents.""" claim_dirs = ("domains/", "core/", "foundations/") files = {} current_file = None current_lines = [] is_deletion = False for line in diff.split("\n"): if line.startswith("diff --git"): if current_file and not is_deletion: files[current_file] = "\n".join(current_lines) current_file = None current_lines = [] is_deletion = False elif line.startswith("deleted file mode") or line.startswith("+++ /dev/null"): is_deletion = True current_file = None elif line.startswith("+++ b/") and not is_deletion: path = line[6:] basename = path.rsplit("/", 1)[-1] if "/" in path else path if any(path.startswith(d) for d in claim_dirs) and path.endswith(".md") and not basename.startswith("_"): current_file = path elif current_file and line.startswith("+") and not line.startswith("+++"): current_lines.append(line[1:]) if current_file and not is_deletion: files[current_file] = "\n".join(current_lines) return files # ─── Forgejo API (using merge module's helper) ───────────────────────────── async def _forgejo_api(method: str, path: str, body: dict = None): """Call Forgejo API. Reuses merge module pattern.""" import aiohttp url = f"{config.FORGEJO_URL}/api/v1{path}" token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} try: async with aiohttp.ClientSession() as session: async with session.request( method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=30) ) as resp: if resp.status >= 400: text = await resp.text() logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) return None if resp.status == 204: return {} return await resp.json() except Exception as e: logger.error("Forgejo API error: %s %s → %s", method, path, e) return None async def _get_pr_diff(pr_number: int) -> str: """Fetch PR diff via Forgejo API.""" import aiohttp url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff" token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" headers = {"Authorization": f"token {token}", "Accept": "text/plain"} try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp: if resp.status >= 400: return "" diff = await resp.text() if len(diff) > 2_000_000: return "" # Too large return diff except Exception as e: logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e) return "" async def _get_pr_head_sha(pr_number: int) -> str: """Get HEAD SHA of PR's branch.""" pr_info = await _forgejo_api( "GET", f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}", ) if pr_info: return pr_info.get("head", {}).get("sha", "") return "" async def _has_tier0_comment(pr_number: int, head_sha: str) -> bool: """Check if we already validated this exact commit.""" if not head_sha: return False # Paginate comments (Ganymede standing rule) page = 1 while True: comments = await _forgejo_api( "GET", f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments?limit=50&page={page}", ) if not comments: break marker = f"" for c in comments: if marker in c.get("body", ""): return True if len(comments) < 50: break page += 1 return False async def _post_validation_comment(pr_number: int, results: list[dict], head_sha: str): """Post Tier 0 validation results as PR comment.""" all_pass = all(r["passes"] for r in results) total = len(results) passing = sum(1 for r in results if r["passes"]) marker = f"" if head_sha else "" status = "PASS" if all_pass else "FAIL" lines = [ marker, f"**Tier 0 Validation: {status}** — {passing}/{total} claims pass\n", ] for r in results: icon = "pass" if r["passes"] else "FAIL" short_path = r["filepath"].split("/", 1)[-1] if "/" in r["filepath"] else r["filepath"] lines.append(f"**[{icon}]** `{short_path}`") for v in r["violations"]: lines.append(f" - {v}") for w in r["warnings"]: lines.append(f" - (warn) {w}") lines.append("") if not all_pass: lines.append("---") lines.append("Fix the violations above and push to trigger re-validation.") lines.append(f"\n*tier0-gate v2 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*") await _forgejo_api( "POST", f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", {"body": "\n".join(lines)}, ) # ─── Existing claims index ───────────────────────────────────────────────── def load_existing_claims() -> set[str]: """Build set of known claim titles from the main worktree.""" claims: set[str] = set() base = config.MAIN_WORKTREE for subdir in ["domains", "core", "foundations", "maps", "agents", "schemas"]: full = base / subdir if not full.is_dir(): continue for f in full.rglob("*.md"): claims.add(f.stem) return claims # ─── Main entry point ────────────────────────────────────────────────────── async def validate_pr(conn, pr_number: int) -> dict: """Run Tier 0 validation on a single PR. Returns {pr, all_pass, total, passing, skipped, reason}. """ # Get HEAD SHA for idempotency head_sha = await _get_pr_head_sha(pr_number) # Skip if already validated for this commit if await _has_tier0_comment(pr_number, head_sha): logger.debug("PR #%d already validated at %s", pr_number, head_sha[:8]) return {"pr": pr_number, "skipped": True, "reason": "already_validated"} # Fetch diff diff = await _get_pr_diff(pr_number) if not diff: logger.debug("PR #%d: empty or oversized diff", pr_number) return {"pr": pr_number, "skipped": True, "reason": "no_diff"} # Extract claim files claim_files = extract_claim_files_from_diff(diff) if not claim_files: logger.debug("PR #%d: no claim files in diff", pr_number) return {"pr": pr_number, "skipped": True, "reason": "no_claims"} # Load existing claims index existing_claims = load_existing_claims() # Validate each claim results = [] for filepath, content in claim_files.items(): result = tier0_validate_claim(filepath, content, existing_claims) results.append(result) status = "PASS" if result["passes"] else "FAIL" logger.debug("PR #%d: %s %s v=%s w=%s", pr_number, status, filepath, result["violations"], result["warnings"]) all_pass = all(r["passes"] for r in results) total = len(results) passing = sum(1 for r in results if r["passes"]) logger.info("PR #%d: Tier 0 — %d/%d pass, all_pass=%s", pr_number, passing, total, all_pass) # Post comment await _post_validation_comment(pr_number, results, head_sha) # Update PR record conn.execute( "UPDATE prs SET tier0_pass = ? WHERE number = ?", (1 if all_pass else 0, pr_number), ) db.audit( conn, "validate", "tier0_complete", json.dumps({"pr": pr_number, "pass": all_pass, "passing": passing, "total": total}), ) return {"pr": pr_number, "all_pass": all_pass, "total": total, "passing": passing} async def validate_cycle(conn, max_workers=None) -> tuple[int, int]: """Run one validation cycle. Finds PRs with status='open' and tier0_pass IS NULL, validates them. """ # Find unvalidated PRs (priority ordered) rows = conn.execute( """SELECT p.number FROM prs p LEFT JOIN sources s ON p.source_path = s.path WHERE p.status = 'open' AND p.tier0_pass IS NULL ORDER BY CASE COALESCE(p.priority, s.priority, 'medium') WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END, p.created_at ASC LIMIT ?""", (max_workers or 10,), ).fetchall() if not rows: return 0, 0 succeeded = 0 failed = 0 for row in rows: try: result = await validate_pr(conn, row["number"]) if result.get("skipped"): # Mark as validated even if skipped (no claims = pass) conn.execute( "UPDATE prs SET tier0_pass = 1 WHERE number = ? AND tier0_pass IS NULL", (row["number"],), ) succeeded += 1 elif result.get("all_pass"): succeeded += 1 else: succeeded += 1 # Validation ran successfully, even if claims failed except Exception: logger.exception("Failed to validate PR #%d", row["number"]) failed += 1 if succeeded or failed: logger.info("Validate cycle: %d validated, %d errors", succeeded, failed) return succeeded, failed