Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
- What: Unified forgejo_api(), get_pr_diff(), get_agent_token(), repo_path() into lib/forgejo.py. Removed 3 duplicate _forgejo_api functions (evaluate.py, merge.py, validate.py), 2 duplicate _get_pr_diff functions (evaluate.py, validate.py), and 1 _agent_token function (evaluate.py). - Why: Phase 3 structural refactor. Single source of truth for all Forgejo HTTP calls. Eliminates ~90 lines of duplicated code across 3 modules. - Connections: All hardcoded repo paths now use repo_path() helper. Consumer modules no longer reference config.FORGEJO_URL/OWNER/REPO/TOKEN_FILE directly. Pentagon-Agent: Ganymede <F99EBFA6-547B-4096-BEEA-1D59C3E4028A>
561 lines
19 KiB
Python
561 lines
19 KiB
Python
"""Validate stage — Tier 0 deterministic validation gate.
|
|
|
|
Ported from tier0-gate.py + validate_claims.py. Pure Python, no LLM calls.
|
|
Validates claim frontmatter, title format, wiki links, domain-directory match,
|
|
proposition heuristic, universal quantifiers, near-duplicate detection.
|
|
|
|
Runs against PRs with status 'open' that have tier0_pass IS NULL.
|
|
Posts results as PR comments. In gate mode, sets tier0_pass = 0/1.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from datetime import date, datetime, timezone
|
|
from difflib import SequenceMatcher
|
|
from pathlib import Path
|
|
|
|
from . import config, db
|
|
from .forgejo import api as forgejo_api
|
|
from .forgejo import get_pr_diff, repo_path
|
|
|
|
logger = logging.getLogger("pipeline.validate")
|
|
|
|
# ─── Constants ──────────────────────────────────────────────────────────────
|
|
|
|
VALID_DOMAINS = frozenset(
|
|
{
|
|
"internet-finance",
|
|
"entertainment",
|
|
"health",
|
|
"ai-alignment",
|
|
"space-development",
|
|
"grand-strategy",
|
|
"mechanisms",
|
|
"living-capital",
|
|
"living-agents",
|
|
"teleohumanity",
|
|
"critical-systems",
|
|
"collective-intelligence",
|
|
"teleological-economics",
|
|
"cultural-dynamics",
|
|
}
|
|
)
|
|
|
|
VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"})
|
|
VALID_TYPES = frozenset({"claim", "framework"})
|
|
REQUIRED_FIELDS = ("type", "domain", "description", "confidence", "source", "created")
|
|
DATE_MIN = date(2020, 1, 1)
|
|
WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
|
|
DEDUP_THRESHOLD = 0.85
|
|
|
|
# Proposition heuristic patterns
|
|
_STRONG_SIGNALS = re.compile(
|
|
r"\b(because|therefore|however|although|despite|since|"
|
|
r"rather than|instead of|not just|more than|less than|"
|
|
r"by\b|through\b|via\b|without\b|"
|
|
r"when\b|where\b|while\b|if\b|unless\b|"
|
|
r"which\b|that\b|"
|
|
r"is\b|are\b|was\b|were\b|will\b|would\b|"
|
|
r"can\b|could\b|should\b|must\b|"
|
|
r"has\b|have\b|had\b|does\b|did\b)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
_VERB_ENDINGS = re.compile(
|
|
r"\b\w{2,}(ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns|ps|ts|rs|ns|ds)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
_UNIVERSAL_QUANTIFIERS = re.compile(
|
|
r"\b(all|every|always|never|no one|nobody|nothing|none of|"
|
|
r"the only|the fundamental|the sole|the single|"
|
|
r"universally|invariably|without exception|in every case)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
_SCOPING_LANGUAGE = re.compile(
|
|
r"\b(when|if|under|given|assuming|provided|in cases where|"
|
|
r"for .+ that|among|within|across|during|between|"
|
|
r"approximately|roughly|nearly|most|many|often|typically|"
|
|
r"tends? to|generally|usually|frequently)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
# ─── YAML frontmatter parser ───────────────────────────────────────────────
|
|
|
|
|
|
def parse_frontmatter(text: str) -> tuple[dict | None, str]:
|
|
"""Extract YAML frontmatter and body from markdown text."""
|
|
if not text.startswith("---"):
|
|
return None, text
|
|
end = text.find("---", 3)
|
|
if end == -1:
|
|
return None, text
|
|
raw = text[3:end]
|
|
body = text[end + 3 :].strip()
|
|
|
|
try:
|
|
import yaml
|
|
|
|
fm = yaml.safe_load(raw)
|
|
if not isinstance(fm, dict):
|
|
return None, body
|
|
return fm, body
|
|
except ImportError:
|
|
pass
|
|
except Exception:
|
|
return None, body
|
|
|
|
# Fallback: simple key-value parser
|
|
fm = {}
|
|
for line in raw.strip().split("\n"):
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
if ":" not in line:
|
|
continue
|
|
key, _, val = line.partition(":")
|
|
key = key.strip()
|
|
val = val.strip().strip('"').strip("'")
|
|
if val.lower() == "null" or val == "":
|
|
val = None
|
|
elif val.startswith("["):
|
|
val = [v.strip().strip('"').strip("'") for v in val.strip("[]").split(",") if v.strip()]
|
|
fm[key] = val
|
|
return fm if fm else None, body
|
|
|
|
|
|
# ─── Validators ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
def validate_schema(fm: dict) -> list[str]:
|
|
"""Check required fields and valid enums."""
|
|
violations = []
|
|
for field in REQUIRED_FIELDS:
|
|
if field not in fm or fm[field] is None:
|
|
violations.append(f"missing_field:{field}")
|
|
|
|
ftype = fm.get("type")
|
|
if ftype and ftype not in VALID_TYPES:
|
|
violations.append(f"invalid_type:{ftype}")
|
|
|
|
domain = fm.get("domain")
|
|
if domain and domain not in VALID_DOMAINS:
|
|
violations.append(f"invalid_domain:{domain}")
|
|
|
|
confidence = fm.get("confidence")
|
|
if confidence and confidence not in VALID_CONFIDENCE:
|
|
violations.append(f"invalid_confidence:{confidence}")
|
|
|
|
desc = fm.get("description")
|
|
if isinstance(desc, str) and len(desc.strip()) < 10:
|
|
violations.append("description_too_short")
|
|
|
|
source = fm.get("source")
|
|
if isinstance(source, str) and len(source.strip()) < 3:
|
|
violations.append("source_too_short")
|
|
|
|
return violations
|
|
|
|
|
|
def validate_date(date_val) -> list[str]:
|
|
"""Validate created date."""
|
|
violations = []
|
|
if date_val is None:
|
|
return ["missing_field:created"]
|
|
|
|
parsed = None
|
|
if isinstance(date_val, date):
|
|
parsed = date_val
|
|
elif isinstance(date_val, str):
|
|
try:
|
|
parsed = datetime.strptime(date_val, "%Y-%m-%d").date()
|
|
except ValueError:
|
|
return [f"invalid_date_format:{date_val}"]
|
|
else:
|
|
return [f"invalid_date_type:{type(date_val).__name__}"]
|
|
|
|
today = date.today()
|
|
if parsed > today:
|
|
violations.append(f"future_date:{parsed}")
|
|
if parsed < DATE_MIN:
|
|
violations.append(f"date_before_2020:{parsed}")
|
|
return violations
|
|
|
|
|
|
def validate_title(filepath: str) -> list[str]:
|
|
"""Check filename follows prose-as-claim convention."""
|
|
violations = []
|
|
name = Path(filepath).stem
|
|
normalized = name.replace("-", " ")
|
|
|
|
if len(normalized) < 20:
|
|
violations.append("title_too_short")
|
|
|
|
words = normalized.split()
|
|
if len(words) < 4:
|
|
violations.append("title_too_few_words")
|
|
|
|
cleaned = re.sub(r"[a-zA-Z0-9\s\-\.,'()%]", "", name)
|
|
if cleaned:
|
|
violations.append(f"title_special_chars:{cleaned[:20]}")
|
|
|
|
return violations
|
|
|
|
|
|
def validate_wiki_links(body: str, existing_claims: set[str]) -> list[str]:
|
|
"""Check that [[wiki links]] resolve to known claims."""
|
|
violations = []
|
|
for link in WIKI_LINK_RE.findall(body):
|
|
if link.strip() and link.strip() not in existing_claims:
|
|
violations.append(f"broken_wiki_link:{link.strip()[:80]}")
|
|
return violations
|
|
|
|
|
|
def validate_proposition(title: str) -> list[str]:
|
|
"""Check title reads as a proposition, not a label."""
|
|
normalized = title.replace("-", " ")
|
|
words = normalized.split()
|
|
n = len(words)
|
|
|
|
if n < 4:
|
|
return ["title_not_proposition:too short to be a disagreeable sentence"]
|
|
|
|
if _STRONG_SIGNALS.search(normalized):
|
|
return []
|
|
if _VERB_ENDINGS.search(normalized):
|
|
return []
|
|
if n >= 8:
|
|
return []
|
|
|
|
return ["title_not_proposition:no verb or connective found"]
|
|
|
|
|
|
def validate_universal_quantifiers(title: str) -> list[str]:
|
|
"""Flag unscoped universal quantifiers (warning, not gate)."""
|
|
universals = _UNIVERSAL_QUANTIFIERS.findall(title)
|
|
if universals and not _SCOPING_LANGUAGE.search(title):
|
|
return [f"unscoped_universal:{','.join(universals)}"]
|
|
return []
|
|
|
|
|
|
def validate_domain_directory_match(filepath: str, fm: dict) -> list[str]:
|
|
"""Check file's directory matches its domain field."""
|
|
domain = fm.get("domain")
|
|
if not domain:
|
|
return []
|
|
|
|
parts = Path(filepath).parts
|
|
for i, part in enumerate(parts):
|
|
if part == "domains" and i + 1 < len(parts):
|
|
dir_domain = parts[i + 1]
|
|
if dir_domain != domain:
|
|
secondary = fm.get("secondary_domains", [])
|
|
if isinstance(secondary, str):
|
|
secondary = [secondary]
|
|
if dir_domain not in (secondary or []):
|
|
return [f"domain_directory_mismatch:file in domains/{dir_domain}/ but domain field says '{domain}'"]
|
|
break
|
|
return []
|
|
|
|
|
|
def validate_description_not_title(title: str, description: str) -> list[str]:
|
|
"""Check description adds info beyond the title."""
|
|
if not description:
|
|
return []
|
|
title_lower = title.lower().strip()
|
|
desc_lower = description.lower().strip().rstrip(".")
|
|
|
|
if desc_lower in title_lower or title_lower in desc_lower:
|
|
return ["description_echoes_title"]
|
|
|
|
ratio = SequenceMatcher(None, title_lower, desc_lower).ratio()
|
|
if ratio > 0.75:
|
|
return [f"description_too_similar:{ratio:.0%}"]
|
|
return []
|
|
|
|
|
|
def find_near_duplicates(title: str, existing_claims: set[str]) -> list[str]:
|
|
"""Find near-duplicate titles using SequenceMatcher with word pre-filter."""
|
|
title_lower = title.lower()
|
|
title_words = set(title_lower.split()[:6])
|
|
warnings = []
|
|
for existing in existing_claims:
|
|
existing_lower = existing.lower()
|
|
if len(title_words & set(existing_lower.split()[:6])) < 2:
|
|
continue
|
|
ratio = SequenceMatcher(None, title_lower, existing_lower).ratio()
|
|
if ratio >= DEDUP_THRESHOLD:
|
|
warnings.append(f"near_duplicate:{existing[:80]} (similarity={ratio:.2f})")
|
|
return warnings
|
|
|
|
|
|
# ─── Full Tier 0 validation ────────────────────────────────────────────────
|
|
|
|
|
|
def tier0_validate_claim(filepath: str, content: str, existing_claims: set[str]) -> dict:
|
|
"""Run full Tier 0 validation. Returns {filepath, passes, violations, warnings}."""
|
|
violations = []
|
|
warnings = []
|
|
|
|
fm, body = parse_frontmatter(content)
|
|
if fm is None:
|
|
return {"filepath": filepath, "passes": False, "violations": ["no_frontmatter"], "warnings": []}
|
|
|
|
violations.extend(validate_schema(fm))
|
|
violations.extend(validate_date(fm.get("created")))
|
|
violations.extend(validate_title(filepath))
|
|
violations.extend(validate_wiki_links(body, existing_claims))
|
|
|
|
title = Path(filepath).stem
|
|
violations.extend(validate_proposition(title))
|
|
warnings.extend(validate_universal_quantifiers(title))
|
|
violations.extend(validate_domain_directory_match(filepath, fm))
|
|
|
|
desc = fm.get("description", "")
|
|
if isinstance(desc, str):
|
|
warnings.extend(validate_description_not_title(title, desc))
|
|
|
|
warnings.extend(find_near_duplicates(title, existing_claims))
|
|
|
|
return {"filepath": filepath, "passes": len(violations) == 0, "violations": violations, "warnings": warnings}
|
|
|
|
|
|
# ─── Diff parsing ──────────────────────────────────────────────────────────
|
|
|
|
|
|
def extract_claim_files_from_diff(diff: str) -> dict[str, str]:
|
|
"""Parse unified diff to extract new/modified claim file contents."""
|
|
claim_dirs = ("domains/", "core/", "foundations/")
|
|
files = {}
|
|
current_file = None
|
|
current_lines = []
|
|
is_deletion = False
|
|
|
|
for line in diff.split("\n"):
|
|
if line.startswith("diff --git"):
|
|
if current_file and not is_deletion:
|
|
files[current_file] = "\n".join(current_lines)
|
|
current_file = None
|
|
current_lines = []
|
|
is_deletion = False
|
|
elif line.startswith("deleted file mode") or line.startswith("+++ /dev/null"):
|
|
is_deletion = True
|
|
current_file = None
|
|
elif line.startswith("+++ b/") and not is_deletion:
|
|
path = line[6:]
|
|
basename = path.rsplit("/", 1)[-1] if "/" in path else path
|
|
if any(path.startswith(d) for d in claim_dirs) and path.endswith(".md") and not basename.startswith("_"):
|
|
current_file = path
|
|
elif current_file and line.startswith("+") and not line.startswith("+++"):
|
|
current_lines.append(line[1:])
|
|
|
|
if current_file and not is_deletion:
|
|
files[current_file] = "\n".join(current_lines)
|
|
|
|
return files
|
|
|
|
|
|
async def _get_pr_head_sha(pr_number: int) -> str:
|
|
"""Get HEAD SHA of PR's branch."""
|
|
pr_info = await forgejo_api(
|
|
"GET",
|
|
repo_path(f"pulls/{pr_number}"),
|
|
)
|
|
if pr_info:
|
|
return pr_info.get("head", {}).get("sha", "")
|
|
return ""
|
|
|
|
|
|
async def _has_tier0_comment(pr_number: int, head_sha: str) -> bool:
|
|
"""Check if we already validated this exact commit."""
|
|
if not head_sha:
|
|
return False
|
|
# Paginate comments (Ganymede standing rule)
|
|
page = 1
|
|
while True:
|
|
comments = await forgejo_api(
|
|
"GET",
|
|
repo_path(f"issues/{pr_number}/comments?limit=50&page={page}"),
|
|
)
|
|
if not comments:
|
|
break
|
|
marker = f"<!-- TIER0-VALIDATION:{head_sha} -->"
|
|
for c in comments:
|
|
if marker in c.get("body", ""):
|
|
return True
|
|
if len(comments) < 50:
|
|
break
|
|
page += 1
|
|
return False
|
|
|
|
|
|
async def _post_validation_comment(pr_number: int, results: list[dict], head_sha: str):
|
|
"""Post Tier 0 validation results as PR comment."""
|
|
all_pass = all(r["passes"] for r in results)
|
|
total = len(results)
|
|
passing = sum(1 for r in results if r["passes"])
|
|
|
|
marker = f"<!-- TIER0-VALIDATION:{head_sha} -->" if head_sha else "<!-- TIER0-VALIDATION -->"
|
|
status = "PASS" if all_pass else "FAIL"
|
|
lines = [
|
|
marker,
|
|
f"**Tier 0 Validation: {status}** — {passing}/{total} claims pass\n",
|
|
]
|
|
|
|
for r in results:
|
|
icon = "pass" if r["passes"] else "FAIL"
|
|
short_path = r["filepath"].split("/", 1)[-1] if "/" in r["filepath"] else r["filepath"]
|
|
lines.append(f"**[{icon}]** `{short_path}`")
|
|
for v in r["violations"]:
|
|
lines.append(f" - {v}")
|
|
for w in r["warnings"]:
|
|
lines.append(f" - (warn) {w}")
|
|
lines.append("")
|
|
|
|
if not all_pass:
|
|
lines.append("---")
|
|
lines.append("Fix the violations above and push to trigger re-validation.")
|
|
|
|
lines.append(f"\n*tier0-gate v2 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*")
|
|
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_number}/comments"),
|
|
{"body": "\n".join(lines)},
|
|
)
|
|
|
|
|
|
# ─── Existing claims index ─────────────────────────────────────────────────
|
|
|
|
|
|
def load_existing_claims() -> set[str]:
|
|
"""Build set of known claim titles from the main worktree."""
|
|
claims: set[str] = set()
|
|
base = config.MAIN_WORKTREE
|
|
for subdir in ["domains", "core", "foundations", "maps", "agents", "schemas"]:
|
|
full = base / subdir
|
|
if not full.is_dir():
|
|
continue
|
|
for f in full.rglob("*.md"):
|
|
claims.add(f.stem)
|
|
return claims
|
|
|
|
|
|
# ─── Main entry point ──────────────────────────────────────────────────────
|
|
|
|
|
|
async def validate_pr(conn, pr_number: int) -> dict:
|
|
"""Run Tier 0 validation on a single PR.
|
|
|
|
Returns {pr, all_pass, total, passing, skipped, reason}.
|
|
"""
|
|
# Get HEAD SHA for idempotency
|
|
head_sha = await _get_pr_head_sha(pr_number)
|
|
|
|
# Skip if already validated for this commit
|
|
if await _has_tier0_comment(pr_number, head_sha):
|
|
logger.debug("PR #%d already validated at %s", pr_number, head_sha[:8])
|
|
return {"pr": pr_number, "skipped": True, "reason": "already_validated"}
|
|
|
|
# Fetch diff
|
|
diff = await get_pr_diff(pr_number)
|
|
if not diff:
|
|
logger.debug("PR #%d: empty or oversized diff", pr_number)
|
|
return {"pr": pr_number, "skipped": True, "reason": "no_diff"}
|
|
|
|
# Extract claim files
|
|
claim_files = extract_claim_files_from_diff(diff)
|
|
if not claim_files:
|
|
logger.debug("PR #%d: no claim files in diff", pr_number)
|
|
return {"pr": pr_number, "skipped": True, "reason": "no_claims"}
|
|
|
|
# Load existing claims index
|
|
existing_claims = load_existing_claims()
|
|
|
|
# Validate each claim
|
|
results = []
|
|
for filepath, content in claim_files.items():
|
|
result = tier0_validate_claim(filepath, content, existing_claims)
|
|
results.append(result)
|
|
status = "PASS" if result["passes"] else "FAIL"
|
|
logger.debug("PR #%d: %s %s v=%s w=%s", pr_number, status, filepath, result["violations"], result["warnings"])
|
|
|
|
all_pass = all(r["passes"] for r in results)
|
|
total = len(results)
|
|
passing = sum(1 for r in results if r["passes"])
|
|
|
|
logger.info("PR #%d: Tier 0 — %d/%d pass, all_pass=%s", pr_number, passing, total, all_pass)
|
|
|
|
# Post comment
|
|
await _post_validation_comment(pr_number, results, head_sha)
|
|
|
|
# Update PR record
|
|
conn.execute(
|
|
"UPDATE prs SET tier0_pass = ? WHERE number = ?",
|
|
(1 if all_pass else 0, pr_number),
|
|
)
|
|
db.audit(
|
|
conn,
|
|
"validate",
|
|
"tier0_complete",
|
|
json.dumps({"pr": pr_number, "pass": all_pass, "passing": passing, "total": total}),
|
|
)
|
|
|
|
return {"pr": pr_number, "all_pass": all_pass, "total": total, "passing": passing}
|
|
|
|
|
|
async def validate_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|
"""Run one validation cycle.
|
|
|
|
Finds PRs with status='open' and tier0_pass IS NULL, validates them.
|
|
"""
|
|
# Find unvalidated PRs (priority ordered)
|
|
rows = conn.execute(
|
|
"""SELECT p.number FROM prs p
|
|
LEFT JOIN sources s ON p.source_path = s.path
|
|
WHERE p.status = 'open'
|
|
AND p.tier0_pass IS NULL
|
|
ORDER BY
|
|
CASE COALESCE(p.priority, s.priority, 'medium')
|
|
WHEN 'critical' THEN 0
|
|
WHEN 'high' THEN 1
|
|
WHEN 'medium' THEN 2
|
|
WHEN 'low' THEN 3
|
|
ELSE 4
|
|
END,
|
|
p.created_at ASC
|
|
LIMIT ?""",
|
|
(max_workers or 10,),
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
return 0, 0
|
|
|
|
succeeded = 0
|
|
failed = 0
|
|
|
|
for row in rows:
|
|
try:
|
|
result = await validate_pr(conn, row["number"])
|
|
if result.get("skipped"):
|
|
# Mark as validated even if skipped (no claims = pass)
|
|
conn.execute(
|
|
"UPDATE prs SET tier0_pass = 1 WHERE number = ? AND tier0_pass IS NULL",
|
|
(row["number"],),
|
|
)
|
|
succeeded += 1
|
|
elif result.get("all_pass"):
|
|
succeeded += 1
|
|
else:
|
|
succeeded += 1 # Validation ran successfully, even if claims failed
|
|
except Exception:
|
|
logger.exception("Failed to validate PR #%d", row["number"])
|
|
failed += 1
|
|
|
|
if succeeded or failed:
|
|
logger.info("Validate cycle: %d validated, %d errors", succeeded, failed)
|
|
|
|
return succeeded, failed
|