"""Tests for entity queue and batch processor.""" import json import os import tempfile import pytest from lib.entity_queue import cleanup, dequeue, enqueue, mark_failed, mark_processed, queue_stats from lib.entity_batch import _apply_timeline_entry, _apply_entity_create # ─── Fixtures ────────────────────────────────────────────────────────────── @pytest.fixture def queue_dir(tmp_path, monkeypatch): """Temporary queue directory.""" monkeypatch.setenv("ENTITY_QUEUE_DIR", str(tmp_path / "queue")) return tmp_path / "queue" @pytest.fixture def entity_dir(tmp_path): """Temporary entity directory with a sample entity.""" edir = tmp_path / "entities" / "internet-finance" edir.mkdir(parents=True) entity_content = """--- type: entity entity_type: company name: "MetaDAO" domain: internet-finance description: "Futarchy governance platform" status: active --- # MetaDAO Overview. ## Timeline - **2024-01-01** — Launch of Autocrat v0.1 """ (edir / "metadao.md").write_text(entity_content) return tmp_path # ─── Queue tests ─────────────────────────────────────────────────────────── class TestEnqueue: def test_enqueue_creates_file(self, queue_dir): entity = { "filename": "metadao.md", "domain": "internet-finance", "action": "update", "timeline_entry": "- **2026-03-15** — New proposal passed", } entry_id = enqueue(entity, "source.md", "rio") assert entry_id # Queue file should exist files = list(queue_dir.glob("*.json")) assert len(files) == 1 data = json.loads(files[0].read_text()) assert data["status"] == "pending" assert data["entity"]["filename"] == "metadao.md" def test_enqueue_multiple(self, queue_dir): for i in range(3): enqueue( {"filename": f"entity-{i}.md", "domain": "internet-finance", "action": "create"}, "source.md", "rio", ) files = list(queue_dir.glob("*.json")) assert len(files) == 3 class TestDequeue: def test_dequeue_returns_pending(self, queue_dir): enqueue({"filename": "a.md", "domain": "x", "action": "create"}, "s.md", "rio") enqueue({"filename": "b.md", "domain": "x", "action": "update"}, "s.md", "rio") entries = dequeue(limit=10) assert len(entries) == 2 assert entries[0]["entity"]["filename"] == "a.md" def test_dequeue_skips_processed(self, queue_dir): enqueue({"filename": "a.md", "domain": "x", "action": "create"}, "s.md", "rio") entries = dequeue() mark_processed(entries[0]) entries2 = dequeue() assert len(entries2) == 0 def test_dequeue_respects_limit(self, queue_dir): for i in range(5): enqueue({"filename": f"e-{i}.md", "domain": "x", "action": "create"}, "s.md", "rio") entries = dequeue(limit=2) assert len(entries) == 2 class TestMarkProcessed: def test_mark_processed(self, queue_dir): enqueue({"filename": "a.md", "domain": "x", "action": "create"}, "s.md", "rio") entries = dequeue() mark_processed(entries[0]) # Re-read the file files = list(queue_dir.glob("*.json")) data = json.loads(files[0].read_text()) assert data["status"] == "applied" assert "processed_at" in data def test_mark_failed(self, queue_dir): enqueue({"filename": "a.md", "domain": "x", "action": "create"}, "s.md", "rio") entries = dequeue() mark_failed(entries[0], "entity file not found") files = list(queue_dir.glob("*.json")) data = json.loads(files[0].read_text()) assert data["status"] == "failed" assert data["last_error"] == "entity file not found" class TestQueueStats: def test_stats(self, queue_dir): enqueue({"filename": "a.md", "domain": "x", "action": "create"}, "s.md", "rio") enqueue({"filename": "b.md", "domain": "x", "action": "create"}, "s.md", "rio") entries = dequeue() mark_processed(entries[0]) stats = queue_stats() assert stats["pending"] == 1 assert stats["applied"] == 1 assert stats["total"] == 2 # ─── Batch processor tests ──────────────────────────────────────────────── class TestApplyTimelineEntry: def test_append_to_existing_timeline(self, entity_dir): entity_path = str(entity_dir / "entities" / "internet-finance" / "metadao.md") entry = "- **2026-03-15** — New governance proposal passed" ok, msg = _apply_timeline_entry(entity_path, entry) assert ok assert "appended" in msg content = open(entity_path).read() assert "2026-03-15" in content assert "New governance proposal" in content # Original entry should still be there assert "2024-01-01" in content def test_duplicate_entry_rejected(self, entity_dir): entity_path = str(entity_dir / "entities" / "internet-finance" / "metadao.md") entry = "- **2024-01-01** — Launch of Autocrat v0.1" ok, msg = _apply_timeline_entry(entity_path, entry) assert not ok assert "duplicate" in msg def test_missing_file_fails(self, entity_dir): ok, msg = _apply_timeline_entry(str(entity_dir / "nonexistent.md"), "entry") assert not ok assert "not found" in msg def test_creates_timeline_section(self, entity_dir): """Entity without ## Timeline section gets one created.""" no_timeline = entity_dir / "entities" / "internet-finance" / "new-entity.md" no_timeline.write_text("---\ntype: entity\n---\n\n# New Entity\n\nOverview.\n") ok, msg = _apply_timeline_entry(str(no_timeline), "- **2026-03-15** — First event") assert ok content = no_timeline.read_text() assert "## Timeline" in content assert "First event" in content class TestApplyEntityCreate: def test_create_new_entity(self, entity_dir): new_path = str(entity_dir / "entities" / "internet-finance" / "new-project.md") content = "---\ntype: entity\n---\n\n# New Project\n" ok, msg = _apply_entity_create(new_path, content) assert ok assert os.path.exists(new_path) def test_create_existing_fails(self, entity_dir): existing = str(entity_dir / "entities" / "internet-finance" / "metadao.md") ok, msg = _apply_entity_create(existing, "content") assert not ok assert "exists" in msg def test_create_makes_directories(self, entity_dir): deep_path = str(entity_dir / "entities" / "new-domain" / "new-entity.md") ok, msg = _apply_entity_create(deep_path, "content") assert ok assert os.path.exists(deep_path)