"""Pluggable approval architecture — extensible voting stages for content approval. Design constraint from m3ta: the approval step must be a pipeline stage, not hardcoded. Current stage: 1 human approves via Telegram. Future stages (interface designed, not implemented): - Agent pre-screening votes (weighted by CI score) - Multi-human approval - Domain-agent substance checks - Futarchy-style decision markets on high-stakes content Adding a new approval stage = implementing ApprovalStage and registering it. Threshold logic aggregates votes across all stages. Epimetheus owns this module. """ import logging import sqlite3 from dataclasses import dataclass, field from enum import Enum from typing import Callable, Optional logger = logging.getLogger("approval-stages") class Vote(Enum): APPROVE = "approve" REJECT = "reject" ABSTAIN = "abstain" @dataclass class StageResult: """Result from a single approval stage.""" stage_name: str vote: Vote weight: float # 0.0 - 1.0, how much this stage's vote counts reason: str = "" metadata: dict = field(default_factory=dict) @dataclass class AggregateResult: """Aggregated result across all approval stages.""" approved: bool total_weight_approve: float total_weight_reject: float total_weight_abstain: float stage_results: list[StageResult] threshold: float # what threshold was used @property def summary(self) -> str: status = "APPROVED" if self.approved else "REJECTED" return ( f"{status} (approve={self.total_weight_approve:.2f}, " f"reject={self.total_weight_reject:.2f}, " f"threshold={self.threshold:.2f})" ) class ApprovalStage: """Base class for approval stages. Implement check() to add a new approval stage. The method receives the approval request and returns a StageResult. Stages run in priority order (lower = earlier). A stage can short-circuit by returning a REJECT with weight >= threshold. """ name: str = "unnamed" priority: int = 100 # lower = runs earlier weight: float = 1.0 # default weight of this stage's vote def check(self, request: dict) -> StageResult: """Evaluate the approval request. Must be overridden.""" raise NotImplementedError # ─── Built-in Stages ───────────────────────────────────────────────── class OutputGateStage(ApprovalStage): """Stage 0: Deterministic output gate. Blocks system content.""" name = "output_gate" priority = 0 weight = 1.0 # absolute veto — if gate blocks, nothing passes def check(self, request: dict) -> StageResult: from output_gate import gate_for_tweet_queue content = request.get("content", "") agent = request.get("originating_agent", "") gate = gate_for_tweet_queue(content, agent) if gate: return StageResult(self.name, Vote.APPROVE, self.weight, "Content passed output gate") else: return StageResult(self.name, Vote.REJECT, self.weight, f"Blocked: {', '.join(gate.blocked_reasons)}", {"blocked_reasons": gate.blocked_reasons}) class OpsecStage(ApprovalStage): """Stage 1: OPSEC content filter. Blocks sensitive content.""" name = "opsec_filter" priority = 1 weight = 1.0 # absolute veto def check(self, request: dict) -> StageResult: from approvals import check_opsec content = request.get("content", "") violation = check_opsec(content) if violation: return StageResult(self.name, Vote.REJECT, self.weight, violation) else: return StageResult(self.name, Vote.APPROVE, self.weight, "No OPSEC violations") class HumanApprovalStage(ApprovalStage): """Stage 10: Human approval via Telegram. Currently the final gate. This stage is async — it doesn't return immediately. Instead, it sets up the Telegram notification and returns ABSTAIN. The actual vote comes later when Cory taps Approve/Reject. """ name = "human_approval" priority = 10 weight = 1.0 def check(self, request: dict) -> StageResult: # Human approval is handled asynchronously via Telegram # This stage just validates the request is properly formatted if not request.get("content"): return StageResult(self.name, Vote.REJECT, self.weight, "No content to approve") return StageResult(self.name, Vote.ABSTAIN, self.weight, "Awaiting human approval via Telegram", {"async": True}) # ─── Stage Registry ────────────────────────────────────────────────── # Default stages — these run for every approval request _DEFAULT_STAGES: list[ApprovalStage] = [ OutputGateStage(), OpsecStage(), HumanApprovalStage(), ] # Custom stages added by agents or plugins _CUSTOM_STAGES: list[ApprovalStage] = [] def register_stage(stage: ApprovalStage): """Register a custom approval stage.""" _CUSTOM_STAGES.append(stage) _CUSTOM_STAGES.sort(key=lambda s: s.priority) logger.info("Registered approval stage: %s (priority=%d, weight=%.2f)", stage.name, stage.priority, stage.weight) def get_all_stages() -> list[ApprovalStage]: """Get all stages sorted by priority.""" all_stages = _DEFAULT_STAGES + _CUSTOM_STAGES all_stages.sort(key=lambda s: s.priority) return all_stages # ─── Aggregation ───────────────────────────────────────────────────── def run_sync_stages(request: dict, threshold: float = 0.5) -> AggregateResult: """Run all synchronous approval stages and aggregate results. Stages with async=True in metadata are skipped (handled separately). Short-circuits on any REJECT with weight >= threshold. Args: request: dict with at minimum {content, originating_agent, type} threshold: weighted approve score needed to pass (0.0-1.0) Returns: AggregateResult with the decision. """ stages = get_all_stages() results = [] total_approve = 0.0 total_reject = 0.0 total_abstain = 0.0 for stage in stages: try: result = stage.check(request) except Exception as e: logger.error("Stage %s failed: %s — treating as ABSTAIN", stage.name, e) result = StageResult(stage.name, Vote.ABSTAIN, 0.0, f"Error: {e}") results.append(result) if result.vote == Vote.APPROVE: total_approve += result.weight elif result.vote == Vote.REJECT: total_reject += result.weight # Short-circuit: absolute veto if result.weight >= threshold: return AggregateResult( approved=False, total_weight_approve=total_approve, total_weight_reject=total_reject, total_weight_abstain=total_abstain, stage_results=results, threshold=threshold, ) else: total_abstain += result.weight # Final decision based on non-abstain votes active_weight = total_approve + total_reject if active_weight == 0: # All abstain — pass to async stages (human approval) approved = False # not yet approved, awaiting human else: approved = (total_approve / active_weight) >= threshold return AggregateResult( approved=approved, total_weight_approve=total_approve, total_weight_reject=total_reject, total_weight_abstain=total_abstain, stage_results=results, threshold=threshold, )