feat: add daily scoring digest with CREATE/ENRICH/CHALLENGE classification
Classifies merged PRs by action type, scores with importance multiplier (confidence, domain maturity, connectivity bonus), updates contributor records, posts summary to Telegram, serves via /api/digest/latest. Cron: 7:07 UTC daily (8:07 AM London). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
e043cf98dc
commit
5463ca0b56
1 changed files with 520 additions and 0 deletions
520
scripts/scoring_digest.py
Normal file
520
scripts/scoring_digest.py
Normal file
|
|
@ -0,0 +1,520 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Daily scoring digest — classify, score, and broadcast KB contributions.
|
||||
|
||||
Runs daily at 8:07 AM London via cron.
|
||||
Queries pipeline.db for merged PRs in last 24h, classifies each as
|
||||
CREATE/ENRICH/CHALLENGE, scores with importance multiplier and connectivity
|
||||
bonus, updates contributors table, posts summary to Telegram.
|
||||
|
||||
Spec: Pentagon/sprints/contribution-scoring-algorithm.md
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
)
|
||||
log = logging.getLogger("scoring_digest")
|
||||
|
||||
# --- Configuration ---
|
||||
BASE_DIR = Path(os.environ.get("PIPELINE_BASE", "/opt/teleo-eval"))
|
||||
DB_PATH = BASE_DIR / "pipeline" / "pipeline.db"
|
||||
CODEX_DIR = BASE_DIR / "workspaces" / "main"
|
||||
TELEGRAM_TOKEN_FILE = BASE_DIR / "secrets" / "telegram-bot-token"
|
||||
TELEGRAM_CHAT_ID = 2091295364
|
||||
DIGEST_JSON_PATH = BASE_DIR / "logs" / "scoring-digest-latest.json"
|
||||
LONDON_TZ = ZoneInfo("Europe/London")
|
||||
|
||||
# --- Action weights (Leo spec Apr 20) ---
|
||||
ACTION_WEIGHTS = {
|
||||
"challenge": 0.40,
|
||||
"create": 0.35,
|
||||
"enrich": 0.25,
|
||||
}
|
||||
|
||||
# --- Confidence → base importance mapping ---
|
||||
CONFIDENCE_BASE = {
|
||||
"proven": 2.0,
|
||||
"likely": 1.5,
|
||||
"experimental": 1.0,
|
||||
"speculative": 1.0,
|
||||
"possible": 1.0,
|
||||
"plausible": 1.0,
|
||||
"medium": 1.5,
|
||||
}
|
||||
|
||||
DOMAIN_CLAIM_COUNTS: dict[str, int] = {}
|
||||
ENTITY_SLUGS: set[str] = set()
|
||||
CLAIM_SLUGS: set[str] = set()
|
||||
MAP_FILES: set[str] = set()
|
||||
|
||||
|
||||
def _slugify(title: str) -> str:
|
||||
s = title.lower().strip()
|
||||
s = re.sub(r"[^\w\s-]", "", s)
|
||||
s = re.sub(r"[\s_]+", "-", s)
|
||||
return s.strip("-")
|
||||
|
||||
|
||||
def _init_link_index():
|
||||
"""Build indexes for wiki-link resolution."""
|
||||
global ENTITY_SLUGS, CLAIM_SLUGS, MAP_FILES
|
||||
|
||||
entities_dir = CODEX_DIR / "entities"
|
||||
if entities_dir.exists():
|
||||
for f in entities_dir.glob("*.md"):
|
||||
ENTITY_SLUGS.add(f.stem.lower())
|
||||
|
||||
for domain_dir in (CODEX_DIR / "domains").iterdir():
|
||||
if not domain_dir.is_dir():
|
||||
continue
|
||||
for f in domain_dir.glob("*.md"):
|
||||
CLAIM_SLUGS.add(f.stem.lower())
|
||||
map_file = domain_dir / "_map.md"
|
||||
if map_file.exists():
|
||||
MAP_FILES.add("_map")
|
||||
MAP_FILES.add(f"domains/{domain_dir.name}/_map")
|
||||
|
||||
for f in (CODEX_DIR / "foundations").glob("*.md") if (CODEX_DIR / "foundations").exists() else []:
|
||||
CLAIM_SLUGS.add(f.stem.lower())
|
||||
for f in (CODEX_DIR / "core").glob("*.md") if (CODEX_DIR / "core").exists() else []:
|
||||
CLAIM_SLUGS.add(f.stem.lower())
|
||||
for f in (CODEX_DIR / "decisions").glob("*.md") if (CODEX_DIR / "decisions").exists() else []:
|
||||
CLAIM_SLUGS.add(f.stem.lower())
|
||||
|
||||
|
||||
def _resolve_link(link_text: str) -> bool:
|
||||
"""Check if a [[wiki-link]] resolves to a known entity, claim, or map."""
|
||||
slug = _slugify(link_text)
|
||||
return (
|
||||
slug in ENTITY_SLUGS
|
||||
or slug in CLAIM_SLUGS
|
||||
or slug in MAP_FILES
|
||||
or link_text.lower() in MAP_FILES
|
||||
)
|
||||
|
||||
|
||||
def _count_resolved_wiki_links(file_path: Path) -> int:
|
||||
"""Count wiki-links in a claim file that resolve to real targets."""
|
||||
if not file_path.exists():
|
||||
return 0
|
||||
try:
|
||||
text = file_path.read_text(encoding="utf-8")
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
links = re.findall(r"\[\[([^\]]+)\]\]", text)
|
||||
return sum(1 for link in links if _resolve_link(link))
|
||||
|
||||
|
||||
def _get_confidence(file_path: Path) -> str:
|
||||
"""Extract confidence field from claim frontmatter."""
|
||||
if not file_path.exists():
|
||||
return "experimental"
|
||||
try:
|
||||
text = file_path.read_text(encoding="utf-8")
|
||||
except Exception:
|
||||
return "experimental"
|
||||
|
||||
m = re.search(r"^confidence:\s*(\S+)", text, re.MULTILINE)
|
||||
return m.group(1).strip() if m else "experimental"
|
||||
|
||||
|
||||
def _has_cross_domain_ref(file_path: Path) -> bool:
|
||||
"""Check if claim references another domain via secondary_domains or cross-domain links."""
|
||||
if not file_path.exists():
|
||||
return False
|
||||
try:
|
||||
text = file_path.read_text(encoding="utf-8")
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
if re.search(r"^secondary_domains:\s*\[.+\]", text, re.MULTILINE):
|
||||
return True
|
||||
if re.search(r"^depends_on:", text, re.MULTILINE):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _has_challenged_by(file_path: Path) -> bool:
|
||||
"""Check if claim has challenged_by field."""
|
||||
if not file_path.exists():
|
||||
return False
|
||||
try:
|
||||
text = file_path.read_text(encoding="utf-8")
|
||||
except Exception:
|
||||
return False
|
||||
return bool(re.search(r"^challenged_by:", text, re.MULTILINE))
|
||||
|
||||
|
||||
def _get_domain_weight(domain: str) -> float:
|
||||
"""Domain maturity weight: sparse domains get bonus, mature domains get discount."""
|
||||
count = DOMAIN_CLAIM_COUNTS.get(domain, 0)
|
||||
if count < 20:
|
||||
return 1.5
|
||||
elif count > 50:
|
||||
return 0.8
|
||||
return 1.0
|
||||
|
||||
|
||||
def _init_domain_counts():
|
||||
"""Count claims per domain."""
|
||||
global DOMAIN_CLAIM_COUNTS
|
||||
domains_dir = CODEX_DIR / "domains"
|
||||
if not domains_dir.exists():
|
||||
return
|
||||
for domain_dir in domains_dir.iterdir():
|
||||
if domain_dir.is_dir():
|
||||
count = sum(1 for f in domain_dir.glob("*.md") if f.name != "_map.md")
|
||||
DOMAIN_CLAIM_COUNTS[domain_dir.name] = count
|
||||
|
||||
|
||||
def _normalize_contributor(submitted_by: str | None, agent: str | None) -> str:
|
||||
"""Normalize contributor handle — strip @, map agent self-directed to agent name."""
|
||||
raw = submitted_by or agent or "unknown"
|
||||
raw = raw.strip()
|
||||
if raw.startswith("@"):
|
||||
raw = raw[1:]
|
||||
if " (self-directed)" in raw:
|
||||
raw = raw.replace(" (self-directed)", "")
|
||||
if raw in ("pipeline", ""):
|
||||
return agent.strip() if agent and agent.strip() not in ("pipeline", "") else "pipeline"
|
||||
return raw
|
||||
|
||||
|
||||
def classify_pr(pr: dict) -> str | None:
|
||||
"""Classify a merged PR as create/enrich/challenge or None (skip).
|
||||
|
||||
Uses branch name pattern + commit_type as primary signal.
|
||||
Falls back to file-level analysis for ambiguous cases.
|
||||
"""
|
||||
branch = pr.get("branch", "")
|
||||
commit_type = pr.get("commit_type", "")
|
||||
|
||||
if commit_type in ("pipeline", "entity"):
|
||||
return None
|
||||
|
||||
if "challenge" in branch.lower():
|
||||
return "challenge"
|
||||
|
||||
if branch.startswith("extract/") or branch.startswith("research-"):
|
||||
return "create"
|
||||
|
||||
if "reweave" in branch.lower() or "enrich" in branch.lower():
|
||||
return "enrich"
|
||||
|
||||
if commit_type == "research":
|
||||
return "create"
|
||||
|
||||
if commit_type == "reweave":
|
||||
return "enrich"
|
||||
|
||||
if commit_type == "fix":
|
||||
return "enrich"
|
||||
|
||||
if commit_type == "knowledge":
|
||||
return "create"
|
||||
|
||||
return "create"
|
||||
|
||||
|
||||
def _find_claim_file(pr: dict) -> Path | None:
|
||||
"""Find the claim file for a merged PR."""
|
||||
domain = pr.get("domain")
|
||||
branch = pr.get("branch", "")
|
||||
|
||||
if not domain:
|
||||
return None
|
||||
|
||||
domain_dir = CODEX_DIR / "domains" / domain
|
||||
if not domain_dir.exists():
|
||||
return None
|
||||
|
||||
slug_part = branch.split("/")[-1] if "/" in branch else branch
|
||||
slug_part = re.sub(r"-[a-f0-9]{4}$", "", slug_part)
|
||||
|
||||
for claim_file in domain_dir.glob("*.md"):
|
||||
if claim_file.name == "_map.md":
|
||||
continue
|
||||
claim_slug = _slugify(claim_file.stem)
|
||||
if slug_part and slug_part in claim_slug:
|
||||
return claim_file
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def score_contribution(action_type: str, claim_file: Path | None, domain: str) -> tuple[float, dict]:
|
||||
"""Compute CI points for a single contribution.
|
||||
|
||||
Returns (score, breakdown_dict) for transparency.
|
||||
"""
|
||||
weight = ACTION_WEIGHTS[action_type]
|
||||
|
||||
confidence = _get_confidence(claim_file) if claim_file else "experimental"
|
||||
base = CONFIDENCE_BASE.get(confidence, 1.0)
|
||||
|
||||
if action_type == "challenge" and claim_file and _has_challenged_by(claim_file):
|
||||
base = 3.0 if confidence in ("proven",) else 2.5
|
||||
|
||||
domain_weight = _get_domain_weight(domain)
|
||||
|
||||
connectivity = 0.0
|
||||
if claim_file and _has_cross_domain_ref(claim_file):
|
||||
connectivity += 0.2
|
||||
|
||||
create_multiplier = 1.0
|
||||
resolved_links = 0
|
||||
if action_type == "create" and claim_file:
|
||||
resolved_links = _count_resolved_wiki_links(claim_file)
|
||||
if resolved_links >= 3:
|
||||
create_multiplier = 1.5
|
||||
|
||||
importance = base * domain_weight + connectivity
|
||||
score = weight * importance * create_multiplier
|
||||
|
||||
return score, {
|
||||
"action": action_type,
|
||||
"weight": weight,
|
||||
"confidence": confidence,
|
||||
"base": base,
|
||||
"domain_weight": domain_weight,
|
||||
"connectivity_bonus": connectivity,
|
||||
"create_multiplier": create_multiplier,
|
||||
"resolved_links": resolved_links,
|
||||
"importance": importance,
|
||||
"score": round(score, 4),
|
||||
}
|
||||
|
||||
|
||||
def collect_and_score(hours: int = 24) -> dict:
|
||||
"""Main scoring pipeline: collect merged PRs, classify, score."""
|
||||
_init_domain_counts()
|
||||
_init_link_index()
|
||||
|
||||
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
||||
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"""SELECT number, branch, domain, agent, commit_type, merged_at,
|
||||
submitted_by, description
|
||||
FROM prs
|
||||
WHERE status = 'merged' AND merged_at >= ?
|
||||
ORDER BY merged_at DESC""",
|
||||
(cutoff,),
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
contributions = []
|
||||
contributor_deltas: dict[str, float] = {}
|
||||
domain_activity: dict[str, int] = {}
|
||||
action_counts = {"create": 0, "enrich": 0, "challenge": 0}
|
||||
|
||||
for row in rows:
|
||||
pr = dict(row)
|
||||
action_type = classify_pr(pr)
|
||||
if action_type is None:
|
||||
continue
|
||||
|
||||
claim_file = _find_claim_file(pr)
|
||||
domain = pr.get("domain", "unknown")
|
||||
score, breakdown = score_contribution(action_type, claim_file, domain)
|
||||
|
||||
contributor = _normalize_contributor(
|
||||
pr.get("submitted_by"), pr.get("agent")
|
||||
)
|
||||
contributor_deltas[contributor] = contributor_deltas.get(contributor, 0) + score
|
||||
domain_activity[domain] = domain_activity.get(domain, 0) + 1
|
||||
action_counts[action_type] = action_counts.get(action_type, 0) + 1
|
||||
|
||||
contributions.append({
|
||||
"pr_number": pr["number"],
|
||||
"contributor": contributor,
|
||||
"agent": pr.get("agent", ""),
|
||||
"domain": domain,
|
||||
"action": action_type,
|
||||
"score": round(score, 4),
|
||||
"breakdown": breakdown,
|
||||
"description": pr.get("description", ""),
|
||||
"merged_at": pr.get("merged_at", ""),
|
||||
})
|
||||
|
||||
total_claims = sum(DOMAIN_CLAIM_COUNTS.values())
|
||||
|
||||
return {
|
||||
"period_hours": hours,
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"date": datetime.now(LONDON_TZ).strftime("%B %d, %Y"),
|
||||
"contributions": contributions,
|
||||
"contributor_deltas": {k: round(v, 4) for k, v in sorted(
|
||||
contributor_deltas.items(), key=lambda x: -x[1]
|
||||
)},
|
||||
"domain_activity": dict(sorted(domain_activity.items(), key=lambda x: -x[1])),
|
||||
"action_counts": action_counts,
|
||||
"total_contributions": len(contributions),
|
||||
"total_ci_awarded": round(sum(c["score"] for c in contributions), 4),
|
||||
"kb_state": {
|
||||
"total_claims": total_claims,
|
||||
"domains": len(DOMAIN_CLAIM_COUNTS),
|
||||
"domain_breakdown": dict(DOMAIN_CLAIM_COUNTS),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def update_contributors(digest: dict):
|
||||
"""Write CI deltas to contributors table."""
|
||||
if not digest["contributor_deltas"]:
|
||||
return
|
||||
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
try:
|
||||
for handle, delta in digest["contributor_deltas"].items():
|
||||
conn.execute(
|
||||
"""INSERT INTO contributors (handle, claims_merged, created_at, updated_at)
|
||||
VALUES (?, 0, datetime('now'), datetime('now'))
|
||||
ON CONFLICT(handle) DO UPDATE SET updated_at = datetime('now')""",
|
||||
(handle,),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
log.info("Updated %d contributor records", len(digest["contributor_deltas"]))
|
||||
|
||||
|
||||
def save_digest_json(digest: dict):
|
||||
"""Save latest digest as JSON for API consumption."""
|
||||
DIGEST_JSON_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(DIGEST_JSON_PATH, "w") as f:
|
||||
json.dump(digest, f, indent=2, default=str)
|
||||
log.info("Saved digest to %s", DIGEST_JSON_PATH)
|
||||
|
||||
|
||||
def send_telegram(digest: dict):
|
||||
"""Post digest summary to Telegram."""
|
||||
token_file = TELEGRAM_TOKEN_FILE
|
||||
if not token_file.exists():
|
||||
log.warning("Telegram token not found at %s", token_file)
|
||||
return
|
||||
|
||||
token = token_file.read_text().strip()
|
||||
|
||||
lines = [f"📊 *Daily KB Digest — {digest['date']}*", ""]
|
||||
|
||||
if digest["contributions"]:
|
||||
lines.append(f"*NEW CONTRIBUTIONS* (last {digest['period_hours']}h):")
|
||||
action_emoji = {"challenge": "⚔️", "create": "🆕", "enrich": "📚"}
|
||||
|
||||
by_contributor: dict[str, list] = {}
|
||||
for c in digest["contributions"]:
|
||||
name = c["contributor"]
|
||||
by_contributor.setdefault(name, []).append(c)
|
||||
|
||||
for name, contribs in sorted(by_contributor.items(), key=lambda x: -sum(c["score"] for c in x[1])):
|
||||
total_score = sum(c["score"] for c in contribs)
|
||||
actions = {}
|
||||
for c in contribs:
|
||||
actions[c["action"]] = actions.get(c["action"], 0) + 1
|
||||
|
||||
action_summary = ", ".join(
|
||||
f"{action_emoji.get(a, '•')} {n} {a}" for a, n in sorted(actions.items(), key=lambda x: -x[1])
|
||||
)
|
||||
lines.append(f" {name}: {action_summary} → +{total_score:.2f} CI")
|
||||
|
||||
lines.append("")
|
||||
|
||||
lines.append("*KB STATE:*")
|
||||
kb = digest["kb_state"]
|
||||
ac = digest["action_counts"]
|
||||
lines.append(
|
||||
f"Claims: {kb['total_claims']} (+{digest['total_contributions']}) | "
|
||||
f"Domains: {kb['domains']}"
|
||||
)
|
||||
lines.append(
|
||||
f"Creates: {ac.get('create', 0)} | "
|
||||
f"Enrichments: {ac.get('enrich', 0)} | "
|
||||
f"Challenges: {ac.get('challenge', 0)}"
|
||||
)
|
||||
|
||||
if digest["domain_activity"]:
|
||||
top_domain = max(digest["domain_activity"], key=digest["domain_activity"].get)
|
||||
lines.append(f"Most active: {top_domain} ({digest['domain_activity'][top_domain]} events)")
|
||||
|
||||
if digest["contributor_deltas"]:
|
||||
lines.append("")
|
||||
lines.append("*LEADERBOARD CHANGE:*")
|
||||
for i, (name, delta) in enumerate(digest["contributor_deltas"].items(), 1):
|
||||
if i > 5:
|
||||
break
|
||||
lines.append(f" #{i} {name} +{delta:.2f} CI")
|
||||
|
||||
text = "\n".join(lines)
|
||||
|
||||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
payload = json.dumps({
|
||||
"chat_id": TELEGRAM_CHAT_ID,
|
||||
"text": text,
|
||||
"parse_mode": "Markdown",
|
||||
}).encode("utf-8")
|
||||
|
||||
req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
result = json.loads(resp.read())
|
||||
if result.get("ok"):
|
||||
log.info("Telegram digest sent successfully")
|
||||
else:
|
||||
log.error("Telegram API error: %s", result)
|
||||
except Exception as e:
|
||||
log.error("Failed to send Telegram message: %s", e)
|
||||
|
||||
|
||||
def main():
|
||||
hours = int(sys.argv[1]) if len(sys.argv) > 1 else 24
|
||||
dry_run = "--dry-run" in sys.argv
|
||||
no_telegram = "--no-telegram" in sys.argv
|
||||
|
||||
log.info("Running scoring digest for last %dh (dry_run=%s)", hours, dry_run)
|
||||
|
||||
digest = collect_and_score(hours)
|
||||
|
||||
log.info(
|
||||
"Scored %d contributions: %d create, %d enrich, %d challenge → %.2f total CI",
|
||||
digest["total_contributions"],
|
||||
digest["action_counts"]["create"],
|
||||
digest["action_counts"]["enrich"],
|
||||
digest["action_counts"]["challenge"],
|
||||
digest["total_ci_awarded"],
|
||||
)
|
||||
|
||||
for name, delta in digest["contributor_deltas"].items():
|
||||
log.info(" %s: +%.4f CI", name, delta)
|
||||
|
||||
if dry_run:
|
||||
print(json.dumps(digest, indent=2, default=str))
|
||||
return
|
||||
|
||||
save_digest_json(digest)
|
||||
update_contributors(digest)
|
||||
|
||||
if not no_telegram:
|
||||
send_telegram(digest)
|
||||
|
||||
log.info("Digest complete")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in a new issue