teleo-infrastructure/tier0-gate.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

581 lines
20 KiB
Python
Executable file

#!/usr/bin/env python3
"""tier0-gate.py — Tier 0 deterministic validation gate for teleo-codex PRs.
Validates all claim files in a PR against mechanical quality checks.
Runs in two modes:
- shadow: log results + post informational comment, don't block
- gate: log results + post comment + return nonzero if failures (blocks eval dispatch)
Usage:
python3 tier0-gate.py <PR_NUM> [--mode shadow|gate] [--repo-dir /path/to/repo]
Designed to be called by eval-dispatcher.sh before dispatching eval-worker.
"""
import json
import os
import re
import sys
from datetime import datetime, timezone
from difflib import SequenceMatcher
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
# ─── Config ─────────────────────────────────────────────────────────────────
FORGEJO_URL = os.environ.get("FORGEJO_URL", "https://git.livingip.xyz")
FORGEJO_OWNER = os.environ.get("FORGEJO_OWNER", "teleo")
FORGEJO_REPO = os.environ.get("FORGEJO_REPO", "teleo-codex")
FORGEJO_TOKEN_FILE = os.environ.get(
"FORGEJO_TOKEN_FILE", "/opt/teleo-eval/secrets/forgejo-admin-token"
)
REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")
LOG_DIR = os.environ.get("LOG_DIR", "/opt/teleo-eval/logs")
DEDUP_THRESHOLD = 0.85
# Import validate_claims from same directory
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from validate_claims import (
VALID_DOMAINS,
WIKI_LINK_RE,
load_existing_claims,
parse_frontmatter,
validate_claim,
)
# ─── New Tier 0 checks (beyond existing validate_claims.py) ────────────────
def _normalize_title(raw_title: str) -> str:
"""Normalize a filename-style title to readable form (hyphens → spaces)."""
return raw_title.replace("-", " ")
# Strong proposition signals (connectives, subordinators, be-verbs, modals)
_STRONG_SIGNALS = re.compile(
r"\b(because|therefore|however|although|despite|since|"
r"rather than|instead of|not just|more than|less than|"
r"by\b|through\b|via\b|without\b|"
r"when\b|where\b|while\b|if\b|unless\b|"
r"which\b|that\b|"
r"is\b|are\b|was\b|were\b|will\b|would\b|"
r"can\b|could\b|should\b|must\b|"
r"has\b|have\b|had\b|does\b|did\b)",
re.IGNORECASE,
)
# Verb-like word endings (past tense, gerund, 3rd person)
_VERB_ENDINGS = re.compile(
r"\b\w{2,}(ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns|ps|ts|rs|ns|ds)\b",
re.IGNORECASE,
)
# Universal quantifiers that signal unscoped claims
_UNIVERSAL_QUANTIFIERS = re.compile(
r"\b(all|every|always|never|no one|nobody|nothing|none of|"
r"the only|the fundamental|the sole|the single|"
r"universally|invariably|without exception|in every case)\b",
re.IGNORECASE,
)
# Scoping language that makes universals acceptable
_SCOPING_LANGUAGE = re.compile(
r"\b(when|if|under|given|assuming|provided|in cases where|"
r"for .+ that|among|within|across|during|between|"
r"approximately|roughly|nearly|most|many|often|typically|"
r"tends? to|generally|usually|frequently)\b",
re.IGNORECASE,
)
def validate_proposition(title: str) -> list[str]:
"""Check that the title reads as a proposition, not a label.
Uses a tiered approach:
- Short titles (<4 words): almost certainly labels → fail
- Medium titles (4-7 words): must contain a verb/connective signal
- Long titles (8+ words): benefit of the doubt (almost always propositions)
"""
violations = []
normalized = _normalize_title(title)
words = normalized.split()
n = len(words)
if n < 4:
violations.append(
"title_not_proposition:too short to be a disagreeable sentence"
)
return violations
# Check for strong signals (connectives, be-verbs, modals)
if _STRONG_SIGNALS.search(normalized):
return violations
# Check for verb-like endings
if _VERB_ENDINGS.search(normalized):
return violations
# Long titles get benefit of the doubt
if n >= 8:
return violations
violations.append(
"title_not_proposition:no verb or connective found — "
"title should be a disagreeable sentence, not a label"
)
return violations
def validate_universal_quantifiers(title: str) -> list[str]:
"""Flag unscoped universal quantifiers in title."""
violations = []
universals = _UNIVERSAL_QUANTIFIERS.findall(title)
if universals:
# Check if there's also scoping language
has_scope = bool(_SCOPING_LANGUAGE.search(title))
if not has_scope:
violations.append(
f"unscoped_universal:{','.join(universals)}"
f"add scoping language or qualify the claim"
)
return violations
def validate_domain_directory_match(filepath: str, frontmatter: dict) -> list[str]:
"""Check that the file's directory matches its domain field."""
violations = []
domain = frontmatter.get("domain")
if not domain:
return violations # missing_field:domain already caught by schema check
# Extract directory domain from filepath
# e.g., domains/internet-finance/foo.md → internet-finance
parts = Path(filepath).parts
for i, part in enumerate(parts):
if part == "domains" and i + 1 < len(parts):
dir_domain = parts[i + 1]
if dir_domain != domain:
# Check secondary_domains before flagging
secondary = frontmatter.get("secondary_domains", [])
if isinstance(secondary, str):
secondary = [secondary]
if dir_domain not in (secondary or []):
violations.append(
f"domain_directory_mismatch:file in domains/{dir_domain}/ "
f"but domain field says '{domain}'"
)
break
return violations
def find_near_duplicates(
title: str, existing_claims: set[str], threshold: float = DEDUP_THRESHOLD
) -> list[str]:
"""Find near-duplicate claim titles using SequenceMatcher with word pre-filter."""
title_lower = title.lower()
title_words = set(title_lower.split()[:6])
duplicates = []
for existing in existing_claims:
existing_lower = existing.lower()
# Quick reject: must share at least 2 words from first 6
existing_words = set(existing_lower.split()[:6])
if len(title_words & existing_words) < 2:
continue
ratio = SequenceMatcher(None, title_lower, existing_lower).ratio()
if ratio >= threshold:
duplicates.append(f"near_duplicate:{existing[:80]} (similarity={ratio:.2f})")
return duplicates
def validate_description_not_title(title: str, description: str) -> list[str]:
"""Check description adds info beyond the title (not just a shorter version)."""
violations = []
if not description:
return violations # missing field already caught
title_lower = title.lower().strip()
desc_lower = description.lower().strip().rstrip(".")
# Check if description is a substring of title or vice versa
if desc_lower in title_lower or title_lower in desc_lower:
violations.append("description_echoes_title:description should add context beyond the title")
# Check if too similar via SequenceMatcher
ratio = SequenceMatcher(None, title_lower, desc_lower).ratio()
if ratio > 0.75:
violations.append(f"description_too_similar:description is {ratio:.0%} similar to title")
return violations
# ─── Full Tier 0 validation ────────────────────────────────────────────────
def tier0_validate_claim(
filepath: str,
content: str,
existing_claims: set[str],
) -> dict:
"""Run full Tier 0 validation on a claim file.
Returns dict with:
- filepath: str
- passes: bool
- violations: list[str]
- warnings: list[str] (non-blocking issues)
"""
violations = []
warnings = []
# Parse content
fm, body = parse_frontmatter(content)
if fm is None:
return {
"filepath": filepath,
"passes": False,
"violations": ["no_frontmatter"],
"warnings": [],
}
# Run existing validate_claims checks (schema, date, title length, wiki links)
# We inline this rather than calling validate_claim() because we already have
# the content parsed and want to separate violations from warnings
from validate_claims import validate_schema, validate_date, validate_title, validate_wiki_links
violations.extend(validate_schema(fm))
violations.extend(validate_date(fm.get("created")))
violations.extend(validate_title(filepath))
violations.extend(validate_wiki_links(body, existing_claims))
# New Tier 0 checks
title = Path(filepath).stem
# Proposition heuristic
violations.extend(validate_proposition(title))
# Universal quantifier check
uq_violations = validate_universal_quantifiers(title)
# Unscoped universals are warnings, not hard failures (judgment call)
warnings.extend(uq_violations)
# Domain-directory match
violations.extend(validate_domain_directory_match(filepath, fm))
# Description quality
desc = fm.get("description", "")
if isinstance(desc, str):
warnings.extend(validate_description_not_title(title, desc))
# Near-duplicate detection (warning, not gate — per Ganymede's recommendation)
dup_results = find_near_duplicates(title, existing_claims)
warnings.extend(dup_results)
passes = len(violations) == 0
return {
"filepath": filepath,
"passes": passes,
"violations": violations,
"warnings": warnings,
}
# ─── Forgejo API helpers ───────────────────────────────────────────────────
def load_token() -> str:
return Path(FORGEJO_TOKEN_FILE).read_text().strip()
def api_get(token: str, endpoint: str, accept: str = "application/json"):
url = f"{FORGEJO_URL}/api/v1/{endpoint}"
req = Request(url, headers={"Authorization": f"token {token}", "Accept": accept})
with urlopen(req, timeout=60) as resp:
data = resp.read().decode("utf-8", errors="replace")
if accept == "application/json":
return json.loads(data)
return data
def api_post(token: str, endpoint: str, body: dict):
url = f"{FORGEJO_URL}/api/v1/{endpoint}"
data = json.dumps(body).encode("utf-8")
req = Request(
url,
data=data,
headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
},
method="POST",
)
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
def get_pr_diff(token: str, pr_num: int) -> str:
"""Fetch PR diff, with 2MB size cap."""
try:
diff = api_get(
token,
f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/pulls/{pr_num}.diff",
accept="text/plain",
)
if len(diff) > 2_000_000:
return "" # Too large for mechanical triage
return diff
except (HTTPError, URLError):
return ""
def extract_claim_files_from_diff(diff: str) -> dict[str, str]:
"""Parse unified diff to extract new/modified claim file contents.
Returns {filepath: content} for files under domains/, core/, foundations/.
Skips deleted files (no content to validate).
"""
claim_dirs = ("domains/", "core/", "foundations/")
files = {}
current_file = None
current_lines = []
is_deletion = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
# Save previous file (unless it was a deletion)
if current_file and not is_deletion:
files[current_file] = "\n".join(current_lines)
current_file = None
current_lines = []
is_deletion = False
elif line.startswith("deleted file mode") or line.startswith("+++ /dev/null"):
is_deletion = True
current_file = None # Don't validate deleted files
elif line.startswith("+++ b/") and not is_deletion:
path = line[6:]
basename = path.rsplit("/", 1)[-1] if "/" in path else path
# Only validate claim files — skip _map.md, _index.md, and non-.md files
if (any(path.startswith(d) for d in claim_dirs)
and path.endswith(".md")
and not basename.startswith("_")):
current_file = path
elif current_file and line.startswith("+") and not line.startswith("+++"):
current_lines.append(line[1:]) # Strip the leading +
# Save last file
if current_file and not is_deletion:
files[current_file] = "\n".join(current_lines)
return files
def get_pr_head_sha(token: str, pr_num: int) -> str:
"""Get the current HEAD SHA of a PR's branch."""
try:
pr_info = api_get(
token,
f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/pulls/{pr_num}",
)
return pr_info.get("head", {}).get("sha", "")
except (HTTPError, URLError):
return ""
def has_tier0_comment(token: str, pr_num: int, head_sha: str) -> bool:
"""Check if we already posted a Tier 0 comment for this exact commit.
Uses SHA-based marker so force-pushes trigger re-validation.
"""
if not head_sha:
return False
try:
comments = api_get(
token,
f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/issues/{pr_num}/comments?limit=50",
)
marker = f"<!-- TIER0-VALIDATION:{head_sha} -->"
for c in comments:
if marker in c.get("body", ""):
return True
except (HTTPError, URLError):
pass
return False
def post_tier0_comment(token: str, pr_num: int, results: list[dict], mode: str, head_sha: str = ""):
"""Post validation results as a Forgejo comment."""
all_pass = all(r["passes"] for r in results)
total = len(results)
passing = sum(1 for r in results if r["passes"])
# SHA-based marker for idempotency — force-pushes trigger re-validation
marker = f"<!-- TIER0-VALIDATION:{head_sha} -->" if head_sha else "<!-- TIER0-VALIDATION -->"
lines = [marker]
if mode == "shadow":
lines.append(f"**Tier 0 Validation (shadow mode)** — {passing}/{total} claims pass\n")
else:
status = "PASS" if all_pass else "FAIL"
lines.append(f"**Tier 0 Validation: {status}** — {passing}/{total} claims pass\n")
for r in results:
icon = "pass" if r["passes"] else "FAIL"
short_path = r["filepath"].split("/", 1)[-1] if "/" in r["filepath"] else r["filepath"]
lines.append(f"**[{icon}]** `{short_path}`")
if r["violations"]:
for v in r["violations"]:
lines.append(f" - {v}")
if r["warnings"]:
for w in r["warnings"]:
lines.append(f" - (warn) {w}")
lines.append("")
if not all_pass and mode == "gate":
lines.append("---")
lines.append("Fix the violations above and push to trigger re-validation.")
elif not all_pass and mode == "shadow":
lines.append("---")
lines.append("*Shadow mode — these results are informational only. "
"This PR will proceed to evaluation regardless.*")
lines.append(f"\n*tier0-gate v1 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*")
body = "\n".join(lines)
try:
api_post(
token,
f"repos/{FORGEJO_OWNER}/{FORGEJO_REPO}/issues/{pr_num}/comments",
{"body": body},
)
except (HTTPError, URLError) as e:
log(f"WARN: Failed to post Tier 0 comment on PR #{pr_num}: {e}")
# ─── Logging ───────────────────────────────────────────────────────────────
def log(msg: str):
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
line = f"[{ts}] [tier0] {msg}"
print(line, file=sys.stderr)
# Also append to log file
log_file = os.path.join(LOG_DIR, "tier0-gate.log")
try:
with open(log_file, "a") as f:
f.write(line + "\n")
except OSError:
pass
# ─── Main ──────────────────────────────────────────────────────────────────
def validate_pr(pr_num: int, mode: str = "shadow") -> dict:
"""Run Tier 0 validation on all claim files in a PR.
Returns:
{
"pr": int,
"mode": str,
"all_pass": bool,
"total": int,
"passing": int,
"results": [...],
"has_claims": bool,
}
"""
token = load_token()
# Get PR HEAD SHA for idempotency (re-validates on force-push)
head_sha = get_pr_head_sha(token, pr_num)
# Check if already validated for this exact commit
if has_tier0_comment(token, pr_num, head_sha):
log(f"PR #{pr_num}: already validated at {head_sha[:8]}, skipping")
return {"pr": pr_num, "mode": mode, "skipped": True, "reason": "already_validated"}
# Get PR diff
diff = get_pr_diff(token, pr_num)
if not diff:
log(f"PR #{pr_num}: empty or oversized diff, skipping Tier 0")
return {"pr": pr_num, "mode": mode, "skipped": True, "reason": "no_diff"}
# Extract claim files from diff
claim_files = extract_claim_files_from_diff(diff)
if not claim_files:
log(f"PR #{pr_num}: no claim files in diff, skipping Tier 0")
return {"pr": pr_num, "mode": mode, "skipped": True, "reason": "no_claims"}
# Load existing claims index
existing_claims = load_existing_claims(REPO_DIR)
# Validate each claim
results = []
for filepath, content in claim_files.items():
result = tier0_validate_claim(filepath, content, existing_claims)
results.append(result)
status = "PASS" if result["passes"] else "FAIL"
log(f"PR #{pr_num}: {status} {filepath} violations={result['violations']} warnings={result['warnings']}")
all_pass = all(r["passes"] for r in results)
total = len(results)
passing = sum(1 for r in results if r["passes"])
log(f"PR #{pr_num}: Tier 0 {mode}{passing}/{total} pass, all_pass={all_pass}")
# Post comment on PR (with SHA marker for idempotency)
post_tier0_comment(token, pr_num, results, mode, head_sha=head_sha)
# Log structured result
output = {
"pr": pr_num,
"mode": mode,
"all_pass": all_pass,
"total": total,
"passing": passing,
"results": results,
"has_claims": True,
"ts": datetime.now(timezone.utc).isoformat(),
}
# Append to structured log
try:
with open(os.path.join(LOG_DIR, "tier0-results.jsonl"), "a") as f:
f.write(json.dumps(output) + "\n")
except OSError:
pass
return output
def main():
import argparse
parser = argparse.ArgumentParser(description="Tier 0 validation gate for PRs")
parser.add_argument("pr_num", type=int, help="PR number to validate")
parser.add_argument("--mode", choices=["shadow", "gate"], default="shadow",
help="shadow = log only, gate = block on failure")
parser.add_argument("--repo-dir", default=None,
help="Path to repo clone (for existing claims index)")
parser.add_argument("--json", action="store_true",
help="Output JSON result to stdout")
args = parser.parse_args()
if args.repo_dir:
global REPO_DIR
REPO_DIR = args.repo_dir
result = validate_pr(args.pr_num, mode=args.mode)
if args.json:
print(json.dumps(result, indent=2))
# Exit code: 0 = pass or shadow mode, 1 = gate mode + failures
if args.mode == "gate" and result.get("all_pass") is False:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()