teleo-infrastructure/scripts/scoring_digest.py
m3taversal 4101048cd0 feat: wire action-type CI into contributor profiles
- contribution_scores table stores per-PR CI with action type
- Profile endpoint returns action_ci alongside role-based ci_score
- Branch-name attribution: contrib/NAME/ PRs attributed to NAME
- Cameron now shows 0.32 CI + BELIEF MOVER badge from challenge
- Handle variant matching (cameron-s1 → cameron) for cross-system lookup
- Full historical backfill: 985 scores across 9 contributors

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 11:29:01 +01:00

561 lines
18 KiB
Python

#!/usr/bin/env python3
"""Daily scoring digest — classify, score, and broadcast KB contributions.
Runs daily at 8:07 AM London via cron.
Queries pipeline.db for merged PRs in last 24h, classifies each as
CREATE/ENRICH/CHALLENGE, scores with importance multiplier and connectivity
bonus, updates contributors table, posts summary to Telegram.
Spec: Pentagon/sprints/contribution-scoring-algorithm.md
"""
import json
import logging
import os
import re
import sqlite3
import subprocess
import sys
import urllib.request
from datetime import datetime, timezone, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
log = logging.getLogger("scoring_digest")
# --- Configuration ---
BASE_DIR = Path(os.environ.get("PIPELINE_BASE", "/opt/teleo-eval"))
DB_PATH = BASE_DIR / "pipeline" / "pipeline.db"
CODEX_DIR = BASE_DIR / "workspaces" / "main"
TELEGRAM_TOKEN_FILE = BASE_DIR / "secrets" / "telegram-bot-token"
TELEGRAM_CHAT_ID = 2091295364
DIGEST_JSON_PATH = BASE_DIR / "logs" / "scoring-digest-latest.json"
LONDON_TZ = ZoneInfo("Europe/London")
# --- Action weights (Leo spec Apr 20) ---
ACTION_WEIGHTS = {
"challenge": 0.40,
"create": 0.35,
"enrich": 0.25,
}
# --- Confidence → base importance mapping ---
CONFIDENCE_BASE = {
"proven": 2.0,
"likely": 1.5,
"experimental": 1.0,
"speculative": 1.0,
"possible": 1.0,
"plausible": 1.0,
"medium": 1.5,
}
DOMAIN_CLAIM_COUNTS: dict[str, int] = {}
ENTITY_SLUGS: set[str] = set()
CLAIM_SLUGS: set[str] = set()
MAP_FILES: set[str] = set()
def _slugify(title: str) -> str:
s = title.lower().strip()
s = re.sub(r"[^\w\s-]", "", s)
s = re.sub(r"[\s_]+", "-", s)
return s.strip("-")
def _init_link_index():
"""Build indexes for wiki-link resolution."""
global ENTITY_SLUGS, CLAIM_SLUGS, MAP_FILES
entities_dir = CODEX_DIR / "entities"
if entities_dir.exists():
for f in entities_dir.glob("*.md"):
ENTITY_SLUGS.add(f.stem.lower())
for domain_dir in (CODEX_DIR / "domains").iterdir():
if not domain_dir.is_dir():
continue
for f in domain_dir.glob("*.md"):
CLAIM_SLUGS.add(f.stem.lower())
map_file = domain_dir / "_map.md"
if map_file.exists():
MAP_FILES.add("_map")
MAP_FILES.add(f"domains/{domain_dir.name}/_map")
for f in (CODEX_DIR / "foundations").glob("*.md") if (CODEX_DIR / "foundations").exists() else []:
CLAIM_SLUGS.add(f.stem.lower())
for f in (CODEX_DIR / "core").glob("*.md") if (CODEX_DIR / "core").exists() else []:
CLAIM_SLUGS.add(f.stem.lower())
for f in (CODEX_DIR / "decisions").glob("*.md") if (CODEX_DIR / "decisions").exists() else []:
CLAIM_SLUGS.add(f.stem.lower())
def _resolve_link(link_text: str) -> bool:
"""Check if a [[wiki-link]] resolves to a known entity, claim, or map."""
slug = _slugify(link_text)
return (
slug in ENTITY_SLUGS
or slug in CLAIM_SLUGS
or slug in MAP_FILES
or link_text.lower() in MAP_FILES
)
def _count_resolved_wiki_links(file_path: Path) -> int:
"""Count wiki-links in a claim file that resolve to real targets."""
if not file_path.exists():
return 0
try:
text = file_path.read_text(encoding="utf-8")
except Exception:
return 0
links = re.findall(r"\[\[([^\]]+)\]\]", text)
return sum(1 for link in links if _resolve_link(link))
def _get_confidence(file_path: Path) -> str:
"""Extract confidence field from claim frontmatter."""
if not file_path.exists():
return "experimental"
try:
text = file_path.read_text(encoding="utf-8")
except Exception:
return "experimental"
m = re.search(r"^confidence:\s*(\S+)", text, re.MULTILINE)
return m.group(1).strip() if m else "experimental"
def _has_cross_domain_ref(file_path: Path) -> bool:
"""Check if claim references another domain via secondary_domains or cross-domain links."""
if not file_path.exists():
return False
try:
text = file_path.read_text(encoding="utf-8")
except Exception:
return False
if re.search(r"^secondary_domains:\s*\[.+\]", text, re.MULTILINE):
return True
if re.search(r"^depends_on:", text, re.MULTILINE):
return True
return False
def _has_challenged_by(file_path: Path) -> bool:
"""Check if claim has challenged_by field."""
if not file_path.exists():
return False
try:
text = file_path.read_text(encoding="utf-8")
except Exception:
return False
return bool(re.search(r"^challenged_by:", text, re.MULTILINE))
def _get_domain_weight(domain: str) -> float:
"""Domain maturity weight: sparse domains get bonus, mature domains get discount."""
count = DOMAIN_CLAIM_COUNTS.get(domain, 0)
if count < 20:
return 1.5
elif count > 50:
return 0.8
return 1.0
def _init_domain_counts():
"""Count claims per domain."""
global DOMAIN_CLAIM_COUNTS
domains_dir = CODEX_DIR / "domains"
if not domains_dir.exists():
return
for domain_dir in domains_dir.iterdir():
if domain_dir.is_dir():
count = sum(1 for f in domain_dir.glob("*.md") if f.name != "_map.md")
DOMAIN_CLAIM_COUNTS[domain_dir.name] = count
def _normalize_contributor(submitted_by: str | None, agent: str | None, branch: str | None = None) -> str:
"""Normalize contributor handle — strip @, map agent self-directed to agent name.
For fork PRs (contrib/NAME/...), extract contributor from branch name.
"""
if branch and branch.startswith("contrib/"):
parts = branch.split("/")
if len(parts) >= 2 and parts[1]:
return parts[1].lower()
raw = submitted_by or agent or "unknown"
raw = raw.strip()
if raw.startswith("@"):
raw = raw[1:]
if " (self-directed)" in raw:
raw = raw.replace(" (self-directed)", "")
if raw in ("pipeline", ""):
return agent.strip() if agent and agent.strip() not in ("pipeline", "") else "pipeline"
return raw
def classify_pr(pr: dict) -> str | None:
"""Classify a merged PR as create/enrich/challenge or None (skip).
Uses branch name pattern + commit_type as primary signal.
Falls back to file-level analysis for ambiguous cases.
"""
branch = pr.get("branch", "")
commit_type = pr.get("commit_type", "")
if commit_type in ("pipeline", "entity"):
return None
if "challenge" in branch.lower():
return "challenge"
if branch.startswith("extract/") or branch.startswith("research-"):
return "create"
if "reweave" in branch.lower() or "enrich" in branch.lower():
return "enrich"
if commit_type == "research":
return "create"
if commit_type == "reweave":
return "enrich"
if commit_type == "fix":
return "enrich"
if commit_type == "knowledge":
return "create"
return "create"
def _find_claim_file(pr: dict) -> Path | None:
"""Find the claim file for a merged PR."""
domain = pr.get("domain")
branch = pr.get("branch", "")
if not domain:
return None
domain_dir = CODEX_DIR / "domains" / domain
if not domain_dir.exists():
return None
slug_part = branch.split("/")[-1] if "/" in branch else branch
slug_part = re.sub(r"-[a-f0-9]{4}$", "", slug_part)
for claim_file in domain_dir.glob("*.md"):
if claim_file.name == "_map.md":
continue
claim_slug = _slugify(claim_file.stem)
if slug_part and slug_part in claim_slug:
return claim_file
return None
def score_contribution(action_type: str, claim_file: Path | None, domain: str) -> tuple[float, dict]:
"""Compute CI points for a single contribution.
Returns (score, breakdown_dict) for transparency.
"""
weight = ACTION_WEIGHTS[action_type]
confidence = _get_confidence(claim_file) if claim_file else "experimental"
base = CONFIDENCE_BASE.get(confidence, 1.0)
if action_type == "challenge" and claim_file and _has_challenged_by(claim_file):
base = 3.0 if confidence in ("proven",) else 2.5
domain_weight = _get_domain_weight(domain)
connectivity = 0.0
if claim_file and _has_cross_domain_ref(claim_file):
connectivity += 0.2
create_multiplier = 1.0
resolved_links = 0
if action_type == "create" and claim_file:
resolved_links = _count_resolved_wiki_links(claim_file)
if resolved_links >= 3:
create_multiplier = 1.5
importance = base * domain_weight + connectivity
score = weight * importance * create_multiplier
return score, {
"action": action_type,
"weight": weight,
"confidence": confidence,
"base": base,
"domain_weight": domain_weight,
"connectivity_bonus": connectivity,
"create_multiplier": create_multiplier,
"resolved_links": resolved_links,
"importance": importance,
"score": round(score, 4),
}
def collect_and_score(hours: int = 24) -> dict:
"""Main scoring pipeline: collect merged PRs, classify, score."""
_init_domain_counts()
_init_link_index()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""SELECT number, branch, domain, agent, commit_type, merged_at,
submitted_by, description
FROM prs
WHERE status = 'merged' AND merged_at >= ?
ORDER BY merged_at DESC""",
(cutoff,),
).fetchall()
finally:
conn.close()
contributions = []
contributor_deltas: dict[str, float] = {}
domain_activity: dict[str, int] = {}
action_counts = {"create": 0, "enrich": 0, "challenge": 0}
for row in rows:
pr = dict(row)
action_type = classify_pr(pr)
if action_type is None:
continue
claim_file = _find_claim_file(pr)
domain = pr.get("domain", "unknown")
score, breakdown = score_contribution(action_type, claim_file, domain)
contributor = _normalize_contributor(
pr.get("submitted_by"), pr.get("agent"), pr.get("branch")
)
contributor_deltas[contributor] = contributor_deltas.get(contributor, 0) + score
domain_activity[domain] = domain_activity.get(domain, 0) + 1
action_counts[action_type] = action_counts.get(action_type, 0) + 1
contributions.append({
"pr_number": pr["number"],
"contributor": contributor,
"agent": pr.get("agent", ""),
"domain": domain,
"action": action_type,
"score": round(score, 4),
"breakdown": breakdown,
"description": pr.get("description", ""),
"merged_at": pr.get("merged_at", ""),
})
total_claims = sum(DOMAIN_CLAIM_COUNTS.values())
return {
"period_hours": hours,
"generated_at": datetime.now(timezone.utc).isoformat(),
"date": datetime.now(LONDON_TZ).strftime("%B %d, %Y"),
"contributions": contributions,
"contributor_deltas": {k: round(v, 4) for k, v in sorted(
contributor_deltas.items(), key=lambda x: -x[1]
)},
"domain_activity": dict(sorted(domain_activity.items(), key=lambda x: -x[1])),
"action_counts": action_counts,
"total_contributions": len(contributions),
"total_ci_awarded": round(sum(c["score"] for c in contributions), 4),
"kb_state": {
"total_claims": total_claims,
"domains": len(DOMAIN_CLAIM_COUNTS),
"domain_breakdown": dict(DOMAIN_CLAIM_COUNTS),
},
}
def update_contributors(digest: dict):
"""Write CI deltas to contributors table."""
if not digest["contributor_deltas"]:
return
conn = sqlite3.connect(str(DB_PATH))
try:
for handle, delta in digest["contributor_deltas"].items():
conn.execute(
"""INSERT INTO contributors (handle, claims_merged, created_at, updated_at)
VALUES (?, 0, datetime('now'), datetime('now'))
ON CONFLICT(handle) DO UPDATE SET updated_at = datetime('now')""",
(handle,),
)
conn.commit()
finally:
conn.close()
log.info("Updated %d contributor records", len(digest["contributor_deltas"]))
def save_scores_to_db(digest: dict):
"""Write individual contribution scores to contribution_scores table."""
conn = sqlite3.connect(str(DB_PATH))
try:
conn.execute("""CREATE TABLE IF NOT EXISTS contribution_scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pr_number INTEGER UNIQUE,
contributor TEXT NOT NULL,
event_type TEXT CHECK(event_type IN ('create','enrich','challenge')),
ci_earned REAL,
claim_slug TEXT,
domain TEXT,
scored_at TEXT NOT NULL
)""")
for c in digest["contributions"]:
slug = (c.get("description") or "")[:200] or c.get("breakdown", {}).get("action", "")
conn.execute(
"""INSERT INTO contribution_scores (pr_number, contributor, event_type, ci_earned, claim_slug, domain, scored_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(pr_number) DO UPDATE SET
contributor = excluded.contributor,
ci_earned = excluded.ci_earned,
event_type = excluded.event_type,
scored_at = excluded.scored_at""",
(c["pr_number"], c["contributor"], c["action"], c["score"], slug, c["domain"], c["merged_at"]),
)
conn.commit()
log.info("Wrote %d contribution scores to DB", len(digest["contributions"]))
finally:
conn.close()
def save_digest_json(digest: dict):
"""Save latest digest as JSON for API consumption."""
DIGEST_JSON_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(DIGEST_JSON_PATH, "w") as f:
json.dump(digest, f, indent=2, default=str)
log.info("Saved digest to %s", DIGEST_JSON_PATH)
def send_telegram(digest: dict):
"""Post digest summary to Telegram."""
token_file = TELEGRAM_TOKEN_FILE
if not token_file.exists():
log.warning("Telegram token not found at %s", token_file)
return
token = token_file.read_text().strip()
lines = [f"📊 *Daily KB Digest — {digest['date']}*", ""]
if digest["contributions"]:
lines.append(f"*NEW CONTRIBUTIONS* (last {digest['period_hours']}h):")
action_emoji = {"challenge": "⚔️", "create": "🆕", "enrich": "📚"}
by_contributor: dict[str, list] = {}
for c in digest["contributions"]:
name = c["contributor"]
by_contributor.setdefault(name, []).append(c)
for name, contribs in sorted(by_contributor.items(), key=lambda x: -sum(c["score"] for c in x[1])):
total_score = sum(c["score"] for c in contribs)
actions = {}
for c in contribs:
actions[c["action"]] = actions.get(c["action"], 0) + 1
action_summary = ", ".join(
f"{action_emoji.get(a, '')} {n} {a}" for a, n in sorted(actions.items(), key=lambda x: -x[1])
)
lines.append(f" {name}: {action_summary} → +{total_score:.2f} CI")
lines.append("")
lines.append("*KB STATE:*")
kb = digest["kb_state"]
ac = digest["action_counts"]
lines.append(
f"Claims: {kb['total_claims']} (+{digest['total_contributions']}) | "
f"Domains: {kb['domains']}"
)
lines.append(
f"Creates: {ac.get('create', 0)} | "
f"Enrichments: {ac.get('enrich', 0)} | "
f"Challenges: {ac.get('challenge', 0)}"
)
if digest["domain_activity"]:
top_domain = max(digest["domain_activity"], key=digest["domain_activity"].get)
lines.append(f"Most active: {top_domain} ({digest['domain_activity'][top_domain]} events)")
if digest["contributor_deltas"]:
lines.append("")
lines.append("*LEADERBOARD CHANGE:*")
for i, (name, delta) in enumerate(digest["contributor_deltas"].items(), 1):
if i > 5:
break
lines.append(f" #{i} {name} +{delta:.2f} CI")
text = "\n".join(lines)
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = json.dumps({
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "Markdown",
}).encode("utf-8")
req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
result = json.loads(resp.read())
if result.get("ok"):
log.info("Telegram digest sent successfully")
else:
log.error("Telegram API error: %s", result)
except Exception as e:
log.error("Failed to send Telegram message: %s", e)
def main():
hours = int(sys.argv[1]) if len(sys.argv) > 1 else 24
dry_run = "--dry-run" in sys.argv
no_telegram = "--no-telegram" in sys.argv
log.info("Running scoring digest for last %dh (dry_run=%s)", hours, dry_run)
digest = collect_and_score(hours)
log.info(
"Scored %d contributions: %d create, %d enrich, %d challenge → %.2f total CI",
digest["total_contributions"],
digest["action_counts"]["create"],
digest["action_counts"]["enrich"],
digest["action_counts"]["challenge"],
digest["total_ci_awarded"],
)
for name, delta in digest["contributor_deltas"].items():
log.info(" %s: +%.4f CI", name, delta)
if dry_run:
print(json.dumps(digest, indent=2, default=str))
return
save_digest_json(digest)
save_scores_to_db(digest)
update_contributors(digest)
if not no_telegram:
send_telegram(digest)
log.info("Digest complete")
if __name__ == "__main__":
main()