teleo-infrastructure/scripts/scoring_digest.py
m3taversal 5463ca0b56 feat: add daily scoring digest with CREATE/ENRICH/CHALLENGE classification
Classifies merged PRs by action type, scores with importance multiplier
(confidence, domain maturity, connectivity bonus), updates contributor
records, posts summary to Telegram, serves via /api/digest/latest.

Cron: 7:07 UTC daily (8:07 AM London).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 10:55:13 +01:00

520 lines
16 KiB
Python

#!/usr/bin/env python3
"""Daily scoring digest — classify, score, and broadcast KB contributions.
Runs daily at 8:07 AM London via cron.
Queries pipeline.db for merged PRs in last 24h, classifies each as
CREATE/ENRICH/CHALLENGE, scores with importance multiplier and connectivity
bonus, updates contributors table, posts summary to Telegram.
Spec: Pentagon/sprints/contribution-scoring-algorithm.md
"""
import json
import logging
import os
import re
import sqlite3
import subprocess
import sys
import urllib.request
from datetime import datetime, timezone, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
log = logging.getLogger("scoring_digest")
# --- Configuration ---
BASE_DIR = Path(os.environ.get("PIPELINE_BASE", "/opt/teleo-eval"))
DB_PATH = BASE_DIR / "pipeline" / "pipeline.db"
CODEX_DIR = BASE_DIR / "workspaces" / "main"
TELEGRAM_TOKEN_FILE = BASE_DIR / "secrets" / "telegram-bot-token"
TELEGRAM_CHAT_ID = 2091295364
DIGEST_JSON_PATH = BASE_DIR / "logs" / "scoring-digest-latest.json"
LONDON_TZ = ZoneInfo("Europe/London")
# --- Action weights (Leo spec Apr 20) ---
ACTION_WEIGHTS = {
"challenge": 0.40,
"create": 0.35,
"enrich": 0.25,
}
# --- Confidence → base importance mapping ---
CONFIDENCE_BASE = {
"proven": 2.0,
"likely": 1.5,
"experimental": 1.0,
"speculative": 1.0,
"possible": 1.0,
"plausible": 1.0,
"medium": 1.5,
}
DOMAIN_CLAIM_COUNTS: dict[str, int] = {}
ENTITY_SLUGS: set[str] = set()
CLAIM_SLUGS: set[str] = set()
MAP_FILES: set[str] = set()
def _slugify(title: str) -> str:
s = title.lower().strip()
s = re.sub(r"[^\w\s-]", "", s)
s = re.sub(r"[\s_]+", "-", s)
return s.strip("-")
def _init_link_index():
"""Build indexes for wiki-link resolution."""
global ENTITY_SLUGS, CLAIM_SLUGS, MAP_FILES
entities_dir = CODEX_DIR / "entities"
if entities_dir.exists():
for f in entities_dir.glob("*.md"):
ENTITY_SLUGS.add(f.stem.lower())
for domain_dir in (CODEX_DIR / "domains").iterdir():
if not domain_dir.is_dir():
continue
for f in domain_dir.glob("*.md"):
CLAIM_SLUGS.add(f.stem.lower())
map_file = domain_dir / "_map.md"
if map_file.exists():
MAP_FILES.add("_map")
MAP_FILES.add(f"domains/{domain_dir.name}/_map")
for f in (CODEX_DIR / "foundations").glob("*.md") if (CODEX_DIR / "foundations").exists() else []:
CLAIM_SLUGS.add(f.stem.lower())
for f in (CODEX_DIR / "core").glob("*.md") if (CODEX_DIR / "core").exists() else []:
CLAIM_SLUGS.add(f.stem.lower())
for f in (CODEX_DIR / "decisions").glob("*.md") if (CODEX_DIR / "decisions").exists() else []:
CLAIM_SLUGS.add(f.stem.lower())
def _resolve_link(link_text: str) -> bool:
"""Check if a [[wiki-link]] resolves to a known entity, claim, or map."""
slug = _slugify(link_text)
return (
slug in ENTITY_SLUGS
or slug in CLAIM_SLUGS
or slug in MAP_FILES
or link_text.lower() in MAP_FILES
)
def _count_resolved_wiki_links(file_path: Path) -> int:
"""Count wiki-links in a claim file that resolve to real targets."""
if not file_path.exists():
return 0
try:
text = file_path.read_text(encoding="utf-8")
except Exception:
return 0
links = re.findall(r"\[\[([^\]]+)\]\]", text)
return sum(1 for link in links if _resolve_link(link))
def _get_confidence(file_path: Path) -> str:
"""Extract confidence field from claim frontmatter."""
if not file_path.exists():
return "experimental"
try:
text = file_path.read_text(encoding="utf-8")
except Exception:
return "experimental"
m = re.search(r"^confidence:\s*(\S+)", text, re.MULTILINE)
return m.group(1).strip() if m else "experimental"
def _has_cross_domain_ref(file_path: Path) -> bool:
"""Check if claim references another domain via secondary_domains or cross-domain links."""
if not file_path.exists():
return False
try:
text = file_path.read_text(encoding="utf-8")
except Exception:
return False
if re.search(r"^secondary_domains:\s*\[.+\]", text, re.MULTILINE):
return True
if re.search(r"^depends_on:", text, re.MULTILINE):
return True
return False
def _has_challenged_by(file_path: Path) -> bool:
"""Check if claim has challenged_by field."""
if not file_path.exists():
return False
try:
text = file_path.read_text(encoding="utf-8")
except Exception:
return False
return bool(re.search(r"^challenged_by:", text, re.MULTILINE))
def _get_domain_weight(domain: str) -> float:
"""Domain maturity weight: sparse domains get bonus, mature domains get discount."""
count = DOMAIN_CLAIM_COUNTS.get(domain, 0)
if count < 20:
return 1.5
elif count > 50:
return 0.8
return 1.0
def _init_domain_counts():
"""Count claims per domain."""
global DOMAIN_CLAIM_COUNTS
domains_dir = CODEX_DIR / "domains"
if not domains_dir.exists():
return
for domain_dir in domains_dir.iterdir():
if domain_dir.is_dir():
count = sum(1 for f in domain_dir.glob("*.md") if f.name != "_map.md")
DOMAIN_CLAIM_COUNTS[domain_dir.name] = count
def _normalize_contributor(submitted_by: str | None, agent: str | None) -> str:
"""Normalize contributor handle — strip @, map agent self-directed to agent name."""
raw = submitted_by or agent or "unknown"
raw = raw.strip()
if raw.startswith("@"):
raw = raw[1:]
if " (self-directed)" in raw:
raw = raw.replace(" (self-directed)", "")
if raw in ("pipeline", ""):
return agent.strip() if agent and agent.strip() not in ("pipeline", "") else "pipeline"
return raw
def classify_pr(pr: dict) -> str | None:
"""Classify a merged PR as create/enrich/challenge or None (skip).
Uses branch name pattern + commit_type as primary signal.
Falls back to file-level analysis for ambiguous cases.
"""
branch = pr.get("branch", "")
commit_type = pr.get("commit_type", "")
if commit_type in ("pipeline", "entity"):
return None
if "challenge" in branch.lower():
return "challenge"
if branch.startswith("extract/") or branch.startswith("research-"):
return "create"
if "reweave" in branch.lower() or "enrich" in branch.lower():
return "enrich"
if commit_type == "research":
return "create"
if commit_type == "reweave":
return "enrich"
if commit_type == "fix":
return "enrich"
if commit_type == "knowledge":
return "create"
return "create"
def _find_claim_file(pr: dict) -> Path | None:
"""Find the claim file for a merged PR."""
domain = pr.get("domain")
branch = pr.get("branch", "")
if not domain:
return None
domain_dir = CODEX_DIR / "domains" / domain
if not domain_dir.exists():
return None
slug_part = branch.split("/")[-1] if "/" in branch else branch
slug_part = re.sub(r"-[a-f0-9]{4}$", "", slug_part)
for claim_file in domain_dir.glob("*.md"):
if claim_file.name == "_map.md":
continue
claim_slug = _slugify(claim_file.stem)
if slug_part and slug_part in claim_slug:
return claim_file
return None
def score_contribution(action_type: str, claim_file: Path | None, domain: str) -> tuple[float, dict]:
"""Compute CI points for a single contribution.
Returns (score, breakdown_dict) for transparency.
"""
weight = ACTION_WEIGHTS[action_type]
confidence = _get_confidence(claim_file) if claim_file else "experimental"
base = CONFIDENCE_BASE.get(confidence, 1.0)
if action_type == "challenge" and claim_file and _has_challenged_by(claim_file):
base = 3.0 if confidence in ("proven",) else 2.5
domain_weight = _get_domain_weight(domain)
connectivity = 0.0
if claim_file and _has_cross_domain_ref(claim_file):
connectivity += 0.2
create_multiplier = 1.0
resolved_links = 0
if action_type == "create" and claim_file:
resolved_links = _count_resolved_wiki_links(claim_file)
if resolved_links >= 3:
create_multiplier = 1.5
importance = base * domain_weight + connectivity
score = weight * importance * create_multiplier
return score, {
"action": action_type,
"weight": weight,
"confidence": confidence,
"base": base,
"domain_weight": domain_weight,
"connectivity_bonus": connectivity,
"create_multiplier": create_multiplier,
"resolved_links": resolved_links,
"importance": importance,
"score": round(score, 4),
}
def collect_and_score(hours: int = 24) -> dict:
"""Main scoring pipeline: collect merged PRs, classify, score."""
_init_domain_counts()
_init_link_index()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""SELECT number, branch, domain, agent, commit_type, merged_at,
submitted_by, description
FROM prs
WHERE status = 'merged' AND merged_at >= ?
ORDER BY merged_at DESC""",
(cutoff,),
).fetchall()
finally:
conn.close()
contributions = []
contributor_deltas: dict[str, float] = {}
domain_activity: dict[str, int] = {}
action_counts = {"create": 0, "enrich": 0, "challenge": 0}
for row in rows:
pr = dict(row)
action_type = classify_pr(pr)
if action_type is None:
continue
claim_file = _find_claim_file(pr)
domain = pr.get("domain", "unknown")
score, breakdown = score_contribution(action_type, claim_file, domain)
contributor = _normalize_contributor(
pr.get("submitted_by"), pr.get("agent")
)
contributor_deltas[contributor] = contributor_deltas.get(contributor, 0) + score
domain_activity[domain] = domain_activity.get(domain, 0) + 1
action_counts[action_type] = action_counts.get(action_type, 0) + 1
contributions.append({
"pr_number": pr["number"],
"contributor": contributor,
"agent": pr.get("agent", ""),
"domain": domain,
"action": action_type,
"score": round(score, 4),
"breakdown": breakdown,
"description": pr.get("description", ""),
"merged_at": pr.get("merged_at", ""),
})
total_claims = sum(DOMAIN_CLAIM_COUNTS.values())
return {
"period_hours": hours,
"generated_at": datetime.now(timezone.utc).isoformat(),
"date": datetime.now(LONDON_TZ).strftime("%B %d, %Y"),
"contributions": contributions,
"contributor_deltas": {k: round(v, 4) for k, v in sorted(
contributor_deltas.items(), key=lambda x: -x[1]
)},
"domain_activity": dict(sorted(domain_activity.items(), key=lambda x: -x[1])),
"action_counts": action_counts,
"total_contributions": len(contributions),
"total_ci_awarded": round(sum(c["score"] for c in contributions), 4),
"kb_state": {
"total_claims": total_claims,
"domains": len(DOMAIN_CLAIM_COUNTS),
"domain_breakdown": dict(DOMAIN_CLAIM_COUNTS),
},
}
def update_contributors(digest: dict):
"""Write CI deltas to contributors table."""
if not digest["contributor_deltas"]:
return
conn = sqlite3.connect(str(DB_PATH))
try:
for handle, delta in digest["contributor_deltas"].items():
conn.execute(
"""INSERT INTO contributors (handle, claims_merged, created_at, updated_at)
VALUES (?, 0, datetime('now'), datetime('now'))
ON CONFLICT(handle) DO UPDATE SET updated_at = datetime('now')""",
(handle,),
)
conn.commit()
finally:
conn.close()
log.info("Updated %d contributor records", len(digest["contributor_deltas"]))
def save_digest_json(digest: dict):
"""Save latest digest as JSON for API consumption."""
DIGEST_JSON_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(DIGEST_JSON_PATH, "w") as f:
json.dump(digest, f, indent=2, default=str)
log.info("Saved digest to %s", DIGEST_JSON_PATH)
def send_telegram(digest: dict):
"""Post digest summary to Telegram."""
token_file = TELEGRAM_TOKEN_FILE
if not token_file.exists():
log.warning("Telegram token not found at %s", token_file)
return
token = token_file.read_text().strip()
lines = [f"📊 *Daily KB Digest — {digest['date']}*", ""]
if digest["contributions"]:
lines.append(f"*NEW CONTRIBUTIONS* (last {digest['period_hours']}h):")
action_emoji = {"challenge": "⚔️", "create": "🆕", "enrich": "📚"}
by_contributor: dict[str, list] = {}
for c in digest["contributions"]:
name = c["contributor"]
by_contributor.setdefault(name, []).append(c)
for name, contribs in sorted(by_contributor.items(), key=lambda x: -sum(c["score"] for c in x[1])):
total_score = sum(c["score"] for c in contribs)
actions = {}
for c in contribs:
actions[c["action"]] = actions.get(c["action"], 0) + 1
action_summary = ", ".join(
f"{action_emoji.get(a, '')} {n} {a}" for a, n in sorted(actions.items(), key=lambda x: -x[1])
)
lines.append(f" {name}: {action_summary} → +{total_score:.2f} CI")
lines.append("")
lines.append("*KB STATE:*")
kb = digest["kb_state"]
ac = digest["action_counts"]
lines.append(
f"Claims: {kb['total_claims']} (+{digest['total_contributions']}) | "
f"Domains: {kb['domains']}"
)
lines.append(
f"Creates: {ac.get('create', 0)} | "
f"Enrichments: {ac.get('enrich', 0)} | "
f"Challenges: {ac.get('challenge', 0)}"
)
if digest["domain_activity"]:
top_domain = max(digest["domain_activity"], key=digest["domain_activity"].get)
lines.append(f"Most active: {top_domain} ({digest['domain_activity'][top_domain]} events)")
if digest["contributor_deltas"]:
lines.append("")
lines.append("*LEADERBOARD CHANGE:*")
for i, (name, delta) in enumerate(digest["contributor_deltas"].items(), 1):
if i > 5:
break
lines.append(f" #{i} {name} +{delta:.2f} CI")
text = "\n".join(lines)
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = json.dumps({
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "Markdown",
}).encode("utf-8")
req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
result = json.loads(resp.read())
if result.get("ok"):
log.info("Telegram digest sent successfully")
else:
log.error("Telegram API error: %s", result)
except Exception as e:
log.error("Failed to send Telegram message: %s", e)
def main():
hours = int(sys.argv[1]) if len(sys.argv) > 1 else 24
dry_run = "--dry-run" in sys.argv
no_telegram = "--no-telegram" in sys.argv
log.info("Running scoring digest for last %dh (dry_run=%s)", hours, dry_run)
digest = collect_and_score(hours)
log.info(
"Scored %d contributions: %d create, %d enrich, %d challenge → %.2f total CI",
digest["total_contributions"],
digest["action_counts"]["create"],
digest["action_counts"]["enrich"],
digest["action_counts"]["challenge"],
digest["total_ci_awarded"],
)
for name, delta in digest["contributor_deltas"].items():
log.info(" %s: +%.4f CI", name, delta)
if dry_run:
print(json.dumps(digest, indent=2, default=str))
return
save_digest_json(digest)
update_contributors(digest)
if not no_telegram:
send_telegram(digest)
log.info("Digest complete")
if __name__ == "__main__":
main()