teleo-infrastructure/lib/entity_queue.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

206 lines
6.8 KiB
Python

"""Entity enrichment queue — decouple entity writes from extraction branches.
Problem: Entity updates on extraction branches cause merge conflicts because
multiple extraction branches modify the same entity file (e.g., metadao.md).
83% of near_duplicate false positives come from entity file modifications.
Solution: Extraction writes entity operations to a JSON queue file on the VPS.
A separate batch process reads the queue and applies operations to main.
Entity operations are commutative (timeline appends are order-independent),
so parallel extractions never conflict.
Flow:
1. openrouter-extract-v2.py → entity_queue.enqueue() instead of direct file writes
2. entity_batch.py (cron or pipeline stage) → entity_queue.dequeue() + apply to main
3. Commit entity changes to main directly (no PR needed for timeline appends)
Epimetheus owns this module. Leo reviews changes.
"""
import json
import logging
import os
import time
from datetime import date, datetime
from pathlib import Path
logger = logging.getLogger("pipeline.entity_queue")
# Default queue location (VPS)
DEFAULT_QUEUE_DIR = "/opt/teleo-eval/entity-queue"
def _queue_dir() -> Path:
"""Get the queue directory, creating it if needed."""
d = Path(os.environ.get("ENTITY_QUEUE_DIR", DEFAULT_QUEUE_DIR))
d.mkdir(parents=True, exist_ok=True)
return d
def enqueue(entity: dict, source_file: str, agent: str) -> str:
"""Add an entity operation to the queue. Returns the queue entry ID.
Args:
entity: dict with keys: filename, domain, action (create|update),
entity_type, content (for creates), timeline_entry (for updates)
source_file: path to the source that produced this entity
agent: agent name performing extraction
Returns:
Queue entry filename (for tracking)
Raises:
ValueError: if entity dict is missing required fields or has invalid action
"""
# Validate required fields (Ganymede review)
for field in ("filename", "domain", "action"):
if not entity.get(field):
raise ValueError(f"Entity missing required field: {field}")
if entity["action"] not in ("create", "update"):
raise ValueError(f"Invalid entity action: {entity['action']}")
# Sanitize filename — prevent path traversal (Ganymede review)
entity["filename"] = os.path.basename(entity["filename"])
entry_id = f"{int(time.time() * 1000)}-{entity['filename'].replace('.md', '')}"
entry = {
"id": entry_id,
"entity": entity,
"source_file": os.path.basename(source_file),
"agent": agent,
"enqueued_at": datetime.now(tz=__import__('datetime').timezone.utc).isoformat(),
"status": "pending",
}
queue_file = _queue_dir() / f"{entry_id}.json"
with open(queue_file, "w") as f:
json.dump(entry, f, indent=2)
logger.info("Enqueued entity operation: %s (%s)", entity["filename"], entity.get("action", "?"))
return entry_id
def dequeue(limit: int = 50) -> list[dict]:
"""Read pending queue entries, oldest first. Returns list of entry dicts.
Does NOT remove entries — caller marks them processed after successful apply.
"""
qdir = _queue_dir()
entries = []
for f in sorted(qdir.glob("*.json")):
try:
with open(f) as fh:
entry = json.load(fh)
if entry.get("status") == "pending":
entry["_queue_path"] = str(f)
entries.append(entry)
if len(entries) >= limit:
break
except (json.JSONDecodeError, KeyError) as e:
logger.warning("Skipping malformed queue entry %s: %s", f.name, e)
return entries
def mark_processed(entry: dict, result: str = "applied"):
"""Mark a queue entry as processed (or failed).
Uses atomic write (tmp + rename) to prevent race conditions. (Ganymede review)
"""
queue_path = entry.get("_queue_path")
if not queue_path or not os.path.exists(queue_path):
return
entry["status"] = result
entry["processed_at"] = datetime.now(tz=__import__('datetime').timezone.utc).isoformat()
# Remove internal tracking field before writing
path_backup = queue_path
entry.pop("_queue_path", None)
# Atomic write: tmp file + rename (Ganymede review — prevents race condition)
tmp_path = queue_path + ".tmp"
with open(tmp_path, "w") as f:
json.dump(entry, f, indent=2)
os.rename(tmp_path, queue_path)
def mark_failed(entry: dict, error: str):
"""Mark a queue entry as failed with error message."""
entry["last_error"] = error
mark_processed(entry, result="failed")
def queue_enrichment(
target_claim: str,
evidence: str,
pr_number: int,
original_title: str,
similarity: float,
domain: str,
) -> str:
"""Queue an enrichment for an existing claim. Applied by entity_batch alongside entity updates.
Used by the substantive fixer for near-duplicate auto-conversion.
Single writer pattern — avoids race conditions with direct main writes. (Ganymede)
"""
entry_id = f"{int(time.time() * 1000)}-enrichment-{os.path.basename(target_claim).replace('.md', '')}"
entry = {
"id": entry_id,
"type": "enrichment",
"target_claim": target_claim,
"evidence": evidence,
"pr_number": pr_number,
"original_title": original_title,
"similarity": similarity,
"domain": domain,
"enqueued_at": datetime.now(tz=__import__('datetime').timezone.utc).isoformat(),
"status": "pending",
}
queue_file = _queue_dir() / f"{entry_id}.json"
with open(queue_file, "w") as f:
json.dump(entry, f, indent=2)
logger.info("Enqueued enrichment: PR #%d%s (sim=%.2f)", pr_number, target_claim, similarity)
return entry_id
def cleanup(max_age_hours: int = 24):
"""Remove processed/failed entries older than max_age_hours."""
qdir = _queue_dir()
cutoff = time.time() - (max_age_hours * 3600)
removed = 0
for f in qdir.glob("*.json"):
try:
with open(f) as fh:
entry = json.load(fh)
if entry.get("status") in ("applied", "failed"):
if f.stat().st_mtime < cutoff:
f.unlink()
removed += 1
except Exception:
pass
if removed:
logger.info("Cleaned up %d old queue entries", removed)
return removed
def queue_stats() -> dict:
"""Get queue statistics for health monitoring."""
qdir = _queue_dir()
stats = {"pending": 0, "applied": 0, "failed": 0, "total": 0}
for f in qdir.glob("*.json"):
try:
with open(f) as fh:
entry = json.load(fh)
status = entry.get("status", "unknown")
stats[status] = stats.get(status, 0) + 1
stats["total"] += 1
except Exception:
pass
return stats