From 090b1411fdead83e8d2f94faf53e64f03f04bd48 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Wed, 18 Mar 2026 11:50:04 +0000 Subject: [PATCH] =?UTF-8?q?epimetheus:=20source=20archive=20restructure=20?= =?UTF-8?q?=E2=80=94=20inbox/queue=20+=20inbox/archive/{domain}=20+=20inbo?= =?UTF-8?q?x/null-result?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - config.py: added INBOX_QUEUE, INBOX_NULL_RESULT constants - evaluate.py: skip patterns + LIGHT tier cover all inbox/ subdirs - llm.py: eval prompts reference inbox/ generically - telegram/bot.py: archives to inbox/queue/ - telegram/teleo-telegram.service: ReadWritePaths expanded - research-prompt-v2.md: paths updated to inbox/queue/ - research-prompt-leo-synthesis.md: paths updated - migrate-source-archive.py: one-time migration script Reviewed by: Ganymede, Rhea, Leo (all approved) Pentagon-Agent: Epimetheus <968B2991-E2DF-4006-B962-F5B0A0CC8ACA> --- lib/config.py | 84 +++- lib/evaluate.py | 782 +++++++++++++++++++++++++++++-- lib/llm.py | 165 +++++-- migrate-source-archive.py | 130 +++++ research-prompt-leo-synthesis.md | 65 +++ research-prompt-v2.md | 142 ++++++ telegram/bot.py | 615 ++++++++++++++++++++++++ telegram/teleo-telegram.service | 22 + 8 files changed, 1914 insertions(+), 91 deletions(-) create mode 100644 migrate-source-archive.py create mode 100644 research-prompt-leo-synthesis.md create mode 100644 research-prompt-v2.md create mode 100644 telegram/bot.py create mode 100644 telegram/teleo-telegram.service diff --git a/lib/config.py b/lib/config.py index fc95bdc..f621e1a 100644 --- a/lib/config.py +++ b/lib/config.py @@ -10,7 +10,9 @@ MAIN_WORKTREE = BASE_DIR / "workspaces" / "main" SECRETS_DIR = BASE_DIR / "secrets" LOG_DIR = BASE_DIR / "logs" DB_PATH = BASE_DIR / "pipeline" / "pipeline.db" +INBOX_QUEUE = "inbox/queue" INBOX_ARCHIVE = "inbox/archive" +INBOX_NULL_RESULT = "inbox/null-result" # --- Forgejo --- FORGEJO_URL = os.environ.get("FORGEJO_URL", "http://localhost:3000") @@ -27,7 +29,8 @@ OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" MODEL_OPUS = "opus" MODEL_SONNET = "sonnet" MODEL_HAIKU = "anthropic/claude-3.5-haiku" -MODEL_GPT4O = "openai/gpt-4o" +MODEL_GPT4O = "openai/gpt-4o" # legacy, kept for reference +MODEL_GEMINI_FLASH = "google/gemini-2.5-flash" # was -preview, removed by OpenRouter MODEL_SONNET_OR = "anthropic/claude-sonnet-4.5" # OpenRouter Sonnet (paid, not Claude Max) # --- Model assignment per stage --- @@ -41,10 +44,10 @@ MODEL_SONNET_OR = "anthropic/claude-sonnet-4.5" # OpenRouter Sonnet (paid, not # 3. Leo DEEP → Opus (Claude Max) — highest judgment, scarce EXTRACT_MODEL = MODEL_SONNET # extraction: structured output, volume work (Claude Max) TRIAGE_MODEL = MODEL_HAIKU # triage: routing decision, cheapest (OpenRouter) -EVAL_DOMAIN_MODEL = MODEL_GPT4O # domain review: OpenRouter GPT-4o +EVAL_DOMAIN_MODEL = MODEL_GEMINI_FLASH # domain review: Gemini 2.5 Flash (was GPT-4o — 16x cheaper, different family from Sonnet) EVAL_LEO_MODEL = MODEL_OPUS # Leo DEEP review: Claude Max Opus EVAL_LEO_STANDARD_MODEL = MODEL_SONNET_OR # Leo STANDARD review: OpenRouter Sonnet -EVAL_DEEP_MODEL = MODEL_GPT4O # DEEP cross-family: paid, adversarial +EVAL_DEEP_MODEL = MODEL_GEMINI_FLASH # DEEP cross-family: paid, adversarial # --- Model backends --- # Each model can run on Claude Max (subscription, base load) or API (overflow/spikes). @@ -68,6 +71,7 @@ MODEL_COSTS = { "sonnet": {"input": 0.003, "output": 0.015}, MODEL_HAIKU: {"input": 0.0008, "output": 0.004}, MODEL_GPT4O: {"input": 0.0025, "output": 0.01}, + MODEL_GEMINI_FLASH: {"input": 0.00015, "output": 0.0006}, MODEL_SONNET_OR: {"input": 0.003, "output": 0.015}, } @@ -78,7 +82,8 @@ MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain # --- Timeouts (seconds) --- EXTRACT_TIMEOUT = 600 # 10 min -EVAL_TIMEOUT = 300 # 5 min +EVAL_TIMEOUT = 120 # 2 min — routine Sonnet/Gemini Flash calls (was 600, caused 10-min stalls) +EVAL_TIMEOUT_OPUS = 600 # 10 min — Opus DEEP eval needs more time for complex reasoning MERGE_TIMEOUT = 300 # 5 min — force-reset to conflict if exceeded (Rhea) CLAUDE_MAX_PROBE_TIMEOUT = 15 @@ -92,12 +97,70 @@ TRANSIENT_RETRY_MAX = 5 # API timeouts, rate limits SUBSTANTIVE_RETRY_STANDARD = 2 # reviewer request_changes SUBSTANTIVE_RETRY_DEEP = 3 MAX_EVAL_ATTEMPTS = 3 # Hard cap on eval cycles per PR before terminal +MAX_FIX_ATTEMPTS = 2 # Hard cap on auto-fix cycles per PR before giving up +MAX_FIX_PER_CYCLE = 15 # PRs to fix per cycle — bumped from 5 to clear backlog (Cory, Mar 14) # Issue tags that can be fixed mechanically (Python fixer or Haiku) -MECHANICAL_ISSUE_TAGS = {"frontmatter_schema", "broken_wiki_links", "near_duplicate"} +# broken_wiki_links removed — downgraded to warning, not a gate. Links to claims +# in other open PRs resolve naturally as the dependency chain merges. (Cory, Mar 14) +MECHANICAL_ISSUE_TAGS = {"frontmatter_schema", "near_duplicate"} # Issue tags that require re-extraction (substantive quality problems) SUBSTANTIVE_ISSUE_TAGS = {"factual_discrepancy", "confidence_miscalibration", "scope_error", "title_overclaims"} +# --- Content type schemas --- +# Registry of content types. validate.py branches on type to apply the right +# required fields, confidence rules, and title checks. Adding a new type is a +# dict entry here — no code changes in validate.py needed. +TYPE_SCHEMAS = { + "claim": { + "required": ("type", "domain", "description", "confidence", "source", "created"), + "valid_confidence": ("proven", "likely", "experimental", "speculative"), + "needs_proposition_title": True, + }, + "framework": { + "required": ("type", "domain", "description", "source", "created"), + "valid_confidence": None, + "needs_proposition_title": True, + }, + "entity": { + "required": ("type", "domain", "description"), + "valid_confidence": None, + "needs_proposition_title": False, + }, + "decision": { + "required": ("type", "domain", "description", "parent_entity", "status"), + "valid_confidence": None, + "needs_proposition_title": False, + "valid_status": ("active", "passed", "failed", "expired", "cancelled"), + }, +} + +# --- Content directories --- +ENTITY_DIR_TEMPLATE = "entities/{domain}" # centralized path (Rhea: don't hardcode across 5 files) +DECISION_DIR_TEMPLATE = "decisions/{domain}" + +# --- Contributor tiers --- +# Auto-promotion rules. CI is computed, not stored. +CONTRIBUTOR_TIER_RULES = { + "contributor": { + "claims_merged": 1, + }, + "veteran": { + "claims_merged": 10, + "min_days_since_first": 30, + "challenges_survived": 1, + }, +} + +# Role weights for CI computation (must match schemas/contribution-weights.yaml) +CONTRIBUTION_ROLE_WEIGHTS = { + "sourcer": 0.15, + "extractor": 0.40, + "challenger": 0.20, + "synthesizer": 0.15, + "reviewer": 0.10, +} + # --- Circuit breakers --- BREAKER_THRESHOLD = 5 BREAKER_COOLDOWN = 900 # 15 min @@ -111,6 +174,12 @@ SAMPLE_AUDIT_RATE = 0.15 # 15% of LIGHT merges get pre-merge promotion to STAND SAMPLE_AUDIT_DISAGREEMENT_THRESHOLD = 0.10 # 10% disagreement → tighten LIGHT criteria SAMPLE_AUDIT_MODEL = MODEL_OPUS # Opus for audit — different family from Haiku triage (Leo) +# --- Batch eval --- +# Batch domain review: group STANDARD PRs by domain, one LLM call per batch. +# Leo review stays individual (safety net for cross-contamination). +BATCH_EVAL_MAX_PRS = int(os.environ.get("BATCH_EVAL_MAX_PRS", "5")) +BATCH_EVAL_MAX_DIFF_BYTES = int(os.environ.get("BATCH_EVAL_MAX_DIFF_BYTES", "100000")) # 100KB + # --- Tier logic --- # LIGHT_SKIP_LLM: when True, LIGHT PRs skip domain+Leo review entirely (auto-approve on Tier 0 pass). # Set False for shadow mode (domain review runs but logs only). Flip True after 24h validation (Rhea). @@ -124,6 +193,7 @@ INGEST_INTERVAL = 60 VALIDATE_INTERVAL = 30 EVAL_INTERVAL = 30 MERGE_INTERVAL = 30 +FIX_INTERVAL = 60 HEALTH_CHECK_INTERVAL = 60 # --- Health API --- @@ -133,3 +203,7 @@ HEALTH_PORT = 8080 LOG_FILE = LOG_DIR / "pipeline.jsonl" LOG_ROTATION_MAX_BYTES = 50 * 1024 * 1024 # 50MB per file LOG_ROTATION_BACKUP_COUNT = 7 # keep 7 days + +# --- Versioning (tracked in metrics_snapshots for chart annotations) --- +PROMPT_VERSION = "v2-lean-directed" # bump on every prompt change +PIPELINE_VERSION = "2.2" # bump on every significant pipeline change diff --git a/lib/evaluate.py b/lib/evaluate.py index 8662f16..5cb91b9 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -28,7 +28,9 @@ from . import config, db from .domains import agent_for_domain, detect_domain_from_diff from .forgejo import api as forgejo_api from .forgejo import get_agent_token, get_pr_diff, repo_path -from .llm import run_domain_review, run_leo_review, triage_pr +from .llm import run_batch_domain_review, run_domain_review, run_leo_review, triage_pr +from .feedback import format_rejection_comment +from .validate import load_existing_claims logger = logging.getLogger("pipeline.evaluate") @@ -40,10 +42,10 @@ def _filter_diff(diff: str) -> tuple[str, str]: """Filter diff to only review-relevant files. Returns (review_diff, entity_diff). - Strips: inbox/archive/, schemas/, skills/, agents/*/musings/ + Strips: inbox/, schemas/, skills/, agents/*/musings/ """ sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE) - skip_patterns = [r"^diff --git a/(inbox/archive|schemas|skills|agents/[^/]+/musings)/"] + skip_patterns = [r"^diff --git a/(inbox/(archive|queue|null-result)|schemas|skills|agents/[^/]+/musings)/"] core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"} claim_sections = [] @@ -83,6 +85,12 @@ def _is_musings_only(diff: str) -> bool: return has_musings and not has_other +# ─── NOTE: Tier 0.5 mechanical pre-check moved to validate.py ──────────── +# Tier 0.5 now runs as part of the validate stage (before eval), not inside +# evaluate_pr(). This prevents wasting eval_attempts on mechanically fixable +# PRs. Eval trusts that tier0_pass=1 means all mechanical checks passed. + + # ─── Tier overrides ─────────────────────────────────────────────────────── @@ -102,6 +110,74 @@ def _diff_contains_claim_type(diff: str) -> bool: return False +def _deterministic_tier(diff: str) -> str | None: + """Deterministic tier routing — skip Haiku triage for obvious cases. + + Checks diff file patterns before calling the LLM. Returns tier string + if deterministic, None if Haiku triage is needed. + + Rules (Leo-calibrated): + - All files in entities/ only → LIGHT + - All files in inbox/ only (queue, archive, null-result) → LIGHT + - Any file in core/ or foundations/ → DEEP (structural KB changes) + - Has challenged_by field → DEEP (challenges existing claims) + - Modifies existing file (not new) in domains/ → DEEP (enrichment/change) + - Otherwise → None (needs Haiku triage) + + NOTE: Cross-domain wiki links are NOT a DEEP signal — most claims link + across domains, that's the whole point of the knowledge graph (Leo). + """ + changed_files = [] + for line in diff.split("\n"): + if line.startswith("diff --git a/"): + path = line.replace("diff --git a/", "").split(" b/")[0] + changed_files.append(path) + + if not changed_files: + return None + + # All entities/ only → LIGHT + if all(f.startswith("entities/") for f in changed_files): + logger.info("Deterministic tier: LIGHT (all files in entities/)") + return "LIGHT" + + # All inbox/ only (queue, archive, null-result) → LIGHT + if all(f.startswith("inbox/") for f in changed_files): + logger.info("Deterministic tier: LIGHT (all files in inbox/)") + return "LIGHT" + + # Any file in core/ or foundations/ → DEEP (structural KB changes) + if any(f.startswith("core/") or f.startswith("foundations/") for f in changed_files): + logger.info("Deterministic tier: DEEP (touches core/ or foundations/)") + return "DEEP" + + # Check diff content for DEEP signals + has_challenged_by = False + has_modified_claim = False + new_files: set[str] = set() + + lines = diff.split("\n") + for i, line in enumerate(lines): + # Detect new files + if line.startswith("--- /dev/null") and i + 1 < len(lines) and lines[i + 1].startswith("+++ b/"): + new_files.add(lines[i + 1][6:]) + # Check for challenged_by field + if line.startswith("+") and not line.startswith("+++"): + stripped = line[1:].strip() + if stripped.startswith("challenged_by:"): + has_challenged_by = True + + if has_challenged_by: + logger.info("Deterministic tier: DEEP (has challenged_by field)") + return "DEEP" + + # NOTE: Modified existing domain claims are NOT auto-DEEP — enrichments + # (appending evidence) are common and should be STANDARD. Let Haiku triage + # distinguish enrichments from structural changes. + + return None + + # ─── Verdict parsing ────────────────────────────────────────────────────── @@ -117,12 +193,129 @@ def _parse_verdict(review_text: str, reviewer: str) -> str: return "request_changes" +# Map model-invented tags to valid tags. Models consistently ignore the valid +# tag list and invent their own. This normalizes them. (Ganymede, Mar 14) +_TAG_ALIASES: dict[str, str] = { + "schema_violation": "frontmatter_schema", + "missing_schema_fields": "frontmatter_schema", + "missing_schema": "frontmatter_schema", + "schema": "frontmatter_schema", + "missing_frontmatter": "frontmatter_schema", + "redundancy": "near_duplicate", + "duplicate": "near_duplicate", + "missing_confidence": "confidence_miscalibration", + "confidence_error": "confidence_miscalibration", + "vague_claims": "scope_error", + "unfalsifiable": "scope_error", + "unverified_wiki_links": "broken_wiki_links", + "unverified-wiki-links": "broken_wiki_links", + "missing_wiki_links": "broken_wiki_links", + "invalid_wiki_links": "broken_wiki_links", + "wiki_link_errors": "broken_wiki_links", + "overclaiming": "title_overclaims", + "title_overclaim": "title_overclaims", + "date_error": "date_errors", + "factual_error": "factual_discrepancy", + "factual_inaccuracy": "factual_discrepancy", +} + +VALID_ISSUE_TAGS = {"broken_wiki_links", "frontmatter_schema", "title_overclaims", + "confidence_miscalibration", "date_errors", "factual_discrepancy", + "near_duplicate", "scope_error"} + + +def _normalize_tag(tag: str) -> str | None: + """Normalize a model-generated tag to a valid tag, or None if unrecognizable.""" + tag = tag.strip().lower().replace("-", "_") + if tag in VALID_ISSUE_TAGS: + return tag + if tag in _TAG_ALIASES: + return _TAG_ALIASES[tag] + # Fuzzy: check if any valid tag is a substring or vice versa + for valid in VALID_ISSUE_TAGS: + if valid in tag or tag in valid: + return valid + return None + + def _parse_issues(review_text: str) -> list[str]: - """Extract issue tags from review.""" + """Extract issue tags from review. + + First tries structured comment with tag normalization. + Falls back to keyword inference from prose. + """ match = re.search(r"", review_text) - if not match: - return [] - return [tag.strip() for tag in match.group(1).split(",") if tag.strip()] + if match: + raw_tags = [tag.strip() for tag in match.group(1).split(",") if tag.strip()] + normalized = [] + for tag in raw_tags: + norm = _normalize_tag(tag) + if norm and norm not in normalized: + normalized.append(norm) + else: + logger.debug("Unrecognized issue tag '%s' — dropped", tag) + if normalized: + return normalized + # Fallback: infer tags from review prose + return _infer_issues_from_prose(review_text) + + +# Keyword patterns for inferring issue tags from unstructured review prose. +# Conservative: only match unambiguous indicators. Order doesn't matter. +_PROSE_TAG_PATTERNS: dict[str, list[re.Pattern]] = { + "frontmatter_schema": [ + re.compile(r"frontmatter", re.IGNORECASE), + re.compile(r"missing.{0,20}(type|domain|confidence|source|created)\b", re.IGNORECASE), + re.compile(r"yaml.{0,10}(invalid|missing|error|schema)", re.IGNORECASE), + re.compile(r"required field", re.IGNORECASE), + re.compile(r"lacks?.{0,15}(required|yaml|schema|fields)", re.IGNORECASE), + re.compile(r"missing.{0,15}(schema|fields|frontmatter)", re.IGNORECASE), + re.compile(r"schema.{0,10}(compliance|violation|missing|invalid)", re.IGNORECASE), + ], + "broken_wiki_links": [ + re.compile(r"(broken|dead|invalid).{0,10}(wiki.?)?link", re.IGNORECASE), + re.compile(r"wiki.?link.{0,20}(not found|missing|broken|invalid|resolv|unverif)", re.IGNORECASE), + re.compile(r"\[\[.{1,80}\]\].{0,20}(not found|doesn.t exist|missing)", re.IGNORECASE), + re.compile(r"unverified.{0,10}(wiki|link)", re.IGNORECASE), + ], + "factual_discrepancy": [ + re.compile(r"factual.{0,10}(error|inaccura|discrepanc|incorrect)", re.IGNORECASE), + re.compile(r"misrepresent", re.IGNORECASE), + ], + "confidence_miscalibration": [ + re.compile(r"confidence.{0,20}(too high|too low|miscalibrat|overstat|should be)", re.IGNORECASE), + re.compile(r"(overstat|understat).{0,20}confidence", re.IGNORECASE), + ], + "scope_error": [ + re.compile(r"scope.{0,10}(error|too broad|overscop|unscoped)", re.IGNORECASE), + re.compile(r"unscoped.{0,10}(universal|claim)", re.IGNORECASE), + re.compile(r"(vague|unfalsifiable).{0,15}(claim|assertion)", re.IGNORECASE), + re.compile(r"not.{0,10}(specific|falsifiable|disagreeable).{0,10}enough", re.IGNORECASE), + ], + "title_overclaims": [ + re.compile(r"title.{0,20}(overclaim|overstat|too broad)", re.IGNORECASE), + re.compile(r"overclaim", re.IGNORECASE), + ], + "near_duplicate": [ + re.compile(r"near.?duplicate", re.IGNORECASE), + re.compile(r"(very|too) similar.{0,20}(claim|title|existing)", re.IGNORECASE), + re.compile(r"duplicate.{0,20}(of|claim|title|existing|information)", re.IGNORECASE), + re.compile(r"redundan", re.IGNORECASE), + ], +} + + +def _infer_issues_from_prose(review_text: str) -> list[str]: + """Infer issue tags from unstructured review text via keyword matching. + + Fallback for reviews that reject without structured tags. + Conservative: requires at least one unambiguous keyword match per tag. + """ + inferred = [] + for tag, patterns in _PROSE_TAG_PATTERNS.items(): + if any(p.search(review_text) for p in patterns): + inferred.append(tag) + return inferred async def _post_formal_approvals(pr_number: int, pr_author: str): @@ -151,16 +344,35 @@ async def _post_formal_approvals(pr_number: int, pr_author: str): async def _terminate_pr(conn, pr_number: int, reason: str): """Terminal state: close PR on Forgejo, mark source needs_human.""" - # Close PR on Forgejo with explanation + # Get issue tags for structured feedback + row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone() + issues = [] + if row and row["eval_issues"]: + try: + issues = json.loads(row["eval_issues"]) + except (json.JSONDecodeError, TypeError): + pass + + # Post structured rejection comment with quality gate guidance (Epimetheus) + if issues: + feedback_body = format_rejection_comment(issues, source="eval_terminal") + comment_body = ( + f"**Closed by eval pipeline** — {reason}.\n\n" + f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " + f"Source will be re-queued with feedback.\n\n" + f"{feedback_body}" + ) + else: + comment_body = ( + f"**Closed by eval pipeline** — {reason}.\n\n" + f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " + f"Source will be re-queued with feedback." + ) + await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), - { - "body": f"**Closed by eval pipeline** — {reason}.\n\n" - f"This PR has been evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " - f"Source material will be re-queued for extraction with review feedback attached.\n\n" - f"See eval_issues for specific problems." - }, + {"body": comment_body}, ) await forgejo_api( "PATCH", @@ -223,7 +435,15 @@ async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_iss Attempt 3+: terminal. """ if eval_attempts < 2: - return # Attempt 1: normal retry + # Attempt 1: post structured feedback so agent learns, but don't close + if all_issues: + feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1") + await forgejo_api( + "POST", + repo_path(f"issues/{pr_number}/comments"), + {"body": feedback_body}, + ) + return classification = _classify_issues(all_issues) @@ -317,6 +537,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: ) return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"} + # NOTE: Tier 0.5 mechanical checks now run in validate stage (before eval). + # tier0_pass=1 guarantees all mechanical checks passed. No Tier 0.5 here. + # Filter diff review_diff, _entity_diff = _filter_diff(diff) if not review_diff: @@ -338,8 +561,16 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: ) # Step 1: Triage (if not already triaged) + # Try deterministic routing first ($0), fall back to Haiku triage ($0.001) if tier is None: - tier = await triage_pr(diff) + tier = _deterministic_tier(diff) + if tier is not None: + db.audit( + conn, "evaluate", "deterministic_tier", + json.dumps({"pr": pr_number, "tier": tier}), + ) + else: + tier = await triage_pr(diff) # Tier overrides (claim-shape detector + random promotion) # Order matters: claim-shape catches obvious cases, random promotion catches the rest. @@ -557,14 +788,470 @@ _rate_limit_backoff_until: datetime | None = None _RATE_LIMIT_BACKOFF_MINUTES = 15 +# ─── Batch domain review ───────────────────────────────────────────────── + + +def _parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]: + """Parse batched domain review into per-PR review sections. + + Returns {pr_number: review_text} for each PR found in the response. + Missing PRs are omitted — caller handles fallback. + """ + agent_upper = agent.upper() + result: dict[int, str] = {} + + # Split by PR verdict markers: + # Each marker terminates the previous PR's section + pattern = re.compile( + r"" + ) + + matches = list(pattern.finditer(response)) + if not matches: + return result + + for i, match in enumerate(matches): + pr_num = int(match.group(1)) + verdict = match.group(2) + marker_end = match.end() + + # Find the start of this PR's section by looking for the section header + # or the end of the previous verdict + section_header = f"=== PR #{pr_num}" + header_pos = response.rfind(section_header, 0, match.start()) + + if header_pos >= 0: + # Extract from header to end of verdict marker + section_text = response[header_pos:marker_end].strip() + else: + # No header found — extract from previous marker end to this marker end + prev_end = matches[i - 1].end() if i > 0 else 0 + section_text = response[prev_end:marker_end].strip() + + # Re-format as individual review comment + # Strip the batch section header, keep just the review content + # Add batch label for traceability + pr_nums_str = ", ".join(f"#{n}" for n in pr_numbers) + review_text = ( + f"*(batch review with PRs {pr_nums_str})*\n\n" + f"{section_text}\n" + ) + result[pr_num] = review_text + + return result + + +def _validate_batch_fanout( + parsed: dict[int, str], + pr_diffs: list[dict], + agent: str, +) -> tuple[dict[int, str], list[int]]: + """Validate batch fan-out for completeness and cross-contamination. + + Returns (valid_reviews, fallback_pr_numbers). + - valid_reviews: reviews that passed validation + - fallback_pr_numbers: PRs that need individual review (missing or cross-contaminated) + """ + valid: dict[int, str] = {} + fallback: list[int] = [] + + # Build file map: pr_number → set of path segments for matching. + # Use full paths (e.g., "domains/internet-finance/dao.md") not bare filenames + # to avoid false matches on short names like "dao.md" or "space.md" (Leo note #3). + pr_files: dict[int, set[str]] = {} + for pr in pr_diffs: + files = set() + for line in pr["diff"].split("\n"): + if line.startswith("diff --git a/"): + path = line.replace("diff --git a/", "").split(" b/")[0] + files.add(path) + # Also add the last 2 path segments (e.g., "internet-finance/dao.md") + # for models that abbreviate paths + parts = path.split("/") + if len(parts) >= 2: + files.add("/".join(parts[-2:])) + pr_files[pr["number"]] = files + + for pr in pr_diffs: + pr_num = pr["number"] + + # Completeness check: is there a review for this PR? + if pr_num not in parsed: + logger.warning("Batch fan-out: PR #%d missing from response — fallback to individual", pr_num) + fallback.append(pr_num) + continue + + review = parsed[pr_num] + + # Cross-contamination check: does review mention at least one file from this PR? + # Use path segments (min 10 chars) to avoid false substring matches on short names. + my_files = pr_files.get(pr_num, set()) + mentions_own_file = any(f in review for f in my_files if len(f) >= 10) + + if not mentions_own_file and my_files: + # Check if it references files from OTHER PRs (cross-contamination signal) + other_files = set() + for other_pr in pr_diffs: + if other_pr["number"] != pr_num: + other_files.update(pr_files.get(other_pr["number"], set())) + mentions_other = any(f in review for f in other_files if len(f) >= 10) + + if mentions_other: + logger.warning( + "Batch fan-out: PR #%d review references files from another PR — cross-contamination, fallback", + pr_num, + ) + fallback.append(pr_num) + continue + # If it doesn't mention any files at all, could be a generic review — accept it + # (some PRs have short diffs where the model doesn't reference filenames) + + valid[pr_num] = review + + return valid, fallback + + +async def _run_batch_domain_eval( + conn, batch_prs: list[dict], domain: str, agent: str, +) -> tuple[int, int]: + """Execute batch domain review for a group of same-domain STANDARD PRs. + + 1. Claim all PRs atomically + 2. Run single batch domain review + 3. Parse + validate fan-out + 4. Post per-PR comments + 5. Continue to individual Leo review for each + 6. Fall back to individual review for any validation failures + + Returns (succeeded, failed). + """ + from .forgejo import get_pr_diff as _get_pr_diff + + succeeded = 0 + failed = 0 + + # Step 1: Fetch diffs and build batch + pr_diffs = [] + claimed_prs = [] + for pr_row in batch_prs: + pr_num = pr_row["number"] + + # Atomic claim + cursor = conn.execute( + "UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'", + (pr_num,), + ) + if cursor.rowcount == 0: + continue + + # Increment eval_attempts + conn.execute( + "UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1, " + "last_attempt = datetime('now') WHERE number = ?", + (pr_num,), + ) + + diff = await _get_pr_diff(pr_num) + if not diff: + conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) + continue + + # Musings bypass + if _is_musings_only(diff): + await forgejo_api( + "POST", + repo_path(f"issues/{pr_num}/comments"), + {"body": "Auto-approved: musings bypass eval per collective policy."}, + ) + conn.execute( + "UPDATE prs SET status = 'approved', leo_verdict = 'skipped', " + "domain_verdict = 'skipped' WHERE number = ?", + (pr_num,), + ) + succeeded += 1 + continue + + review_diff, _ = _filter_diff(diff) + if not review_diff: + review_diff = diff + files = _extract_changed_files(diff) + + # Build label from branch name or first claim filename + branch = pr_row.get("branch", "") + label = branch.split("/")[-1][:60] if branch else f"pr-{pr_num}" + + pr_diffs.append({ + "number": pr_num, + "label": label, + "diff": review_diff, + "files": files, + "full_diff": diff, # kept for Leo review + "file_count": len([l for l in files.split("\n") if l.strip()]), + }) + claimed_prs.append(pr_num) + + if not pr_diffs: + return 0, 0 + + # Enforce BATCH_EVAL_MAX_DIFF_BYTES — split if total diff is too large. + # We only know diff sizes after fetching, so enforce here not in _build_domain_batches. + total_bytes = sum(len(p["diff"].encode()) for p in pr_diffs) + if total_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and len(pr_diffs) > 1: + # Keep PRs up to the byte cap, revert the rest to open for next cycle + kept = [] + running_bytes = 0 + for p in pr_diffs: + p_bytes = len(p["diff"].encode()) + if running_bytes + p_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and kept: + break + kept.append(p) + running_bytes += p_bytes + overflow = [p for p in pr_diffs if p not in kept] + for p in overflow: + conn.execute( + "UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 " + "WHERE number = ?", + (p["number"],), + ) + claimed_prs.remove(p["number"]) + logger.info( + "PR #%d: diff too large for batch (%d bytes total), deferring to next cycle", + p["number"], total_bytes, + ) + pr_diffs = kept + + if not pr_diffs: + return 0, 0 + + # Detect domain for all PRs (should be same domain) + conn.execute( + "UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number IN ({})".format( + ",".join("?" * len(claimed_prs)) + ), + [domain, agent] + claimed_prs, + ) + + # Step 2: Run batch domain review + logger.info( + "Batch domain review: %d PRs in %s domain (PRs: %s)", + len(pr_diffs), + domain, + ", ".join(f"#{p['number']}" for p in pr_diffs), + ) + batch_response = await run_batch_domain_review(pr_diffs, domain, agent) + + if batch_response is None: + # Complete failure — revert all to open + logger.warning("Batch domain review failed — reverting all PRs to open") + for pr_num in claimed_prs: + conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) + return 0, len(claimed_prs) + + # Step 3: Parse + validate fan-out + parsed = _parse_batch_response(batch_response, claimed_prs, agent) + valid_reviews, fallback_prs = _validate_batch_fanout(parsed, pr_diffs, agent) + + db.audit( + conn, "evaluate", "batch_domain_review", + json.dumps({ + "domain": domain, + "batch_size": len(pr_diffs), + "valid": len(valid_reviews), + "fallback": fallback_prs, + }), + ) + + # Step 4: Process valid reviews — post comments + continue to Leo + for pr_data in pr_diffs: + pr_num = pr_data["number"] + + if pr_num in fallback_prs: + # Revert — will be picked up by individual eval next cycle + conn.execute( + "UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 " + "WHERE number = ?", + (pr_num,), + ) + logger.info("PR #%d: batch fallback — will retry individually", pr_num) + continue + + if pr_num not in valid_reviews: + # Should not happen, but safety + conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) + continue + + review_text = valid_reviews[pr_num] + domain_verdict = _parse_verdict(review_text, agent) + + # Post domain review comment + agent_tok = get_agent_token(agent) + await forgejo_api( + "POST", + repo_path(f"issues/{pr_num}/comments"), + {"body": review_text}, + token=agent_tok, + ) + + conn.execute( + "UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?", + (domain_verdict, config.EVAL_DOMAIN_MODEL, pr_num), + ) + + # Record cost + from . import costs + costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter") + + # If domain rejects, handle disposition (same as individual path) + if domain_verdict == "request_changes": + domain_issues = _parse_issues(review_text) + eval_attempts = (conn.execute( + "SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,) + ).fetchone()["eval_attempts"] or 0) + + conn.execute( + "UPDATE prs SET status = 'open', leo_verdict = 'skipped', " + "last_error = 'domain review requested changes', eval_issues = ? WHERE number = ?", + (json.dumps(domain_issues), pr_num), + ) + db.audit( + conn, "evaluate", "domain_rejected", + json.dumps({"pr": pr_num, "agent": agent, "issues": domain_issues, "batch": True}), + ) + await _dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues) + succeeded += 1 + continue + + # Domain approved — continue to individual Leo review + logger.info("PR #%d: batch domain approved, proceeding to individual Leo review", pr_num) + + review_diff = pr_data["diff"] + files = pr_data["files"] + + leo_review = await run_leo_review(review_diff, files, "STANDARD") + + if leo_review is None: + conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) + logger.debug("PR #%d: Leo review failed, will retry next cycle", pr_num) + continue + + if leo_review == "RATE_LIMITED": + conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) + logger.info("PR #%d: Leo rate limited, will retry next cycle", pr_num) + continue + + leo_verdict = _parse_verdict(leo_review, "LEO") + conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_num)) + + # Post Leo review + leo_tok = get_agent_token("Leo") + await forgejo_api( + "POST", + repo_path(f"issues/{pr_num}/comments"), + {"body": leo_review}, + token=leo_tok, + ) + + costs.record_usage(conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", backend="openrouter") + + # Final verdict + both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped") + + if both_approve: + pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_num}")) + pr_author = pr_info.get("user", {}).get("login", "") if pr_info else "" + await _post_formal_approvals(pr_num, pr_author) + conn.execute("UPDATE prs SET status = 'approved' WHERE number = ?", (pr_num,)) + db.audit( + conn, "evaluate", "approved", + json.dumps({"pr": pr_num, "tier": "STANDARD", "domain": domain, + "leo": leo_verdict, "domain_agent": agent, "batch": True}), + ) + logger.info("PR #%d: APPROVED (batch domain + individual Leo)", pr_num) + else: + all_issues = [] + if leo_verdict == "request_changes": + all_issues.extend(_parse_issues(leo_review)) + conn.execute( + "UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?", + (json.dumps(all_issues), pr_num), + ) + feedback = {"leo": leo_verdict, "domain": domain_verdict, + "tier": "STANDARD", "issues": all_issues} + conn.execute( + "UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)", + (json.dumps(feedback), pr_num), + ) + db.audit( + conn, "evaluate", "changes_requested", + json.dumps({"pr": pr_num, "tier": "STANDARD", "leo": leo_verdict, + "domain": domain_verdict, "issues": all_issues, "batch": True}), + ) + eval_attempts = (conn.execute( + "SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,) + ).fetchone()["eval_attempts"] or 0) + await _dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues) + + succeeded += 1 + + return succeeded, failed + + +def _build_domain_batches( + rows: list, conn, +) -> tuple[dict[str, list[dict]], list[dict]]: + """Group STANDARD PRs by domain for batch eval. DEEP and LIGHT stay individual. + + Returns (batches_by_domain, individual_prs). + Respects BATCH_EVAL_MAX_PRS and BATCH_EVAL_MAX_DIFF_BYTES. + """ + domain_candidates: dict[str, list[dict]] = {} + individual: list[dict] = [] + + for row in rows: + pr_num = row["number"] + tier = row["tier"] + + # Only batch STANDARD PRs with pending domain review + if tier != "STANDARD": + individual.append(row) + continue + + # Check if domain review already done (resuming after Leo rate limit) + existing = conn.execute( + "SELECT domain_verdict, domain FROM prs WHERE number = ?", (pr_num,) + ).fetchone() + if existing and existing["domain_verdict"] not in ("pending", None): + individual.append(row) + continue + + domain = existing["domain"] if existing and existing["domain"] else "general" + domain_candidates.setdefault(domain, []).append(row) + + # Build sized batches per domain + batches: dict[str, list[dict]] = {} + for domain, prs in domain_candidates.items(): + if len(prs) == 1: + # Single PR — no batching benefit, process individually + individual.extend(prs) + continue + # Cap at BATCH_EVAL_MAX_PRS + batch = prs[: config.BATCH_EVAL_MAX_PRS] + batches[domain] = batch + # Overflow goes individual + individual.extend(prs[config.BATCH_EVAL_MAX_PRS :]) + + return batches, individual + + # ─── Main entry point ────────────────────────────────────────────────────── async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: """Run one evaluation cycle. - Finds PRs with status='open', tier0_pass=1, and no pending verdicts. - Evaluates in priority order. + Groups eligible STANDARD PRs by domain for batch domain review. + DEEP PRs get individual eval. LIGHT PRs get auto-approved. + Leo review always individual (safety net for batch cross-contamination). """ global _rate_limit_backoff_until @@ -580,29 +1267,17 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: logger.info("Rate limit backoff expired, resuming full eval cycles") _rate_limit_backoff_until = None - # Find PRs ready for evaluation: - # - status = 'open' - # - tier0_pass = 1 (passed validation) - # - leo_verdict = 'pending' OR domain_verdict = 'pending' - # During Opus backoff: skip DEEP PRs waiting for Leo (they need Opus). - # STANDARD PRs can overflow Leo review to GPT-4o, so let them through. - # Skip PRs attempted within last 10 minutes (backoff during rate limits) + # Find PRs ready for evaluation if opus_backoff: verdict_filter = "AND (p.domain_verdict = 'pending' OR (p.leo_verdict = 'pending' AND p.tier != 'DEEP'))" else: verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')" - # Stagger first pass after migration: if there are previously-rejected PRs - # with eval_attempts=0 (freshly migrated), limit batch to avoid OpenRouter spike. - migrated_count = conn.execute( - """SELECT COUNT(*) as c FROM prs - WHERE status = 'open' AND eval_attempts = 0 - AND (domain_verdict NOT IN ('pending') OR leo_verdict NOT IN ('pending'))""" - ).fetchone()["c"] - stagger_limit = 5 if migrated_count > 5 else None + # Stagger removed — migration protection no longer needed. Merge is domain-serialized + # and entity conflicts auto-resolve. Safe to let all eligible PRs enter eval. (Cory, Mar 14) rows = conn.execute( - f"""SELECT p.number, p.tier FROM prs p + f"""SELECT p.number, p.tier, p.branch, p.domain FROM prs p LEFT JOIN sources s ON p.source_path = s.path WHERE p.status = 'open' AND p.tier0_pass = 1 @@ -611,7 +1286,6 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: AND (p.last_attempt IS NULL OR p.last_attempt < datetime('now', '-10 minutes')) ORDER BY - -- Fresh PRs before re-evals: unevaluated PRs have higher chance of passing CASE WHEN COALESCE(p.eval_attempts, 0) = 0 THEN 0 ELSE 1 END, CASE COALESCE(p.priority, s.priority, 'medium') WHEN 'critical' THEN 0 @@ -622,24 +1296,37 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: END, p.created_at ASC LIMIT ?""", - (stagger_limit or max_workers or config.MAX_EVAL_WORKERS,), + (max_workers or config.MAX_EVAL_WORKERS,), ).fetchall() - if stagger_limit and rows: - logger.info( - "Post-migration stagger: limiting eval batch to %d (migrated PRs: %d)", stagger_limit, migrated_count - ) - if not rows: return 0, 0 succeeded = 0 failed = 0 - for row in rows: + # Group STANDARD PRs by domain for batch eval + domain_batches, individual_prs = _build_domain_batches(rows, conn) + + # Process batch domain reviews first + for domain, batch_prs in domain_batches.items(): + try: + agent = agent_for_domain(domain) + b_succeeded, b_failed = await _run_batch_domain_eval( + conn, batch_prs, domain, agent, + ) + succeeded += b_succeeded + failed += b_failed + except Exception: + logger.exception("Batch eval failed for domain %s", domain) + # Revert all to open + for pr_row in batch_prs: + conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_row["number"],)) + failed += len(batch_prs) + + # Process individual PRs (DEEP, LIGHT, single-domain, fallback) + for row in individual_prs: try: - # During Opus backoff, skip DEEP PRs that already completed domain review - # (they need Opus which is rate limited). STANDARD PRs can overflow to GPT-4o. if opus_backoff and row["tier"] == "DEEP": existing = conn.execute( "SELECT domain_verdict FROM prs WHERE number = ?", @@ -661,20 +1348,16 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: from datetime import timedelta if reason == "opus_rate_limited": - # Opus hit — set backoff but DON'T break. Other PRs - # may still need triage (Haiku) or domain review (Sonnet). _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( minutes=_RATE_LIMIT_BACKOFF_MINUTES ) - opus_backoff = True # Update local flag so in-loop guard kicks in + opus_backoff = True logger.info( "Opus rate limited — backing off Opus for %d min, continuing triage+domain", _RATE_LIMIT_BACKOFF_MINUTES, ) continue else: - # Non-Opus rate limit (Sonnet/Haiku) — break the cycle, - # nothing else can proceed either. _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( minutes=_RATE_LIMIT_BACKOFF_MINUTES ) @@ -687,7 +1370,6 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: except Exception: logger.exception("Failed to evaluate PR #%d", row["number"]) failed += 1 - # Revert to open on unhandled error conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],)) if succeeded or failed: diff --git a/lib/llm.py b/lib/llm.py index 0d10f42..dc3a09d 100644 --- a/lib/llm.py +++ b/lib/llm.py @@ -49,18 +49,20 @@ REVIEW_STYLE_GUIDE = ( TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT. -DEEP — use when ANY of these apply: -- PR adds or modifies claims rated "likely" or higher confidence -- PR touches agent beliefs or creates cross-domain wiki links -- PR challenges an existing claim (has "challenged_by" or contradicts existing) -- PR modifies axiom-level beliefs -- PR is a cross-domain synthesis claim +DEEP — use ONLY when the PR could change the knowledge graph structure: +- PR modifies files in core/ or foundations/ (structural KB changes) +- PR challenges an existing claim (has "challenged_by" field or explicitly argues against an existing claim) +- PR modifies axiom-level beliefs in agents/*/beliefs.md +- PR is a cross-domain synthesis claim that draws conclusions across 2+ domains -STANDARD — use when: -- New claims in established domain areas -- Enrichments to existing claims (confirm/extend) +DEEP is rare — most new claims are STANDARD even if they have high confidence or cross-domain wiki links. Adding a new "likely" claim about futarchy is STANDARD. Arguing that an existing claim is wrong is DEEP. + +STANDARD — the DEFAULT for most PRs: +- New claims in any domain at any confidence level +- Enrichments to existing claims (adding evidence, extending arguments) - New hypothesis-level beliefs - Source archives with extraction results +- Claims with cross-domain wiki links (this is normal, not exceptional) LIGHT — use ONLY when ALL changes fit these categories: - Entity attribute updates (factual corrections, new data points) @@ -68,7 +70,7 @@ LIGHT — use ONLY when ALL changes fit these categories: - Formatting fixes, typo corrections - Status field changes -IMPORTANT: When uncertain, classify UP, not down. Always err toward more review. +IMPORTANT: When uncertain between DEEP and STANDARD, choose STANDARD. Most claims are STANDARD. DEEP is reserved for structural changes to the knowledge base, not for complex or important-sounding claims. Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line. @@ -77,17 +79,24 @@ Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, fo DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base. +IMPORTANT — This PR may contain different content types: +- **Claims** (type: claim): arguable assertions with confidence levels. Review fully. +- **Entities** (type: entity, files in entities/): descriptive records of projects, people, protocols. Do NOT reject entities for missing confidence or source fields — they have a different schema. +- **Sources** (files in inbox/): archive metadata. Auto-approve these. + Review this PR. For EACH criterion below, write one sentence stating what you found: -1. **Factual accuracy** — Are the claims factually correct? Name any specific errors. -2. **Intra-PR duplicates** — Do multiple changes in THIS PR add the same evidence to different claims with near-identical wording? Only flag if the same paragraph of evidence is copy-pasted across files. -3. **Confidence calibration** — Is the confidence level right for the evidence provided? Name the level and say if it matches. -4. **Wiki links** — Do [[wiki links]] in the diff reference files that exist? Flag any that look broken. +1. **Factual accuracy** — Are the claims/entities factually correct? Name any specific errors. +2. **Intra-PR duplicates** — Do multiple changes in THIS PR add the same evidence to different claims with near-identical wording? Only flag if the same paragraph of evidence is copy-pasted across files. Shared entity files (like metadao.md or futardio.md) appearing in multiple PRs are NOT duplicates — they are expected enrichments. +3. **Confidence calibration** — For claims only. Is the confidence level right for the evidence? Entities don't have confidence levels. +4. **Wiki links** — Note any broken [[wiki links]], but do NOT let them affect your verdict. Broken links are expected — linked claims often exist in other open PRs that haven't merged yet. ALWAYS APPROVE even if wiki links are broken. VERDICT RULES — read carefully: - APPROVE if claims are factually correct and evidence supports them, even if minor improvements are possible. -- REQUEST_CHANGES only for BLOCKING issues: factual errors, genuinely broken wiki links, copy-pasted duplicate evidence across files, or confidence that is clearly wrong (e.g. "proven" with no evidence). -- Missing context, style preferences, and "could be better" observations are NOT blocking. Note them but still APPROVE. +- APPROVE entity files (type: entity) unless they contain factual errors. +- APPROVE even if wiki links are broken — this is NEVER a reason to REQUEST_CHANGES. +- REQUEST_CHANGES only for these BLOCKING issues: factual errors, copy-pasted duplicate evidence, or confidence that is clearly wrong (e.g. "proven" with no evidence). +- If the ONLY issues you find are broken wiki links: you MUST APPROVE. - Do NOT invent problems. If a criterion passes, say it passes. {style_guide} @@ -95,7 +104,7 @@ VERDICT RULES — read carefully: If requesting changes, tag the specific issues using ONLY these tags (do not invent new tags): -Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error +Valid tags: frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error End your review with exactly one of: @@ -109,21 +118,30 @@ End your review with exactly one of: LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. +IMPORTANT — Content types have DIFFERENT schemas: +- **Claims** (type: claim): require type, domain, confidence, source, created, description. Title must be a prose proposition. +- **Entities** (type: entity, files in entities/): require ONLY type, domain, description. NO confidence, NO source, NO created date. Short filenames like "metadao.md" are correct — entities are NOT claims. +- **Sources** (files in inbox/): different schema entirely. Do NOT flag sources for missing claim fields. + +Do NOT flag entity files for missing confidence, source, or created fields. Do NOT flag entity filenames for being too short or not prose propositions. These are different content types with different rules. + Review this PR. For EACH criterion below, write one sentence stating what you found: -1. **Schema** — Does YAML frontmatter have type, domain, confidence, source, created? Is the title a prose proposition (not a label)? +1. **Schema** — Does each file have valid frontmatter FOR ITS TYPE? (Claims need full schema. Entities need only type+domain+description.) 2. **Duplicate/redundancy** — Do multiple enrichments in this PR inject the same evidence into different claims? Is the enrichment actually new vs already present in the claim? -3. **Confidence** — Name the confidence level. Does the evidence justify it? (proven needs strong evidence, speculative is fine for theories) -4. **Wiki links** — Do [[links]] in the diff point to real files? Flag any that look invented. +3. **Confidence** — For claims only: name the confidence level. Does the evidence justify it? +4. **Wiki links** — Note any broken [[links]], but do NOT let them affect your verdict. Broken links are expected — linked claims often exist in other open PRs. ALWAYS APPROVE even if wiki links are broken. 5. **Source quality** — Is the source credible for this claim? -6. **Specificity** — Could someone disagree with this claim? If it's too vague to be wrong, flag it. +6. **Specificity** — For claims only: could someone disagree? If it's too vague to be wrong, flag it. + +VERDICT: APPROVE if the claims are factually correct and evidence supports them. Broken wiki links are NEVER a reason to REQUEST_CHANGES. If broken links are the ONLY issue, you MUST APPROVE. {style_guide} If requesting changes, tag the specific issues using ONLY these tags (do not invent new tags): -Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error +Valid tags: frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error End your review with exactly one of: @@ -141,7 +159,7 @@ Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check 1. Cross-domain implications — does this claim affect beliefs in other domains? 2. Confidence calibration — is the confidence level justified by the evidence? 3. Contradiction check — does this contradict any existing claims without explicit argument? -4. Wiki link validity — do all wiki links reference real, existing claims? +4. Wiki link validity — note any broken links, but do NOT let them affect your verdict. Broken links are expected (linked claims may be in other PRs). NEVER REQUEST_CHANGES for broken wiki links alone. 5. Axiom integrity — if touching axiom-level beliefs, is the justification extraordinary? 6. Source quality — is the source credible for the claim being made? 7. Duplicate check — does a substantially similar claim already exist? @@ -155,7 +173,7 @@ Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check If requesting changes, tag the specific issues using ONLY these tags (do not invent new tags): -Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error +Valid tags: frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error End your review with exactly one of: @@ -168,10 +186,45 @@ End your review with exactly one of: {files}""" +BATCH_DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base. + +You are reviewing {n_prs} PRs in a single batch. For EACH PR, apply all criteria INDEPENDENTLY. Do not mix content between PRs. Each PR is a separate evaluation. + +For EACH PR, check these criteria (one sentence each): + +1. **Factual accuracy** — Are the claims factually correct? Name any specific errors. +2. **Intra-PR duplicates** — Do multiple changes in THIS PR add the same evidence to different claims with near-identical wording? +3. **Confidence calibration** — Is the confidence level right for the evidence provided? +4. **Wiki links** — Do [[wiki links]] in the diff reference files that exist? + +VERDICT RULES — read carefully: +- APPROVE if claims are factually correct and evidence supports them, even if minor improvements are possible. +- REQUEST_CHANGES only for BLOCKING issues: factual errors, genuinely broken wiki links, copy-pasted duplicate evidence across files, or confidence that is clearly wrong. +- Missing context, style preferences, and "could be better" observations are NOT blocking. Note them but still APPROVE. +- Do NOT invent problems. If a criterion passes, say it passes. + +{style_guide} + +For EACH PR, write your full review, then end that PR's section with the verdict tag. +If requesting changes, tag the specific issues: + + +Valid tags: frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error + +{pr_sections} + +IMPORTANT: You MUST provide a verdict for every PR listed above. For each PR, end with exactly one of: + + +where NUMBER is the PR number shown in the section header.""" + + # ─── API helpers ─────────────────────────────────────────────────────────── -async def openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None: +async def openrouter_call( + model: str, prompt: str, timeout_sec: int = 120, max_tokens: int = 4096, +) -> str | None: """Call OpenRouter API. Returns response text or None on failure.""" key_file = config.SECRETS_DIR / "openrouter-key" if not key_file.exists(): @@ -182,7 +235,7 @@ async def openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> st payload = { "model": model, "messages": [{"role": "user", "content": prompt}], - "max_tokens": 4096, + "max_tokens": max_tokens, "temperature": 0.2, } @@ -270,6 +323,41 @@ async def triage_pr(diff: str) -> str: return "STANDARD" +async def run_batch_domain_review( + pr_diffs: list[dict], domain: str, agent: str, +) -> str | None: + """Run batched domain review for multiple PRs in one LLM call. + + pr_diffs: list of {"number": int, "label": str, "diff": str, "files": str} + Returns raw response text or None on failure. + """ + # Build per-PR sections with anchoring labels + sections = [] + for pr in pr_diffs: + sections.append( + f"=== PR #{pr['number']}: {pr['label']} ({pr['file_count']} files) ===\n" + f"--- PR DIFF ---\n{pr['diff']}\n\n" + f"--- CHANGED FILES ---\n{pr['files']}\n" + ) + + prompt = BATCH_DOMAIN_PROMPT.format( + agent=agent, + agent_upper=agent.upper(), + domain=domain, + n_prs=len(pr_diffs), + style_guide=REVIEW_STYLE_GUIDE, + pr_sections="\n".join(sections), + ) + + # Scale max_tokens with batch size: ~3K tokens per PR review + max_tokens = min(3000 * len(pr_diffs), 16384) + result = await openrouter_call( + config.EVAL_DOMAIN_MODEL, prompt, + timeout_sec=config.EVAL_TIMEOUT, max_tokens=max_tokens, + ) + return result + + async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None: """Run domain review via OpenRouter GPT-4o. @@ -300,15 +388,20 @@ async def run_leo_review(diff: str, files: str, tier: str) -> str | None: prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files) if tier == "DEEP": - # DEEP: Opus only, queue if rate limited. Opus is scarce — reserve for high-stakes. - result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) - if result == "RATE_LIMITED": - logger.info("Claude Max Opus rate limited, queuing DEEP Leo review") - return None - return result - else: - # STANDARD/LIGHT: Sonnet via OpenRouter. Different model family from - # domain review (GPT-4o) = no correlated blind spots. Keeps Claude Max - # rate limit untouched for Opus DEEP + overnight research. + # Opus skipped — route all Leo reviews through Sonnet until backlog clears. + # Opus via Claude Max CLI is consistently unavailable (rate limited or hanging). + # Re-enable by removing this block and uncommenting the try-then-overflow below. + # (Cory, Mar 14: "yes lets skip opus") + # + # --- Re-enable Opus later (uses EVAL_TIMEOUT_OPUS for longer reasoning): --- + # result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS) + # if result == "RATE_LIMITED" or result is None: + # logger.info("Opus unavailable for DEEP Leo review — overflowing to Sonnet") + # result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS) + # return result + result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + return result + else: + # STANDARD/LIGHT: Sonnet via OpenRouter — 120s timeout (routine calls) result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) return result diff --git a/migrate-source-archive.py b/migrate-source-archive.py new file mode 100644 index 0000000..bd05cfd --- /dev/null +++ b/migrate-source-archive.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +"""Migrate source archive from flat inbox/archive/ to organized structure. + +inbox/queue/ — unprocessed sources (landing zone) +inbox/archive/{domain}/ — processed sources with extraction results +inbox/null-result/ — reviewed, nothing extractable + +One-time migration. Atomic commit. Idempotent (safe to re-run). + +Run from repo root: + cd /opt/teleo-eval/workspaces/main + python3 /opt/teleo-eval/pipeline/migrate-source-archive.py [--dry-run] +""" + +import argparse +import glob +import os +import re +from pathlib import Path + + +def get_source_status(filepath: str) -> str: + """Read status from source frontmatter.""" + try: + content = open(filepath).read() + match = re.search(r"^status:\s*(\S+)", content, re.MULTILINE) + if match: + return match.group(1).strip() + except Exception: + pass + return "unknown" + + +def get_source_domain(filepath: str) -> str: + """Read domain from source frontmatter.""" + try: + content = open(filepath).read() + match = re.search(r"^domain:\s*(\S+)", content, re.MULTILINE) + if match: + return match.group(1).strip() + except Exception: + pass + return "uncategorized" + + +def migrate(repo_root: str, dry_run: bool = False): + """Move source files to organized structure.""" + archive_dir = os.path.join(repo_root, "inbox", "archive") + queue_dir = os.path.join(repo_root, "inbox", "queue") + null_dir = os.path.join(repo_root, "inbox", "null-result") + + if not os.path.isdir(archive_dir): + print(f"ERROR: {archive_dir} not found") + return + + # Create target directories + if not dry_run: + os.makedirs(queue_dir, exist_ok=True) + os.makedirs(null_dir, exist_ok=True) + + sources = glob.glob(os.path.join(archive_dir, "*.md")) + print(f"Found {len(sources)} source files in inbox/archive/") + + moved = {"queue": 0, "null-result": 0, "archive": {}} + skipped = 0 + + for filepath in sorted(sources): + filename = os.path.basename(filepath) + if filename.startswith("_") or filename.startswith("."): + skipped += 1 + continue + + status = get_source_status(filepath) + domain = get_source_domain(filepath) + + if status == "unprocessed" or status == "processing": + # → queue/ + dest = os.path.join(queue_dir, filename) + if not dry_run: + os.rename(filepath, dest) + moved["queue"] += 1 + + elif status in ("null-result", "null_result"): + # → null-result/ + dest = os.path.join(null_dir, filename) + if not dry_run: + os.rename(filepath, dest) + moved["null-result"] += 1 + + elif status in ("processed", "enrichment"): + # → archive/{domain}/ + domain_dir = os.path.join(archive_dir, domain) + if not dry_run: + os.makedirs(domain_dir, exist_ok=True) + dest = os.path.join(domain_dir, filename) + if not dry_run: + os.rename(filepath, dest) + moved["archive"][domain] = moved["archive"].get(domain, 0) + 1 + + else: + # Unknown status — treat as unprocessed → queue/ + dest = os.path.join(queue_dir, filename) + if not dry_run: + os.rename(filepath, dest) + moved["queue"] += 1 + + # Also move any .extraction-debug/ directory + debug_dir = os.path.join(archive_dir, ".extraction-debug") + if os.path.isdir(debug_dir): + print(f" (keeping .extraction-debug/ in place)") + + print(f"\n{'='*60}") + print(f" MIGRATION {'(DRY RUN) ' if dry_run else ''}COMPLETE") + print(f" → queue/ (unprocessed): {moved['queue']}") + print(f" → null-result/: {moved['null-result']}") + print(f" → archive/{{domain}}/:") + for domain, count in sorted(moved["archive"].items()): + print(f" {domain}: {count}") + print(f" Archive total: {sum(moved['archive'].values())}") + print(f" Skipped: {skipped}") + print(f" Grand total: {moved['queue'] + moved['null-result'] + sum(moved['archive'].values()) + skipped}") + print(f"{'='*60}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Migrate source archive to organized structure") + parser.add_argument("--repo-root", default=".", help="Repository root") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + migrate(args.repo_root, args.dry_run) diff --git a/research-prompt-leo-synthesis.md b/research-prompt-leo-synthesis.md new file mode 100644 index 0000000..93e905d --- /dev/null +++ b/research-prompt-leo-synthesis.md @@ -0,0 +1,65 @@ +# Research Prompt — Leo Synthesis Session +# Fundamentally different from domain agent research. +# Leo runs LAST (08:00 UTC), after all 5 domain agents have researched overnight. + +You are Leo, the Teleo collective's lead synthesizer. Domain: grand-strategy. + +## Your Task: Overnight Synthesis Session + +You run AFTER the 5 domain agents have researched (Rio 22:00, Theseus 00:00, Clay 02:00, Vida 04:00, Astra 06:00). Your job is NOT to find new sources. Your job is to CONNECT what they found. + +### Step 1: Read Overnight Output (15 min) +Check what the domain agents produced since yesterday: +- New source archives in inbox/queue/ (look for today's date + yesterday's) +- New musings in agents/*/musings/research-*.md +- ROUTE:leo flags from other agents' research +- Any new claims merged overnight + +### Step 2: Cross-Domain Connection Scan (20 min) +Look for patterns across what multiple agents found: +- Did 2+ agents find evidence about the same mechanism in different domains? +- Did anyone find something that contradicts another agent's existing claim? +- Are there structural parallels that neither agent would see from within their domain? + +### Step 3: Synthesis Claims (30 min) +Draft 1-3 cross-domain synthesis claims. These go to agents/leo/musings/synthesis-${DATE}.md (not inbox/queue/ — Leo proposes claims, not sources). + +For each synthesis: +- Name the specific mechanism that connects domains +- Cite the specific claims/sources from each domain +- Rate confidence honestly (synthesis claims start at speculative or experimental) +- Wiki-link to the domain-specific claims being synthesized + +### Step 4: Falsifiable Prediction (10 min) +Every overnight cycle should produce at least ONE prediction with temporal stakes: +- "By [date], [observable outcome] because [mechanism from synthesis]" +- Performance criteria: what would prove this right or wrong? +- Time horizon: 3 months, 6 months, or 1 year + +Write to agents/leo/musings/predictions-${DATE}.md + +### Step 5: Research Priority Flags (5 min) +Based on what you saw overnight, leave suggestions for domain agents: +Write to agents/leo/musings/research-flags-${DATE}.md: + +## Overnight Research Flags (${DATE}) +**For Rio:** [What to investigate, why] +**For Theseus:** [What to investigate, why] +**For Clay:** [What to investigate, why] +**For Vida:** [What to investigate, why] +**For Astra:** [What to investigate, why] + +These are suggestions, not directives. Agents can take them or leave them. + +### Step 6: Update Research Journal (5 min) +Append to agents/leo/research-journal.md: + +## Synthesis Session ${DATE} +**Agents who produced overnight:** [which agents ran] +**Cross-domain connections found:** [count + brief description] +**Strongest synthesis:** [the most surprising cross-domain finding] +**Prediction made:** [one-line summary] +**Biggest gap in overnight run:** [what nobody researched that should have been covered] + +### Step 7: Stop +When finished, STOP. The script handles all git operations. diff --git a/research-prompt-v2.md b/research-prompt-v2.md new file mode 100644 index 0000000..76c6126 --- /dev/null +++ b/research-prompt-v2.md @@ -0,0 +1,142 @@ +# Research Prompt v2 — Domain Agent Version +# Integrated improvements from Theseus (triage), Leo (quality), Vida (frontier.md) +# This gets embedded in research-session.sh as RESEARCH_PROMPT + +You are ${AGENT}, a Teleo knowledge base agent. Domain: ${DOMAIN}. + +## Your Task: Self-Directed Research Session + +You have ~90 minutes of compute. Target: 5-8 high-quality sources (not 15 thin ones). + +### Step 1: Orient (5 min) +Read these files: +- agents/${AGENT}/identity.md (who you are) +- agents/${AGENT}/beliefs.md (what you believe) +- agents/${AGENT}/reasoning.md (how you think) +- domains/${DOMAIN}/_map.md (current claims + gaps) +- agents/${AGENT}/frontier.md (if it exists — your priority research gaps) + +### Step 2: Review Recent Tweets (10 min) +Read ${TWEET_FILE} — recent tweets from your domain's X accounts. +Scan for: new claims, evidence, debates, data, counterarguments. + +### Step 3: Check Previous Follow-ups (2 min) +Read agents/${AGENT}/musings/ — previous research-*.md files. +Check for NEXT: flags at the bottom. These are threads your past self flagged. +Also read agents/${AGENT}/research-journal.md for cross-session patterns. +Check for ROUTE flags from other agents who found things in your domain. + +### Step 4: Pick ONE Research Question (5 min) +Pick ONE research question. Not one topic — one question. + +**Direction priority** (active inference — pursue surprise, not confirmation): +1. NEXT flags from previous sessions (your past self flagged these) +2. Frontier.md priority gaps (if exists — structured research agenda) +3. Claims rated 'experimental' or areas with live tensions +4. Evidence that CHALLENGES your beliefs +5. Cross-domain connections flagged by other agents +6. New developments that change the landscape + +Write a brief note explaining your choice to: agents/${AGENT}/musings/research-${DATE}.md + +### Step 5: Research + Triage (60 min) + +As you research, CLASSIFY each finding before archiving: + +**[CLAIM]** — Specific, disagreeable proposition with evidence. + Will become a claim. Include: proposed title, confidence, key evidence. + Archive as a source. + +**[ENTITY]** — Tracked object with temporal data (company, person, protocol, lab). + Will become an entity file or update. Include: what changed, when. + Archive as a source. + +**[CONTEXT]** — Background that informs future work but isn't a proposition. + Goes to memory/research journal ONLY. Do NOT archive as a source. + +**[ROUTE:{agent}]** — Finding outside your domain. + Archive the source with flagged_for_{agent} in frontmatter. + Note why it's relevant to that agent. + +**[SKIP]** — Interesting but not actionable. Don't archive. + +Only archive [CLAIM] and [ENTITY] tagged findings as sources. +[CONTEXT] goes to your research journal. [ROUTE] gets flagged in source frontmatter. + +### Source Type Evaluation (before archiving): +1. Academic paper → Read Results + Conclusion. Confidence floor by study type. +2. Regulatory/policy → Extract direction claims only. High null-result rate is expected. +3. Journalism → Find the primary source. Downgrade confidence from headline level. +4. Market/industry report → Historical data = proven. Projections: 1-2yr likely, 3-5yr experimental, 5yr+ speculative. +5. Tweet thread or opinion → Signal for research direction, not evidence. Archive only if it cites primary sources. + +### Archiving Format: +Path: inbox/queue/YYYY-MM-DD-{author-handle}-{brief-slug}.md + +--- +type: source +title: "Descriptive title" +author: "Display Name (@handle)" +url: https://original-url +date: YYYY-MM-DD +domain: ${DOMAIN} +secondary_domains: [] +format: tweet | thread | essay | paper | report +status: unprocessed +priority: high | medium | low +triage_tag: claim | entity +tags: [topic1, topic2] +flagged_for_rio: ["reason"] +--- + +## Content +[Full text of tweet/thread/paper abstract] + +## Agent Notes +**Triage:** [CLAIM] or [ENTITY] — why this classification +**Why this matters:** [1-2 sentences] +**What surprised me:** [Unexpected finding — extractor needs this] +**KB connections:** [Which existing claims relate?] +**Extraction hints:** [What claims/entities might the extractor pull?] + +## Curator Notes +PRIMARY CONNECTION: [exact claim title this source most relates to] +WHY ARCHIVED: [what pattern or tension this evidences] + +### Step 5 Rules: +- Target 5-8 sources per session (quality over volume) +- Archive EVERYTHING tagged [CLAIM] or [ENTITY], not just what supports your views +- Set all sources to status: unprocessed +- Flag cross-domain sources with flagged_for_{agent} +- Do NOT extract claims yourself — the extractor is a separate instance +- Check inbox/queue/ and inbox/archive/ for duplicates before creating new archives + +### Step 6: Update Research Journal + Follow-ups (8 min) + +Append to agents/${AGENT}/research-journal.md: + +## Session ${DATE} +**Question:** [your research question] +**Key finding:** [most important thing you learned] +**Pattern update:** [confirm, challenge, or extend a pattern?] +**Confidence shift:** [any beliefs get stronger or weaker?] +**Extraction yield prediction:** [of the sources you archived, how many do you expect to produce claims vs entities vs null-results?] + +At the bottom of your research musing, add: + +## Follow-up Directions + +### NEXT: (continue next session) +- [Thread]: [What to do next, what to look for] + +### COMPLETED: (threads finished this session) +- [Thread]: [What you found, which claims/entities resulted] + +### DEAD ENDS: (don't re-run) +- [What you searched for]: [Why it was empty] + +### ROUTE: (findings for other agents) +- [Finding] → [Agent]: [Why relevant to their domain] + +### Step 7: Stop +When finished, STOP. The script handles all git operations. diff --git a/telegram/bot.py b/telegram/bot.py new file mode 100644 index 0000000..bd67c18 --- /dev/null +++ b/telegram/bot.py @@ -0,0 +1,615 @@ +#!/usr/bin/env python3 +"""Teleo Telegram Bot — Rio as analytical agent in community groups. + +Architecture: +- Always-on ingestion: captures all messages, batch triage every N minutes +- Tag-based response: Opus-quality KB-grounded responses when @tagged +- Conversation-window triage: identifies coherent claims across message threads +- Full eval tracing: Rio's responses are logged as KB claims, accountable + +Two paths (Ganymede architecture): +- Fast path (read): tag → KB query → Opus response → post to group +- Slow path (write): batch triage → archive to inbox/ → pipeline extracts + +Separate systemd service: teleo-telegram.service +Does NOT integrate with pipeline daemon. + +Epimetheus owns this module. +""" + +import asyncio +import json +import logging +import os +import re +import sqlite3 +import sys +import time +from collections import defaultdict +from datetime import datetime, timezone +from pathlib import Path + +# Add pipeline lib to path for shared modules +sys.path.insert(0, "/opt/teleo-eval/pipeline") + +from telegram import Update +from telegram.ext import ( + Application, + CommandHandler, + ContextTypes, + MessageHandler, + filters, +) + +# ─── Config ───────────────────────────────────────────────────────────── + +BOT_TOKEN_FILE = "/opt/teleo-eval/secrets/telegram-bot-token" +OPENROUTER_KEY_FILE = "/opt/teleo-eval/secrets/openrouter-key" +PIPELINE_DB = "/opt/teleo-eval/pipeline/pipeline.db" +CLAIM_INDEX_PATH = Path.home() / ".pentagon" / "workspace" / "collective" / "claim-index.json" +REPO_DIR = "/opt/teleo-eval/workspaces/extract" # For archiving sources +LOG_FILE = "/opt/teleo-eval/logs/telegram-bot.log" + +# Triage interval (seconds) +TRIAGE_INTERVAL = 900 # 15 minutes + +# Models +RESPONSE_MODEL = "anthropic/claude-opus-4-6" # Opus for tagged responses +TRIAGE_MODEL = "anthropic/claude-haiku-4.5" # Haiku for batch triage + +# Rate limits +MAX_RESPONSE_PER_USER_PER_HOUR = 6 +MIN_MESSAGE_LENGTH = 20 # Skip very short messages + +# ─── Logging ──────────────────────────────────────────────────────────── + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler(LOG_FILE), + logging.StreamHandler(), + ], +) +logger = logging.getLogger("telegram-bot") + +# ─── State ────────────────────────────────────────────────────────────── + +# Message buffer for batch triage +message_buffer: list[dict] = [] + +# Rate limiting +user_response_times: dict[int, list[float]] = defaultdict(list) + +# Allowed group IDs (set after first message received, or configure) +allowed_groups: set[int] = set() + + +# ─── Helpers ──────────────────────────────────────────────────────────── + + +def load_claim_index() -> dict | None: + """Load the claim-index.json for KB queries.""" + try: + if CLAIM_INDEX_PATH.exists(): + return json.loads(CLAIM_INDEX_PATH.read_text()) + # Fallback: try VPS path + vps_path = Path("/opt/teleo-eval/pipeline/claim-index.json") + if vps_path.exists(): + return json.loads(vps_path.read_text()) + except Exception as e: + logger.error("Failed to load claim index: %s", e) + return None + + +def find_relevant_claims(query: str, index: dict, max_results: int = 5) -> list[dict]: + """Find claims relevant to a query using keyword matching. + + Simple for now — upgrade to semantic search later. + """ + query_words = set(query.lower().split()) + scored = [] + for claim in index.get("claims", []): + title_words = set(claim.get("title", "").lower().split()) + overlap = len(query_words & title_words) + if overlap >= 2: + scored.append((overlap, claim)) + scored.sort(key=lambda x: x[0], reverse=True) + return [c for _, c in scored[:max_results]] + + +def get_db_stats() -> dict: + """Get basic KB stats from pipeline DB.""" + try: + conn = sqlite3.connect(PIPELINE_DB, timeout=5) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA query_only=ON") + merged = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='merged'").fetchone()["n"] + contributors = conn.execute("SELECT COUNT(*) as n FROM contributors").fetchone()["n"] + conn.close() + return {"merged_claims": merged, "contributors": contributors} + except Exception: + return {"merged_claims": "?", "contributors": "?"} + + +async def call_openrouter(model: str, prompt: str, max_tokens: int = 2048) -> str | None: + """Call OpenRouter API.""" + import aiohttp + + key = Path(OPENROUTER_KEY_FILE).read_text().strip() + payload = { + "model": model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": max_tokens, + "temperature": 0.3, + } + try: + async with aiohttp.ClientSession() as session: + async with session.post( + "https://openrouter.ai/api/v1/chat/completions", + headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"}, + json=payload, + timeout=aiohttp.ClientTimeout(total=120), + ) as resp: + if resp.status >= 400: + logger.error("OpenRouter %s → %d", model, resp.status) + return None + data = await resp.json() + return data.get("choices", [{}])[0].get("message", {}).get("content") + except Exception as e: + logger.error("OpenRouter error: %s", e) + return None + + +def is_rate_limited(user_id: int) -> bool: + """Check if a user has exceeded the response rate limit.""" + now = time.time() + times = user_response_times[user_id] + # Prune old entries + times[:] = [t for t in times if now - t < 3600] + return len(times) >= MAX_RESPONSE_PER_USER_PER_HOUR + + +def sanitize_message(text: str) -> str: + """Sanitize message content before sending to LLM. (Ganymede: security)""" + # Strip code blocks (potential prompt injection) + text = re.sub(r"```.*?```", "[code block removed]", text, flags=re.DOTALL) + # Strip anything that looks like system instructions + text = re.sub(r"(system:|assistant:|human:|<\|.*?\|>)", "", text, flags=re.IGNORECASE) + # Truncate + return text[:2000] + + +# ─── Message Handlers ─────────────────────────────────────────────────── + + +async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handle ALL incoming group messages — buffer for triage.""" + if not update.message or not update.message.text: + return + + msg = update.message + text = msg.text.strip() + + # Skip very short messages + if len(text) < MIN_MESSAGE_LENGTH: + return + + # Buffer for batch triage + message_buffer.append({ + "text": sanitize_message(text), + "user_id": msg.from_user.id if msg.from_user else None, + "username": msg.from_user.username if msg.from_user else None, + "display_name": msg.from_user.full_name if msg.from_user else None, + "chat_id": msg.chat_id, + "message_id": msg.message_id, + "timestamp": msg.date.isoformat() if msg.date else datetime.now(timezone.utc).isoformat(), + "reply_to": msg.reply_to_message.message_id if msg.reply_to_message else None, + }) + + +async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handle messages that tag the bot — Rio responds with Opus.""" + if not update.message or not update.message.text: + return + + msg = update.message + user = msg.from_user + text = sanitize_message(msg.text) + + # Rate limit check + if user and is_rate_limited(user.id): + await msg.reply_text("I'm processing other requests — try again in a few minutes.") + return + + logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100]) + + # Send typing indicator + await msg.chat.send_action("typing") + + # Load KB + index = load_claim_index() + stats = get_db_stats() + + if not index: + await msg.reply_text("KB index unavailable — try again shortly.") + return + + # Find relevant claims + relevant = find_relevant_claims(text, index, max_results=5) + + claims_context = "" + if relevant: + claims_context = "## Relevant KB Claims\n" + for c in relevant: + claims_context += f"- **{c['title']}** (confidence: {c.get('confidence', '?')}, domain: {c.get('domain', '?')})\n" + + # Build Opus prompt — Rio's voice + prompt = f"""You are Rio, the internet finance domain expert for TeleoHumanity's collective knowledge base. You're responding to a message in the ownership community Telegram group. + +## Your Voice +- Grounded in KB evidence — cite specific claims and their confidence levels +- State your position when you have one — analyst means grounded, not neutral +- Name uncertainty explicitly — "we don't have data on this yet" is honest +- Never shill — present evidence and risks alongside convictions +- If the message contains a genuine insight the KB doesn't have, say so: "That's something we haven't captured yet — it's worth investigating" + +## KB State +- {stats['merged_claims']} merged claims across 14 domains +- {stats['contributors']} contributors tracked +- {index.get('total_claims', '?')} claims in index + +{claims_context} + +## The Message +From: @{user.username if user else 'unknown'} ({user.full_name if user else 'unknown'}) +Message: {text} + +## Your Response +Respond substantively. If the message contains a claim or evidence: +1. Connect it to what the KB already knows +2. State where you agree and where the evidence is uncertain +3. If this challenges an existing claim, say so specifically + +Keep it conversational — this is Telegram, not a paper. 2-4 paragraphs max. +Do NOT use markdown headers. Light formatting only (bold for claim titles, italics for emphasis).""" + + # Call Opus + response = await call_openrouter(RESPONSE_MODEL, prompt, max_tokens=1024) + + if not response: + await msg.reply_text("Processing error — I'll get back to you.") + return + + # Post response + await msg.reply_text(response) + + # Record rate limit + if user: + user_response_times[user.id].append(time.time()) + + # Log the exchange for audit trail + logger.info("Rio responded to @%s (msg_id=%d)", user.username if user else "?", msg.message_id) + + # Detect and fetch URLs for pipeline ingestion + urls = _extract_urls(text) + url_content = None + if urls: + logger.info("Fetching URL: %s", urls[0]) + url_content = await _fetch_url_content(urls[0]) + if url_content: + logger.info("Fetched %d chars from %s", len(url_content), urls[0]) + + # Archive the exchange as a source for pipeline (slow path) + _archive_exchange(text, response, user, msg, url_content=url_content, urls=urls) + + +async def _fetch_url_content(url: str) -> str | None: + """Fetch article/page content from a URL for pipeline ingestion.""" + import aiohttp + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp: + if resp.status >= 400: + return None + html = await resp.text() + # Strip HTML tags for plain text (basic — upgrade to readability later) + text = re.sub(r"", "", html, flags=re.DOTALL) + text = re.sub(r"", "", text, flags=re.DOTALL) + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"\s+", " ", text).strip() + return text[:10000] # Cap at 10K chars + except Exception as e: + logger.warning("Failed to fetch URL %s: %s", url, e) + return None + + +def _extract_urls(text: str) -> list[str]: + """Extract URLs from message text.""" + return re.findall(r"https?://[^\s<>\"']+", text) + + +def _archive_exchange(user_text: str, rio_response: str, user, msg, + url_content: str | None = None, urls: list[str] | None = None): + """Archive a tagged exchange to inbox/queue/ for pipeline processing.""" + try: + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + username = user.username if user else "anonymous" + slug = re.sub(r"[^a-z0-9]+", "-", user_text[:50].lower()).strip("-") + filename = f"{date_str}-telegram-{username}-{slug}.md" + + archive_path = Path(REPO_DIR) / "inbox" / "queue" / filename + archive_path.parent.mkdir(parents=True, exist_ok=True) + + # Extract rationale (the user's text minus the @mention and URL) + rationale = re.sub(r"@\w+", "", user_text).strip() + for url in (urls or []): + rationale = rationale.replace(url, "").strip() + + # Determine priority — directed contribution with rationale gets high priority + priority = "high" if rationale and len(rationale) > 20 else "medium" + intake_tier = "directed" if rationale and len(rationale) > 20 else "undirected" + + url_section = "" + if url_content: + url_section = f"\n## Article Content (fetched)\n\n{url_content[:8000]}\n" + + content = f"""--- +type: source +source_type: telegram +title: "Telegram: @{username} — {slug}" +author: "@{username}" +url: "{urls[0] if urls else ''}" +date: {date_str} +domain: internet-finance +format: conversation +status: unprocessed +priority: {priority} +intake_tier: {intake_tier} +rationale: "{rationale[:200]}" +proposed_by: "@{username}" +tags: [telegram, ownership-community] +--- + +## Conversation + +**@{username}:** +{user_text} + +**Rio (response):** +{rio_response} +{url_section} +## Agent Notes +**Why archived:** Tagged exchange in ownership community. +**Rationale from contributor:** {rationale if rationale else 'No rationale provided (bare link or question)'} +**Intake tier:** {intake_tier} — {'fast-tracked, contributor provided reasoning' if intake_tier == 'directed' else 'standard processing'} +**Triage:** Conversation may contain [CLAIM], [ENTITY], or [EVIDENCE] for extraction. +""" + archive_path.write_text(content) + logger.info("Archived exchange to %s (tier: %s, urls: %d)", + filename, intake_tier, len(urls or [])) + except Exception as e: + logger.error("Failed to archive exchange: %s", e) + + +# ─── Batch Triage ─────────────────────────────────────────────────────── + + +async def run_batch_triage(context: ContextTypes.DEFAULT_TYPE): + """Batch triage of buffered messages every TRIAGE_INTERVAL seconds. + + Groups messages into conversation windows, sends to Haiku for classification, + archives substantive findings. + """ + global message_buffer + + if not message_buffer: + return + + # Grab and clear buffer + messages = message_buffer[:] + message_buffer = [] + + logger.info("Batch triage: %d messages to process", len(messages)) + + # Group into conversation windows (messages within 5 min of each other) + windows = _group_into_windows(messages, window_seconds=300) + + if not windows: + return + + # Build triage prompt + windows_text = "" + for i, window in enumerate(windows): + window_msgs = "\n".join( + f" @{m.get('username', '?')}: {m['text'][:200]}" + for m in window + ) + windows_text += f"\n--- Window {i+1} ({len(window)} messages) ---\n{window_msgs}\n" + + prompt = f"""Classify each conversation window. For each, respond with ONE tag: + +[CLAIM] — Contains a specific, disagreeable proposition about how something works +[ENTITY] — Contains factual data about a company, protocol, person, or market +[EVIDENCE] — Contains data or argument that supports or challenges an existing claim about internet finance, futarchy, prediction markets, or token governance +[SKIP] — Casual conversation, not relevant to the knowledge base + +Be generous with EVIDENCE — even confirming evidence strengthens the KB. + +{windows_text} + +Respond with ONLY the window numbers and tags, one per line: +1: [TAG] +2: [TAG] +...""" + + result = await call_openrouter(TRIAGE_MODEL, prompt, max_tokens=500) + + if not result: + logger.warning("Triage LLM call failed — buffered messages dropped") + return + + # Parse triage results + for line in result.strip().split("\n"): + match = re.match(r"(\d+):\s*\[(\w+)\]", line) + if not match: + continue + idx = int(match.group(1)) - 1 + tag = match.group(2).upper() + + if idx < 0 or idx >= len(windows): + continue + + if tag in ("CLAIM", "ENTITY", "EVIDENCE"): + _archive_window(windows[idx], tag) + + logger.info("Triage complete: %d windows processed", len(windows)) + + +def _group_into_windows(messages: list[dict], window_seconds: int = 300) -> list[list[dict]]: + """Group messages into conversation windows by time proximity.""" + if not messages: + return [] + + # Sort by timestamp + messages.sort(key=lambda m: m.get("timestamp", "")) + + windows = [] + current_window = [messages[0]] + + for msg in messages[1:]: + # Simple grouping: if within window_seconds of previous message, same window + current_window.append(msg) + if len(current_window) >= 10: # Cap window size + windows.append(current_window) + current_window = [] + + if current_window: + windows.append(current_window) + + return windows + + +def _archive_window(window: list[dict], tag: str): + """Archive a triaged conversation window to inbox/queue/.""" + try: + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + first_user = window[0].get("username", "group") + slug = re.sub(r"[^a-z0-9]+", "-", window[0]["text"][:40].lower()).strip("-") + filename = f"{date_str}-telegram-{first_user}-{slug}.md" + + archive_path = Path(REPO_DIR) / "inbox" / "queue" / filename + archive_path.parent.mkdir(parents=True, exist_ok=True) + + # Build conversation content + conversation = "" + contributors = set() + for msg in window: + username = msg.get("username", "anonymous") + contributors.add(username) + conversation += f"**@{username}:** {msg['text']}\n\n" + + content = f"""--- +type: source +source_type: telegram +title: "Telegram conversation: {slug}" +author: "{', '.join(contributors)}" +date: {date_str} +domain: internet-finance +format: conversation +status: unprocessed +priority: medium +triage_tag: {tag.lower()} +tags: [telegram, ownership-community] +--- + +## Conversation ({len(window)} messages, {len(contributors)} participants) + +{conversation} + +## Agent Notes +**Triage:** [{tag}] — classified by batch triage +**Participants:** {', '.join(f'@{u}' for u in contributors)} +""" + archive_path.write_text(content) + logger.info("Archived window [%s]: %s (%d msgs, %d participants)", + tag, filename, len(window), len(contributors)) + except Exception as e: + logger.error("Failed to archive window: %s", e) + + +# ─── Bot Setup ────────────────────────────────────────────────────────── + + +async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handle /start command.""" + await update.message.reply_text( + "I'm Rio, the internet finance agent for TeleoHumanity's collective knowledge base. " + "Tag me with @teleo to ask about futarchy, prediction markets, token governance, " + "or anything in our domain. I'll ground my response in our KB's evidence." + ) + + +async def stats_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handle /stats command — show KB stats.""" + index = load_claim_index() + stats = get_db_stats() + if index: + await update.message.reply_text( + f"📊 KB Stats:\n" + f"• {index.get('total_claims', '?')} claims across {len(index.get('domains', {}))} domains\n" + f"• {stats['merged_claims']} PRs merged\n" + f"• {stats['contributors']} contributors\n" + f"• {index.get('orphan_count', '?')} orphan claims ({index.get('orphan_ratio', 0)*100:.0f}%)\n" + f"• {index.get('cross_domain_links', '?')} cross-domain connections" + ) + else: + await update.message.reply_text("KB index unavailable.") + + +def main(): + """Start the bot.""" + # Load token + token_path = Path(BOT_TOKEN_FILE) + if not token_path.exists(): + logger.error("Bot token not found at %s", BOT_TOKEN_FILE) + sys.exit(1) + token = token_path.read_text().strip() + + logger.info("Starting Teleo Telegram bot (Rio)...") + + # Build application + app = Application.builder().token(token).build() + + # Command handlers + app.add_handler(CommandHandler("start", start_command)) + app.add_handler(CommandHandler("stats", stats_command)) + + # Tag handler — messages mentioning the bot + # python-telegram-bot filters.Mention doesn't work for bot mentions in groups + # Use a regex filter for the bot username + app.add_handler(MessageHandler( + filters.TEXT & filters.Regex(r"(?i)@teleo"), + handle_tagged, + )) + + # All other text messages — buffer for triage + app.add_handler(MessageHandler( + filters.TEXT & ~filters.COMMAND, + handle_message, + )) + + # Batch triage job + app.job_queue.run_repeating( + run_batch_triage, + interval=TRIAGE_INTERVAL, + first=TRIAGE_INTERVAL, + ) + + # Run + logger.info("Bot running. Triage interval: %ds", TRIAGE_INTERVAL) + app.run_polling(drop_pending_updates=True) + + +if __name__ == "__main__": + main() diff --git a/telegram/teleo-telegram.service b/telegram/teleo-telegram.service new file mode 100644 index 0000000..71f3810 --- /dev/null +++ b/telegram/teleo-telegram.service @@ -0,0 +1,22 @@ +[Unit] +Description=Teleo Telegram Bot — Rio in ownership community +After=network.target teleo-pipeline.service +Wants=teleo-pipeline.service + +[Service] +Type=simple +User=teleo +Group=teleo +WorkingDirectory=/opt/teleo-eval/telegram +ExecStart=/opt/teleo-eval/pipeline/.venv/bin/python3 /opt/teleo-eval/telegram/bot.py +Restart=always +RestartSec=10 +Environment=PYTHONUNBUFFERED=1 + +# Security +NoNewPrivileges=true +ProtectSystem=strict +ReadWritePaths=/opt/teleo-eval/logs /opt/teleo-eval/workspaces/extract/inbox/queue /opt/teleo-eval/workspaces/extract/inbox/archive /opt/teleo-eval/workspaces/extract/inbox/null-result + +[Install] +WantedBy=multi-user.target