"""Extraction stage — automated claim extraction from queued sources. Replaces extract-cron.sh with a Python module inside the pipeline daemon. Processes unprocessed sources in inbox/queue/, extracts claims via LLM, creates PRs on Forgejo, and archives sources on main. Flow per source: 1. Read source frontmatter (domain, author, rationale) 2. Pre-screen: Haiku identifies themes, Qdrant finds prior art 3. Build KB index for dedup 4. Build extraction prompt (extraction_prompt.py) 5. Call Sonnet via OpenRouter 6. Parse JSON response 7. Post-extraction validation (post_extract.py) 8. Create branch, write claim/entity files, commit, push 9. Create PR on Forgejo via agent token 10. Archive source on main (worktree lock) Design: one source at a time (sequential), up to MAX_SOURCES per cycle. Uses the main worktree for reading + archival, extract worktree for branches. Epimetheus owns this module. Leo reviews changes. """ import asyncio import json import logging import os import re import secrets from datetime import date from pathlib import Path from . import config from .costs import record_usage from .domains import agent_for_domain from .extraction_prompt import build_extraction_prompt from .forgejo import api as forgejo_api from .llm import openrouter_call from .post_extract import load_existing_claims_from_repo, validate_and_fix_claims from .worktree_lock import async_main_worktree_lock logger = logging.getLogger("pipeline.extract") # Extraction worktree (separate from main to avoid conflicts) EXTRACT_WORKTREE = config.BASE_DIR / "workspaces" / "extract" # Max sources per cycle MAX_SOURCES = int(os.environ.get("MAX_EXTRACT_SOURCES", "3")) # KB index cache (rebuilt once per cycle, not per source) _kb_index_cache: dict[str, str] = {} _kb_index_timestamp: float = 0 KB_INDEX_TTL = 300 # 5 minutes def _parse_source_frontmatter(content: str) -> dict: """Parse source file frontmatter. Returns dict of fields.""" if not content.startswith("---"): return {} end = content.find("---", 3) if end == -1: return {} raw = content[3:end] fm = {} for line in raw.strip().split("\n"): line = line.strip() if not line or ":" not in line: continue key, _, val = line.partition(":") key = key.strip() val = val.strip().strip('"').strip("'") if val.lower() == "null" or val == "": val = None fm[key] = val return fm def _get_kb_index(domain: str) -> str: """Get KB index text for a domain. Uses cached /tmp/kb-indexes/ files.""" import time global _kb_index_cache, _kb_index_timestamp now = time.time() if now - _kb_index_timestamp > KB_INDEX_TTL: _kb_index_cache.clear() _kb_index_timestamp = now if domain in _kb_index_cache: return _kb_index_cache[domain] # Try pre-generated index files first index_file = Path(f"/tmp/kb-indexes/{domain}.txt") if index_file.exists(): text = index_file.read_text(encoding="utf-8") _kb_index_cache[domain] = text return text # Fallback: build from repo main = config.MAIN_WORKTREE claims = [] domain_dir = main / "domains" / domain if domain_dir.is_dir(): for f in domain_dir.glob("*.md"): if not f.name.startswith("_"): claims.append(f"- {f.name}") text = f"## Claims in domains/{domain}/\n" + "\n".join(sorted(claims)) _kb_index_cache[domain] = text return text async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]: """Run a git command async. Returns (returncode, stdout+stderr).""" proc = await asyncio.create_subprocess_exec( "git", *args, cwd=cwd or str(EXTRACT_WORKTREE), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() return -1, f"git {args[0]} timed out after {timeout}s" output = (stdout or b"").decode().strip() if stderr: output += "\n" + stderr.decode().strip() return proc.returncode, output async def _pre_screen(source_content: str, source_title: str) -> str | None: """Run pre-screening: identify themes and find prior art. Returns formatted prior art text, or None if pre-screening fails/unavailable. Non-fatal — extraction proceeds without prior art if this fails. """ try: from .pre_screen import identify_themes, PRIOR_ART_THRESHOLD from .search import search key_file = config.SECRETS_DIR / "openrouter-key" if not key_file.exists(): return None api_key = key_file.read_text().strip() themes = identify_themes(source_content, api_key, source_title) if not themes: return None # Search each theme against Qdrant results = [] search_queries = themes + ([source_title] if source_title else []) for query in search_queries[:5]: try: hits = search(query, limit=3, score_threshold=PRIOR_ART_THRESHOLD) for hit in hits: title = hit.get("title", hit.get("filename", "")) score = hit.get("score", 0) domain = hit.get("domain", "") if title and score >= PRIOR_ART_THRESHOLD: results.append(f"- [{score:.2f}] {title} (domain: {domain})") except Exception: continue if not results: return None # Deduplicate seen = set() unique = [] for r in results: if r not in seen: seen.add(r) unique.append(r) return "\n".join(unique[:15]) except Exception: logger.debug("Pre-screening failed (non-fatal)", exc_info=True) return None def _parse_extraction_json(text: str) -> dict | None: """Parse extraction JSON from LLM response. Handles markdown fencing.""" if not text: return None # Strip markdown code fences text = text.strip() if text.startswith("```"): # Remove opening fence (```json or ```) first_newline = text.index("\n") if "\n" in text else len(text) text = text[first_newline + 1:] if text.endswith("```"): text = text[:-3] text = text.strip() try: return json.loads(text) except json.JSONDecodeError as e: logger.warning("Failed to parse extraction JSON: %s", e) # Try to find JSON object in text match = re.search(r"\{[\s\S]+\}", text) if match: try: return json.loads(match.group()) except json.JSONDecodeError: pass return None def _build_claim_content(claim: dict, agent: str) -> str: """Build claim markdown file content from extraction JSON.""" today = date.today().isoformat() domain = claim.get("domain", "") title = claim.get("title", claim.get("filename", "").replace("-", " ").replace(".md", "")) description = claim.get("description", "") confidence = claim.get("confidence", "experimental") source_ref = claim.get("source", "") body = claim.get("body", "") scope = claim.get("scope", "") sourcer = claim.get("sourcer", "") related = claim.get("related_claims", []) lines = [ "---", "type: claim", f"domain: {domain}", f'title: "{title}"', f'description: "{description}"', f"confidence: {confidence}", f'source: "{source_ref}"', f"created: {today}", f"agent: {agent}", ] if scope: lines.append(f"scope: {scope}") if sourcer: lines.append(f'sourcer: "{sourcer}"') if related: lines.append("related_claims:") for r in related: lines.append(f' - "[[{r}]]"') lines.append("---") lines.append("") lines.append(f"# {title}") lines.append("") if body: lines.append(body) lines.append("") return "\n".join(lines) def _build_entity_content(entity: dict, domain: str) -> str: """Build entity markdown file content from extraction JSON.""" today = date.today().isoformat() entity_type = entity.get("entity_type", "company") description = entity.get("content", "") if description: return description name = entity.get("filename", "").replace("-", " ").replace(".md", "").title() return f"""--- type: entity entity_type: {entity_type} domain: {domain} description: "" created: {today} --- # {name} ## Timeline {entity.get("timeline_entry", "")} """ async def _extract_one_source( conn, source_path: str, source_content: str, fm: dict, existing_claims: set[str], feedback: dict | None = None, ) -> tuple[int, int]: """Extract claims from a single source. Returns (succeeded, errors).""" source_file = os.path.basename(source_path) domain = fm.get("domain", "") agent_name = agent_for_domain(domain) agent_lower = agent_name.lower() title = fm.get("title", source_file) rationale = fm.get("rationale") intake_tier = fm.get("intake_tier") proposed_by = fm.get("proposed_by") logger.info("Extracting: %s (domain: %s, agent: %s)", source_file, domain, agent_name) # 1. Pre-screen (non-fatal) prior_art = await _pre_screen(source_content, title) if prior_art: logger.info("Pre-screening found %d prior art items", prior_art.count("\n") + 1) # 2. Build KB index kb_index = _get_kb_index(domain) # 3. Build extraction prompt prompt = build_extraction_prompt( source_file=source_path, source_content=source_content, domain=domain, agent=agent_name, kb_index=kb_index, rationale=rationale, intake_tier=intake_tier, proposed_by=proposed_by, prior_art=prior_art, previous_feedback=feedback, ) # 4. Call LLM (OpenRouter — not Claude Max CLI) # EXTRACT_MODEL is "sonnet" (CLI name), use MODEL_SONNET_OR for OpenRouter extract_model = config.MODEL_SONNET_OR response, usage = await openrouter_call( model=extract_model, prompt=prompt, timeout_sec=config.EXTRACT_TIMEOUT, max_tokens=8192, ) # Record usage try: record_usage( conn, model=extract_model, stage="extract", input_tokens=usage.get("prompt_tokens", 0), output_tokens=usage.get("completion_tokens", 0), backend="api", ) except Exception: logger.debug("Failed to record extraction usage", exc_info=True) if not response: logger.error("LLM extraction failed for %s — no response", source_file) return 0, 1 # 5. Parse JSON extraction = _parse_extraction_json(response) if not extraction: logger.error("Failed to parse extraction JSON for %s", source_file) return 0, 1 claims_raw = extraction.get("claims", []) entities_raw = extraction.get("entities", []) enrichments = extraction.get("enrichments", []) decisions = extraction.get("decisions", []) facts = extraction.get("facts", []) notes = extraction.get("extraction_notes", "") logger.info( "Extraction result for %s: %d claims, %d enrichments, %d entities, %d decisions", source_file, len(claims_raw), len(enrichments), len(entities_raw), len(decisions), ) # 6. Build claim file contents claim_files = [] for c in claims_raw: filename = c.get("filename", "") if not filename: continue if not filename.endswith(".md"): filename += ".md" content = _build_claim_content(c, agent_lower) claim_files.append({"filename": filename, "domain": c.get("domain", domain), "content": content}) # Build entity file contents entity_files = [] for e in entities_raw: filename = e.get("filename", "") if not filename: continue if not filename.endswith(".md"): filename += ".md" action = e.get("action", "create") if action == "create": content = _build_entity_content(e, domain) entity_files.append({"filename": filename, "domain": domain, "content": content}) # 7. Post-extraction validation if claim_files: kept_claims, rejected_claims, stats = validate_and_fix_claims( claim_files, domain, agent_lower, existing_claims, repo_root=str(config.MAIN_WORKTREE), ) if rejected_claims: logger.info( "Post-extract rejected %d/%d claims for %s: %s", len(rejected_claims), len(claim_files), source_file, stats.get("rejections", [])[:5], ) claim_files = kept_claims if not claim_files and not entity_files: logger.info("No valid claims/entities after validation for %s — archiving as null-result", source_file) await _archive_source(source_path, domain, "null-result") return 0, 0 # 8. Create branch, write files, commit, push slug = Path(source_file).stem branch = f"extract/{slug}-{secrets.token_hex(2)}" # Prepare extract worktree rc, _ = await _git("fetch", "origin", "main", cwd=str(EXTRACT_WORKTREE)) rc, _ = await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE)) rc, _ = await _git("reset", "--hard", "origin/main", cwd=str(EXTRACT_WORKTREE)) rc, _ = await _git("checkout", "-b", branch, cwd=str(EXTRACT_WORKTREE)) if rc != 0: # Branch might already exist await _git("branch", "-D", branch, cwd=str(EXTRACT_WORKTREE)) rc, out = await _git("checkout", "-b", branch, cwd=str(EXTRACT_WORKTREE)) if rc != 0: logger.error("Failed to create branch %s: %s", branch, out) return 0, 1 # Write claim files worktree = EXTRACT_WORKTREE files_written = [] for cf in claim_files: domain_dir = worktree / "domains" / cf["domain"] domain_dir.mkdir(parents=True, exist_ok=True) fpath = domain_dir / cf["filename"] fpath.write_text(cf["content"], encoding="utf-8") files_written.append(f"domains/{cf['domain']}/{cf['filename']}") for ef in entity_files: entity_dir = worktree / "entities" / domain entity_dir.mkdir(parents=True, exist_ok=True) fpath = entity_dir / ef["filename"] fpath.write_text(ef["content"], encoding="utf-8") files_written.append(f"entities/{domain}/{ef['filename']}") if not files_written: logger.info("No files written for %s — cleaning up", source_file) await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE)) await _git("branch", "-D", branch, cwd=str(EXTRACT_WORKTREE)) await _archive_source(source_path, domain, "null-result") return 0, 0 # Stage and commit for f in files_written: await _git("add", f, cwd=str(EXTRACT_WORKTREE)) commit_msg = ( f"{agent_lower}: extract claims from {slug}\n\n" f"- Source: {source_path}\n" f"- Domain: {domain}\n" f"- Claims: {len(claim_files)}, Entities: {len(entity_files)}\n" f"- Enrichments: {len(enrichments)}\n" f"- Extracted by: pipeline ingest (OpenRouter {extract_model})\n\n" f"Pentagon-Agent: {agent_name} " ) rc, out = await _git("commit", "-m", commit_msg, cwd=str(EXTRACT_WORKTREE)) if rc != 0: logger.error("Commit failed for %s: %s", branch, out) await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE)) await _git("branch", "-D", branch, cwd=str(EXTRACT_WORKTREE)) return 0, 1 # Push branch rc, out = await _git("push", "-u", "origin", branch, cwd=str(EXTRACT_WORKTREE)) if rc != 0: logger.error("Push failed for %s: %s", branch, out) await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE)) await _git("branch", "-D", branch, cwd=str(EXTRACT_WORKTREE)) return 0, 1 # 9. Create PR on Forgejo agent_token_file = config.SECRETS_DIR / f"forgejo-{agent_lower}-token" if not agent_token_file.exists(): agent_token_file = config.SECRETS_DIR / "forgejo-leo-token" agent_token = agent_token_file.read_text().strip() pr_title = f"{agent_lower}: extract claims from {slug}" pr_body = ( f"## Automated Extraction\n\n" f"**Source:** `{source_path}`\n" f"**Domain:** {domain}\n" f"**Agent:** {agent_name}\n" f"**Model:** {extract_model}\n\n" f"### Extraction Summary\n" f"- **Claims:** {len(claim_files)}\n" f"- **Entities:** {len(entity_files)}\n" f"- **Enrichments:** {len(enrichments)}\n" f"- **Decisions:** {len(decisions)}\n" f"- **Facts:** {len(facts)}\n\n" f"{notes}\n\n" f"---\n" f"*Extracted by pipeline ingest stage (replaces extract-cron.sh)*" ) pr_result = await forgejo_api( "POST", f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls", body={"title": pr_title, "body": pr_body, "base": "main", "head": branch}, token=agent_token, ) if pr_result and pr_result.get("number"): pr_num = pr_result["number"] logger.info("PR #%d created for %s (%d claims, %d entities)", pr_num, source_file, len(claim_files), len(entity_files)) else: logger.warning("PR creation may have failed for %s — response: %s", source_file, pr_result) # Clean up extract worktree await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE)) # 10. Archive source on main await _archive_source(source_path, domain, "processed", agent_lower) return 1, 0 async def _archive_source( source_path: str, domain: str, status: str, agent: str | None = None, ) -> None: """Move source from inbox/queue/ to archive (or null-result) on main. Uses worktree lock to avoid conflicts with other main-writing processes. """ source_file = os.path.basename(source_path) main = str(config.MAIN_WORKTREE) try: async with async_main_worktree_lock(): # Pull latest await _git("pull", "--rebase", "origin", "main", cwd=main, timeout=30) queue_path = Path(main) / "inbox" / "queue" / source_file if not queue_path.exists(): logger.warning("Source %s not found in queue — may have been archived already", source_file) return if status == "null-result": dest_dir = Path(main) / "inbox" / "null-result" else: dest_dir = Path(main) / "inbox" / "archive" / (domain or "unknown") dest_dir.mkdir(parents=True, exist_ok=True) dest_path = dest_dir / source_file # Read and update frontmatter content = queue_path.read_text(encoding="utf-8") today = date.today().isoformat() content = re.sub(r"^status: unprocessed", f"status: {status}", content, flags=re.MULTILINE) if agent and "processed_by:" not in content: content = re.sub( r"(^status: \w+)", rf"\1\nprocessed_by: {agent}\nprocessed_date: {today}", content, count=1, flags=re.MULTILINE, ) if "extraction_model:" not in content: content = re.sub( r"(^status: \w+.*?)(\n---)", rf'\1\nextraction_model: "{config.MODEL_SONNET_OR}"\2', content, count=1, flags=re.MULTILINE | re.DOTALL, ) dest_path.write_text(content, encoding="utf-8") queue_path.unlink() # Git add, commit, push await _git("add", "inbox/", cwd=main) commit_msg = ( f"source: {source_file} → {status}\n\n" f"Pentagon-Agent: Epimetheus " ) await _git("commit", "-m", commit_msg, cwd=main) # Push with retry for attempt in range(3): rc, out = await _git("push", "origin", "main", cwd=main, timeout=30) if rc == 0: break logger.warning("Push attempt %d failed: %s", attempt + 1, out) await _git("pull", "--rebase", "origin", "main", cwd=main, timeout=30) else: logger.error("Failed to push source archival after 3 attempts") except Exception: logger.exception("Failed to archive source %s", source_file) async def extract_cycle(conn, max_workers=None) -> tuple[int, int]: """Main extraction cycle — called by the pipeline daemon's ingest stage. Finds unprocessed sources in inbox/queue/, extracts claims, creates PRs. Returns (succeeded, errors) for circuit breaker tracking. """ main = config.MAIN_WORKTREE # Find unprocessed sources queue_dir = main / "inbox" / "queue" if not queue_dir.exists(): return 0, 0 unprocessed = [] for f in sorted(queue_dir.glob("*.md")): try: content = f.read_text(encoding="utf-8") fm = _parse_source_frontmatter(content) if fm.get("status") == "unprocessed": unprocessed.append((str(f.relative_to(main)), content, fm)) except Exception: logger.debug("Failed to read source %s", f, exc_info=True) if not unprocessed: return 0, 0 # Filter out sources that already have open extraction PRs open_pr_slugs = set() try: prs = await forgejo_api( "GET", f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls?state=open&limit=50", ) if prs: for pr in prs: head = pr.get("head", {}).get("ref", "") if head.startswith("extract/"): # Extract the source slug from branch name (extract/{slug}-{nonce}) slug_part = head[len("extract/"):] # Remove the random suffix (last 5 chars: -{4-hex-chars}) if len(slug_part) > 5 and slug_part[-5] == "-": slug_part = slug_part[:-5] open_pr_slugs.add(slug_part) except Exception: logger.debug("Failed to check open PRs for dedup", exc_info=True) if open_pr_slugs: before = len(unprocessed) unprocessed = [ (sp, c, f) for sp, c, f in unprocessed if Path(sp).stem not in open_pr_slugs ] skipped = before - len(unprocessed) if skipped: logger.info("Skipped %d source(s) with existing open PRs", skipped) if not unprocessed: return 0, 0 logger.info("Extract cycle: %d unprocessed source(s) found, processing up to %d", len(unprocessed), MAX_SOURCES) # Load existing claims for dedup existing_claims = load_existing_claims_from_repo(str(main)) # Ensure extract worktree exists and is clean if not EXTRACT_WORKTREE.exists(): logger.error("Extract worktree not found at %s", EXTRACT_WORKTREE) return 0, 1 total_ok = 0 total_err = 0 # ── Re-extraction: pick up sources that failed eval and have feedback ── reextract_rows = conn.execute( """SELECT path, feedback FROM sources WHERE status = 'needs_reextraction' AND feedback IS NOT NULL ORDER BY updated_at ASC LIMIT ?""", (max(1, MAX_SOURCES - len(unprocessed)),), ).fetchall() for row in reextract_rows: reex_path = row["path"] # Source was archived — read from archive location archive_base = main / "inbox" / "archive" # Try to find the file in archive subdirs reex_file = None for subdir in archive_base.iterdir(): candidate = subdir / Path(reex_path).name if candidate.exists(): reex_file = candidate break if not reex_file: # Try original path as fallback candidate = main / reex_path if candidate.exists(): reex_file = candidate if not reex_file: logger.warning("Re-extraction: source %s not found on disk — skipping", reex_path) continue try: reex_content = reex_file.read_text(encoding="utf-8") reex_fm = _parse_source_frontmatter(reex_content) reex_feedback = json.loads(row["feedback"]) if row["feedback"] else {} logger.info("Re-extracting %s with feedback: %s", reex_path, list(reex_feedback.get("issues", []))) conn.execute( "UPDATE sources SET status = 'extracting', updated_at = datetime('now') WHERE path = ?", (reex_path,), ) conn.commit() ok, err = await _extract_one_source(conn, reex_path, reex_content, reex_fm, existing_claims, feedback=reex_feedback) total_ok += ok total_err += err if ok: conn.execute( "UPDATE sources SET status = 'extracted', updated_at = datetime('now') WHERE path = ?", (reex_path,), ) else: conn.execute( "UPDATE sources SET status = 'error', last_error = 're-extraction failed', updated_at = datetime('now') WHERE path = ?", (reex_path,), ) conn.commit() except Exception: logger.exception("Re-extraction failed for %s", reex_path) total_err += 1 for source_path, content, fm in unprocessed[:MAX_SOURCES]: try: ok, err = await _extract_one_source(conn, source_path, content, fm, existing_claims) total_ok += ok total_err += err except Exception: logger.exception("Unhandled error extracting %s", source_path) total_err += 1 # Brief pause between sources await asyncio.sleep(2) logger.info("Extract cycle complete: %d succeeded, %d errors", total_ok, total_err) return total_ok, total_err