teleo-codex/ops/pipeline-v2/lib/entity_queue.py
m3taversal 05d74d5e32 sync: import all VPS pipeline + diagnostics code as baseline
Imports 67 files from VPS (/opt/teleo-eval/) into repo as the single source
of truth. Previously only 8 of 67 files existed in repo — the rest were
deployed directly to VPS via SCP, causing massive drift.

Includes:
- pipeline/lib/: 33 Python modules (daemon core, extraction, evaluation, merge, cascade, cross-domain, costs, attribution, etc.)
- pipeline/: main daemon (teleo-pipeline.py), reweave.py, batch-extract-50.sh
- diagnostics/: 19 files (4-page dashboard, alerting, daily digest, review queue, tier1 metrics)
- agent-state/: bootstrap, lib-state, cascade inbox processor, schema
- systemd/: service unit files for reference
- deploy.sh: rsync-based deploy with --dry-run, syntax checks, dirty-tree gate
- research-session.sh: updated with Step 8.5 digest + cascade inbox processing

No new code written — all files are exact copies from VPS as of 2026-04-06.
From this point forward: edit in repo, commit, then deploy.sh.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 00:00:00 +01:00

206 lines
6.8 KiB
Python

"""Entity enrichment queue — decouple entity writes from extraction branches.
Problem: Entity updates on extraction branches cause merge conflicts because
multiple extraction branches modify the same entity file (e.g., metadao.md).
83% of near_duplicate false positives come from entity file modifications.
Solution: Extraction writes entity operations to a JSON queue file on the VPS.
A separate batch process reads the queue and applies operations to main.
Entity operations are commutative (timeline appends are order-independent),
so parallel extractions never conflict.
Flow:
1. openrouter-extract-v2.py → entity_queue.enqueue() instead of direct file writes
2. entity_batch.py (cron or pipeline stage) → entity_queue.dequeue() + apply to main
3. Commit entity changes to main directly (no PR needed for timeline appends)
Epimetheus owns this module. Leo reviews changes.
"""
import json
import logging
import os
import time
from datetime import date, datetime
from pathlib import Path
logger = logging.getLogger("pipeline.entity_queue")
# Default queue location (VPS)
DEFAULT_QUEUE_DIR = "/opt/teleo-eval/entity-queue"
def _queue_dir() -> Path:
"""Get the queue directory, creating it if needed."""
d = Path(os.environ.get("ENTITY_QUEUE_DIR", DEFAULT_QUEUE_DIR))
d.mkdir(parents=True, exist_ok=True)
return d
def enqueue(entity: dict, source_file: str, agent: str) -> str:
"""Add an entity operation to the queue. Returns the queue entry ID.
Args:
entity: dict with keys: filename, domain, action (create|update),
entity_type, content (for creates), timeline_entry (for updates)
source_file: path to the source that produced this entity
agent: agent name performing extraction
Returns:
Queue entry filename (for tracking)
Raises:
ValueError: if entity dict is missing required fields or has invalid action
"""
# Validate required fields (Ganymede review)
for field in ("filename", "domain", "action"):
if not entity.get(field):
raise ValueError(f"Entity missing required field: {field}")
if entity["action"] not in ("create", "update"):
raise ValueError(f"Invalid entity action: {entity['action']}")
# Sanitize filename — prevent path traversal (Ganymede review)
entity["filename"] = os.path.basename(entity["filename"])
entry_id = f"{int(time.time() * 1000)}-{entity['filename'].replace('.md', '')}"
entry = {
"id": entry_id,
"entity": entity,
"source_file": os.path.basename(source_file),
"agent": agent,
"enqueued_at": datetime.now(tz=__import__('datetime').timezone.utc).isoformat(),
"status": "pending",
}
queue_file = _queue_dir() / f"{entry_id}.json"
with open(queue_file, "w") as f:
json.dump(entry, f, indent=2)
logger.info("Enqueued entity operation: %s (%s)", entity["filename"], entity.get("action", "?"))
return entry_id
def dequeue(limit: int = 50) -> list[dict]:
"""Read pending queue entries, oldest first. Returns list of entry dicts.
Does NOT remove entries — caller marks them processed after successful apply.
"""
qdir = _queue_dir()
entries = []
for f in sorted(qdir.glob("*.json")):
try:
with open(f) as fh:
entry = json.load(fh)
if entry.get("status") == "pending":
entry["_queue_path"] = str(f)
entries.append(entry)
if len(entries) >= limit:
break
except (json.JSONDecodeError, KeyError) as e:
logger.warning("Skipping malformed queue entry %s: %s", f.name, e)
return entries
def mark_processed(entry: dict, result: str = "applied"):
"""Mark a queue entry as processed (or failed).
Uses atomic write (tmp + rename) to prevent race conditions. (Ganymede review)
"""
queue_path = entry.get("_queue_path")
if not queue_path or not os.path.exists(queue_path):
return
entry["status"] = result
entry["processed_at"] = datetime.now(tz=__import__('datetime').timezone.utc).isoformat()
# Remove internal tracking field before writing
path_backup = queue_path
entry.pop("_queue_path", None)
# Atomic write: tmp file + rename (Ganymede review — prevents race condition)
tmp_path = queue_path + ".tmp"
with open(tmp_path, "w") as f:
json.dump(entry, f, indent=2)
os.rename(tmp_path, queue_path)
def mark_failed(entry: dict, error: str):
"""Mark a queue entry as failed with error message."""
entry["last_error"] = error
mark_processed(entry, result="failed")
def queue_enrichment(
target_claim: str,
evidence: str,
pr_number: int,
original_title: str,
similarity: float,
domain: str,
) -> str:
"""Queue an enrichment for an existing claim. Applied by entity_batch alongside entity updates.
Used by the substantive fixer for near-duplicate auto-conversion.
Single writer pattern — avoids race conditions with direct main writes. (Ganymede)
"""
entry_id = f"{int(time.time() * 1000)}-enrichment-{os.path.basename(target_claim).replace('.md', '')}"
entry = {
"id": entry_id,
"type": "enrichment",
"target_claim": target_claim,
"evidence": evidence,
"pr_number": pr_number,
"original_title": original_title,
"similarity": similarity,
"domain": domain,
"enqueued_at": datetime.now(tz=__import__('datetime').timezone.utc).isoformat(),
"status": "pending",
}
queue_file = _queue_dir() / f"{entry_id}.json"
with open(queue_file, "w") as f:
json.dump(entry, f, indent=2)
logger.info("Enqueued enrichment: PR #%d%s (sim=%.2f)", pr_number, target_claim, similarity)
return entry_id
def cleanup(max_age_hours: int = 24):
"""Remove processed/failed entries older than max_age_hours."""
qdir = _queue_dir()
cutoff = time.time() - (max_age_hours * 3600)
removed = 0
for f in qdir.glob("*.json"):
try:
with open(f) as fh:
entry = json.load(fh)
if entry.get("status") in ("applied", "failed"):
if f.stat().st_mtime < cutoff:
f.unlink()
removed += 1
except Exception:
pass
if removed:
logger.info("Cleaned up %d old queue entries", removed)
return removed
def queue_stats() -> dict:
"""Get queue statistics for health monitoring."""
qdir = _queue_dir()
stats = {"pending": 0, "applied": 0, "failed": 0, "total": 0}
for f in qdir.glob("*.json"):
try:
with open(f) as fh:
entry = json.load(fh)
status = entry.get("status", "unknown")
stats[status] = stats.get(status, 0) + 1
stats["total"] += 1
except Exception:
pass
return stats