5 patterns were too broad — matched common English words: - "extraction" (concept) matched pipeline extraction pattern - "class X" (English) matched Python class definition pattern - ".md " (product name) matched file extension pattern - "threshold" (concept) matched internal metrics pattern Fixes: - extraction: require pipeline context words (queue/PR/branch/cron) - class/def/import: require line-start (actual code, not prose) - .py/.yaml/.json: require path-like prefix (not bare .md) - threshold: require pipeline context (cosine/vector/Qdrant) All 3 Hermes dry-run drafts now pass. 18/18 tests pass. 11/11 system content regression tests pass. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
166 lines
6.7 KiB
Python
166 lines
6.7 KiB
Python
"""Output gate — classifies content as system/internal vs public-facing.
|
|
|
|
Blocks pipeline messages (extraction logs, merge notifications, diagnostics)
|
|
from ever reaching the tweet queue or any public-facing output.
|
|
|
|
This is a deterministic classifier — no LLM calls. Pattern matching on content.
|
|
|
|
Epimetheus owns this module.
|
|
"""
|
|
|
|
import re
|
|
|
|
# ─── System Message Patterns ─────────────────────────────────────────
|
|
# Content matching ANY of these is classified as system/internal.
|
|
|
|
_SYSTEM_PATTERNS = [
|
|
# Pipeline operations
|
|
re.compile(r"\b(PR\s*#\d+|pull request|merge|rebase|cherry.?pick)\b", re.IGNORECASE),
|
|
re.compile(r"\b(batch.?extract|extract/|extractor)\b", re.IGNORECASE),
|
|
re.compile(r"\bextract(?:ed|ion)\b.*\b(pipeline|queue|PR|branch|source|cron)\b", re.IGNORECASE),
|
|
re.compile(r"\b(pipeline|cron|batch.?extract|systemd|teleo-pipeline)\b", re.IGNORECASE),
|
|
re.compile(r"\b(conflict.?permanent|conflict.?closed|merge.?conflict)\b", re.IGNORECASE),
|
|
|
|
# Infrastructure / ops
|
|
re.compile(r"\b(schema\s*v\d+|migration\s*v\d+|SCHEMA_VERSION)\b", re.IGNORECASE),
|
|
re.compile(r"\b(deploy|VPS|ssh|scp|systemctl|journalctl)\b", re.IGNORECASE),
|
|
re.compile(r"\b(Qdrant|embed.?on.?merge|vector.?gc|backfill)\b", re.IGNORECASE),
|
|
re.compile(r"\b(ReadWritePaths|ProtectSystem|ExecStartPre)\b", re.IGNORECASE),
|
|
|
|
# Diagnostics
|
|
re.compile(r"\b(vital.?signs|queue.?staleness|orphan.?ratio)\b", re.IGNORECASE),
|
|
re.compile(r"\b(approval.?rate|throughput|PRs?.?per.?hour)\b", re.IGNORECASE),
|
|
re.compile(r"\b(reviewer_count|reviewer.?backfill)\b", re.IGNORECASE),
|
|
|
|
# Agent names — standalone mentions of any internal agent
|
|
# Leo and Rio excluded (common words) — caught by context patterns below
|
|
re.compile(r"\b(Epimetheus|Ganymede|Rhea|Oberon|Hermes|Theseus|Argus|Vida|Astra|Clay)\b"),
|
|
re.compile(r"\b(Leo|Rio)\s+(review|approv|reject|said|flagged|owns?|confirm)", re.IGNORECASE),
|
|
re.compile(r"\bPentagon\b"),
|
|
re.compile(r"\bm3ta\b", re.IGNORECASE),
|
|
|
|
# Agent coordination internals
|
|
re.compile(r"\b(Ganymede|Rhea|Oberon)\s+(review(?:ed)?|approv(?:ed|es?)|reject(?:ed|s)?)\b", re.IGNORECASE),
|
|
re.compile(r"\b(PIPELINE_OWNED_PREFIXES|AGENT_NAMES)\b"),
|
|
re.compile(r"\b(worktree|bare.?repo|forgejo|git\.livingip)\b", re.IGNORECASE),
|
|
|
|
# Coordination language
|
|
re.compile(r"\b(craft.?review|substance.?review|m3ta.?approv|skill.?graph|eval.?rubric)\b", re.IGNORECASE),
|
|
|
|
# Infrastructure domains
|
|
re.compile(r"\bteleo.?codex\b", re.IGNORECASE),
|
|
re.compile(r"\blivingip\.xyz\b", re.IGNORECASE),
|
|
|
|
# UUIDs (conversation IDs, agent IDs)
|
|
re.compile(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", re.IGNORECASE),
|
|
|
|
# Code / technical — require line-start or code context to avoid matching English "class"
|
|
re.compile(r"^\s*(def|import|class)\s+\w+", re.MULTILINE),
|
|
re.compile(r"[\w/]+\.(py|yaml|json)\b", re.IGNORECASE),
|
|
re.compile(r"\b(sqlite3?|pipeline\.db|response_audit)\b", re.IGNORECASE),
|
|
|
|
# Internal metrics / debugging — require pipeline context, not bare English words
|
|
re.compile(r"\b(cosine.?sim|PRIOR_ART_THRESHOLD|SCHEMA_VERSION)\b", re.IGNORECASE),
|
|
re.compile(r"\bthreshold\b.*\b(cosine|vector|Qdrant|embedding|pre.?screen)\b", re.IGNORECASE),
|
|
re.compile(r"\b(pre.?screen|Layer\s*[01234]|RRF|entity.?boost)\b", re.IGNORECASE),
|
|
|
|
# Paths
|
|
re.compile(r"/opt/teleo-eval/"),
|
|
re.compile(r"/Users/\w+/"),
|
|
re.compile(r"\.pentagon/"),
|
|
]
|
|
|
|
# ─── Public Content Signals ──────────────────────────────────────────
|
|
# Content matching these is MORE LIKELY to be public-facing.
|
|
# These don't override system classification — they're tiebreakers.
|
|
|
|
_PUBLIC_SIGNALS = [
|
|
re.compile(r"^(thread|tweet|post):", re.IGNORECASE | re.MULTILINE),
|
|
re.compile(r"\b(insight|analysis|take|perspective|argument)\b", re.IGNORECASE),
|
|
re.compile(r"\b(audience|followers|engagement|impression)\b", re.IGNORECASE),
|
|
]
|
|
|
|
|
|
class GateResult:
|
|
"""Result of output gate classification."""
|
|
|
|
__slots__ = ("is_public", "blocked_reasons", "confidence")
|
|
|
|
def __init__(self, is_public: bool, blocked_reasons: list[str], confidence: float):
|
|
self.is_public = is_public
|
|
self.blocked_reasons = blocked_reasons
|
|
self.confidence = confidence
|
|
|
|
def __bool__(self):
|
|
return self.is_public
|
|
|
|
def __repr__(self):
|
|
status = "PUBLIC" if self.is_public else "BLOCKED"
|
|
return f"GateResult({status}, reasons={self.blocked_reasons}, conf={self.confidence:.2f})"
|
|
|
|
|
|
def classify(content: str) -> GateResult:
|
|
"""Classify content as public-facing or system/internal.
|
|
|
|
Returns GateResult:
|
|
- is_public=True: safe for tweet queue / public output
|
|
- is_public=False: system content, blocked from public outputs
|
|
"""
|
|
if not content or not content.strip():
|
|
return GateResult(False, ["empty content"], 1.0)
|
|
|
|
# Count system pattern matches
|
|
system_hits = []
|
|
for pattern in _SYSTEM_PATTERNS:
|
|
match = pattern.search(content)
|
|
if match:
|
|
system_hits.append(match.group())
|
|
|
|
# Count public signals
|
|
public_hits = sum(1 for p in _PUBLIC_SIGNALS if p.search(content))
|
|
|
|
# Decision logic
|
|
if len(system_hits) >= 3:
|
|
# Strong system signal — definitely internal
|
|
return GateResult(False, system_hits[:5], 0.95)
|
|
|
|
if len(system_hits) >= 1 and public_hits == 0:
|
|
# Some system signal, no public signal — likely internal
|
|
return GateResult(False, system_hits, 0.75)
|
|
|
|
if len(system_hits) == 0:
|
|
# No system signal — public
|
|
return GateResult(True, [], 0.90 if public_hits > 0 else 0.70)
|
|
|
|
# Mixed signals (system hits + public signals) — default to blocking
|
|
# Better to block a borderline tweet than leak system info
|
|
return GateResult(False, system_hits, 0.50)
|
|
|
|
|
|
def gate_for_tweet_queue(content: str, agent: str = None) -> GateResult:
|
|
"""Gate specifically for the tweet queue. Stricter than general classify.
|
|
|
|
Additional checks:
|
|
- OPSEC filter (imported from approvals)
|
|
- Agent attribution check
|
|
"""
|
|
result = classify(content)
|
|
if not result.is_public:
|
|
return result
|
|
|
|
# Additional tweet-specific checks
|
|
blocked = []
|
|
|
|
# Must not be too short (probably a fragment or command)
|
|
stripped = content.strip()
|
|
if len(stripped) < 20:
|
|
blocked.append("content too short for tweet (<20 chars)")
|
|
|
|
# Must not contain raw URLs to internal systems
|
|
if re.search(r"https?://(?:localhost|127\.0\.0\.1|77\.42\.65\.182)", stripped):
|
|
blocked.append("contains internal URL")
|
|
|
|
if blocked:
|
|
return GateResult(False, blocked, 0.85)
|
|
|
|
return result
|