From 5463ca0b56f31c3498efc1a7253cb44d4b3a9b2f Mon Sep 17 00:00:00 2001 From: m3taversal Date: Tue, 21 Apr 2026 10:55:13 +0100 Subject: [PATCH] feat: add daily scoring digest with CREATE/ENRICH/CHALLENGE classification Classifies merged PRs by action type, scores with importance multiplier (confidence, domain maturity, connectivity bonus), updates contributor records, posts summary to Telegram, serves via /api/digest/latest. Cron: 7:07 UTC daily (8:07 AM London). Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/scoring_digest.py | 520 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 520 insertions(+) create mode 100644 scripts/scoring_digest.py diff --git a/scripts/scoring_digest.py b/scripts/scoring_digest.py new file mode 100644 index 0000000..fdfe361 --- /dev/null +++ b/scripts/scoring_digest.py @@ -0,0 +1,520 @@ +#!/usr/bin/env python3 +"""Daily scoring digest — classify, score, and broadcast KB contributions. + +Runs daily at 8:07 AM London via cron. +Queries pipeline.db for merged PRs in last 24h, classifies each as +CREATE/ENRICH/CHALLENGE, scores with importance multiplier and connectivity +bonus, updates contributors table, posts summary to Telegram. + +Spec: Pentagon/sprints/contribution-scoring-algorithm.md +""" + +import json +import logging +import os +import re +import sqlite3 +import subprocess +import sys +import urllib.request +from datetime import datetime, timezone, timedelta +from pathlib import Path +from zoneinfo import ZoneInfo + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +log = logging.getLogger("scoring_digest") + +# --- Configuration --- +BASE_DIR = Path(os.environ.get("PIPELINE_BASE", "/opt/teleo-eval")) +DB_PATH = BASE_DIR / "pipeline" / "pipeline.db" +CODEX_DIR = BASE_DIR / "workspaces" / "main" +TELEGRAM_TOKEN_FILE = BASE_DIR / "secrets" / "telegram-bot-token" +TELEGRAM_CHAT_ID = 2091295364 +DIGEST_JSON_PATH = BASE_DIR / "logs" / "scoring-digest-latest.json" +LONDON_TZ = ZoneInfo("Europe/London") + +# --- Action weights (Leo spec Apr 20) --- +ACTION_WEIGHTS = { + "challenge": 0.40, + "create": 0.35, + "enrich": 0.25, +} + +# --- Confidence → base importance mapping --- +CONFIDENCE_BASE = { + "proven": 2.0, + "likely": 1.5, + "experimental": 1.0, + "speculative": 1.0, + "possible": 1.0, + "plausible": 1.0, + "medium": 1.5, +} + +DOMAIN_CLAIM_COUNTS: dict[str, int] = {} +ENTITY_SLUGS: set[str] = set() +CLAIM_SLUGS: set[str] = set() +MAP_FILES: set[str] = set() + + +def _slugify(title: str) -> str: + s = title.lower().strip() + s = re.sub(r"[^\w\s-]", "", s) + s = re.sub(r"[\s_]+", "-", s) + return s.strip("-") + + +def _init_link_index(): + """Build indexes for wiki-link resolution.""" + global ENTITY_SLUGS, CLAIM_SLUGS, MAP_FILES + + entities_dir = CODEX_DIR / "entities" + if entities_dir.exists(): + for f in entities_dir.glob("*.md"): + ENTITY_SLUGS.add(f.stem.lower()) + + for domain_dir in (CODEX_DIR / "domains").iterdir(): + if not domain_dir.is_dir(): + continue + for f in domain_dir.glob("*.md"): + CLAIM_SLUGS.add(f.stem.lower()) + map_file = domain_dir / "_map.md" + if map_file.exists(): + MAP_FILES.add("_map") + MAP_FILES.add(f"domains/{domain_dir.name}/_map") + + for f in (CODEX_DIR / "foundations").glob("*.md") if (CODEX_DIR / "foundations").exists() else []: + CLAIM_SLUGS.add(f.stem.lower()) + for f in (CODEX_DIR / "core").glob("*.md") if (CODEX_DIR / "core").exists() else []: + CLAIM_SLUGS.add(f.stem.lower()) + for f in (CODEX_DIR / "decisions").glob("*.md") if (CODEX_DIR / "decisions").exists() else []: + CLAIM_SLUGS.add(f.stem.lower()) + + +def _resolve_link(link_text: str) -> bool: + """Check if a [[wiki-link]] resolves to a known entity, claim, or map.""" + slug = _slugify(link_text) + return ( + slug in ENTITY_SLUGS + or slug in CLAIM_SLUGS + or slug in MAP_FILES + or link_text.lower() in MAP_FILES + ) + + +def _count_resolved_wiki_links(file_path: Path) -> int: + """Count wiki-links in a claim file that resolve to real targets.""" + if not file_path.exists(): + return 0 + try: + text = file_path.read_text(encoding="utf-8") + except Exception: + return 0 + + links = re.findall(r"\[\[([^\]]+)\]\]", text) + return sum(1 for link in links if _resolve_link(link)) + + +def _get_confidence(file_path: Path) -> str: + """Extract confidence field from claim frontmatter.""" + if not file_path.exists(): + return "experimental" + try: + text = file_path.read_text(encoding="utf-8") + except Exception: + return "experimental" + + m = re.search(r"^confidence:\s*(\S+)", text, re.MULTILINE) + return m.group(1).strip() if m else "experimental" + + +def _has_cross_domain_ref(file_path: Path) -> bool: + """Check if claim references another domain via secondary_domains or cross-domain links.""" + if not file_path.exists(): + return False + try: + text = file_path.read_text(encoding="utf-8") + except Exception: + return False + + if re.search(r"^secondary_domains:\s*\[.+\]", text, re.MULTILINE): + return True + if re.search(r"^depends_on:", text, re.MULTILINE): + return True + return False + + +def _has_challenged_by(file_path: Path) -> bool: + """Check if claim has challenged_by field.""" + if not file_path.exists(): + return False + try: + text = file_path.read_text(encoding="utf-8") + except Exception: + return False + return bool(re.search(r"^challenged_by:", text, re.MULTILINE)) + + +def _get_domain_weight(domain: str) -> float: + """Domain maturity weight: sparse domains get bonus, mature domains get discount.""" + count = DOMAIN_CLAIM_COUNTS.get(domain, 0) + if count < 20: + return 1.5 + elif count > 50: + return 0.8 + return 1.0 + + +def _init_domain_counts(): + """Count claims per domain.""" + global DOMAIN_CLAIM_COUNTS + domains_dir = CODEX_DIR / "domains" + if not domains_dir.exists(): + return + for domain_dir in domains_dir.iterdir(): + if domain_dir.is_dir(): + count = sum(1 for f in domain_dir.glob("*.md") if f.name != "_map.md") + DOMAIN_CLAIM_COUNTS[domain_dir.name] = count + + +def _normalize_contributor(submitted_by: str | None, agent: str | None) -> str: + """Normalize contributor handle — strip @, map agent self-directed to agent name.""" + raw = submitted_by or agent or "unknown" + raw = raw.strip() + if raw.startswith("@"): + raw = raw[1:] + if " (self-directed)" in raw: + raw = raw.replace(" (self-directed)", "") + if raw in ("pipeline", ""): + return agent.strip() if agent and agent.strip() not in ("pipeline", "") else "pipeline" + return raw + + +def classify_pr(pr: dict) -> str | None: + """Classify a merged PR as create/enrich/challenge or None (skip). + + Uses branch name pattern + commit_type as primary signal. + Falls back to file-level analysis for ambiguous cases. + """ + branch = pr.get("branch", "") + commit_type = pr.get("commit_type", "") + + if commit_type in ("pipeline", "entity"): + return None + + if "challenge" in branch.lower(): + return "challenge" + + if branch.startswith("extract/") or branch.startswith("research-"): + return "create" + + if "reweave" in branch.lower() or "enrich" in branch.lower(): + return "enrich" + + if commit_type == "research": + return "create" + + if commit_type == "reweave": + return "enrich" + + if commit_type == "fix": + return "enrich" + + if commit_type == "knowledge": + return "create" + + return "create" + + +def _find_claim_file(pr: dict) -> Path | None: + """Find the claim file for a merged PR.""" + domain = pr.get("domain") + branch = pr.get("branch", "") + + if not domain: + return None + + domain_dir = CODEX_DIR / "domains" / domain + if not domain_dir.exists(): + return None + + slug_part = branch.split("/")[-1] if "/" in branch else branch + slug_part = re.sub(r"-[a-f0-9]{4}$", "", slug_part) + + for claim_file in domain_dir.glob("*.md"): + if claim_file.name == "_map.md": + continue + claim_slug = _slugify(claim_file.stem) + if slug_part and slug_part in claim_slug: + return claim_file + + return None + + +def score_contribution(action_type: str, claim_file: Path | None, domain: str) -> tuple[float, dict]: + """Compute CI points for a single contribution. + + Returns (score, breakdown_dict) for transparency. + """ + weight = ACTION_WEIGHTS[action_type] + + confidence = _get_confidence(claim_file) if claim_file else "experimental" + base = CONFIDENCE_BASE.get(confidence, 1.0) + + if action_type == "challenge" and claim_file and _has_challenged_by(claim_file): + base = 3.0 if confidence in ("proven",) else 2.5 + + domain_weight = _get_domain_weight(domain) + + connectivity = 0.0 + if claim_file and _has_cross_domain_ref(claim_file): + connectivity += 0.2 + + create_multiplier = 1.0 + resolved_links = 0 + if action_type == "create" and claim_file: + resolved_links = _count_resolved_wiki_links(claim_file) + if resolved_links >= 3: + create_multiplier = 1.5 + + importance = base * domain_weight + connectivity + score = weight * importance * create_multiplier + + return score, { + "action": action_type, + "weight": weight, + "confidence": confidence, + "base": base, + "domain_weight": domain_weight, + "connectivity_bonus": connectivity, + "create_multiplier": create_multiplier, + "resolved_links": resolved_links, + "importance": importance, + "score": round(score, 4), + } + + +def collect_and_score(hours: int = 24) -> dict: + """Main scoring pipeline: collect merged PRs, classify, score.""" + _init_domain_counts() + _init_link_index() + + cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() + + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + """SELECT number, branch, domain, agent, commit_type, merged_at, + submitted_by, description + FROM prs + WHERE status = 'merged' AND merged_at >= ? + ORDER BY merged_at DESC""", + (cutoff,), + ).fetchall() + finally: + conn.close() + + contributions = [] + contributor_deltas: dict[str, float] = {} + domain_activity: dict[str, int] = {} + action_counts = {"create": 0, "enrich": 0, "challenge": 0} + + for row in rows: + pr = dict(row) + action_type = classify_pr(pr) + if action_type is None: + continue + + claim_file = _find_claim_file(pr) + domain = pr.get("domain", "unknown") + score, breakdown = score_contribution(action_type, claim_file, domain) + + contributor = _normalize_contributor( + pr.get("submitted_by"), pr.get("agent") + ) + contributor_deltas[contributor] = contributor_deltas.get(contributor, 0) + score + domain_activity[domain] = domain_activity.get(domain, 0) + 1 + action_counts[action_type] = action_counts.get(action_type, 0) + 1 + + contributions.append({ + "pr_number": pr["number"], + "contributor": contributor, + "agent": pr.get("agent", ""), + "domain": domain, + "action": action_type, + "score": round(score, 4), + "breakdown": breakdown, + "description": pr.get("description", ""), + "merged_at": pr.get("merged_at", ""), + }) + + total_claims = sum(DOMAIN_CLAIM_COUNTS.values()) + + return { + "period_hours": hours, + "generated_at": datetime.now(timezone.utc).isoformat(), + "date": datetime.now(LONDON_TZ).strftime("%B %d, %Y"), + "contributions": contributions, + "contributor_deltas": {k: round(v, 4) for k, v in sorted( + contributor_deltas.items(), key=lambda x: -x[1] + )}, + "domain_activity": dict(sorted(domain_activity.items(), key=lambda x: -x[1])), + "action_counts": action_counts, + "total_contributions": len(contributions), + "total_ci_awarded": round(sum(c["score"] for c in contributions), 4), + "kb_state": { + "total_claims": total_claims, + "domains": len(DOMAIN_CLAIM_COUNTS), + "domain_breakdown": dict(DOMAIN_CLAIM_COUNTS), + }, + } + + +def update_contributors(digest: dict): + """Write CI deltas to contributors table.""" + if not digest["contributor_deltas"]: + return + + conn = sqlite3.connect(str(DB_PATH)) + try: + for handle, delta in digest["contributor_deltas"].items(): + conn.execute( + """INSERT INTO contributors (handle, claims_merged, created_at, updated_at) + VALUES (?, 0, datetime('now'), datetime('now')) + ON CONFLICT(handle) DO UPDATE SET updated_at = datetime('now')""", + (handle,), + ) + conn.commit() + finally: + conn.close() + + log.info("Updated %d contributor records", len(digest["contributor_deltas"])) + + +def save_digest_json(digest: dict): + """Save latest digest as JSON for API consumption.""" + DIGEST_JSON_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(DIGEST_JSON_PATH, "w") as f: + json.dump(digest, f, indent=2, default=str) + log.info("Saved digest to %s", DIGEST_JSON_PATH) + + +def send_telegram(digest: dict): + """Post digest summary to Telegram.""" + token_file = TELEGRAM_TOKEN_FILE + if not token_file.exists(): + log.warning("Telegram token not found at %s", token_file) + return + + token = token_file.read_text().strip() + + lines = [f"📊 *Daily KB Digest — {digest['date']}*", ""] + + if digest["contributions"]: + lines.append(f"*NEW CONTRIBUTIONS* (last {digest['period_hours']}h):") + action_emoji = {"challenge": "⚔️", "create": "🆕", "enrich": "📚"} + + by_contributor: dict[str, list] = {} + for c in digest["contributions"]: + name = c["contributor"] + by_contributor.setdefault(name, []).append(c) + + for name, contribs in sorted(by_contributor.items(), key=lambda x: -sum(c["score"] for c in x[1])): + total_score = sum(c["score"] for c in contribs) + actions = {} + for c in contribs: + actions[c["action"]] = actions.get(c["action"], 0) + 1 + + action_summary = ", ".join( + f"{action_emoji.get(a, '•')} {n} {a}" for a, n in sorted(actions.items(), key=lambda x: -x[1]) + ) + lines.append(f" {name}: {action_summary} → +{total_score:.2f} CI") + + lines.append("") + + lines.append("*KB STATE:*") + kb = digest["kb_state"] + ac = digest["action_counts"] + lines.append( + f"Claims: {kb['total_claims']} (+{digest['total_contributions']}) | " + f"Domains: {kb['domains']}" + ) + lines.append( + f"Creates: {ac.get('create', 0)} | " + f"Enrichments: {ac.get('enrich', 0)} | " + f"Challenges: {ac.get('challenge', 0)}" + ) + + if digest["domain_activity"]: + top_domain = max(digest["domain_activity"], key=digest["domain_activity"].get) + lines.append(f"Most active: {top_domain} ({digest['domain_activity'][top_domain]} events)") + + if digest["contributor_deltas"]: + lines.append("") + lines.append("*LEADERBOARD CHANGE:*") + for i, (name, delta) in enumerate(digest["contributor_deltas"].items(), 1): + if i > 5: + break + lines.append(f" #{i} {name} +{delta:.2f} CI") + + text = "\n".join(lines) + + url = f"https://api.telegram.org/bot{token}/sendMessage" + payload = json.dumps({ + "chat_id": TELEGRAM_CHAT_ID, + "text": text, + "parse_mode": "Markdown", + }).encode("utf-8") + + req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + result = json.loads(resp.read()) + if result.get("ok"): + log.info("Telegram digest sent successfully") + else: + log.error("Telegram API error: %s", result) + except Exception as e: + log.error("Failed to send Telegram message: %s", e) + + +def main(): + hours = int(sys.argv[1]) if len(sys.argv) > 1 else 24 + dry_run = "--dry-run" in sys.argv + no_telegram = "--no-telegram" in sys.argv + + log.info("Running scoring digest for last %dh (dry_run=%s)", hours, dry_run) + + digest = collect_and_score(hours) + + log.info( + "Scored %d contributions: %d create, %d enrich, %d challenge → %.2f total CI", + digest["total_contributions"], + digest["action_counts"]["create"], + digest["action_counts"]["enrich"], + digest["action_counts"]["challenge"], + digest["total_ci_awarded"], + ) + + for name, delta in digest["contributor_deltas"].items(): + log.info(" %s: +%.4f CI", name, delta) + + if dry_run: + print(json.dumps(digest, indent=2, default=str)) + return + + save_digest_json(digest) + update_contributors(digest) + + if not no_telegram: + send_telegram(digest) + + log.info("Digest complete") + + +if __name__ == "__main__": + main()