teleo-infrastructure/lib/entity_batch.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

354 lines
13 KiB
Python

"""Entity batch processor — applies queued entity operations to main.
Reads from entity_queue, applies creates/updates to the main worktree,
commits directly to main. No PR needed for entity timeline appends —
they're factual, commutative, and low-risk.
Entity creates (new entity files) go through PR review like claims.
Entity updates (timeline appends) commit directly — they're additive
and recoverable from source archives if wrong.
Runs as part of the pipeline's ingest stage or as a standalone cron.
Epimetheus owns this module. Leo reviews changes. Rhea deploys.
"""
import asyncio
import json
import logging
import os
import re
from datetime import date
from pathlib import Path
from . import config, db
from .entity_queue import cleanup, dequeue, mark_failed, mark_processed
logger = logging.getLogger("pipeline.entity_batch")
def _read_file(path: str) -> str:
try:
with open(path) as f:
return f.read()
except FileNotFoundError:
return ""
async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]:
"""Run a git command async."""
proc = await asyncio.create_subprocess_exec(
"git", *args,
cwd=cwd or str(config.MAIN_WORKTREE),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return -1, f"git {args[0]} timed out after {timeout}s"
output = (stdout or b"").decode().strip()
if stderr:
output += "\n" + stderr.decode().strip()
return proc.returncode, output
def _apply_timeline_entry(entity_path: str, timeline_entry: str) -> tuple[bool, str]:
"""Append a timeline entry to an existing entity file.
Returns (success, message).
"""
if not os.path.exists(entity_path):
return False, f"entity file not found: {entity_path}"
content = _read_file(entity_path)
if not content:
return False, f"entity file empty: {entity_path}"
# Check for duplicate timeline entry
if timeline_entry.strip() in content:
return False, "duplicate timeline entry"
# Find or create Timeline section
if "## Timeline" in content:
lines = content.split("\n")
insert_idx = len(lines)
in_timeline = False
for i, line in enumerate(lines):
if line.strip().startswith("## Timeline"):
in_timeline = True
continue
if in_timeline and line.strip().startswith("## "):
insert_idx = i
break
lines.insert(insert_idx, timeline_entry)
updated = "\n".join(lines)
else:
updated = content.rstrip() + "\n\n## Timeline\n\n" + timeline_entry + "\n"
with open(entity_path, "w") as f:
f.write(updated)
return True, "timeline entry appended"
def _apply_claim_enrichment(claim_path: str, evidence: str, pr_number: int,
original_title: str, similarity: float) -> tuple[bool, str]:
"""Append auto-enrichment evidence to an existing claim file.
Used for near-duplicate auto-conversion. (Ganymede: route through entity_batch)
"""
if not os.path.exists(claim_path):
return False, f"target claim not found: {claim_path}"
content = _read_file(claim_path)
if not content:
return False, f"target claim empty: {claim_path}"
enrichment_block = (
f"\n\n### Auto-enrichment (near-duplicate conversion, similarity={similarity:.2f})\n"
f"*Source: PR #{pr_number}\"{original_title}\"*\n"
f"*Auto-converted by substantive fixer. Review: revert if this evidence doesn't belong here.*\n\n"
f"{evidence}\n"
)
if "\n---\n" in content:
parts = content.rsplit("\n---\n", 1)
updated = parts[0] + enrichment_block + "\n---\n" + parts[1]
else:
updated = content + enrichment_block
with open(claim_path, "w") as f:
f.write(updated)
return True, "enrichment appended"
def _apply_entity_create(entity_path: str, content: str) -> tuple[bool, str]:
"""Create a new entity file. Returns (success, message)."""
if os.path.exists(entity_path):
return False, f"entity already exists: {entity_path}"
os.makedirs(os.path.dirname(entity_path), exist_ok=True)
with open(entity_path, "w") as f:
f.write(content)
return True, "entity created"
async def apply_batch(conn=None, max_entries: int = 50) -> tuple[int, int]:
"""Process the entity queue. Returns (applied, failed).
1. Pull latest main
2. Read pending queue entries
3. Apply each operation to the main worktree
4. Commit all changes in one batch commit
5. Push to origin
"""
main_wt = str(config.MAIN_WORKTREE)
# Ensure we're on main branch — batch script may have left worktree on an extract branch
await _git("checkout", "main", cwd=main_wt)
# Pull latest main
rc, out = await _git("fetch", "origin", "main", cwd=main_wt)
if rc != 0:
logger.error("Failed to fetch main: %s", out)
return 0, 0
rc, out = await _git("reset", "--hard", "origin/main", cwd=main_wt)
if rc != 0:
logger.error("Failed to reset main: %s", out)
return 0, 0
# Read queue
entries = dequeue(limit=max_entries)
if not entries:
return 0, 0
logger.info("Processing %d entity queue entries", len(entries))
applied_entries: list[dict] = [] # Track for post-push marking (Ganymede review)
failed = 0
files_changed: set[str] = set()
for entry in entries:
# Handle enrichments (from substantive fixer near-duplicate conversion)
if entry.get("type") == "enrichment":
target = entry.get("target_claim", "")
evidence = entry.get("evidence", "")
domain = entry.get("domain", "")
if not target or not evidence:
mark_failed(entry, "enrichment missing target or evidence")
failed += 1
continue
claim_path = os.path.join(main_wt, "domains", domain, os.path.basename(target))
rel_path = os.path.join("domains", domain, os.path.basename(target))
try:
ok, msg = _apply_claim_enrichment(
claim_path, evidence, entry.get("pr_number", 0),
entry.get("original_title", ""), entry.get("similarity", 0),
)
if ok:
files_changed.add(rel_path)
applied_entries.append(entry)
logger.info("Applied enrichment to %s: %s", target, msg)
else:
mark_failed(entry, msg)
failed += 1
except Exception as e:
logger.exception("Failed enrichment on %s", target)
mark_failed(entry, str(e))
failed += 1
continue
# Handle entity operations
entity = entry.get("entity", {})
filename = entity.get("filename", "")
domain = entity.get("domain", "")
action = entity.get("action", "")
if not filename or not domain:
mark_failed(entry, "missing filename or domain")
failed += 1
continue
# Sanitize filename — prevent path traversal (Ganymede review)
filename = os.path.basename(filename)
entity_dir = os.path.join(main_wt, "entities", domain)
entity_path = os.path.join(entity_dir, filename)
rel_path = os.path.join("entities", domain, filename)
try:
if action == "update":
timeline = entity.get("timeline_entry", "")
if not timeline:
mark_failed(entry, "update with no timeline_entry")
failed += 1
continue
ok, msg = _apply_timeline_entry(entity_path, timeline)
if ok:
files_changed.add(rel_path)
applied_entries.append(entry)
logger.debug("Applied update to %s: %s", filename, msg)
else:
mark_failed(entry, msg)
failed += 1
elif action == "create":
content = entity.get("content", "")
if not content:
mark_failed(entry, "create with no content")
failed += 1
continue
# If entity already exists, try to apply as timeline update instead
if os.path.exists(entity_path):
timeline = entity.get("timeline_entry", "")
if timeline:
ok, msg = _apply_timeline_entry(entity_path, timeline)
if ok:
files_changed.add(rel_path)
applied_entries.append(entry)
else:
mark_failed(entry, f"create→update fallback: {msg}")
failed += 1
else:
mark_failed(entry, "entity exists, no timeline to append")
failed += 1
continue
ok, msg = _apply_entity_create(entity_path, content)
if ok:
files_changed.add(rel_path)
applied_entries.append(entry)
logger.debug("Created entity %s", filename)
else:
mark_failed(entry, msg)
failed += 1
else:
mark_failed(entry, f"unknown action: {action}")
failed += 1
except Exception as e:
logger.exception("Failed to apply entity %s", filename)
mark_failed(entry, str(e))
failed += 1
applied = len(applied_entries)
# Commit and push if any files changed
if files_changed:
# Stage changed files
for f in files_changed:
await _git("add", f, cwd=main_wt)
# Commit
commit_msg = (
f"entity-batch: update {len(files_changed)} entities\n\n"
f"- Applied {applied} entity operations from queue\n"
f"- Files: {', '.join(sorted(files_changed)[:10])}"
f"{'...' if len(files_changed) > 10 else ''}\n\n"
f"Pentagon-Agent: Epimetheus <968B2991-E2DF-4006-B962-F5B0A0CC8ACA>"
)
rc, out = await _git("commit", "-m", commit_msg, cwd=main_wt)
if rc != 0:
logger.error("Entity batch commit failed: %s", out)
return applied, failed
# Push with retry — main advances frequently from merge module.
# Pull-rebase before each attempt to catch up with remote.
push_ok = False
for attempt in range(3):
# Always pull-rebase before pushing to catch up with remote main
rc, out = await _git("pull", "--rebase", "origin", "main", cwd=main_wt, timeout=30)
if rc != 0:
logger.warning("Entity batch pull-rebase failed (attempt %d): %s", attempt + 1, out)
await _git("rebase", "--abort", cwd=main_wt)
await _git("reset", "--hard", "origin/main", cwd=main_wt)
return 0, failed + applied
rc, out = await _git("push", "origin", "main", cwd=main_wt, timeout=30)
if rc == 0:
push_ok = True
break
logger.warning("Entity batch push failed (attempt %d), retrying: %s", attempt + 1, out[:100])
await asyncio.sleep(2) # Brief pause before retry
if not push_ok:
logger.error("Entity batch push failed after 3 attempts")
await _git("reset", "--hard", "origin/main", cwd=main_wt)
return 0, failed + applied
# Push succeeded — NOW mark entries as processed (Ganymede review)
for entry in applied_entries:
mark_processed(entry)
logger.info(
"Entity batch: committed %d file changes (%d applied, %d failed)",
len(files_changed), applied, failed,
)
# Audit
if conn:
db.audit(
conn, "entity_batch", "batch_applied",
json.dumps({
"applied": applied, "failed": failed,
"files": sorted(files_changed)[:20],
}),
)
# Cleanup old entries
cleanup(max_age_hours=24)
return applied, failed
async def entity_batch_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Pipeline stage entry point. Called by teleo-pipeline.py's ingest stage."""
return await apply_batch(conn)