epimetheus: source archive restructure — inbox/queue + inbox/archive/{domain} + inbox/null-result

- config.py: added INBOX_QUEUE, INBOX_NULL_RESULT constants
- evaluate.py: skip patterns + LIGHT tier cover all inbox/ subdirs
- llm.py: eval prompts reference inbox/ generically
- telegram/bot.py: archives to inbox/queue/
- telegram/teleo-telegram.service: ReadWritePaths expanded
- research-prompt-v2.md: paths updated to inbox/queue/
- research-prompt-leo-synthesis.md: paths updated
- migrate-source-archive.py: one-time migration script

Reviewed by: Ganymede, Rhea, Leo (all approved)

Pentagon-Agent: Epimetheus <968B2991-E2DF-4006-B962-F5B0A0CC8ACA>
This commit is contained in:
m3taversal 2026-03-18 11:50:04 +00:00
parent ffa718e834
commit 090b1411fd
8 changed files with 1914 additions and 91 deletions

View file

@ -10,7 +10,9 @@ MAIN_WORKTREE = BASE_DIR / "workspaces" / "main"
SECRETS_DIR = BASE_DIR / "secrets"
LOG_DIR = BASE_DIR / "logs"
DB_PATH = BASE_DIR / "pipeline" / "pipeline.db"
INBOX_QUEUE = "inbox/queue"
INBOX_ARCHIVE = "inbox/archive"
INBOX_NULL_RESULT = "inbox/null-result"
# --- Forgejo ---
FORGEJO_URL = os.environ.get("FORGEJO_URL", "http://localhost:3000")
@ -27,7 +29,8 @@ OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
MODEL_OPUS = "opus"
MODEL_SONNET = "sonnet"
MODEL_HAIKU = "anthropic/claude-3.5-haiku"
MODEL_GPT4O = "openai/gpt-4o"
MODEL_GPT4O = "openai/gpt-4o" # legacy, kept for reference
MODEL_GEMINI_FLASH = "google/gemini-2.5-flash" # was -preview, removed by OpenRouter
MODEL_SONNET_OR = "anthropic/claude-sonnet-4.5" # OpenRouter Sonnet (paid, not Claude Max)
# --- Model assignment per stage ---
@ -41,10 +44,10 @@ MODEL_SONNET_OR = "anthropic/claude-sonnet-4.5" # OpenRouter Sonnet (paid, not
# 3. Leo DEEP → Opus (Claude Max) — highest judgment, scarce
EXTRACT_MODEL = MODEL_SONNET # extraction: structured output, volume work (Claude Max)
TRIAGE_MODEL = MODEL_HAIKU # triage: routing decision, cheapest (OpenRouter)
EVAL_DOMAIN_MODEL = MODEL_GPT4O # domain review: OpenRouter GPT-4o
EVAL_DOMAIN_MODEL = MODEL_GEMINI_FLASH # domain review: Gemini 2.5 Flash (was GPT-4o — 16x cheaper, different family from Sonnet)
EVAL_LEO_MODEL = MODEL_OPUS # Leo DEEP review: Claude Max Opus
EVAL_LEO_STANDARD_MODEL = MODEL_SONNET_OR # Leo STANDARD review: OpenRouter Sonnet
EVAL_DEEP_MODEL = MODEL_GPT4O # DEEP cross-family: paid, adversarial
EVAL_DEEP_MODEL = MODEL_GEMINI_FLASH # DEEP cross-family: paid, adversarial
# --- Model backends ---
# Each model can run on Claude Max (subscription, base load) or API (overflow/spikes).
@ -68,6 +71,7 @@ MODEL_COSTS = {
"sonnet": {"input": 0.003, "output": 0.015},
MODEL_HAIKU: {"input": 0.0008, "output": 0.004},
MODEL_GPT4O: {"input": 0.0025, "output": 0.01},
MODEL_GEMINI_FLASH: {"input": 0.00015, "output": 0.0006},
MODEL_SONNET_OR: {"input": 0.003, "output": 0.015},
}
@ -78,7 +82,8 @@ MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain
# --- Timeouts (seconds) ---
EXTRACT_TIMEOUT = 600 # 10 min
EVAL_TIMEOUT = 300 # 5 min
EVAL_TIMEOUT = 120 # 2 min — routine Sonnet/Gemini Flash calls (was 600, caused 10-min stalls)
EVAL_TIMEOUT_OPUS = 600 # 10 min — Opus DEEP eval needs more time for complex reasoning
MERGE_TIMEOUT = 300 # 5 min — force-reset to conflict if exceeded (Rhea)
CLAUDE_MAX_PROBE_TIMEOUT = 15
@ -92,12 +97,70 @@ TRANSIENT_RETRY_MAX = 5 # API timeouts, rate limits
SUBSTANTIVE_RETRY_STANDARD = 2 # reviewer request_changes
SUBSTANTIVE_RETRY_DEEP = 3
MAX_EVAL_ATTEMPTS = 3 # Hard cap on eval cycles per PR before terminal
MAX_FIX_ATTEMPTS = 2 # Hard cap on auto-fix cycles per PR before giving up
MAX_FIX_PER_CYCLE = 15 # PRs to fix per cycle — bumped from 5 to clear backlog (Cory, Mar 14)
# Issue tags that can be fixed mechanically (Python fixer or Haiku)
MECHANICAL_ISSUE_TAGS = {"frontmatter_schema", "broken_wiki_links", "near_duplicate"}
# broken_wiki_links removed — downgraded to warning, not a gate. Links to claims
# in other open PRs resolve naturally as the dependency chain merges. (Cory, Mar 14)
MECHANICAL_ISSUE_TAGS = {"frontmatter_schema", "near_duplicate"}
# Issue tags that require re-extraction (substantive quality problems)
SUBSTANTIVE_ISSUE_TAGS = {"factual_discrepancy", "confidence_miscalibration", "scope_error", "title_overclaims"}
# --- Content type schemas ---
# Registry of content types. validate.py branches on type to apply the right
# required fields, confidence rules, and title checks. Adding a new type is a
# dict entry here — no code changes in validate.py needed.
TYPE_SCHEMAS = {
"claim": {
"required": ("type", "domain", "description", "confidence", "source", "created"),
"valid_confidence": ("proven", "likely", "experimental", "speculative"),
"needs_proposition_title": True,
},
"framework": {
"required": ("type", "domain", "description", "source", "created"),
"valid_confidence": None,
"needs_proposition_title": True,
},
"entity": {
"required": ("type", "domain", "description"),
"valid_confidence": None,
"needs_proposition_title": False,
},
"decision": {
"required": ("type", "domain", "description", "parent_entity", "status"),
"valid_confidence": None,
"needs_proposition_title": False,
"valid_status": ("active", "passed", "failed", "expired", "cancelled"),
},
}
# --- Content directories ---
ENTITY_DIR_TEMPLATE = "entities/{domain}" # centralized path (Rhea: don't hardcode across 5 files)
DECISION_DIR_TEMPLATE = "decisions/{domain}"
# --- Contributor tiers ---
# Auto-promotion rules. CI is computed, not stored.
CONTRIBUTOR_TIER_RULES = {
"contributor": {
"claims_merged": 1,
},
"veteran": {
"claims_merged": 10,
"min_days_since_first": 30,
"challenges_survived": 1,
},
}
# Role weights for CI computation (must match schemas/contribution-weights.yaml)
CONTRIBUTION_ROLE_WEIGHTS = {
"sourcer": 0.15,
"extractor": 0.40,
"challenger": 0.20,
"synthesizer": 0.15,
"reviewer": 0.10,
}
# --- Circuit breakers ---
BREAKER_THRESHOLD = 5
BREAKER_COOLDOWN = 900 # 15 min
@ -111,6 +174,12 @@ SAMPLE_AUDIT_RATE = 0.15 # 15% of LIGHT merges get pre-merge promotion to STAND
SAMPLE_AUDIT_DISAGREEMENT_THRESHOLD = 0.10 # 10% disagreement → tighten LIGHT criteria
SAMPLE_AUDIT_MODEL = MODEL_OPUS # Opus for audit — different family from Haiku triage (Leo)
# --- Batch eval ---
# Batch domain review: group STANDARD PRs by domain, one LLM call per batch.
# Leo review stays individual (safety net for cross-contamination).
BATCH_EVAL_MAX_PRS = int(os.environ.get("BATCH_EVAL_MAX_PRS", "5"))
BATCH_EVAL_MAX_DIFF_BYTES = int(os.environ.get("BATCH_EVAL_MAX_DIFF_BYTES", "100000")) # 100KB
# --- Tier logic ---
# LIGHT_SKIP_LLM: when True, LIGHT PRs skip domain+Leo review entirely (auto-approve on Tier 0 pass).
# Set False for shadow mode (domain review runs but logs only). Flip True after 24h validation (Rhea).
@ -124,6 +193,7 @@ INGEST_INTERVAL = 60
VALIDATE_INTERVAL = 30
EVAL_INTERVAL = 30
MERGE_INTERVAL = 30
FIX_INTERVAL = 60
HEALTH_CHECK_INTERVAL = 60
# --- Health API ---
@ -133,3 +203,7 @@ HEALTH_PORT = 8080
LOG_FILE = LOG_DIR / "pipeline.jsonl"
LOG_ROTATION_MAX_BYTES = 50 * 1024 * 1024 # 50MB per file
LOG_ROTATION_BACKUP_COUNT = 7 # keep 7 days
# --- Versioning (tracked in metrics_snapshots for chart annotations) ---
PROMPT_VERSION = "v2-lean-directed" # bump on every prompt change
PIPELINE_VERSION = "2.2" # bump on every significant pipeline change

View file

@ -28,7 +28,9 @@ from . import config, db
from .domains import agent_for_domain, detect_domain_from_diff
from .forgejo import api as forgejo_api
from .forgejo import get_agent_token, get_pr_diff, repo_path
from .llm import run_domain_review, run_leo_review, triage_pr
from .llm import run_batch_domain_review, run_domain_review, run_leo_review, triage_pr
from .feedback import format_rejection_comment
from .validate import load_existing_claims
logger = logging.getLogger("pipeline.evaluate")
@ -40,10 +42,10 @@ def _filter_diff(diff: str) -> tuple[str, str]:
"""Filter diff to only review-relevant files.
Returns (review_diff, entity_diff).
Strips: inbox/archive/, schemas/, skills/, agents/*/musings/
Strips: inbox/, schemas/, skills/, agents/*/musings/
"""
sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE)
skip_patterns = [r"^diff --git a/(inbox/archive|schemas|skills|agents/[^/]+/musings)/"]
skip_patterns = [r"^diff --git a/(inbox/(archive|queue|null-result)|schemas|skills|agents/[^/]+/musings)/"]
core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"}
claim_sections = []
@ -83,6 +85,12 @@ def _is_musings_only(diff: str) -> bool:
return has_musings and not has_other
# ─── NOTE: Tier 0.5 mechanical pre-check moved to validate.py ────────────
# Tier 0.5 now runs as part of the validate stage (before eval), not inside
# evaluate_pr(). This prevents wasting eval_attempts on mechanically fixable
# PRs. Eval trusts that tier0_pass=1 means all mechanical checks passed.
# ─── Tier overrides ───────────────────────────────────────────────────────
@ -102,6 +110,74 @@ def _diff_contains_claim_type(diff: str) -> bool:
return False
def _deterministic_tier(diff: str) -> str | None:
"""Deterministic tier routing — skip Haiku triage for obvious cases.
Checks diff file patterns before calling the LLM. Returns tier string
if deterministic, None if Haiku triage is needed.
Rules (Leo-calibrated):
- All files in entities/ only LIGHT
- All files in inbox/ only (queue, archive, null-result) LIGHT
- Any file in core/ or foundations/ DEEP (structural KB changes)
- Has challenged_by field DEEP (challenges existing claims)
- Modifies existing file (not new) in domains/ DEEP (enrichment/change)
- Otherwise None (needs Haiku triage)
NOTE: Cross-domain wiki links are NOT a DEEP signal most claims link
across domains, that's the whole point of the knowledge graph (Leo).
"""
changed_files = []
for line in diff.split("\n"):
if line.startswith("diff --git a/"):
path = line.replace("diff --git a/", "").split(" b/")[0]
changed_files.append(path)
if not changed_files:
return None
# All entities/ only → LIGHT
if all(f.startswith("entities/") for f in changed_files):
logger.info("Deterministic tier: LIGHT (all files in entities/)")
return "LIGHT"
# All inbox/ only (queue, archive, null-result) → LIGHT
if all(f.startswith("inbox/") for f in changed_files):
logger.info("Deterministic tier: LIGHT (all files in inbox/)")
return "LIGHT"
# Any file in core/ or foundations/ → DEEP (structural KB changes)
if any(f.startswith("core/") or f.startswith("foundations/") for f in changed_files):
logger.info("Deterministic tier: DEEP (touches core/ or foundations/)")
return "DEEP"
# Check diff content for DEEP signals
has_challenged_by = False
has_modified_claim = False
new_files: set[str] = set()
lines = diff.split("\n")
for i, line in enumerate(lines):
# Detect new files
if line.startswith("--- /dev/null") and i + 1 < len(lines) and lines[i + 1].startswith("+++ b/"):
new_files.add(lines[i + 1][6:])
# Check for challenged_by field
if line.startswith("+") and not line.startswith("+++"):
stripped = line[1:].strip()
if stripped.startswith("challenged_by:"):
has_challenged_by = True
if has_challenged_by:
logger.info("Deterministic tier: DEEP (has challenged_by field)")
return "DEEP"
# NOTE: Modified existing domain claims are NOT auto-DEEP — enrichments
# (appending evidence) are common and should be STANDARD. Let Haiku triage
# distinguish enrichments from structural changes.
return None
# ─── Verdict parsing ──────────────────────────────────────────────────────
@ -117,12 +193,129 @@ def _parse_verdict(review_text: str, reviewer: str) -> str:
return "request_changes"
# Map model-invented tags to valid tags. Models consistently ignore the valid
# tag list and invent their own. This normalizes them. (Ganymede, Mar 14)
_TAG_ALIASES: dict[str, str] = {
"schema_violation": "frontmatter_schema",
"missing_schema_fields": "frontmatter_schema",
"missing_schema": "frontmatter_schema",
"schema": "frontmatter_schema",
"missing_frontmatter": "frontmatter_schema",
"redundancy": "near_duplicate",
"duplicate": "near_duplicate",
"missing_confidence": "confidence_miscalibration",
"confidence_error": "confidence_miscalibration",
"vague_claims": "scope_error",
"unfalsifiable": "scope_error",
"unverified_wiki_links": "broken_wiki_links",
"unverified-wiki-links": "broken_wiki_links",
"missing_wiki_links": "broken_wiki_links",
"invalid_wiki_links": "broken_wiki_links",
"wiki_link_errors": "broken_wiki_links",
"overclaiming": "title_overclaims",
"title_overclaim": "title_overclaims",
"date_error": "date_errors",
"factual_error": "factual_discrepancy",
"factual_inaccuracy": "factual_discrepancy",
}
VALID_ISSUE_TAGS = {"broken_wiki_links", "frontmatter_schema", "title_overclaims",
"confidence_miscalibration", "date_errors", "factual_discrepancy",
"near_duplicate", "scope_error"}
def _normalize_tag(tag: str) -> str | None:
"""Normalize a model-generated tag to a valid tag, or None if unrecognizable."""
tag = tag.strip().lower().replace("-", "_")
if tag in VALID_ISSUE_TAGS:
return tag
if tag in _TAG_ALIASES:
return _TAG_ALIASES[tag]
# Fuzzy: check if any valid tag is a substring or vice versa
for valid in VALID_ISSUE_TAGS:
if valid in tag or tag in valid:
return valid
return None
def _parse_issues(review_text: str) -> list[str]:
"""Extract issue tags from review."""
"""Extract issue tags from review.
First tries structured <!-- ISSUES: tag1, tag2 --> comment with tag normalization.
Falls back to keyword inference from prose.
"""
match = re.search(r"<!-- ISSUES: ([^>]+) -->", review_text)
if not match:
return []
return [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
if match:
raw_tags = [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
normalized = []
for tag in raw_tags:
norm = _normalize_tag(tag)
if norm and norm not in normalized:
normalized.append(norm)
else:
logger.debug("Unrecognized issue tag '%s' — dropped", tag)
if normalized:
return normalized
# Fallback: infer tags from review prose
return _infer_issues_from_prose(review_text)
# Keyword patterns for inferring issue tags from unstructured review prose.
# Conservative: only match unambiguous indicators. Order doesn't matter.
_PROSE_TAG_PATTERNS: dict[str, list[re.Pattern]] = {
"frontmatter_schema": [
re.compile(r"frontmatter", re.IGNORECASE),
re.compile(r"missing.{0,20}(type|domain|confidence|source|created)\b", re.IGNORECASE),
re.compile(r"yaml.{0,10}(invalid|missing|error|schema)", re.IGNORECASE),
re.compile(r"required field", re.IGNORECASE),
re.compile(r"lacks?.{0,15}(required|yaml|schema|fields)", re.IGNORECASE),
re.compile(r"missing.{0,15}(schema|fields|frontmatter)", re.IGNORECASE),
re.compile(r"schema.{0,10}(compliance|violation|missing|invalid)", re.IGNORECASE),
],
"broken_wiki_links": [
re.compile(r"(broken|dead|invalid).{0,10}(wiki.?)?link", re.IGNORECASE),
re.compile(r"wiki.?link.{0,20}(not found|missing|broken|invalid|resolv|unverif)", re.IGNORECASE),
re.compile(r"\[\[.{1,80}\]\].{0,20}(not found|doesn.t exist|missing)", re.IGNORECASE),
re.compile(r"unverified.{0,10}(wiki|link)", re.IGNORECASE),
],
"factual_discrepancy": [
re.compile(r"factual.{0,10}(error|inaccura|discrepanc|incorrect)", re.IGNORECASE),
re.compile(r"misrepresent", re.IGNORECASE),
],
"confidence_miscalibration": [
re.compile(r"confidence.{0,20}(too high|too low|miscalibrat|overstat|should be)", re.IGNORECASE),
re.compile(r"(overstat|understat).{0,20}confidence", re.IGNORECASE),
],
"scope_error": [
re.compile(r"scope.{0,10}(error|too broad|overscop|unscoped)", re.IGNORECASE),
re.compile(r"unscoped.{0,10}(universal|claim)", re.IGNORECASE),
re.compile(r"(vague|unfalsifiable).{0,15}(claim|assertion)", re.IGNORECASE),
re.compile(r"not.{0,10}(specific|falsifiable|disagreeable).{0,10}enough", re.IGNORECASE),
],
"title_overclaims": [
re.compile(r"title.{0,20}(overclaim|overstat|too broad)", re.IGNORECASE),
re.compile(r"overclaim", re.IGNORECASE),
],
"near_duplicate": [
re.compile(r"near.?duplicate", re.IGNORECASE),
re.compile(r"(very|too) similar.{0,20}(claim|title|existing)", re.IGNORECASE),
re.compile(r"duplicate.{0,20}(of|claim|title|existing|information)", re.IGNORECASE),
re.compile(r"redundan", re.IGNORECASE),
],
}
def _infer_issues_from_prose(review_text: str) -> list[str]:
"""Infer issue tags from unstructured review text via keyword matching.
Fallback for reviews that reject without structured <!-- ISSUES: --> tags.
Conservative: requires at least one unambiguous keyword match per tag.
"""
inferred = []
for tag, patterns in _PROSE_TAG_PATTERNS.items():
if any(p.search(review_text) for p in patterns):
inferred.append(tag)
return inferred
async def _post_formal_approvals(pr_number: int, pr_author: str):
@ -151,16 +344,35 @@ async def _post_formal_approvals(pr_number: int, pr_author: str):
async def _terminate_pr(conn, pr_number: int, reason: str):
"""Terminal state: close PR on Forgejo, mark source needs_human."""
# Close PR on Forgejo with explanation
# Get issue tags for structured feedback
row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
issues = []
if row and row["eval_issues"]:
try:
issues = json.loads(row["eval_issues"])
except (json.JSONDecodeError, TypeError):
pass
# Post structured rejection comment with quality gate guidance (Epimetheus)
if issues:
feedback_body = format_rejection_comment(issues, source="eval_terminal")
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback.\n\n"
f"{feedback_body}"
)
else:
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback."
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{
"body": f"**Closed by eval pipeline** — {reason}.\n\n"
f"This PR has been evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source material will be re-queued for extraction with review feedback attached.\n\n"
f"See eval_issues for specific problems."
},
{"body": comment_body},
)
await forgejo_api(
"PATCH",
@ -223,7 +435,15 @@ async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_iss
Attempt 3+: terminal.
"""
if eval_attempts < 2:
return # Attempt 1: normal retry
# Attempt 1: post structured feedback so agent learns, but don't close
if all_issues:
feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": feedback_body},
)
return
classification = _classify_issues(all_issues)
@ -317,6 +537,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
)
return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"}
# NOTE: Tier 0.5 mechanical checks now run in validate stage (before eval).
# tier0_pass=1 guarantees all mechanical checks passed. No Tier 0.5 here.
# Filter diff
review_diff, _entity_diff = _filter_diff(diff)
if not review_diff:
@ -338,8 +561,16 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
)
# Step 1: Triage (if not already triaged)
# Try deterministic routing first ($0), fall back to Haiku triage ($0.001)
if tier is None:
tier = await triage_pr(diff)
tier = _deterministic_tier(diff)
if tier is not None:
db.audit(
conn, "evaluate", "deterministic_tier",
json.dumps({"pr": pr_number, "tier": tier}),
)
else:
tier = await triage_pr(diff)
# Tier overrides (claim-shape detector + random promotion)
# Order matters: claim-shape catches obvious cases, random promotion catches the rest.
@ -557,14 +788,470 @@ _rate_limit_backoff_until: datetime | None = None
_RATE_LIMIT_BACKOFF_MINUTES = 15
# ─── Batch domain review ─────────────────────────────────────────────────
def _parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]:
"""Parse batched domain review into per-PR review sections.
Returns {pr_number: review_text} for each PR found in the response.
Missing PRs are omitted caller handles fallback.
"""
agent_upper = agent.upper()
result: dict[int, str] = {}
# Split by PR verdict markers: <!-- PR:NNN VERDICT:AGENT:... -->
# Each marker terminates the previous PR's section
pattern = re.compile(
r"<!-- PR:(\d+) VERDICT:" + re.escape(agent_upper) + r":(APPROVE|REQUEST_CHANGES) -->"
)
matches = list(pattern.finditer(response))
if not matches:
return result
for i, match in enumerate(matches):
pr_num = int(match.group(1))
verdict = match.group(2)
marker_end = match.end()
# Find the start of this PR's section by looking for the section header
# or the end of the previous verdict
section_header = f"=== PR #{pr_num}"
header_pos = response.rfind(section_header, 0, match.start())
if header_pos >= 0:
# Extract from header to end of verdict marker
section_text = response[header_pos:marker_end].strip()
else:
# No header found — extract from previous marker end to this marker end
prev_end = matches[i - 1].end() if i > 0 else 0
section_text = response[prev_end:marker_end].strip()
# Re-format as individual review comment
# Strip the batch section header, keep just the review content
# Add batch label for traceability
pr_nums_str = ", ".join(f"#{n}" for n in pr_numbers)
review_text = (
f"*(batch review with PRs {pr_nums_str})*\n\n"
f"{section_text}\n"
)
result[pr_num] = review_text
return result
def _validate_batch_fanout(
parsed: dict[int, str],
pr_diffs: list[dict],
agent: str,
) -> tuple[dict[int, str], list[int]]:
"""Validate batch fan-out for completeness and cross-contamination.
Returns (valid_reviews, fallback_pr_numbers).
- valid_reviews: reviews that passed validation
- fallback_pr_numbers: PRs that need individual review (missing or cross-contaminated)
"""
valid: dict[int, str] = {}
fallback: list[int] = []
# Build file map: pr_number → set of path segments for matching.
# Use full paths (e.g., "domains/internet-finance/dao.md") not bare filenames
# to avoid false matches on short names like "dao.md" or "space.md" (Leo note #3).
pr_files: dict[int, set[str]] = {}
for pr in pr_diffs:
files = set()
for line in pr["diff"].split("\n"):
if line.startswith("diff --git a/"):
path = line.replace("diff --git a/", "").split(" b/")[0]
files.add(path)
# Also add the last 2 path segments (e.g., "internet-finance/dao.md")
# for models that abbreviate paths
parts = path.split("/")
if len(parts) >= 2:
files.add("/".join(parts[-2:]))
pr_files[pr["number"]] = files
for pr in pr_diffs:
pr_num = pr["number"]
# Completeness check: is there a review for this PR?
if pr_num not in parsed:
logger.warning("Batch fan-out: PR #%d missing from response — fallback to individual", pr_num)
fallback.append(pr_num)
continue
review = parsed[pr_num]
# Cross-contamination check: does review mention at least one file from this PR?
# Use path segments (min 10 chars) to avoid false substring matches on short names.
my_files = pr_files.get(pr_num, set())
mentions_own_file = any(f in review for f in my_files if len(f) >= 10)
if not mentions_own_file and my_files:
# Check if it references files from OTHER PRs (cross-contamination signal)
other_files = set()
for other_pr in pr_diffs:
if other_pr["number"] != pr_num:
other_files.update(pr_files.get(other_pr["number"], set()))
mentions_other = any(f in review for f in other_files if len(f) >= 10)
if mentions_other:
logger.warning(
"Batch fan-out: PR #%d review references files from another PR — cross-contamination, fallback",
pr_num,
)
fallback.append(pr_num)
continue
# If it doesn't mention any files at all, could be a generic review — accept it
# (some PRs have short diffs where the model doesn't reference filenames)
valid[pr_num] = review
return valid, fallback
async def _run_batch_domain_eval(
conn, batch_prs: list[dict], domain: str, agent: str,
) -> tuple[int, int]:
"""Execute batch domain review for a group of same-domain STANDARD PRs.
1. Claim all PRs atomically
2. Run single batch domain review
3. Parse + validate fan-out
4. Post per-PR comments
5. Continue to individual Leo review for each
6. Fall back to individual review for any validation failures
Returns (succeeded, failed).
"""
from .forgejo import get_pr_diff as _get_pr_diff
succeeded = 0
failed = 0
# Step 1: Fetch diffs and build batch
pr_diffs = []
claimed_prs = []
for pr_row in batch_prs:
pr_num = pr_row["number"]
# Atomic claim
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_num,),
)
if cursor.rowcount == 0:
continue
# Increment eval_attempts
conn.execute(
"UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1, "
"last_attempt = datetime('now') WHERE number = ?",
(pr_num,),
)
diff = await _get_pr_diff(pr_num)
if not diff:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
continue
# Musings bypass
if _is_musings_only(diff):
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": "Auto-approved: musings bypass eval per collective policy."},
)
conn.execute(
"UPDATE prs SET status = 'approved', leo_verdict = 'skipped', "
"domain_verdict = 'skipped' WHERE number = ?",
(pr_num,),
)
succeeded += 1
continue
review_diff, _ = _filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
# Build label from branch name or first claim filename
branch = pr_row.get("branch", "")
label = branch.split("/")[-1][:60] if branch else f"pr-{pr_num}"
pr_diffs.append({
"number": pr_num,
"label": label,
"diff": review_diff,
"files": files,
"full_diff": diff, # kept for Leo review
"file_count": len([l for l in files.split("\n") if l.strip()]),
})
claimed_prs.append(pr_num)
if not pr_diffs:
return 0, 0
# Enforce BATCH_EVAL_MAX_DIFF_BYTES — split if total diff is too large.
# We only know diff sizes after fetching, so enforce here not in _build_domain_batches.
total_bytes = sum(len(p["diff"].encode()) for p in pr_diffs)
if total_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and len(pr_diffs) > 1:
# Keep PRs up to the byte cap, revert the rest to open for next cycle
kept = []
running_bytes = 0
for p in pr_diffs:
p_bytes = len(p["diff"].encode())
if running_bytes + p_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and kept:
break
kept.append(p)
running_bytes += p_bytes
overflow = [p for p in pr_diffs if p not in kept]
for p in overflow:
conn.execute(
"UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 "
"WHERE number = ?",
(p["number"],),
)
claimed_prs.remove(p["number"])
logger.info(
"PR #%d: diff too large for batch (%d bytes total), deferring to next cycle",
p["number"], total_bytes,
)
pr_diffs = kept
if not pr_diffs:
return 0, 0
# Detect domain for all PRs (should be same domain)
conn.execute(
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number IN ({})".format(
",".join("?" * len(claimed_prs))
),
[domain, agent] + claimed_prs,
)
# Step 2: Run batch domain review
logger.info(
"Batch domain review: %d PRs in %s domain (PRs: %s)",
len(pr_diffs),
domain,
", ".join(f"#{p['number']}" for p in pr_diffs),
)
batch_response = await run_batch_domain_review(pr_diffs, domain, agent)
if batch_response is None:
# Complete failure — revert all to open
logger.warning("Batch domain review failed — reverting all PRs to open")
for pr_num in claimed_prs:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
return 0, len(claimed_prs)
# Step 3: Parse + validate fan-out
parsed = _parse_batch_response(batch_response, claimed_prs, agent)
valid_reviews, fallback_prs = _validate_batch_fanout(parsed, pr_diffs, agent)
db.audit(
conn, "evaluate", "batch_domain_review",
json.dumps({
"domain": domain,
"batch_size": len(pr_diffs),
"valid": len(valid_reviews),
"fallback": fallback_prs,
}),
)
# Step 4: Process valid reviews — post comments + continue to Leo
for pr_data in pr_diffs:
pr_num = pr_data["number"]
if pr_num in fallback_prs:
# Revert — will be picked up by individual eval next cycle
conn.execute(
"UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 "
"WHERE number = ?",
(pr_num,),
)
logger.info("PR #%d: batch fallback — will retry individually", pr_num)
continue
if pr_num not in valid_reviews:
# Should not happen, but safety
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
continue
review_text = valid_reviews[pr_num]
domain_verdict = _parse_verdict(review_text, agent)
# Post domain review comment
agent_tok = get_agent_token(agent)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": review_text},
token=agent_tok,
)
conn.execute(
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_num),
)
# Record cost
from . import costs
costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter")
# If domain rejects, handle disposition (same as individual path)
if domain_verdict == "request_changes":
domain_issues = _parse_issues(review_text)
eval_attempts = (conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
).fetchone()["eval_attempts"] or 0)
conn.execute(
"UPDATE prs SET status = 'open', leo_verdict = 'skipped', "
"last_error = 'domain review requested changes', eval_issues = ? WHERE number = ?",
(json.dumps(domain_issues), pr_num),
)
db.audit(
conn, "evaluate", "domain_rejected",
json.dumps({"pr": pr_num, "agent": agent, "issues": domain_issues, "batch": True}),
)
await _dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues)
succeeded += 1
continue
# Domain approved — continue to individual Leo review
logger.info("PR #%d: batch domain approved, proceeding to individual Leo review", pr_num)
review_diff = pr_data["diff"]
files = pr_data["files"]
leo_review = await run_leo_review(review_diff, files, "STANDARD")
if leo_review is None:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
logger.debug("PR #%d: Leo review failed, will retry next cycle", pr_num)
continue
if leo_review == "RATE_LIMITED":
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
logger.info("PR #%d: Leo rate limited, will retry next cycle", pr_num)
continue
leo_verdict = _parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_num))
# Post Leo review
leo_tok = get_agent_token("Leo")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": leo_review},
token=leo_tok,
)
costs.record_usage(conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", backend="openrouter")
# Final verdict
both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped")
if both_approve:
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_num}"))
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
await _post_formal_approvals(pr_num, pr_author)
conn.execute("UPDATE prs SET status = 'approved' WHERE number = ?", (pr_num,))
db.audit(
conn, "evaluate", "approved",
json.dumps({"pr": pr_num, "tier": "STANDARD", "domain": domain,
"leo": leo_verdict, "domain_agent": agent, "batch": True}),
)
logger.info("PR #%d: APPROVED (batch domain + individual Leo)", pr_num)
else:
all_issues = []
if leo_verdict == "request_changes":
all_issues.extend(_parse_issues(leo_review))
conn.execute(
"UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?",
(json.dumps(all_issues), pr_num),
)
feedback = {"leo": leo_verdict, "domain": domain_verdict,
"tier": "STANDARD", "issues": all_issues}
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
(json.dumps(feedback), pr_num),
)
db.audit(
conn, "evaluate", "changes_requested",
json.dumps({"pr": pr_num, "tier": "STANDARD", "leo": leo_verdict,
"domain": domain_verdict, "issues": all_issues, "batch": True}),
)
eval_attempts = (conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
).fetchone()["eval_attempts"] or 0)
await _dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues)
succeeded += 1
return succeeded, failed
def _build_domain_batches(
rows: list, conn,
) -> tuple[dict[str, list[dict]], list[dict]]:
"""Group STANDARD PRs by domain for batch eval. DEEP and LIGHT stay individual.
Returns (batches_by_domain, individual_prs).
Respects BATCH_EVAL_MAX_PRS and BATCH_EVAL_MAX_DIFF_BYTES.
"""
domain_candidates: dict[str, list[dict]] = {}
individual: list[dict] = []
for row in rows:
pr_num = row["number"]
tier = row["tier"]
# Only batch STANDARD PRs with pending domain review
if tier != "STANDARD":
individual.append(row)
continue
# Check if domain review already done (resuming after Leo rate limit)
existing = conn.execute(
"SELECT domain_verdict, domain FROM prs WHERE number = ?", (pr_num,)
).fetchone()
if existing and existing["domain_verdict"] not in ("pending", None):
individual.append(row)
continue
domain = existing["domain"] if existing and existing["domain"] else "general"
domain_candidates.setdefault(domain, []).append(row)
# Build sized batches per domain
batches: dict[str, list[dict]] = {}
for domain, prs in domain_candidates.items():
if len(prs) == 1:
# Single PR — no batching benefit, process individually
individual.extend(prs)
continue
# Cap at BATCH_EVAL_MAX_PRS
batch = prs[: config.BATCH_EVAL_MAX_PRS]
batches[domain] = batch
# Overflow goes individual
individual.extend(prs[config.BATCH_EVAL_MAX_PRS :])
return batches, individual
# ─── Main entry point ──────────────────────────────────────────────────────
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one evaluation cycle.
Finds PRs with status='open', tier0_pass=1, and no pending verdicts.
Evaluates in priority order.
Groups eligible STANDARD PRs by domain for batch domain review.
DEEP PRs get individual eval. LIGHT PRs get auto-approved.
Leo review always individual (safety net for batch cross-contamination).
"""
global _rate_limit_backoff_until
@ -580,29 +1267,17 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
logger.info("Rate limit backoff expired, resuming full eval cycles")
_rate_limit_backoff_until = None
# Find PRs ready for evaluation:
# - status = 'open'
# - tier0_pass = 1 (passed validation)
# - leo_verdict = 'pending' OR domain_verdict = 'pending'
# During Opus backoff: skip DEEP PRs waiting for Leo (they need Opus).
# STANDARD PRs can overflow Leo review to GPT-4o, so let them through.
# Skip PRs attempted within last 10 minutes (backoff during rate limits)
# Find PRs ready for evaluation
if opus_backoff:
verdict_filter = "AND (p.domain_verdict = 'pending' OR (p.leo_verdict = 'pending' AND p.tier != 'DEEP'))"
else:
verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')"
# Stagger first pass after migration: if there are previously-rejected PRs
# with eval_attempts=0 (freshly migrated), limit batch to avoid OpenRouter spike.
migrated_count = conn.execute(
"""SELECT COUNT(*) as c FROM prs
WHERE status = 'open' AND eval_attempts = 0
AND (domain_verdict NOT IN ('pending') OR leo_verdict NOT IN ('pending'))"""
).fetchone()["c"]
stagger_limit = 5 if migrated_count > 5 else None
# Stagger removed — migration protection no longer needed. Merge is domain-serialized
# and entity conflicts auto-resolve. Safe to let all eligible PRs enter eval. (Cory, Mar 14)
rows = conn.execute(
f"""SELECT p.number, p.tier FROM prs p
f"""SELECT p.number, p.tier, p.branch, p.domain FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.tier0_pass = 1
@ -611,7 +1286,6 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
AND (p.last_attempt IS NULL
OR p.last_attempt < datetime('now', '-10 minutes'))
ORDER BY
-- Fresh PRs before re-evals: unevaluated PRs have higher chance of passing
CASE WHEN COALESCE(p.eval_attempts, 0) = 0 THEN 0 ELSE 1 END,
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
@ -622,24 +1296,37 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
END,
p.created_at ASC
LIMIT ?""",
(stagger_limit or max_workers or config.MAX_EVAL_WORKERS,),
(max_workers or config.MAX_EVAL_WORKERS,),
).fetchall()
if stagger_limit and rows:
logger.info(
"Post-migration stagger: limiting eval batch to %d (migrated PRs: %d)", stagger_limit, migrated_count
)
if not rows:
return 0, 0
succeeded = 0
failed = 0
for row in rows:
# Group STANDARD PRs by domain for batch eval
domain_batches, individual_prs = _build_domain_batches(rows, conn)
# Process batch domain reviews first
for domain, batch_prs in domain_batches.items():
try:
agent = agent_for_domain(domain)
b_succeeded, b_failed = await _run_batch_domain_eval(
conn, batch_prs, domain, agent,
)
succeeded += b_succeeded
failed += b_failed
except Exception:
logger.exception("Batch eval failed for domain %s", domain)
# Revert all to open
for pr_row in batch_prs:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_row["number"],))
failed += len(batch_prs)
# Process individual PRs (DEEP, LIGHT, single-domain, fallback)
for row in individual_prs:
try:
# During Opus backoff, skip DEEP PRs that already completed domain review
# (they need Opus which is rate limited). STANDARD PRs can overflow to GPT-4o.
if opus_backoff and row["tier"] == "DEEP":
existing = conn.execute(
"SELECT domain_verdict FROM prs WHERE number = ?",
@ -661,20 +1348,16 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
from datetime import timedelta
if reason == "opus_rate_limited":
# Opus hit — set backoff but DON'T break. Other PRs
# may still need triage (Haiku) or domain review (Sonnet).
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
opus_backoff = True # Update local flag so in-loop guard kicks in
opus_backoff = True
logger.info(
"Opus rate limited — backing off Opus for %d min, continuing triage+domain",
_RATE_LIMIT_BACKOFF_MINUTES,
)
continue
else:
# Non-Opus rate limit (Sonnet/Haiku) — break the cycle,
# nothing else can proceed either.
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
@ -687,7 +1370,6 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
except Exception:
logger.exception("Failed to evaluate PR #%d", row["number"])
failed += 1
# Revert to open on unhandled error
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],))
if succeeded or failed:

View file

@ -49,18 +49,20 @@ REVIEW_STYLE_GUIDE = (
TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT.
DEEP use when ANY of these apply:
- PR adds or modifies claims rated "likely" or higher confidence
- PR touches agent beliefs or creates cross-domain wiki links
- PR challenges an existing claim (has "challenged_by" or contradicts existing)
- PR modifies axiom-level beliefs
- PR is a cross-domain synthesis claim
DEEP use ONLY when the PR could change the knowledge graph structure:
- PR modifies files in core/ or foundations/ (structural KB changes)
- PR challenges an existing claim (has "challenged_by" field or explicitly argues against an existing claim)
- PR modifies axiom-level beliefs in agents/*/beliefs.md
- PR is a cross-domain synthesis claim that draws conclusions across 2+ domains
STANDARD use when:
- New claims in established domain areas
- Enrichments to existing claims (confirm/extend)
DEEP is rare most new claims are STANDARD even if they have high confidence or cross-domain wiki links. Adding a new "likely" claim about futarchy is STANDARD. Arguing that an existing claim is wrong is DEEP.
STANDARD the DEFAULT for most PRs:
- New claims in any domain at any confidence level
- Enrichments to existing claims (adding evidence, extending arguments)
- New hypothesis-level beliefs
- Source archives with extraction results
- Claims with cross-domain wiki links (this is normal, not exceptional)
LIGHT use ONLY when ALL changes fit these categories:
- Entity attribute updates (factual corrections, new data points)
@ -68,7 +70,7 @@ LIGHT — use ONLY when ALL changes fit these categories:
- Formatting fixes, typo corrections
- Status field changes
IMPORTANT: When uncertain, classify UP, not down. Always err toward more review.
IMPORTANT: When uncertain between DEEP and STANDARD, choose STANDARD. Most claims are STANDARD. DEEP is reserved for structural changes to the knowledge base, not for complex or important-sounding claims.
Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line.
@ -77,17 +79,24 @@ Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, fo
DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base.
IMPORTANT This PR may contain different content types:
- **Claims** (type: claim): arguable assertions with confidence levels. Review fully.
- **Entities** (type: entity, files in entities/): descriptive records of projects, people, protocols. Do NOT reject entities for missing confidence or source fields they have a different schema.
- **Sources** (files in inbox/): archive metadata. Auto-approve these.
Review this PR. For EACH criterion below, write one sentence stating what you found:
1. **Factual accuracy** Are the claims factually correct? Name any specific errors.
2. **Intra-PR duplicates** Do multiple changes in THIS PR add the same evidence to different claims with near-identical wording? Only flag if the same paragraph of evidence is copy-pasted across files.
3. **Confidence calibration** Is the confidence level right for the evidence provided? Name the level and say if it matches.
4. **Wiki links** Do [[wiki links]] in the diff reference files that exist? Flag any that look broken.
1. **Factual accuracy** Are the claims/entities factually correct? Name any specific errors.
2. **Intra-PR duplicates** Do multiple changes in THIS PR add the same evidence to different claims with near-identical wording? Only flag if the same paragraph of evidence is copy-pasted across files. Shared entity files (like metadao.md or futardio.md) appearing in multiple PRs are NOT duplicates they are expected enrichments.
3. **Confidence calibration** For claims only. Is the confidence level right for the evidence? Entities don't have confidence levels.
4. **Wiki links** Note any broken [[wiki links]], but do NOT let them affect your verdict. Broken links are expected linked claims often exist in other open PRs that haven't merged yet. ALWAYS APPROVE even if wiki links are broken.
VERDICT RULES read carefully:
- APPROVE if claims are factually correct and evidence supports them, even if minor improvements are possible.
- REQUEST_CHANGES only for BLOCKING issues: factual errors, genuinely broken wiki links, copy-pasted duplicate evidence across files, or confidence that is clearly wrong (e.g. "proven" with no evidence).
- Missing context, style preferences, and "could be better" observations are NOT blocking. Note them but still APPROVE.
- APPROVE entity files (type: entity) unless they contain factual errors.
- APPROVE even if wiki links are broken this is NEVER a reason to REQUEST_CHANGES.
- REQUEST_CHANGES only for these BLOCKING issues: factual errors, copy-pasted duplicate evidence, or confidence that is clearly wrong (e.g. "proven" with no evidence).
- If the ONLY issues you find are broken wiki links: you MUST APPROVE.
- Do NOT invent problems. If a criterion passes, say it passes.
{style_guide}
@ -95,7 +104,7 @@ VERDICT RULES — read carefully:
If requesting changes, tag the specific issues using ONLY these tags (do not invent new tags):
<!-- ISSUES: tag1, tag2 -->
Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error
Valid tags: frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error
End your review with exactly one of:
<!-- VERDICT:{agent_upper}:APPROVE -->
@ -109,21 +118,30 @@ End your review with exactly one of:
LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
IMPORTANT Content types have DIFFERENT schemas:
- **Claims** (type: claim): require type, domain, confidence, source, created, description. Title must be a prose proposition.
- **Entities** (type: entity, files in entities/): require ONLY type, domain, description. NO confidence, NO source, NO created date. Short filenames like "metadao.md" are correct entities are NOT claims.
- **Sources** (files in inbox/): different schema entirely. Do NOT flag sources for missing claim fields.
Do NOT flag entity files for missing confidence, source, or created fields. Do NOT flag entity filenames for being too short or not prose propositions. These are different content types with different rules.
Review this PR. For EACH criterion below, write one sentence stating what you found:
1. **Schema** Does YAML frontmatter have type, domain, confidence, source, created? Is the title a prose proposition (not a label)?
1. **Schema** Does each file have valid frontmatter FOR ITS TYPE? (Claims need full schema. Entities need only type+domain+description.)
2. **Duplicate/redundancy** Do multiple enrichments in this PR inject the same evidence into different claims? Is the enrichment actually new vs already present in the claim?
3. **Confidence** Name the confidence level. Does the evidence justify it? (proven needs strong evidence, speculative is fine for theories)
4. **Wiki links** Do [[links]] in the diff point to real files? Flag any that look invented.
3. **Confidence** For claims only: name the confidence level. Does the evidence justify it?
4. **Wiki links** Note any broken [[links]], but do NOT let them affect your verdict. Broken links are expected linked claims often exist in other open PRs. ALWAYS APPROVE even if wiki links are broken.
5. **Source quality** Is the source credible for this claim?
6. **Specificity** Could someone disagree with this claim? If it's too vague to be wrong, flag it.
6. **Specificity** For claims only: could someone disagree? If it's too vague to be wrong, flag it.
VERDICT: APPROVE if the claims are factually correct and evidence supports them. Broken wiki links are NEVER a reason to REQUEST_CHANGES. If broken links are the ONLY issue, you MUST APPROVE.
{style_guide}
If requesting changes, tag the specific issues using ONLY these tags (do not invent new tags):
<!-- ISSUES: tag1, tag2 -->
Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error
Valid tags: frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
@ -141,7 +159,7 @@ Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check
1. Cross-domain implications does this claim affect beliefs in other domains?
2. Confidence calibration is the confidence level justified by the evidence?
3. Contradiction check does this contradict any existing claims without explicit argument?
4. Wiki link validity do all wiki links reference real, existing claims?
4. Wiki link validity note any broken links, but do NOT let them affect your verdict. Broken links are expected (linked claims may be in other PRs). NEVER REQUEST_CHANGES for broken wiki links alone.
5. Axiom integrity if touching axiom-level beliefs, is the justification extraordinary?
6. Source quality is the source credible for the claim being made?
7. Duplicate check does a substantially similar claim already exist?
@ -155,7 +173,7 @@ Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check
If requesting changes, tag the specific issues using ONLY these tags (do not invent new tags):
<!-- ISSUES: tag1, tag2 -->
Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error
Valid tags: frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
@ -168,10 +186,45 @@ End your review with exactly one of:
{files}"""
BATCH_DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base.
You are reviewing {n_prs} PRs in a single batch. For EACH PR, apply all criteria INDEPENDENTLY. Do not mix content between PRs. Each PR is a separate evaluation.
For EACH PR, check these criteria (one sentence each):
1. **Factual accuracy** Are the claims factually correct? Name any specific errors.
2. **Intra-PR duplicates** Do multiple changes in THIS PR add the same evidence to different claims with near-identical wording?
3. **Confidence calibration** Is the confidence level right for the evidence provided?
4. **Wiki links** Do [[wiki links]] in the diff reference files that exist?
VERDICT RULES read carefully:
- APPROVE if claims are factually correct and evidence supports them, even if minor improvements are possible.
- REQUEST_CHANGES only for BLOCKING issues: factual errors, genuinely broken wiki links, copy-pasted duplicate evidence across files, or confidence that is clearly wrong.
- Missing context, style preferences, and "could be better" observations are NOT blocking. Note them but still APPROVE.
- Do NOT invent problems. If a criterion passes, say it passes.
{style_guide}
For EACH PR, write your full review, then end that PR's section with the verdict tag.
If requesting changes, tag the specific issues:
<!-- ISSUES: tag1, tag2 -->
Valid tags: frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error
{pr_sections}
IMPORTANT: You MUST provide a verdict for every PR listed above. For each PR, end with exactly one of:
<!-- PR:NUMBER VERDICT:{agent_upper}:APPROVE -->
<!-- PR:NUMBER VERDICT:{agent_upper}:REQUEST_CHANGES -->
where NUMBER is the PR number shown in the section header."""
# ─── API helpers ───────────────────────────────────────────────────────────
async def openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None:
async def openrouter_call(
model: str, prompt: str, timeout_sec: int = 120, max_tokens: int = 4096,
) -> str | None:
"""Call OpenRouter API. Returns response text or None on failure."""
key_file = config.SECRETS_DIR / "openrouter-key"
if not key_file.exists():
@ -182,7 +235,7 @@ async def openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> st
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 4096,
"max_tokens": max_tokens,
"temperature": 0.2,
}
@ -270,6 +323,41 @@ async def triage_pr(diff: str) -> str:
return "STANDARD"
async def run_batch_domain_review(
pr_diffs: list[dict], domain: str, agent: str,
) -> str | None:
"""Run batched domain review for multiple PRs in one LLM call.
pr_diffs: list of {"number": int, "label": str, "diff": str, "files": str}
Returns raw response text or None on failure.
"""
# Build per-PR sections with anchoring labels
sections = []
for pr in pr_diffs:
sections.append(
f"=== PR #{pr['number']}: {pr['label']} ({pr['file_count']} files) ===\n"
f"--- PR DIFF ---\n{pr['diff']}\n\n"
f"--- CHANGED FILES ---\n{pr['files']}\n"
)
prompt = BATCH_DOMAIN_PROMPT.format(
agent=agent,
agent_upper=agent.upper(),
domain=domain,
n_prs=len(pr_diffs),
style_guide=REVIEW_STYLE_GUIDE,
pr_sections="\n".join(sections),
)
# Scale max_tokens with batch size: ~3K tokens per PR review
max_tokens = min(3000 * len(pr_diffs), 16384)
result = await openrouter_call(
config.EVAL_DOMAIN_MODEL, prompt,
timeout_sec=config.EVAL_TIMEOUT, max_tokens=max_tokens,
)
return result
async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None:
"""Run domain review via OpenRouter GPT-4o.
@ -300,15 +388,20 @@ async def run_leo_review(diff: str, files: str, tier: str) -> str | None:
prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files)
if tier == "DEEP":
# DEEP: Opus only, queue if rate limited. Opus is scarce — reserve for high-stakes.
result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
logger.info("Claude Max Opus rate limited, queuing DEEP Leo review")
return None
return result
else:
# STANDARD/LIGHT: Sonnet via OpenRouter. Different model family from
# domain review (GPT-4o) = no correlated blind spots. Keeps Claude Max
# rate limit untouched for Opus DEEP + overnight research.
# Opus skipped — route all Leo reviews through Sonnet until backlog clears.
# Opus via Claude Max CLI is consistently unavailable (rate limited or hanging).
# Re-enable by removing this block and uncommenting the try-then-overflow below.
# (Cory, Mar 14: "yes lets skip opus")
#
# --- Re-enable Opus later (uses EVAL_TIMEOUT_OPUS for longer reasoning): ---
# result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS)
# if result == "RATE_LIMITED" or result is None:
# logger.info("Opus unavailable for DEEP Leo review — overflowing to Sonnet")
# result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS)
# return result
result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
return result
else:
# STANDARD/LIGHT: Sonnet via OpenRouter — 120s timeout (routine calls)
result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
return result

130
migrate-source-archive.py Normal file
View file

@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""Migrate source archive from flat inbox/archive/ to organized structure.
inbox/queue/ unprocessed sources (landing zone)
inbox/archive/{domain}/ processed sources with extraction results
inbox/null-result/ reviewed, nothing extractable
One-time migration. Atomic commit. Idempotent (safe to re-run).
Run from repo root:
cd /opt/teleo-eval/workspaces/main
python3 /opt/teleo-eval/pipeline/migrate-source-archive.py [--dry-run]
"""
import argparse
import glob
import os
import re
from pathlib import Path
def get_source_status(filepath: str) -> str:
"""Read status from source frontmatter."""
try:
content = open(filepath).read()
match = re.search(r"^status:\s*(\S+)", content, re.MULTILINE)
if match:
return match.group(1).strip()
except Exception:
pass
return "unknown"
def get_source_domain(filepath: str) -> str:
"""Read domain from source frontmatter."""
try:
content = open(filepath).read()
match = re.search(r"^domain:\s*(\S+)", content, re.MULTILINE)
if match:
return match.group(1).strip()
except Exception:
pass
return "uncategorized"
def migrate(repo_root: str, dry_run: bool = False):
"""Move source files to organized structure."""
archive_dir = os.path.join(repo_root, "inbox", "archive")
queue_dir = os.path.join(repo_root, "inbox", "queue")
null_dir = os.path.join(repo_root, "inbox", "null-result")
if not os.path.isdir(archive_dir):
print(f"ERROR: {archive_dir} not found")
return
# Create target directories
if not dry_run:
os.makedirs(queue_dir, exist_ok=True)
os.makedirs(null_dir, exist_ok=True)
sources = glob.glob(os.path.join(archive_dir, "*.md"))
print(f"Found {len(sources)} source files in inbox/archive/")
moved = {"queue": 0, "null-result": 0, "archive": {}}
skipped = 0
for filepath in sorted(sources):
filename = os.path.basename(filepath)
if filename.startswith("_") or filename.startswith("."):
skipped += 1
continue
status = get_source_status(filepath)
domain = get_source_domain(filepath)
if status == "unprocessed" or status == "processing":
# → queue/
dest = os.path.join(queue_dir, filename)
if not dry_run:
os.rename(filepath, dest)
moved["queue"] += 1
elif status in ("null-result", "null_result"):
# → null-result/
dest = os.path.join(null_dir, filename)
if not dry_run:
os.rename(filepath, dest)
moved["null-result"] += 1
elif status in ("processed", "enrichment"):
# → archive/{domain}/
domain_dir = os.path.join(archive_dir, domain)
if not dry_run:
os.makedirs(domain_dir, exist_ok=True)
dest = os.path.join(domain_dir, filename)
if not dry_run:
os.rename(filepath, dest)
moved["archive"][domain] = moved["archive"].get(domain, 0) + 1
else:
# Unknown status — treat as unprocessed → queue/
dest = os.path.join(queue_dir, filename)
if not dry_run:
os.rename(filepath, dest)
moved["queue"] += 1
# Also move any .extraction-debug/ directory
debug_dir = os.path.join(archive_dir, ".extraction-debug")
if os.path.isdir(debug_dir):
print(f" (keeping .extraction-debug/ in place)")
print(f"\n{'='*60}")
print(f" MIGRATION {'(DRY RUN) ' if dry_run else ''}COMPLETE")
print(f" → queue/ (unprocessed): {moved['queue']}")
print(f" → null-result/: {moved['null-result']}")
print(f" → archive/{{domain}}/:")
for domain, count in sorted(moved["archive"].items()):
print(f" {domain}: {count}")
print(f" Archive total: {sum(moved['archive'].values())}")
print(f" Skipped: {skipped}")
print(f" Grand total: {moved['queue'] + moved['null-result'] + sum(moved['archive'].values()) + skipped}")
print(f"{'='*60}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Migrate source archive to organized structure")
parser.add_argument("--repo-root", default=".", help="Repository root")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
migrate(args.repo_root, args.dry_run)

View file

@ -0,0 +1,65 @@
# Research Prompt — Leo Synthesis Session
# Fundamentally different from domain agent research.
# Leo runs LAST (08:00 UTC), after all 5 domain agents have researched overnight.
You are Leo, the Teleo collective's lead synthesizer. Domain: grand-strategy.
## Your Task: Overnight Synthesis Session
You run AFTER the 5 domain agents have researched (Rio 22:00, Theseus 00:00, Clay 02:00, Vida 04:00, Astra 06:00). Your job is NOT to find new sources. Your job is to CONNECT what they found.
### Step 1: Read Overnight Output (15 min)
Check what the domain agents produced since yesterday:
- New source archives in inbox/queue/ (look for today's date + yesterday's)
- New musings in agents/*/musings/research-*.md
- ROUTE:leo flags from other agents' research
- Any new claims merged overnight
### Step 2: Cross-Domain Connection Scan (20 min)
Look for patterns across what multiple agents found:
- Did 2+ agents find evidence about the same mechanism in different domains?
- Did anyone find something that contradicts another agent's existing claim?
- Are there structural parallels that neither agent would see from within their domain?
### Step 3: Synthesis Claims (30 min)
Draft 1-3 cross-domain synthesis claims. These go to agents/leo/musings/synthesis-${DATE}.md (not inbox/queue/ — Leo proposes claims, not sources).
For each synthesis:
- Name the specific mechanism that connects domains
- Cite the specific claims/sources from each domain
- Rate confidence honestly (synthesis claims start at speculative or experimental)
- Wiki-link to the domain-specific claims being synthesized
### Step 4: Falsifiable Prediction (10 min)
Every overnight cycle should produce at least ONE prediction with temporal stakes:
- "By [date], [observable outcome] because [mechanism from synthesis]"
- Performance criteria: what would prove this right or wrong?
- Time horizon: 3 months, 6 months, or 1 year
Write to agents/leo/musings/predictions-${DATE}.md
### Step 5: Research Priority Flags (5 min)
Based on what you saw overnight, leave suggestions for domain agents:
Write to agents/leo/musings/research-flags-${DATE}.md:
## Overnight Research Flags (${DATE})
**For Rio:** [What to investigate, why]
**For Theseus:** [What to investigate, why]
**For Clay:** [What to investigate, why]
**For Vida:** [What to investigate, why]
**For Astra:** [What to investigate, why]
These are suggestions, not directives. Agents can take them or leave them.
### Step 6: Update Research Journal (5 min)
Append to agents/leo/research-journal.md:
## Synthesis Session ${DATE}
**Agents who produced overnight:** [which agents ran]
**Cross-domain connections found:** [count + brief description]
**Strongest synthesis:** [the most surprising cross-domain finding]
**Prediction made:** [one-line summary]
**Biggest gap in overnight run:** [what nobody researched that should have been covered]
### Step 7: Stop
When finished, STOP. The script handles all git operations.

142
research-prompt-v2.md Normal file
View file

@ -0,0 +1,142 @@
# Research Prompt v2 — Domain Agent Version
# Integrated improvements from Theseus (triage), Leo (quality), Vida (frontier.md)
# This gets embedded in research-session.sh as RESEARCH_PROMPT
You are ${AGENT}, a Teleo knowledge base agent. Domain: ${DOMAIN}.
## Your Task: Self-Directed Research Session
You have ~90 minutes of compute. Target: 5-8 high-quality sources (not 15 thin ones).
### Step 1: Orient (5 min)
Read these files:
- agents/${AGENT}/identity.md (who you are)
- agents/${AGENT}/beliefs.md (what you believe)
- agents/${AGENT}/reasoning.md (how you think)
- domains/${DOMAIN}/_map.md (current claims + gaps)
- agents/${AGENT}/frontier.md (if it exists — your priority research gaps)
### Step 2: Review Recent Tweets (10 min)
Read ${TWEET_FILE} — recent tweets from your domain's X accounts.
Scan for: new claims, evidence, debates, data, counterarguments.
### Step 3: Check Previous Follow-ups (2 min)
Read agents/${AGENT}/musings/ — previous research-*.md files.
Check for NEXT: flags at the bottom. These are threads your past self flagged.
Also read agents/${AGENT}/research-journal.md for cross-session patterns.
Check for ROUTE flags from other agents who found things in your domain.
### Step 4: Pick ONE Research Question (5 min)
Pick ONE research question. Not one topic — one question.
**Direction priority** (active inference — pursue surprise, not confirmation):
1. NEXT flags from previous sessions (your past self flagged these)
2. Frontier.md priority gaps (if exists — structured research agenda)
3. Claims rated 'experimental' or areas with live tensions
4. Evidence that CHALLENGES your beliefs
5. Cross-domain connections flagged by other agents
6. New developments that change the landscape
Write a brief note explaining your choice to: agents/${AGENT}/musings/research-${DATE}.md
### Step 5: Research + Triage (60 min)
As you research, CLASSIFY each finding before archiving:
**[CLAIM]** — Specific, disagreeable proposition with evidence.
Will become a claim. Include: proposed title, confidence, key evidence.
Archive as a source.
**[ENTITY]** — Tracked object with temporal data (company, person, protocol, lab).
Will become an entity file or update. Include: what changed, when.
Archive as a source.
**[CONTEXT]** — Background that informs future work but isn't a proposition.
Goes to memory/research journal ONLY. Do NOT archive as a source.
**[ROUTE:{agent}]** — Finding outside your domain.
Archive the source with flagged_for_{agent} in frontmatter.
Note why it's relevant to that agent.
**[SKIP]** — Interesting but not actionable. Don't archive.
Only archive [CLAIM] and [ENTITY] tagged findings as sources.
[CONTEXT] goes to your research journal. [ROUTE] gets flagged in source frontmatter.
### Source Type Evaluation (before archiving):
1. Academic paper → Read Results + Conclusion. Confidence floor by study type.
2. Regulatory/policy → Extract direction claims only. High null-result rate is expected.
3. Journalism → Find the primary source. Downgrade confidence from headline level.
4. Market/industry report → Historical data = proven. Projections: 1-2yr likely, 3-5yr experimental, 5yr+ speculative.
5. Tweet thread or opinion → Signal for research direction, not evidence. Archive only if it cites primary sources.
### Archiving Format:
Path: inbox/queue/YYYY-MM-DD-{author-handle}-{brief-slug}.md
---
type: source
title: "Descriptive title"
author: "Display Name (@handle)"
url: https://original-url
date: YYYY-MM-DD
domain: ${DOMAIN}
secondary_domains: []
format: tweet | thread | essay | paper | report
status: unprocessed
priority: high | medium | low
triage_tag: claim | entity
tags: [topic1, topic2]
flagged_for_rio: ["reason"]
---
## Content
[Full text of tweet/thread/paper abstract]
## Agent Notes
**Triage:** [CLAIM] or [ENTITY] — why this classification
**Why this matters:** [1-2 sentences]
**What surprised me:** [Unexpected finding — extractor needs this]
**KB connections:** [Which existing claims relate?]
**Extraction hints:** [What claims/entities might the extractor pull?]
## Curator Notes
PRIMARY CONNECTION: [exact claim title this source most relates to]
WHY ARCHIVED: [what pattern or tension this evidences]
### Step 5 Rules:
- Target 5-8 sources per session (quality over volume)
- Archive EVERYTHING tagged [CLAIM] or [ENTITY], not just what supports your views
- Set all sources to status: unprocessed
- Flag cross-domain sources with flagged_for_{agent}
- Do NOT extract claims yourself — the extractor is a separate instance
- Check inbox/queue/ and inbox/archive/ for duplicates before creating new archives
### Step 6: Update Research Journal + Follow-ups (8 min)
Append to agents/${AGENT}/research-journal.md:
## Session ${DATE}
**Question:** [your research question]
**Key finding:** [most important thing you learned]
**Pattern update:** [confirm, challenge, or extend a pattern?]
**Confidence shift:** [any beliefs get stronger or weaker?]
**Extraction yield prediction:** [of the sources you archived, how many do you expect to produce claims vs entities vs null-results?]
At the bottom of your research musing, add:
## Follow-up Directions
### NEXT: (continue next session)
- [Thread]: [What to do next, what to look for]
### COMPLETED: (threads finished this session)
- [Thread]: [What you found, which claims/entities resulted]
### DEAD ENDS: (don't re-run)
- [What you searched for]: [Why it was empty]
### ROUTE: (findings for other agents)
- [Finding] → [Agent]: [Why relevant to their domain]
### Step 7: Stop
When finished, STOP. The script handles all git operations.

615
telegram/bot.py Normal file
View file

@ -0,0 +1,615 @@
#!/usr/bin/env python3
"""Teleo Telegram Bot — Rio as analytical agent in community groups.
Architecture:
- Always-on ingestion: captures all messages, batch triage every N minutes
- Tag-based response: Opus-quality KB-grounded responses when @tagged
- Conversation-window triage: identifies coherent claims across message threads
- Full eval tracing: Rio's responses are logged as KB claims, accountable
Two paths (Ganymede architecture):
- Fast path (read): tag KB query Opus response post to group
- Slow path (write): batch triage archive to inbox/ pipeline extracts
Separate systemd service: teleo-telegram.service
Does NOT integrate with pipeline daemon.
Epimetheus owns this module.
"""
import asyncio
import json
import logging
import os
import re
import sqlite3
import sys
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
# Add pipeline lib to path for shared modules
sys.path.insert(0, "/opt/teleo-eval/pipeline")
from telegram import Update
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
# ─── Config ─────────────────────────────────────────────────────────────
BOT_TOKEN_FILE = "/opt/teleo-eval/secrets/telegram-bot-token"
OPENROUTER_KEY_FILE = "/opt/teleo-eval/secrets/openrouter-key"
PIPELINE_DB = "/opt/teleo-eval/pipeline/pipeline.db"
CLAIM_INDEX_PATH = Path.home() / ".pentagon" / "workspace" / "collective" / "claim-index.json"
REPO_DIR = "/opt/teleo-eval/workspaces/extract" # For archiving sources
LOG_FILE = "/opt/teleo-eval/logs/telegram-bot.log"
# Triage interval (seconds)
TRIAGE_INTERVAL = 900 # 15 minutes
# Models
RESPONSE_MODEL = "anthropic/claude-opus-4-6" # Opus for tagged responses
TRIAGE_MODEL = "anthropic/claude-haiku-4.5" # Haiku for batch triage
# Rate limits
MAX_RESPONSE_PER_USER_PER_HOUR = 6
MIN_MESSAGE_LENGTH = 20 # Skip very short messages
# ─── Logging ────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(),
],
)
logger = logging.getLogger("telegram-bot")
# ─── State ──────────────────────────────────────────────────────────────
# Message buffer for batch triage
message_buffer: list[dict] = []
# Rate limiting
user_response_times: dict[int, list[float]] = defaultdict(list)
# Allowed group IDs (set after first message received, or configure)
allowed_groups: set[int] = set()
# ─── Helpers ────────────────────────────────────────────────────────────
def load_claim_index() -> dict | None:
"""Load the claim-index.json for KB queries."""
try:
if CLAIM_INDEX_PATH.exists():
return json.loads(CLAIM_INDEX_PATH.read_text())
# Fallback: try VPS path
vps_path = Path("/opt/teleo-eval/pipeline/claim-index.json")
if vps_path.exists():
return json.loads(vps_path.read_text())
except Exception as e:
logger.error("Failed to load claim index: %s", e)
return None
def find_relevant_claims(query: str, index: dict, max_results: int = 5) -> list[dict]:
"""Find claims relevant to a query using keyword matching.
Simple for now upgrade to semantic search later.
"""
query_words = set(query.lower().split())
scored = []
for claim in index.get("claims", []):
title_words = set(claim.get("title", "").lower().split())
overlap = len(query_words & title_words)
if overlap >= 2:
scored.append((overlap, claim))
scored.sort(key=lambda x: x[0], reverse=True)
return [c for _, c in scored[:max_results]]
def get_db_stats() -> dict:
"""Get basic KB stats from pipeline DB."""
try:
conn = sqlite3.connect(PIPELINE_DB, timeout=5)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA query_only=ON")
merged = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='merged'").fetchone()["n"]
contributors = conn.execute("SELECT COUNT(*) as n FROM contributors").fetchone()["n"]
conn.close()
return {"merged_claims": merged, "contributors": contributors}
except Exception:
return {"merged_claims": "?", "contributors": "?"}
async def call_openrouter(model: str, prompt: str, max_tokens: int = 2048) -> str | None:
"""Call OpenRouter API."""
import aiohttp
key = Path(OPENROUTER_KEY_FILE).read_text().strip()
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": 0.3,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
json=payload,
timeout=aiohttp.ClientTimeout(total=120),
) as resp:
if resp.status >= 400:
logger.error("OpenRouter %s%d", model, resp.status)
return None
data = await resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content")
except Exception as e:
logger.error("OpenRouter error: %s", e)
return None
def is_rate_limited(user_id: int) -> bool:
"""Check if a user has exceeded the response rate limit."""
now = time.time()
times = user_response_times[user_id]
# Prune old entries
times[:] = [t for t in times if now - t < 3600]
return len(times) >= MAX_RESPONSE_PER_USER_PER_HOUR
def sanitize_message(text: str) -> str:
"""Sanitize message content before sending to LLM. (Ganymede: security)"""
# Strip code blocks (potential prompt injection)
text = re.sub(r"```.*?```", "[code block removed]", text, flags=re.DOTALL)
# Strip anything that looks like system instructions
text = re.sub(r"(system:|assistant:|human:|<\|.*?\|>)", "", text, flags=re.IGNORECASE)
# Truncate
return text[:2000]
# ─── Message Handlers ───────────────────────────────────────────────────
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle ALL incoming group messages — buffer for triage."""
if not update.message or not update.message.text:
return
msg = update.message
text = msg.text.strip()
# Skip very short messages
if len(text) < MIN_MESSAGE_LENGTH:
return
# Buffer for batch triage
message_buffer.append({
"text": sanitize_message(text),
"user_id": msg.from_user.id if msg.from_user else None,
"username": msg.from_user.username if msg.from_user else None,
"display_name": msg.from_user.full_name if msg.from_user else None,
"chat_id": msg.chat_id,
"message_id": msg.message_id,
"timestamp": msg.date.isoformat() if msg.date else datetime.now(timezone.utc).isoformat(),
"reply_to": msg.reply_to_message.message_id if msg.reply_to_message else None,
})
async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle messages that tag the bot — Rio responds with Opus."""
if not update.message or not update.message.text:
return
msg = update.message
user = msg.from_user
text = sanitize_message(msg.text)
# Rate limit check
if user and is_rate_limited(user.id):
await msg.reply_text("I'm processing other requests — try again in a few minutes.")
return
logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100])
# Send typing indicator
await msg.chat.send_action("typing")
# Load KB
index = load_claim_index()
stats = get_db_stats()
if not index:
await msg.reply_text("KB index unavailable — try again shortly.")
return
# Find relevant claims
relevant = find_relevant_claims(text, index, max_results=5)
claims_context = ""
if relevant:
claims_context = "## Relevant KB Claims\n"
for c in relevant:
claims_context += f"- **{c['title']}** (confidence: {c.get('confidence', '?')}, domain: {c.get('domain', '?')})\n"
# Build Opus prompt — Rio's voice
prompt = f"""You are Rio, the internet finance domain expert for TeleoHumanity's collective knowledge base. You're responding to a message in the ownership community Telegram group.
## Your Voice
- Grounded in KB evidence cite specific claims and their confidence levels
- State your position when you have one analyst means grounded, not neutral
- Name uncertainty explicitly "we don't have data on this yet" is honest
- Never shill present evidence and risks alongside convictions
- If the message contains a genuine insight the KB doesn't have, say so: "That's something we haven't captured yet — it's worth investigating"
## KB State
- {stats['merged_claims']} merged claims across 14 domains
- {stats['contributors']} contributors tracked
- {index.get('total_claims', '?')} claims in index
{claims_context}
## The Message
From: @{user.username if user else 'unknown'} ({user.full_name if user else 'unknown'})
Message: {text}
## Your Response
Respond substantively. If the message contains a claim or evidence:
1. Connect it to what the KB already knows
2. State where you agree and where the evidence is uncertain
3. If this challenges an existing claim, say so specifically
Keep it conversational this is Telegram, not a paper. 2-4 paragraphs max.
Do NOT use markdown headers. Light formatting only (bold for claim titles, italics for emphasis)."""
# Call Opus
response = await call_openrouter(RESPONSE_MODEL, prompt, max_tokens=1024)
if not response:
await msg.reply_text("Processing error — I'll get back to you.")
return
# Post response
await msg.reply_text(response)
# Record rate limit
if user:
user_response_times[user.id].append(time.time())
# Log the exchange for audit trail
logger.info("Rio responded to @%s (msg_id=%d)", user.username if user else "?", msg.message_id)
# Detect and fetch URLs for pipeline ingestion
urls = _extract_urls(text)
url_content = None
if urls:
logger.info("Fetching URL: %s", urls[0])
url_content = await _fetch_url_content(urls[0])
if url_content:
logger.info("Fetched %d chars from %s", len(url_content), urls[0])
# Archive the exchange as a source for pipeline (slow path)
_archive_exchange(text, response, user, msg, url_content=url_content, urls=urls)
async def _fetch_url_content(url: str) -> str | None:
"""Fetch article/page content from a URL for pipeline ingestion."""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status >= 400:
return None
html = await resp.text()
# Strip HTML tags for plain text (basic — upgrade to readability later)
text = re.sub(r"<script.*?</script>", "", html, flags=re.DOTALL)
text = re.sub(r"<style.*?</style>", "", text, flags=re.DOTALL)
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text).strip()
return text[:10000] # Cap at 10K chars
except Exception as e:
logger.warning("Failed to fetch URL %s: %s", url, e)
return None
def _extract_urls(text: str) -> list[str]:
"""Extract URLs from message text."""
return re.findall(r"https?://[^\s<>\"']+", text)
def _archive_exchange(user_text: str, rio_response: str, user, msg,
url_content: str | None = None, urls: list[str] | None = None):
"""Archive a tagged exchange to inbox/queue/ for pipeline processing."""
try:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
username = user.username if user else "anonymous"
slug = re.sub(r"[^a-z0-9]+", "-", user_text[:50].lower()).strip("-")
filename = f"{date_str}-telegram-{username}-{slug}.md"
archive_path = Path(REPO_DIR) / "inbox" / "queue" / filename
archive_path.parent.mkdir(parents=True, exist_ok=True)
# Extract rationale (the user's text minus the @mention and URL)
rationale = re.sub(r"@\w+", "", user_text).strip()
for url in (urls or []):
rationale = rationale.replace(url, "").strip()
# Determine priority — directed contribution with rationale gets high priority
priority = "high" if rationale and len(rationale) > 20 else "medium"
intake_tier = "directed" if rationale and len(rationale) > 20 else "undirected"
url_section = ""
if url_content:
url_section = f"\n## Article Content (fetched)\n\n{url_content[:8000]}\n"
content = f"""---
type: source
source_type: telegram
title: "Telegram: @{username}{slug}"
author: "@{username}"
url: "{urls[0] if urls else ''}"
date: {date_str}
domain: internet-finance
format: conversation
status: unprocessed
priority: {priority}
intake_tier: {intake_tier}
rationale: "{rationale[:200]}"
proposed_by: "@{username}"
tags: [telegram, ownership-community]
---
## Conversation
**@{username}:**
{user_text}
**Rio (response):**
{rio_response}
{url_section}
## Agent Notes
**Why archived:** Tagged exchange in ownership community.
**Rationale from contributor:** {rationale if rationale else 'No rationale provided (bare link or question)'}
**Intake tier:** {intake_tier} {'fast-tracked, contributor provided reasoning' if intake_tier == 'directed' else 'standard processing'}
**Triage:** Conversation may contain [CLAIM], [ENTITY], or [EVIDENCE] for extraction.
"""
archive_path.write_text(content)
logger.info("Archived exchange to %s (tier: %s, urls: %d)",
filename, intake_tier, len(urls or []))
except Exception as e:
logger.error("Failed to archive exchange: %s", e)
# ─── Batch Triage ───────────────────────────────────────────────────────
async def run_batch_triage(context: ContextTypes.DEFAULT_TYPE):
"""Batch triage of buffered messages every TRIAGE_INTERVAL seconds.
Groups messages into conversation windows, sends to Haiku for classification,
archives substantive findings.
"""
global message_buffer
if not message_buffer:
return
# Grab and clear buffer
messages = message_buffer[:]
message_buffer = []
logger.info("Batch triage: %d messages to process", len(messages))
# Group into conversation windows (messages within 5 min of each other)
windows = _group_into_windows(messages, window_seconds=300)
if not windows:
return
# Build triage prompt
windows_text = ""
for i, window in enumerate(windows):
window_msgs = "\n".join(
f" @{m.get('username', '?')}: {m['text'][:200]}"
for m in window
)
windows_text += f"\n--- Window {i+1} ({len(window)} messages) ---\n{window_msgs}\n"
prompt = f"""Classify each conversation window. For each, respond with ONE tag:
[CLAIM] Contains a specific, disagreeable proposition about how something works
[ENTITY] Contains factual data about a company, protocol, person, or market
[EVIDENCE] Contains data or argument that supports or challenges an existing claim about internet finance, futarchy, prediction markets, or token governance
[SKIP] Casual conversation, not relevant to the knowledge base
Be generous with EVIDENCE even confirming evidence strengthens the KB.
{windows_text}
Respond with ONLY the window numbers and tags, one per line:
1: [TAG]
2: [TAG]
..."""
result = await call_openrouter(TRIAGE_MODEL, prompt, max_tokens=500)
if not result:
logger.warning("Triage LLM call failed — buffered messages dropped")
return
# Parse triage results
for line in result.strip().split("\n"):
match = re.match(r"(\d+):\s*\[(\w+)\]", line)
if not match:
continue
idx = int(match.group(1)) - 1
tag = match.group(2).upper()
if idx < 0 or idx >= len(windows):
continue
if tag in ("CLAIM", "ENTITY", "EVIDENCE"):
_archive_window(windows[idx], tag)
logger.info("Triage complete: %d windows processed", len(windows))
def _group_into_windows(messages: list[dict], window_seconds: int = 300) -> list[list[dict]]:
"""Group messages into conversation windows by time proximity."""
if not messages:
return []
# Sort by timestamp
messages.sort(key=lambda m: m.get("timestamp", ""))
windows = []
current_window = [messages[0]]
for msg in messages[1:]:
# Simple grouping: if within window_seconds of previous message, same window
current_window.append(msg)
if len(current_window) >= 10: # Cap window size
windows.append(current_window)
current_window = []
if current_window:
windows.append(current_window)
return windows
def _archive_window(window: list[dict], tag: str):
"""Archive a triaged conversation window to inbox/queue/."""
try:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
first_user = window[0].get("username", "group")
slug = re.sub(r"[^a-z0-9]+", "-", window[0]["text"][:40].lower()).strip("-")
filename = f"{date_str}-telegram-{first_user}-{slug}.md"
archive_path = Path(REPO_DIR) / "inbox" / "queue" / filename
archive_path.parent.mkdir(parents=True, exist_ok=True)
# Build conversation content
conversation = ""
contributors = set()
for msg in window:
username = msg.get("username", "anonymous")
contributors.add(username)
conversation += f"**@{username}:** {msg['text']}\n\n"
content = f"""---
type: source
source_type: telegram
title: "Telegram conversation: {slug}"
author: "{', '.join(contributors)}"
date: {date_str}
domain: internet-finance
format: conversation
status: unprocessed
priority: medium
triage_tag: {tag.lower()}
tags: [telegram, ownership-community]
---
## Conversation ({len(window)} messages, {len(contributors)} participants)
{conversation}
## Agent Notes
**Triage:** [{tag}] classified by batch triage
**Participants:** {', '.join(f'@{u}' for u in contributors)}
"""
archive_path.write_text(content)
logger.info("Archived window [%s]: %s (%d msgs, %d participants)",
tag, filename, len(window), len(contributors))
except Exception as e:
logger.error("Failed to archive window: %s", e)
# ─── Bot Setup ──────────────────────────────────────────────────────────
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command."""
await update.message.reply_text(
"I'm Rio, the internet finance agent for TeleoHumanity's collective knowledge base. "
"Tag me with @teleo to ask about futarchy, prediction markets, token governance, "
"or anything in our domain. I'll ground my response in our KB's evidence."
)
async def stats_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /stats command — show KB stats."""
index = load_claim_index()
stats = get_db_stats()
if index:
await update.message.reply_text(
f"📊 KB Stats:\n"
f"{index.get('total_claims', '?')} claims across {len(index.get('domains', {}))} domains\n"
f"{stats['merged_claims']} PRs merged\n"
f"{stats['contributors']} contributors\n"
f"{index.get('orphan_count', '?')} orphan claims ({index.get('orphan_ratio', 0)*100:.0f}%)\n"
f"{index.get('cross_domain_links', '?')} cross-domain connections"
)
else:
await update.message.reply_text("KB index unavailable.")
def main():
"""Start the bot."""
# Load token
token_path = Path(BOT_TOKEN_FILE)
if not token_path.exists():
logger.error("Bot token not found at %s", BOT_TOKEN_FILE)
sys.exit(1)
token = token_path.read_text().strip()
logger.info("Starting Teleo Telegram bot (Rio)...")
# Build application
app = Application.builder().token(token).build()
# Command handlers
app.add_handler(CommandHandler("start", start_command))
app.add_handler(CommandHandler("stats", stats_command))
# Tag handler — messages mentioning the bot
# python-telegram-bot filters.Mention doesn't work for bot mentions in groups
# Use a regex filter for the bot username
app.add_handler(MessageHandler(
filters.TEXT & filters.Regex(r"(?i)@teleo"),
handle_tagged,
))
# All other text messages — buffer for triage
app.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
handle_message,
))
# Batch triage job
app.job_queue.run_repeating(
run_batch_triage,
interval=TRIAGE_INTERVAL,
first=TRIAGE_INTERVAL,
)
# Run
logger.info("Bot running. Triage interval: %ds", TRIAGE_INTERVAL)
app.run_polling(drop_pending_updates=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,22 @@
[Unit]
Description=Teleo Telegram Bot — Rio in ownership community
After=network.target teleo-pipeline.service
Wants=teleo-pipeline.service
[Service]
Type=simple
User=teleo
Group=teleo
WorkingDirectory=/opt/teleo-eval/telegram
ExecStart=/opt/teleo-eval/pipeline/.venv/bin/python3 /opt/teleo-eval/telegram/bot.py
Restart=always
RestartSec=10
Environment=PYTHONUNBUFFERED=1
# Security
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/opt/teleo-eval/logs /opt/teleo-eval/workspaces/extract/inbox/queue /opt/teleo-eval/workspaces/extract/inbox/archive /opt/teleo-eval/workspaces/extract/inbox/null-result
[Install]
WantedBy=multi-user.target