#!/usr/bin/env python3 """tier0-gate.py — Tier 0 deterministic validation gate for teleo-codex PRs. Validates all claim files in a PR against mechanical quality checks. Runs in two modes: - shadow: log results + post informational comment, don't block - gate: log results + post comment + return nonzero if failures (blocks eval dispatch) Usage: python3 tier0-gate.py [--mode shadow|gate] [--repo-dir /path/to/repo] Designed to be called by eval-dispatcher.sh before dispatching eval-worker. """ import json import os import re import sys from datetime import datetime, timezone from difflib import SequenceMatcher from pathlib import Path from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen # ─── Config ───────────────────────────────────────────────────────────────── FORGEJO_URL = os.environ.get("FORGEJO_URL", "https://git.livingip.xyz") FORGEJO_OWNER = os.environ.get("FORGEJO_OWNER", "teleo") FORGEJO_REPO = os.environ.get("FORGEJO_REPO", "teleo-codex") FORGEJO_TOKEN_FILE = os.environ.get( "FORGEJO_TOKEN_FILE", "/opt/teleo-eval/secrets/forgejo-admin-token" ) REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main") LOG_DIR = os.environ.get("LOG_DIR", "/opt/teleo-eval/logs") DEDUP_THRESHOLD = 0.85 # Import validate_claims from same directory sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from validate_claims import ( VALID_DOMAINS, WIKI_LINK_RE, load_existing_claims, parse_frontmatter, validate_claim, ) # ─── New Tier 0 checks (beyond existing validate_claims.py) ──────────────── def _normalize_title(raw_title: str) -> str: """Normalize a filename-style title to readable form (hyphens → spaces).""" return raw_title.replace("-", " ") # Strong proposition signals (connectives, subordinators, be-verbs, modals) _STRONG_SIGNALS = re.compile( r"\b(because|therefore|however|although|despite|since|" r"rather than|instead of|not just|more than|less than|" r"by\b|through\b|via\b|without\b|" r"when\b|where\b|while\b|if\b|unless\b|" r"which\b|that\b|" r"is\b|are\b|was\b|were\b|will\b|would\b|" r"can\b|could\b|should\b|must\b|" r"has\b|have\b|had\b|does\b|did\b)", re.IGNORECASE, ) # Verb-like word endings (past tense, gerund, 3rd person) _VERB_ENDINGS = re.compile( r"\b\w{2,}(ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns|ps|ts|rs|ns|ds)\b", re.IGNORECASE, ) # Universal quantifiers that signal unscoped claims _UNIVERSAL_QUANTIFIERS = re.compile( r"\b(all|every|always|never|no one|nobody|nothing|none of|" r"the only|the fundamental|the sole|the single|" r"universally|invariably|without exception|in every case)\b", re.IGNORECASE, ) # Scoping language that makes universals acceptable _SCOPING_LANGUAGE = re.compile( r"\b(when|if|under|given|assuming|provided|in cases where|" r"for .+ that|among|within|across|during|between|" r"approximately|roughly|nearly|most|many|often|typically|" r"tends? to|generally|usually|frequently)\b", re.IGNORECASE, ) def validate_proposition(title: str) -> list[str]: """Check that the title reads as a proposition, not a label. Uses a tiered approach: - Short titles (<4 words): almost certainly labels → fail - Medium titles (4-7 words): must contain a verb/connective signal - Long titles (8+ words): benefit of the doubt (almost always propositions) """ violations = [] normalized = _normalize_title(title) words = normalized.split() n = len(words) if n < 4: violations.append( "title_not_proposition:too short to be a disagreeable sentence" ) return violations # Check for strong signals (connectives, be-verbs, modals) if _STRONG_SIGNALS.search(normalized): return violations # Check for verb-like endings if _VERB_ENDINGS.search(normalized): return violations # Long titles get benefit of the doubt if n >= 8: return violations violations.append( "title_not_proposition:no verb or connective found — " "title should be a disagreeable sentence, not a label" ) return violations def validate_universal_quantifiers(title: str) -> list[str]: """Flag unscoped universal quantifiers in title.""" violations = [] universals = _UNIVERSAL_QUANTIFIERS.findall(title) if universals: # Check if there's also scoping language has_scope = bool(_SCOPING_LANGUAGE.search(title)) if not has_scope: violations.append( f"unscoped_universal:{','.join(universals)} — " f"add scoping language or qualify the claim" ) return violations def validate_domain_directory_match(filepath: str, frontmatter: dict) -> list[str]: """Check that the file's directory matches its domain field.""" violations = [] domain = frontmatter.get("domain") if not domain: return violations # missing_field:domain already caught by schema check # Extract directory domain from filepath # e.g., domains/internet-finance/foo.md → internet-finance parts = Path(filepath).parts for i, part in enumerate(parts): if part == "domains" and i + 1 < len(parts): dir_domain = parts[i + 1] if dir_domain != domain: # Check secondary_domains before flagging secondary = frontmatter.get("secondary_domains", []) if isinstance(secondary, str): secondary = [secondary] if dir_domain not in (secondary or []): violations.append( f"domain_directory_mismatch:file in domains/{dir_domain}/ " f"but domain field says '{domain}'" ) break return violations def find_near_duplicates( title: str, existing_claims: set[str], threshold: float = DEDUP_THRESHOLD ) -> list[str]: """Find near-duplicate claim titles using SequenceMatcher with word pre-filter.""" title_lower = title.lower() title_words = set(title_lower.split()[:6]) duplicates = [] for existing in existing_claims: existing_lower = existing.lower() # Quick reject: must share at least 2 words from first 6 existing_words = set(existing_lower.split()[:6]) if len(title_words & existing_words) < 2: continue ratio = SequenceMatcher(None, title_lower, existing_lower).ratio() if ratio >= threshold: duplicates.append(f"near_duplicate:{existing[:80]} (similarity={ratio:.2f})") return duplicates def validate_description_not_title(title: str, description: str) -> list[str]: """Check description adds info beyond the title (not just a shorter version).""" violations = [] if not description: return violations # missing field already caught title_lower = title.lower().strip() desc_lower = description.lower().strip().rstrip(".") # Check if description is a substring of title or vice versa if desc_lower in title_lower or title_lower in desc_lower: violations.append("description_echoes_title:description should add context beyond the title") # Check if too similar via SequenceMatcher ratio = SequenceMatcher(None, title_lower, desc_lower).ratio() if ratio > 0.75: violations.append(f"description_too_similar:description is {ratio:.0%} similar to title") return violations # ─── Full Tier 0 validation ──────────────────────────────────────────────── def tier0_validate_claim( filepath: str, content: str, existing_claims: set[str], ) -> dict: """Run full Tier 0 validation on a claim file. Returns dict with: - filepath: str - passes: bool - violations: list[str] - warnings: list[str] (non-blocking issues) """ violations = [] warnings = [] # Parse content fm, body = parse_frontmatter(content) if fm is None: return { "filepath": filepath, "passes": False, "violations": ["no_frontmatter"], "warnings": [], } # Run existing validate_claims checks (schema, date, title length, wiki links) # We inline this rather than calling validate_claim() because we already have # the content parsed and want to separate violations from warnings from validate_claims import validate_schema, validate_date, validate_title, validate_wiki_links violations.extend(validate_schema(fm)) violations.extend(validate_date(fm.get("created"))) violations.extend(validate_title(filepath)) violations.extend(validate_wiki_links(body, existing_claims)) # New Tier 0 checks title = Path(filepath).stem # Proposition heuristic violations.extend(validate_proposition(title)) # Universal quantifier check uq_violations = validate_universal_quantifiers(title) # Unscoped universals are warnings, not hard failures (judgment call) warnings.extend(uq_violations) # Domain-directory match violations.extend(validate_domain_directory_match(filepath, fm)) # Description quality desc = fm.get("description", "") if isinstance(desc, str): warnings.extend(validate_description_not_title(title, desc)) # Near-duplicate detection (warning, not gate — per Ganymede's recommendation) dup_results = find_near_duplicates(title, existing_claims) warnings.extend(dup_results) passes = len(violations) == 0 return { "filepath": filepath, "passes": passes, "violations": violations, "warnings": warnings, } # ─── Forgejo API helpers ─────────────────────────────────────────────────── def load_token() -> str: return Path(FORGEJO_TOKEN_FILE).read_text().strip() def api_get(token: str, endpoint: str, accept: str = "application/json"): url = f"{FORGEJO_URL}/api/v1/{endpoint}" req = Request(url, headers={"Authorization": f"token {token}", "Accept": accept}) with urlopen(req, timeout=60) as resp: data = resp.read().decode("utf-8", errors="replace") if accept == "application/json": return json.loads(data) return data def api_post(token: str, endpoint: str, body: dict): url = f"{FORGEJO_URL}/api/v1/{endpoint}" data = json.dumps(body).encode("utf-8") req = Request( url, data=data, headers={ "Authorization": f"token {token}", "Content-Type": "application/json", }, method="POST", ) with urlopen(req, timeout=30) as resp: return json.loads(resp.read()) def get_pr_diff(token: str, pr_num: int) -> str: """Fetch PR diff, with 2MB size cap.""" try: diff = api_get( token, f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/pulls/{pr_num}.diff", accept="text/plain", ) if len(diff) > 2_000_000: return "" # Too large for mechanical triage return diff except (HTTPError, URLError): return "" def extract_claim_files_from_diff(diff: str) -> dict[str, str]: """Parse unified diff to extract new/modified claim file contents. Returns {filepath: content} for files under domains/, core/, foundations/. Skips deleted files (no content to validate). """ claim_dirs = ("domains/", "core/", "foundations/") files = {} current_file = None current_lines = [] is_deletion = False for line in diff.split("\n"): if line.startswith("diff --git"): # Save previous file (unless it was a deletion) if current_file and not is_deletion: files[current_file] = "\n".join(current_lines) current_file = None current_lines = [] is_deletion = False elif line.startswith("deleted file mode") or line.startswith("+++ /dev/null"): is_deletion = True current_file = None # Don't validate deleted files elif line.startswith("+++ b/") and not is_deletion: path = line[6:] basename = path.rsplit("/", 1)[-1] if "/" in path else path # Only validate claim files — skip _map.md, _index.md, and non-.md files if (any(path.startswith(d) for d in claim_dirs) and path.endswith(".md") and not basename.startswith("_")): current_file = path elif current_file and line.startswith("+") and not line.startswith("+++"): current_lines.append(line[1:]) # Strip the leading + # Save last file if current_file and not is_deletion: files[current_file] = "\n".join(current_lines) return files def get_pr_head_sha(token: str, pr_num: int) -> str: """Get the current HEAD SHA of a PR's branch.""" try: pr_info = api_get( token, f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/pulls/{pr_num}", ) return pr_info.get("head", {}).get("sha", "") except (HTTPError, URLError): return "" def has_tier0_comment(token: str, pr_num: int, head_sha: str) -> bool: """Check if we already posted a Tier 0 comment for this exact commit. Uses SHA-based marker so force-pushes trigger re-validation. """ if not head_sha: return False try: comments = api_get( token, f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/issues/{pr_num}/comments?limit=50", ) marker = f"" for c in comments: if marker in c.get("body", ""): return True except (HTTPError, URLError): pass return False def post_tier0_comment(token: str, pr_num: int, results: list[dict], mode: str, head_sha: str = ""): """Post validation results as a Forgejo comment.""" all_pass = all(r["passes"] for r in results) total = len(results) passing = sum(1 for r in results if r["passes"]) # SHA-based marker for idempotency — force-pushes trigger re-validation marker = f"" if head_sha else "" lines = [marker] if mode == "shadow": lines.append(f"**Tier 0 Validation (shadow mode)** — {passing}/{total} claims pass\n") else: status = "PASS" if all_pass else "FAIL" lines.append(f"**Tier 0 Validation: {status}** — {passing}/{total} claims pass\n") for r in results: icon = "pass" if r["passes"] else "FAIL" short_path = r["filepath"].split("/", 1)[-1] if "/" in r["filepath"] else r["filepath"] lines.append(f"**[{icon}]** `{short_path}`") if r["violations"]: for v in r["violations"]: lines.append(f" - {v}") if r["warnings"]: for w in r["warnings"]: lines.append(f" - (warn) {w}") lines.append("") if not all_pass and mode == "gate": lines.append("---") lines.append("Fix the violations above and push to trigger re-validation.") elif not all_pass and mode == "shadow": lines.append("---") lines.append("*Shadow mode — these results are informational only. " "This PR will proceed to evaluation regardless.*") lines.append(f"\n*tier0-gate v1 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*") body = "\n".join(lines) try: api_post( token, f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/issues/{pr_num}/comments", {"body": body}, ) except (HTTPError, URLError) as e: log(f"WARN: Failed to post Tier 0 comment on PR #{pr_num}: {e}") # ─── Logging ─────────────────────────────────────────────────────────────── def log(msg: str): ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") line = f"[{ts}] [tier0] {msg}" print(line, file=sys.stderr) # Also append to log file log_file = os.path.join(LOG_DIR, "tier0-gate.log") try: with open(log_file, "a") as f: f.write(line + "\n") except OSError: pass # ─── Main ────────────────────────────────────────────────────────────────── def validate_pr(pr_num: int, mode: str = "shadow") -> dict: """Run Tier 0 validation on all claim files in a PR. Returns: { "pr": int, "mode": str, "all_pass": bool, "total": int, "passing": int, "results": [...], "has_claims": bool, } """ token = load_token() # Get PR HEAD SHA for idempotency (re-validates on force-push) head_sha = get_pr_head_sha(token, pr_num) # Check if already validated for this exact commit if has_tier0_comment(token, pr_num, head_sha): log(f"PR #{pr_num}: already validated at {head_sha[:8]}, skipping") return {"pr": pr_num, "mode": mode, "skipped": True, "reason": "already_validated"} # Get PR diff diff = get_pr_diff(token, pr_num) if not diff: log(f"PR #{pr_num}: empty or oversized diff, skipping Tier 0") return {"pr": pr_num, "mode": mode, "skipped": True, "reason": "no_diff"} # Extract claim files from diff claim_files = extract_claim_files_from_diff(diff) if not claim_files: log(f"PR #{pr_num}: no claim files in diff, skipping Tier 0") return {"pr": pr_num, "mode": mode, "skipped": True, "reason": "no_claims"} # Load existing claims index existing_claims = load_existing_claims(REPO_DIR) # Validate each claim results = [] for filepath, content in claim_files.items(): result = tier0_validate_claim(filepath, content, existing_claims) results.append(result) status = "PASS" if result["passes"] else "FAIL" log(f"PR #{pr_num}: {status} {filepath} violations={result['violations']} warnings={result['warnings']}") all_pass = all(r["passes"] for r in results) total = len(results) passing = sum(1 for r in results if r["passes"]) log(f"PR #{pr_num}: Tier 0 {mode} — {passing}/{total} pass, all_pass={all_pass}") # Post comment on PR (with SHA marker for idempotency) post_tier0_comment(token, pr_num, results, mode, head_sha=head_sha) # Log structured result output = { "pr": pr_num, "mode": mode, "all_pass": all_pass, "total": total, "passing": passing, "results": results, "has_claims": True, "ts": datetime.now(timezone.utc).isoformat(), } # Append to structured log try: with open(os.path.join(LOG_DIR, "tier0-results.jsonl"), "a") as f: f.write(json.dumps(output) + "\n") except OSError: pass return output def main(): import argparse parser = argparse.ArgumentParser(description="Tier 0 validation gate for PRs") parser.add_argument("pr_num", type=int, help="PR number to validate") parser.add_argument("--mode", choices=["shadow", "gate"], default="shadow", help="shadow = log only, gate = block on failure") parser.add_argument("--repo-dir", default=None, help="Path to repo clone (for existing claims index)") parser.add_argument("--json", action="store_true", help="Output JSON result to stdout") args = parser.parse_args() if args.repo_dir: global REPO_DIR REPO_DIR = args.repo_dir result = validate_pr(args.pr_num, mode=args.mode) if args.json: print(json.dumps(result, indent=2)) # Exit code: 0 = pass or shadow mode, 1 = gate mode + failures if args.mode == "gate" and result.get("all_pass") is False: sys.exit(1) sys.exit(0) if __name__ == "__main__": main()