Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge
Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors
Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes
Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
206 lines
6.8 KiB
Python
206 lines
6.8 KiB
Python
"""Entity enrichment queue — decouple entity writes from extraction branches.
|
|
|
|
Problem: Entity updates on extraction branches cause merge conflicts because
|
|
multiple extraction branches modify the same entity file (e.g., metadao.md).
|
|
83% of near_duplicate false positives come from entity file modifications.
|
|
|
|
Solution: Extraction writes entity operations to a JSON queue file on the VPS.
|
|
A separate batch process reads the queue and applies operations to main.
|
|
Entity operations are commutative (timeline appends are order-independent),
|
|
so parallel extractions never conflict.
|
|
|
|
Flow:
|
|
1. openrouter-extract-v2.py → entity_queue.enqueue() instead of direct file writes
|
|
2. entity_batch.py (cron or pipeline stage) → entity_queue.dequeue() + apply to main
|
|
3. Commit entity changes to main directly (no PR needed for timeline appends)
|
|
|
|
Epimetheus owns this module. Leo reviews changes.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from datetime import date, datetime
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger("pipeline.entity_queue")
|
|
|
|
# Default queue location (VPS)
|
|
DEFAULT_QUEUE_DIR = "/opt/teleo-eval/entity-queue"
|
|
|
|
|
|
def _queue_dir() -> Path:
|
|
"""Get the queue directory, creating it if needed."""
|
|
d = Path(os.environ.get("ENTITY_QUEUE_DIR", DEFAULT_QUEUE_DIR))
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def enqueue(entity: dict, source_file: str, agent: str) -> str:
|
|
"""Add an entity operation to the queue. Returns the queue entry ID.
|
|
|
|
Args:
|
|
entity: dict with keys: filename, domain, action (create|update),
|
|
entity_type, content (for creates), timeline_entry (for updates)
|
|
source_file: path to the source that produced this entity
|
|
agent: agent name performing extraction
|
|
|
|
Returns:
|
|
Queue entry filename (for tracking)
|
|
|
|
Raises:
|
|
ValueError: if entity dict is missing required fields or has invalid action
|
|
"""
|
|
# Validate required fields (Ganymede review)
|
|
for field in ("filename", "domain", "action"):
|
|
if not entity.get(field):
|
|
raise ValueError(f"Entity missing required field: {field}")
|
|
if entity["action"] not in ("create", "update"):
|
|
raise ValueError(f"Invalid entity action: {entity['action']}")
|
|
|
|
# Sanitize filename — prevent path traversal (Ganymede review)
|
|
entity["filename"] = os.path.basename(entity["filename"])
|
|
|
|
entry_id = f"{int(time.time() * 1000)}-{entity['filename'].replace('.md', '')}"
|
|
entry = {
|
|
"id": entry_id,
|
|
"entity": entity,
|
|
"source_file": os.path.basename(source_file),
|
|
"agent": agent,
|
|
"enqueued_at": datetime.now(tz=__import__('datetime').timezone.utc).isoformat(),
|
|
"status": "pending",
|
|
}
|
|
|
|
queue_file = _queue_dir() / f"{entry_id}.json"
|
|
with open(queue_file, "w") as f:
|
|
json.dump(entry, f, indent=2)
|
|
|
|
logger.info("Enqueued entity operation: %s (%s)", entity["filename"], entity.get("action", "?"))
|
|
return entry_id
|
|
|
|
|
|
def dequeue(limit: int = 50) -> list[dict]:
|
|
"""Read pending queue entries, oldest first. Returns list of entry dicts.
|
|
|
|
Does NOT remove entries — caller marks them processed after successful apply.
|
|
"""
|
|
qdir = _queue_dir()
|
|
entries = []
|
|
|
|
for f in sorted(qdir.glob("*.json")):
|
|
try:
|
|
with open(f) as fh:
|
|
entry = json.load(fh)
|
|
if entry.get("status") == "pending":
|
|
entry["_queue_path"] = str(f)
|
|
entries.append(entry)
|
|
if len(entries) >= limit:
|
|
break
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
logger.warning("Skipping malformed queue entry %s: %s", f.name, e)
|
|
|
|
return entries
|
|
|
|
|
|
def mark_processed(entry: dict, result: str = "applied"):
|
|
"""Mark a queue entry as processed (or failed).
|
|
|
|
Uses atomic write (tmp + rename) to prevent race conditions. (Ganymede review)
|
|
"""
|
|
queue_path = entry.get("_queue_path")
|
|
if not queue_path or not os.path.exists(queue_path):
|
|
return
|
|
|
|
entry["status"] = result
|
|
entry["processed_at"] = datetime.now(tz=__import__('datetime').timezone.utc).isoformat()
|
|
# Remove internal tracking field before writing
|
|
path_backup = queue_path
|
|
entry.pop("_queue_path", None)
|
|
|
|
# Atomic write: tmp file + rename (Ganymede review — prevents race condition)
|
|
tmp_path = queue_path + ".tmp"
|
|
with open(tmp_path, "w") as f:
|
|
json.dump(entry, f, indent=2)
|
|
os.rename(tmp_path, queue_path)
|
|
|
|
|
|
def mark_failed(entry: dict, error: str):
|
|
"""Mark a queue entry as failed with error message."""
|
|
entry["last_error"] = error
|
|
mark_processed(entry, result="failed")
|
|
|
|
|
|
def queue_enrichment(
|
|
target_claim: str,
|
|
evidence: str,
|
|
pr_number: int,
|
|
original_title: str,
|
|
similarity: float,
|
|
domain: str,
|
|
) -> str:
|
|
"""Queue an enrichment for an existing claim. Applied by entity_batch alongside entity updates.
|
|
|
|
Used by the substantive fixer for near-duplicate auto-conversion.
|
|
Single writer pattern — avoids race conditions with direct main writes. (Ganymede)
|
|
"""
|
|
entry_id = f"{int(time.time() * 1000)}-enrichment-{os.path.basename(target_claim).replace('.md', '')}"
|
|
entry = {
|
|
"id": entry_id,
|
|
"type": "enrichment",
|
|
"target_claim": target_claim,
|
|
"evidence": evidence,
|
|
"pr_number": pr_number,
|
|
"original_title": original_title,
|
|
"similarity": similarity,
|
|
"domain": domain,
|
|
"enqueued_at": datetime.now(tz=__import__('datetime').timezone.utc).isoformat(),
|
|
"status": "pending",
|
|
}
|
|
|
|
queue_file = _queue_dir() / f"{entry_id}.json"
|
|
with open(queue_file, "w") as f:
|
|
json.dump(entry, f, indent=2)
|
|
|
|
logger.info("Enqueued enrichment: PR #%d → %s (sim=%.2f)", pr_number, target_claim, similarity)
|
|
return entry_id
|
|
|
|
|
|
def cleanup(max_age_hours: int = 24):
|
|
"""Remove processed/failed entries older than max_age_hours."""
|
|
qdir = _queue_dir()
|
|
cutoff = time.time() - (max_age_hours * 3600)
|
|
removed = 0
|
|
|
|
for f in qdir.glob("*.json"):
|
|
try:
|
|
with open(f) as fh:
|
|
entry = json.load(fh)
|
|
if entry.get("status") in ("applied", "failed"):
|
|
if f.stat().st_mtime < cutoff:
|
|
f.unlink()
|
|
removed += 1
|
|
except Exception:
|
|
pass
|
|
|
|
if removed:
|
|
logger.info("Cleaned up %d old queue entries", removed)
|
|
return removed
|
|
|
|
|
|
def queue_stats() -> dict:
|
|
"""Get queue statistics for health monitoring."""
|
|
qdir = _queue_dir()
|
|
stats = {"pending": 0, "applied": 0, "failed": 0, "total": 0}
|
|
|
|
for f in qdir.glob("*.json"):
|
|
try:
|
|
with open(f) as fh:
|
|
entry = json.load(fh)
|
|
status = entry.get("status", "unknown")
|
|
stats[status] = stats.get(status, 0) + 1
|
|
stats["total"] += 1
|
|
except Exception:
|
|
pass
|
|
|
|
return stats
|