Some checks are pending
CI / lint-and-test (push) Waiting to run
Move scattered root-level files into categorized directories: - deploy/ — deployment + mirror scripts (Ship) - scripts/ — one-off backfills + migrations (Ship) - research/ — nightly research + prompts (Ship) - docs/ — all operational documentation (shared) Delete 3 dead cron scripts replaced by pipeline daemon: - batch-extract-50.sh, evaluate-trigger.sh, extract-cron.sh Add CODEOWNERS mapping every path to its owning agent. Add README with directory structure, ownership table, and VPS layout. Update deploy.sh paths to match new structure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
581 lines
20 KiB
Python
Executable file
581 lines
20 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""tier0-gate.py — Tier 0 deterministic validation gate for teleo-codex PRs.
|
|
|
|
Validates all claim files in a PR against mechanical quality checks.
|
|
Runs in two modes:
|
|
- shadow: log results + post informational comment, don't block
|
|
- gate: log results + post comment + return nonzero if failures (blocks eval dispatch)
|
|
|
|
Usage:
|
|
python3 tier0-gate.py <PR_NUM> [--mode shadow|gate] [--repo-dir /path/to/repo]
|
|
|
|
Designed to be called by eval-dispatcher.sh before dispatching eval-worker.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from difflib import SequenceMatcher
|
|
from pathlib import Path
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
# ─── Config ─────────────────────────────────────────────────────────────────
|
|
|
|
FORGEJO_URL = os.environ.get("FORGEJO_URL", "https://git.livingip.xyz")
|
|
FORGEJO_OWNER = os.environ.get("FORGEJO_OWNER", "teleo")
|
|
FORGEJO_REPO = os.environ.get("FORGEJO_REPO", "teleo-codex")
|
|
FORGEJO_TOKEN_FILE = os.environ.get(
|
|
"FORGEJO_TOKEN_FILE", "/opt/teleo-eval/secrets/forgejo-admin-token"
|
|
)
|
|
REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")
|
|
LOG_DIR = os.environ.get("LOG_DIR", "/opt/teleo-eval/logs")
|
|
DEDUP_THRESHOLD = 0.85
|
|
|
|
# Import validate_claims from same directory
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from validate_claims import (
|
|
VALID_DOMAINS,
|
|
WIKI_LINK_RE,
|
|
load_existing_claims,
|
|
parse_frontmatter,
|
|
validate_claim,
|
|
)
|
|
|
|
|
|
# ─── New Tier 0 checks (beyond existing validate_claims.py) ────────────────
|
|
|
|
|
|
def _normalize_title(raw_title: str) -> str:
|
|
"""Normalize a filename-style title to readable form (hyphens → spaces)."""
|
|
return raw_title.replace("-", " ")
|
|
|
|
|
|
# Strong proposition signals (connectives, subordinators, be-verbs, modals)
|
|
_STRONG_SIGNALS = re.compile(
|
|
r"\b(because|therefore|however|although|despite|since|"
|
|
r"rather than|instead of|not just|more than|less than|"
|
|
r"by\b|through\b|via\b|without\b|"
|
|
r"when\b|where\b|while\b|if\b|unless\b|"
|
|
r"which\b|that\b|"
|
|
r"is\b|are\b|was\b|were\b|will\b|would\b|"
|
|
r"can\b|could\b|should\b|must\b|"
|
|
r"has\b|have\b|had\b|does\b|did\b)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Verb-like word endings (past tense, gerund, 3rd person)
|
|
_VERB_ENDINGS = re.compile(
|
|
r"\b\w{2,}(ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns|ps|ts|rs|ns|ds)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Universal quantifiers that signal unscoped claims
|
|
_UNIVERSAL_QUANTIFIERS = re.compile(
|
|
r"\b(all|every|always|never|no one|nobody|nothing|none of|"
|
|
r"the only|the fundamental|the sole|the single|"
|
|
r"universally|invariably|without exception|in every case)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Scoping language that makes universals acceptable
|
|
_SCOPING_LANGUAGE = re.compile(
|
|
r"\b(when|if|under|given|assuming|provided|in cases where|"
|
|
r"for .+ that|among|within|across|during|between|"
|
|
r"approximately|roughly|nearly|most|many|often|typically|"
|
|
r"tends? to|generally|usually|frequently)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def validate_proposition(title: str) -> list[str]:
|
|
"""Check that the title reads as a proposition, not a label.
|
|
|
|
Uses a tiered approach:
|
|
- Short titles (<4 words): almost certainly labels → fail
|
|
- Medium titles (4-7 words): must contain a verb/connective signal
|
|
- Long titles (8+ words): benefit of the doubt (almost always propositions)
|
|
"""
|
|
violations = []
|
|
normalized = _normalize_title(title)
|
|
words = normalized.split()
|
|
n = len(words)
|
|
|
|
if n < 4:
|
|
violations.append(
|
|
"title_not_proposition:too short to be a disagreeable sentence"
|
|
)
|
|
return violations
|
|
|
|
# Check for strong signals (connectives, be-verbs, modals)
|
|
if _STRONG_SIGNALS.search(normalized):
|
|
return violations
|
|
|
|
# Check for verb-like endings
|
|
if _VERB_ENDINGS.search(normalized):
|
|
return violations
|
|
|
|
# Long titles get benefit of the doubt
|
|
if n >= 8:
|
|
return violations
|
|
|
|
violations.append(
|
|
"title_not_proposition:no verb or connective found — "
|
|
"title should be a disagreeable sentence, not a label"
|
|
)
|
|
return violations
|
|
|
|
|
|
def validate_universal_quantifiers(title: str) -> list[str]:
|
|
"""Flag unscoped universal quantifiers in title."""
|
|
violations = []
|
|
universals = _UNIVERSAL_QUANTIFIERS.findall(title)
|
|
if universals:
|
|
# Check if there's also scoping language
|
|
has_scope = bool(_SCOPING_LANGUAGE.search(title))
|
|
if not has_scope:
|
|
violations.append(
|
|
f"unscoped_universal:{','.join(universals)} — "
|
|
f"add scoping language or qualify the claim"
|
|
)
|
|
return violations
|
|
|
|
|
|
def validate_domain_directory_match(filepath: str, frontmatter: dict) -> list[str]:
|
|
"""Check that the file's directory matches its domain field."""
|
|
violations = []
|
|
domain = frontmatter.get("domain")
|
|
if not domain:
|
|
return violations # missing_field:domain already caught by schema check
|
|
|
|
# Extract directory domain from filepath
|
|
# e.g., domains/internet-finance/foo.md → internet-finance
|
|
parts = Path(filepath).parts
|
|
for i, part in enumerate(parts):
|
|
if part == "domains" and i + 1 < len(parts):
|
|
dir_domain = parts[i + 1]
|
|
if dir_domain != domain:
|
|
# Check secondary_domains before flagging
|
|
secondary = frontmatter.get("secondary_domains", [])
|
|
if isinstance(secondary, str):
|
|
secondary = [secondary]
|
|
if dir_domain not in (secondary or []):
|
|
violations.append(
|
|
f"domain_directory_mismatch:file in domains/{dir_domain}/ "
|
|
f"but domain field says '{domain}'"
|
|
)
|
|
break
|
|
return violations
|
|
|
|
|
|
def find_near_duplicates(
|
|
title: str, existing_claims: set[str], threshold: float = DEDUP_THRESHOLD
|
|
) -> list[str]:
|
|
"""Find near-duplicate claim titles using SequenceMatcher with word pre-filter."""
|
|
title_lower = title.lower()
|
|
title_words = set(title_lower.split()[:6])
|
|
duplicates = []
|
|
for existing in existing_claims:
|
|
existing_lower = existing.lower()
|
|
# Quick reject: must share at least 2 words from first 6
|
|
existing_words = set(existing_lower.split()[:6])
|
|
if len(title_words & existing_words) < 2:
|
|
continue
|
|
ratio = SequenceMatcher(None, title_lower, existing_lower).ratio()
|
|
if ratio >= threshold:
|
|
duplicates.append(f"near_duplicate:{existing[:80]} (similarity={ratio:.2f})")
|
|
return duplicates
|
|
|
|
|
|
def validate_description_not_title(title: str, description: str) -> list[str]:
|
|
"""Check description adds info beyond the title (not just a shorter version)."""
|
|
violations = []
|
|
if not description:
|
|
return violations # missing field already caught
|
|
|
|
title_lower = title.lower().strip()
|
|
desc_lower = description.lower().strip().rstrip(".")
|
|
|
|
# Check if description is a substring of title or vice versa
|
|
if desc_lower in title_lower or title_lower in desc_lower:
|
|
violations.append("description_echoes_title:description should add context beyond the title")
|
|
|
|
# Check if too similar via SequenceMatcher
|
|
ratio = SequenceMatcher(None, title_lower, desc_lower).ratio()
|
|
if ratio > 0.75:
|
|
violations.append(f"description_too_similar:description is {ratio:.0%} similar to title")
|
|
|
|
return violations
|
|
|
|
|
|
# ─── Full Tier 0 validation ────────────────────────────────────────────────
|
|
|
|
def tier0_validate_claim(
|
|
filepath: str,
|
|
content: str,
|
|
existing_claims: set[str],
|
|
) -> dict:
|
|
"""Run full Tier 0 validation on a claim file.
|
|
|
|
Returns dict with:
|
|
- filepath: str
|
|
- passes: bool
|
|
- violations: list[str]
|
|
- warnings: list[str] (non-blocking issues)
|
|
"""
|
|
violations = []
|
|
warnings = []
|
|
|
|
# Parse content
|
|
fm, body = parse_frontmatter(content)
|
|
if fm is None:
|
|
return {
|
|
"filepath": filepath,
|
|
"passes": False,
|
|
"violations": ["no_frontmatter"],
|
|
"warnings": [],
|
|
}
|
|
|
|
# Run existing validate_claims checks (schema, date, title length, wiki links)
|
|
# We inline this rather than calling validate_claim() because we already have
|
|
# the content parsed and want to separate violations from warnings
|
|
from validate_claims import validate_schema, validate_date, validate_title, validate_wiki_links
|
|
|
|
violations.extend(validate_schema(fm))
|
|
violations.extend(validate_date(fm.get("created")))
|
|
violations.extend(validate_title(filepath))
|
|
violations.extend(validate_wiki_links(body, existing_claims))
|
|
|
|
# New Tier 0 checks
|
|
title = Path(filepath).stem
|
|
|
|
# Proposition heuristic
|
|
violations.extend(validate_proposition(title))
|
|
|
|
# Universal quantifier check
|
|
uq_violations = validate_universal_quantifiers(title)
|
|
# Unscoped universals are warnings, not hard failures (judgment call)
|
|
warnings.extend(uq_violations)
|
|
|
|
# Domain-directory match
|
|
violations.extend(validate_domain_directory_match(filepath, fm))
|
|
|
|
# Description quality
|
|
desc = fm.get("description", "")
|
|
if isinstance(desc, str):
|
|
warnings.extend(validate_description_not_title(title, desc))
|
|
|
|
# Near-duplicate detection (warning, not gate — per Ganymede's recommendation)
|
|
dup_results = find_near_duplicates(title, existing_claims)
|
|
warnings.extend(dup_results)
|
|
|
|
passes = len(violations) == 0
|
|
return {
|
|
"filepath": filepath,
|
|
"passes": passes,
|
|
"violations": violations,
|
|
"warnings": warnings,
|
|
}
|
|
|
|
|
|
# ─── Forgejo API helpers ───────────────────────────────────────────────────
|
|
|
|
def load_token() -> str:
|
|
return Path(FORGEJO_TOKEN_FILE).read_text().strip()
|
|
|
|
|
|
def api_get(token: str, endpoint: str, accept: str = "application/json"):
|
|
url = f"{FORGEJO_URL}/api/v1/{endpoint}"
|
|
req = Request(url, headers={"Authorization": f"token {token}", "Accept": accept})
|
|
with urlopen(req, timeout=60) as resp:
|
|
data = resp.read().decode("utf-8", errors="replace")
|
|
if accept == "application/json":
|
|
return json.loads(data)
|
|
return data
|
|
|
|
|
|
def api_post(token: str, endpoint: str, body: dict):
|
|
url = f"{FORGEJO_URL}/api/v1/{endpoint}"
|
|
data = json.dumps(body).encode("utf-8")
|
|
req = Request(
|
|
url,
|
|
data=data,
|
|
headers={
|
|
"Authorization": f"token {token}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
method="POST",
|
|
)
|
|
with urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def get_pr_diff(token: str, pr_num: int) -> str:
|
|
"""Fetch PR diff, with 2MB size cap."""
|
|
try:
|
|
diff = api_get(
|
|
token,
|
|
f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/pulls/{pr_num}.diff",
|
|
accept="text/plain",
|
|
)
|
|
if len(diff) > 2_000_000:
|
|
return "" # Too large for mechanical triage
|
|
return diff
|
|
except (HTTPError, URLError):
|
|
return ""
|
|
|
|
|
|
def extract_claim_files_from_diff(diff: str) -> dict[str, str]:
|
|
"""Parse unified diff to extract new/modified claim file contents.
|
|
|
|
Returns {filepath: content} for files under domains/, core/, foundations/.
|
|
Skips deleted files (no content to validate).
|
|
"""
|
|
claim_dirs = ("domains/", "core/", "foundations/")
|
|
files = {}
|
|
current_file = None
|
|
current_lines = []
|
|
is_deletion = False
|
|
|
|
for line in diff.split("\n"):
|
|
if line.startswith("diff --git"):
|
|
# Save previous file (unless it was a deletion)
|
|
if current_file and not is_deletion:
|
|
files[current_file] = "\n".join(current_lines)
|
|
current_file = None
|
|
current_lines = []
|
|
is_deletion = False
|
|
elif line.startswith("deleted file mode") or line.startswith("+++ /dev/null"):
|
|
is_deletion = True
|
|
current_file = None # Don't validate deleted files
|
|
elif line.startswith("+++ b/") and not is_deletion:
|
|
path = line[6:]
|
|
basename = path.rsplit("/", 1)[-1] if "/" in path else path
|
|
# Only validate claim files — skip _map.md, _index.md, and non-.md files
|
|
if (any(path.startswith(d) for d in claim_dirs)
|
|
and path.endswith(".md")
|
|
and not basename.startswith("_")):
|
|
current_file = path
|
|
elif current_file and line.startswith("+") and not line.startswith("+++"):
|
|
current_lines.append(line[1:]) # Strip the leading +
|
|
|
|
# Save last file
|
|
if current_file and not is_deletion:
|
|
files[current_file] = "\n".join(current_lines)
|
|
|
|
return files
|
|
|
|
|
|
def get_pr_head_sha(token: str, pr_num: int) -> str:
|
|
"""Get the current HEAD SHA of a PR's branch."""
|
|
try:
|
|
pr_info = api_get(
|
|
token,
|
|
f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/pulls/{pr_num}",
|
|
)
|
|
return pr_info.get("head", {}).get("sha", "")
|
|
except (HTTPError, URLError):
|
|
return ""
|
|
|
|
|
|
def has_tier0_comment(token: str, pr_num: int, head_sha: str) -> bool:
|
|
"""Check if we already posted a Tier 0 comment for this exact commit.
|
|
|
|
Uses SHA-based marker so force-pushes trigger re-validation.
|
|
"""
|
|
if not head_sha:
|
|
return False
|
|
try:
|
|
comments = api_get(
|
|
token,
|
|
f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/issues/{pr_num}/comments?limit=50",
|
|
)
|
|
marker = f"<!-- TIER0-VALIDATION:{head_sha} -->"
|
|
for c in comments:
|
|
if marker in c.get("body", ""):
|
|
return True
|
|
except (HTTPError, URLError):
|
|
pass
|
|
return False
|
|
|
|
|
|
def post_tier0_comment(token: str, pr_num: int, results: list[dict], mode: str, head_sha: str = ""):
|
|
"""Post validation results as a Forgejo comment."""
|
|
all_pass = all(r["passes"] for r in results)
|
|
total = len(results)
|
|
passing = sum(1 for r in results if r["passes"])
|
|
|
|
# SHA-based marker for idempotency — force-pushes trigger re-validation
|
|
marker = f"<!-- TIER0-VALIDATION:{head_sha} -->" if head_sha else "<!-- TIER0-VALIDATION -->"
|
|
lines = [marker]
|
|
|
|
if mode == "shadow":
|
|
lines.append(f"**Tier 0 Validation (shadow mode)** — {passing}/{total} claims pass\n")
|
|
else:
|
|
status = "PASS" if all_pass else "FAIL"
|
|
lines.append(f"**Tier 0 Validation: {status}** — {passing}/{total} claims pass\n")
|
|
|
|
for r in results:
|
|
icon = "pass" if r["passes"] else "FAIL"
|
|
short_path = r["filepath"].split("/", 1)[-1] if "/" in r["filepath"] else r["filepath"]
|
|
lines.append(f"**[{icon}]** `{short_path}`")
|
|
|
|
if r["violations"]:
|
|
for v in r["violations"]:
|
|
lines.append(f" - {v}")
|
|
|
|
if r["warnings"]:
|
|
for w in r["warnings"]:
|
|
lines.append(f" - (warn) {w}")
|
|
|
|
lines.append("")
|
|
|
|
if not all_pass and mode == "gate":
|
|
lines.append("---")
|
|
lines.append("Fix the violations above and push to trigger re-validation.")
|
|
elif not all_pass and mode == "shadow":
|
|
lines.append("---")
|
|
lines.append("*Shadow mode — these results are informational only. "
|
|
"This PR will proceed to evaluation regardless.*")
|
|
|
|
lines.append(f"\n*tier0-gate v1 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*")
|
|
|
|
body = "\n".join(lines)
|
|
|
|
try:
|
|
api_post(
|
|
token,
|
|
f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/issues/{pr_num}/comments",
|
|
{"body": body},
|
|
)
|
|
except (HTTPError, URLError) as e:
|
|
log(f"WARN: Failed to post Tier 0 comment on PR #{pr_num}: {e}")
|
|
|
|
|
|
# ─── Logging ───────────────────────────────────────────────────────────────
|
|
|
|
def log(msg: str):
|
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
line = f"[{ts}] [tier0] {msg}"
|
|
print(line, file=sys.stderr)
|
|
# Also append to log file
|
|
log_file = os.path.join(LOG_DIR, "tier0-gate.log")
|
|
try:
|
|
with open(log_file, "a") as f:
|
|
f.write(line + "\n")
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
# ─── Main ──────────────────────────────────────────────────────────────────
|
|
|
|
def validate_pr(pr_num: int, mode: str = "shadow") -> dict:
|
|
"""Run Tier 0 validation on all claim files in a PR.
|
|
|
|
Returns:
|
|
{
|
|
"pr": int,
|
|
"mode": str,
|
|
"all_pass": bool,
|
|
"total": int,
|
|
"passing": int,
|
|
"results": [...],
|
|
"has_claims": bool,
|
|
}
|
|
"""
|
|
token = load_token()
|
|
|
|
# Get PR HEAD SHA for idempotency (re-validates on force-push)
|
|
head_sha = get_pr_head_sha(token, pr_num)
|
|
|
|
# Check if already validated for this exact commit
|
|
if has_tier0_comment(token, pr_num, head_sha):
|
|
log(f"PR #{pr_num}: already validated at {head_sha[:8]}, skipping")
|
|
return {"pr": pr_num, "mode": mode, "skipped": True, "reason": "already_validated"}
|
|
|
|
# Get PR diff
|
|
diff = get_pr_diff(token, pr_num)
|
|
if not diff:
|
|
log(f"PR #{pr_num}: empty or oversized diff, skipping Tier 0")
|
|
return {"pr": pr_num, "mode": mode, "skipped": True, "reason": "no_diff"}
|
|
|
|
# Extract claim files from diff
|
|
claim_files = extract_claim_files_from_diff(diff)
|
|
if not claim_files:
|
|
log(f"PR #{pr_num}: no claim files in diff, skipping Tier 0")
|
|
return {"pr": pr_num, "mode": mode, "skipped": True, "reason": "no_claims"}
|
|
|
|
# Load existing claims index
|
|
existing_claims = load_existing_claims(REPO_DIR)
|
|
|
|
# Validate each claim
|
|
results = []
|
|
for filepath, content in claim_files.items():
|
|
result = tier0_validate_claim(filepath, content, existing_claims)
|
|
results.append(result)
|
|
status = "PASS" if result["passes"] else "FAIL"
|
|
log(f"PR #{pr_num}: {status} {filepath} violations={result['violations']} warnings={result['warnings']}")
|
|
|
|
all_pass = all(r["passes"] for r in results)
|
|
total = len(results)
|
|
passing = sum(1 for r in results if r["passes"])
|
|
|
|
log(f"PR #{pr_num}: Tier 0 {mode} — {passing}/{total} pass, all_pass={all_pass}")
|
|
|
|
# Post comment on PR (with SHA marker for idempotency)
|
|
post_tier0_comment(token, pr_num, results, mode, head_sha=head_sha)
|
|
|
|
# Log structured result
|
|
output = {
|
|
"pr": pr_num,
|
|
"mode": mode,
|
|
"all_pass": all_pass,
|
|
"total": total,
|
|
"passing": passing,
|
|
"results": results,
|
|
"has_claims": True,
|
|
"ts": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
# Append to structured log
|
|
try:
|
|
with open(os.path.join(LOG_DIR, "tier0-results.jsonl"), "a") as f:
|
|
f.write(json.dumps(output) + "\n")
|
|
except OSError:
|
|
pass
|
|
|
|
return output
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Tier 0 validation gate for PRs")
|
|
parser.add_argument("pr_num", type=int, help="PR number to validate")
|
|
parser.add_argument("--mode", choices=["shadow", "gate"], default="shadow",
|
|
help="shadow = log only, gate = block on failure")
|
|
parser.add_argument("--repo-dir", default=None,
|
|
help="Path to repo clone (for existing claims index)")
|
|
parser.add_argument("--json", action="store_true",
|
|
help="Output JSON result to stdout")
|
|
args = parser.parse_args()
|
|
|
|
if args.repo_dir:
|
|
global REPO_DIR
|
|
REPO_DIR = args.repo_dir
|
|
|
|
result = validate_pr(args.pr_num, mode=args.mode)
|
|
|
|
if args.json:
|
|
print(json.dumps(result, indent=2))
|
|
|
|
# Exit code: 0 = pass or shadow mode, 1 = gate mode + failures
|
|
if args.mode == "gate" and result.get("all_pass") is False:
|
|
sys.exit(1)
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|