Some checks are pending
CI / lint-and-test (push) Waiting to run
Three targeted fixes from Ganymede's review of commit 469cb7f:
BUG #1 — Success path now updates sources.status='extracting' before PR
creation, so queue scan's DB-authoritative filter catches sources between
PR creation and merge. Previously the cooldown gate was load-bearing for
this window, not belt-and-suspenders as claimed.
BUG #2 — Second null-result path (line 573, triggered when enrichments
existed but all targets were missing in worktree) now updates DB. Without
this, that path created no PR, no DB mark, and would have re-entered the
runaway loop 4h later when the cooldown window expired.
NIT #6 — 4h cooldown moved to config.EXTRACTION_COOLDOWN_HOURS. Tunable
without code change. Log format now shows the configured hours.
Also backfilled 59 pre-existing zombie queue-path rows where the file
was already archived but DB status said 'unprocessed' — these would have
leaked past the DB filter once the 4h cooldown expired.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1075 lines
41 KiB
Python
1075 lines
41 KiB
Python
"""Extraction stage — automated claim extraction from queued sources.
|
|
|
|
Replaces extract-cron.sh with a Python module inside the pipeline daemon.
|
|
Processes unprocessed sources in inbox/queue/, extracts claims via LLM,
|
|
creates PRs on Forgejo, and archives sources on main.
|
|
|
|
Flow per source:
|
|
1. Read source frontmatter (domain, author, rationale)
|
|
2. Pre-screen: Haiku identifies themes, Qdrant finds prior art
|
|
3. Build KB index for dedup
|
|
4. Build extraction prompt (extraction_prompt.py)
|
|
5. Call Sonnet via OpenRouter
|
|
6. Parse JSON response
|
|
7. Post-extraction validation (post_extract.py)
|
|
8. Create branch, write claim/entity files, commit, push
|
|
9. Create PR on Forgejo via agent token
|
|
10. Archive source on main (worktree lock)
|
|
|
|
Design: one source at a time (sequential), up to MAX_SOURCES per cycle.
|
|
Uses the main worktree for reading + archival, extract worktree for branches.
|
|
|
|
Epimetheus owns this module. Leo reviews changes.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import secrets
|
|
from datetime import date
|
|
from pathlib import Path
|
|
|
|
from . import config
|
|
from .costs import record_usage
|
|
from .db import classify_source_channel
|
|
from .domains import agent_for_domain
|
|
from .extraction_prompt import build_extraction_prompt
|
|
from .forgejo import api as forgejo_api
|
|
from .llm import openrouter_call
|
|
from .connect import connect_new_claims
|
|
from .post_extract import load_existing_claims_from_repo, validate_and_fix_claims
|
|
from .worktree_lock import async_main_worktree_lock
|
|
|
|
logger = logging.getLogger("pipeline.extract")
|
|
|
|
# Extraction worktree (separate from main to avoid conflicts)
|
|
EXTRACT_WORKTREE = config.BASE_DIR / "workspaces" / "extract"
|
|
|
|
# Max sources per cycle
|
|
MAX_SOURCES = int(os.environ.get("MAX_EXTRACT_SOURCES", "3"))
|
|
|
|
# KB index cache (rebuilt once per cycle, not per source)
|
|
_kb_index_cache: dict[str, str] = {}
|
|
_kb_index_timestamp: float = 0
|
|
KB_INDEX_TTL = 300 # 5 minutes
|
|
|
|
|
|
def _parse_source_frontmatter(content: str) -> dict:
|
|
"""Parse source file frontmatter. Returns dict of fields."""
|
|
if not content.startswith("---"):
|
|
return {}
|
|
end = content.find("---", 3)
|
|
if end == -1:
|
|
return {}
|
|
raw = content[3:end]
|
|
|
|
fm = {}
|
|
for line in raw.strip().split("\n"):
|
|
line = line.strip()
|
|
if not line or ":" not in line:
|
|
continue
|
|
key, _, val = line.partition(":")
|
|
key = key.strip()
|
|
val = val.strip().strip('"').strip("'")
|
|
if val.lower() == "null" or val == "":
|
|
val = None
|
|
fm[key] = val
|
|
return fm
|
|
|
|
|
|
def _get_kb_index(domain: str) -> str:
|
|
"""Get KB index text for a domain. Uses cached /tmp/kb-indexes/ files."""
|
|
import time
|
|
|
|
global _kb_index_cache, _kb_index_timestamp
|
|
|
|
now = time.time()
|
|
if now - _kb_index_timestamp > KB_INDEX_TTL:
|
|
_kb_index_cache.clear()
|
|
_kb_index_timestamp = now
|
|
|
|
if domain in _kb_index_cache:
|
|
return _kb_index_cache[domain]
|
|
|
|
# Try pre-generated index files first
|
|
index_file = Path(f"/tmp/kb-indexes/{domain}.txt")
|
|
if index_file.exists():
|
|
text = index_file.read_text(encoding="utf-8")
|
|
_kb_index_cache[domain] = text
|
|
return text
|
|
|
|
# Fallback: build from repo
|
|
main = config.MAIN_WORKTREE
|
|
sections = []
|
|
|
|
# Domain claims
|
|
claims = []
|
|
domain_dir = main / "domains" / domain
|
|
if domain_dir.is_dir():
|
|
for f in domain_dir.glob("*.md"):
|
|
if not f.name.startswith("_"):
|
|
claims.append(f"- {f.stem}")
|
|
sections.append(f"## Claims in domains/{domain}/\n" + "\n".join(sorted(claims)))
|
|
|
|
# Domain entities — so the LLM knows what entities exist for connections
|
|
entities = []
|
|
entity_dir = main / "entities" / domain
|
|
if entity_dir.is_dir():
|
|
for f in entity_dir.glob("*.md"):
|
|
if not f.name.startswith("_"):
|
|
entities.append(f"- {f.stem}")
|
|
if entities:
|
|
sections.append(f"## Entities in entities/{domain}/\n" + "\n".join(sorted(entities)))
|
|
|
|
text = "\n\n".join(sections)
|
|
_kb_index_cache[domain] = text
|
|
return text
|
|
|
|
|
|
async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]:
|
|
"""Run a git command async. Returns (returncode, stdout+stderr)."""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git", *args,
|
|
cwd=cwd or str(EXTRACT_WORKTREE),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
return -1, f"git {args[0]} timed out after {timeout}s"
|
|
output = (stdout or b"").decode().strip()
|
|
if stderr:
|
|
output += "\n" + stderr.decode().strip()
|
|
return proc.returncode, output
|
|
|
|
|
|
async def _pre_screen(source_content: str, source_title: str) -> str | None:
|
|
"""Run pre-screening: identify themes and find prior art.
|
|
|
|
Returns formatted prior art text, or None if pre-screening fails/unavailable.
|
|
Non-fatal — extraction proceeds without prior art if this fails.
|
|
"""
|
|
try:
|
|
from .pre_screen import identify_themes, PRIOR_ART_THRESHOLD
|
|
from .search import search
|
|
|
|
key_file = config.SECRETS_DIR / "openrouter-key"
|
|
if not key_file.exists():
|
|
return None
|
|
|
|
api_key = key_file.read_text().strip()
|
|
themes = identify_themes(source_content, api_key, source_title)
|
|
if not themes:
|
|
return None
|
|
|
|
# Search each theme against Qdrant
|
|
results = []
|
|
search_queries = themes + ([source_title] if source_title else [])
|
|
|
|
for query in search_queries[:5]:
|
|
try:
|
|
hits = search(query, limit=3, score_threshold=PRIOR_ART_THRESHOLD)
|
|
for hit in hits:
|
|
title = hit.get("title", hit.get("filename", ""))
|
|
score = hit.get("score", 0)
|
|
domain = hit.get("domain", "")
|
|
if title and score >= PRIOR_ART_THRESHOLD:
|
|
results.append(f"- [{score:.2f}] {title} (domain: {domain})")
|
|
except Exception:
|
|
continue
|
|
|
|
if not results:
|
|
return None
|
|
|
|
# Deduplicate
|
|
seen = set()
|
|
unique = []
|
|
for r in results:
|
|
if r not in seen:
|
|
seen.add(r)
|
|
unique.append(r)
|
|
|
|
return "\n".join(unique[:15])
|
|
|
|
except Exception:
|
|
logger.debug("Pre-screening failed (non-fatal)", exc_info=True)
|
|
return None
|
|
|
|
|
|
def _parse_extraction_json(text: str) -> dict | None:
|
|
"""Parse extraction JSON from LLM response. Handles markdown fencing."""
|
|
if not text:
|
|
return None
|
|
|
|
# Strip markdown code fences
|
|
text = text.strip()
|
|
if text.startswith("```"):
|
|
# Remove opening fence (```json or ```)
|
|
first_newline = text.index("\n") if "\n" in text else len(text)
|
|
text = text[first_newline + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError as e:
|
|
logger.warning("Failed to parse extraction JSON: %s", e)
|
|
# Try to find JSON object in text
|
|
match = re.search(r"\{[\s\S]+\}", text)
|
|
if match:
|
|
try:
|
|
return json.loads(match.group())
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _build_claim_content(claim: dict, agent: str, source_format: str | None = None, source_file: str = "") -> str:
|
|
"""Build claim markdown file content from extraction JSON."""
|
|
today = date.today().isoformat()
|
|
domain = claim.get("domain", "")
|
|
title = claim.get("title", claim.get("filename", "").replace("-", " ").replace(".md", ""))
|
|
description = claim.get("description", "")
|
|
raw_confidence = claim.get("confidence", "experimental")
|
|
_CONFIDENCE_MAP = {
|
|
"proven": "proven", "likely": "likely", "experimental": "experimental",
|
|
"speculative": "speculative", "high": "likely", "medium": "experimental",
|
|
"low": "speculative", "very high": "proven", "moderate": "experimental",
|
|
}
|
|
confidence = _CONFIDENCE_MAP.get(raw_confidence.lower().strip(), "experimental") if isinstance(raw_confidence, str) else "experimental"
|
|
source_ref = claim.get("source", "")
|
|
body = claim.get("body", "")
|
|
scope = claim.get("scope", "")
|
|
sourcer = claim.get("sourcer", "")
|
|
related_claims = claim.get("related_claims", [])
|
|
connections = claim.get("connections", [])
|
|
|
|
edge_fields = {"supports": [], "challenges": [], "related": []}
|
|
for conn in connections:
|
|
target = conn.get("target", "")
|
|
rel = conn.get("relationship", "related")
|
|
if target and rel in edge_fields:
|
|
target = target.replace(".md", "")
|
|
if target not in edge_fields[rel]:
|
|
edge_fields[rel].append(target)
|
|
for r in related_claims[:5]:
|
|
r_clean = r.replace(".md", "").strip("[]").strip()
|
|
if r_clean and r_clean not in edge_fields["related"]:
|
|
edge_fields["related"].append(r_clean)
|
|
|
|
edge_lines = []
|
|
for edge_type in ("supports", "challenges", "related"):
|
|
targets = edge_fields[edge_type]
|
|
if targets:
|
|
edge_lines.append(f"{edge_type}:")
|
|
for t in targets:
|
|
edge_lines.append(f" - {t}")
|
|
|
|
lines = [
|
|
"---",
|
|
"type: claim",
|
|
f"domain: {domain}",
|
|
f'title: "{title}"',
|
|
f'description: "{description}"',
|
|
f"confidence: {confidence}",
|
|
f'source: "{source_ref}"',
|
|
f"created: {today}",
|
|
f"agent: {agent}",
|
|
]
|
|
if source_file:
|
|
lines.append(f"sourced_from: {source_file}")
|
|
if scope:
|
|
lines.append(f"scope: {scope}")
|
|
if sourcer:
|
|
lines.append(f'sourcer: "{sourcer}"')
|
|
if source_format and source_format.lower() == "conversation":
|
|
lines.append("verified: false")
|
|
lines.append("source_type: conversation")
|
|
lines.extend(edge_lines)
|
|
lines.append("---")
|
|
lines.append("")
|
|
lines.append(f"# {title}")
|
|
lines.append("")
|
|
if body:
|
|
lines.append(body)
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_entity_content(entity: dict, domain: str) -> str:
|
|
"""Build entity markdown file content from extraction JSON."""
|
|
today = date.today().isoformat()
|
|
entity_type = entity.get("entity_type", "company")
|
|
description = entity.get("content", "")
|
|
|
|
if description:
|
|
# Strip code fences the LLM may have wrapped the content in
|
|
description = description.strip()
|
|
if description.startswith("```"):
|
|
first_nl = description.find("\n")
|
|
if first_nl != -1:
|
|
description = description[first_nl + 1:]
|
|
if description.endswith("```"):
|
|
description = description[:-3].rstrip()
|
|
return description
|
|
|
|
name = entity.get("filename", "").replace("-", " ").replace(".md", "").title()
|
|
return f"""---
|
|
type: entity
|
|
entity_type: {entity_type}
|
|
domain: {domain}
|
|
description: ""
|
|
created: {today}
|
|
---
|
|
|
|
# {name}
|
|
|
|
## Timeline
|
|
|
|
{entity.get("timeline_entry", "")}
|
|
"""
|
|
|
|
|
|
async def _extract_one_source(
|
|
conn,
|
|
source_path: str,
|
|
source_content: str,
|
|
fm: dict,
|
|
existing_claims: set[str],
|
|
feedback: dict | None = None,
|
|
) -> tuple[int, int]:
|
|
"""Extract claims from a single source. Returns (succeeded, errors)."""
|
|
source_file = os.path.basename(source_path)
|
|
domain = fm.get("domain", "")
|
|
agent_name = agent_for_domain(domain)
|
|
agent_lower = agent_name.lower()
|
|
title = fm.get("title", source_file)
|
|
rationale = fm.get("rationale")
|
|
intake_tier = fm.get("intake_tier")
|
|
proposed_by = fm.get("proposed_by")
|
|
source_format = fm.get("format")
|
|
|
|
logger.info("Extracting: %s (domain: %s, agent: %s)", source_file, domain, agent_name)
|
|
|
|
# 1. Pre-screen (non-fatal)
|
|
prior_art = await _pre_screen(source_content, title)
|
|
if prior_art:
|
|
logger.info("Pre-screening found %d prior art items", prior_art.count("\n") + 1)
|
|
|
|
# 2. Build KB index
|
|
kb_index = _get_kb_index(domain)
|
|
|
|
# 3. Build extraction prompt
|
|
prompt = build_extraction_prompt(
|
|
source_file=source_path,
|
|
source_content=source_content,
|
|
domain=domain,
|
|
agent=agent_name,
|
|
kb_index=kb_index,
|
|
rationale=rationale,
|
|
intake_tier=intake_tier,
|
|
proposed_by=proposed_by,
|
|
prior_art=prior_art,
|
|
previous_feedback=feedback,
|
|
source_format=source_format,
|
|
)
|
|
|
|
# 4. Call LLM (OpenRouter — not Claude Max CLI)
|
|
# EXTRACT_MODEL is "sonnet" (CLI name), use MODEL_SONNET_OR for OpenRouter
|
|
extract_model = config.MODEL_SONNET_OR
|
|
response, usage = await openrouter_call(
|
|
model=extract_model,
|
|
prompt=prompt,
|
|
timeout_sec=config.EXTRACT_TIMEOUT,
|
|
max_tokens=8192,
|
|
)
|
|
|
|
# Record usage
|
|
try:
|
|
record_usage(
|
|
conn,
|
|
model=extract_model,
|
|
stage="extract",
|
|
input_tokens=usage.get("prompt_tokens", 0),
|
|
output_tokens=usage.get("completion_tokens", 0),
|
|
backend="api",
|
|
)
|
|
except Exception:
|
|
logger.debug("Failed to record extraction usage", exc_info=True)
|
|
|
|
if not response:
|
|
logger.error("LLM extraction failed for %s — no response", source_file)
|
|
return 0, 1
|
|
|
|
# 5. Parse JSON
|
|
extraction = _parse_extraction_json(response)
|
|
if not extraction:
|
|
logger.error("Failed to parse extraction JSON for %s", source_file)
|
|
return 0, 1
|
|
|
|
claims_raw = extraction.get("claims", [])
|
|
entities_raw = extraction.get("entities", [])
|
|
enrichments = extraction.get("enrichments", [])
|
|
decisions = extraction.get("decisions", [])
|
|
facts = extraction.get("facts", [])
|
|
notes = extraction.get("extraction_notes", "")
|
|
|
|
logger.info(
|
|
"Extraction result for %s: %d claims, %d enrichments, %d entities, %d decisions",
|
|
source_file, len(claims_raw), len(enrichments), len(entities_raw), len(decisions),
|
|
)
|
|
|
|
# 6. Build claim file contents
|
|
claim_files = []
|
|
for c in claims_raw:
|
|
filename = c.get("filename", "")
|
|
if not filename:
|
|
continue
|
|
filename = Path(filename).name # Strip directory components — LLM output may contain path traversal
|
|
if not filename.endswith(".md"):
|
|
filename += ".md"
|
|
content = _build_claim_content(c, agent_lower, source_format=source_format, source_file=f"{domain}/{source_file}" if domain else source_file)
|
|
claim_files.append({"filename": filename, "domain": c.get("domain", domain), "content": content})
|
|
|
|
# Build entity file contents
|
|
entity_files = []
|
|
for e in entities_raw:
|
|
filename = e.get("filename", "")
|
|
if not filename:
|
|
continue
|
|
filename = Path(filename).name # Strip directory components — LLM output may contain path traversal
|
|
if not filename.endswith(".md"):
|
|
filename += ".md"
|
|
action = e.get("action", "create")
|
|
if action == "create":
|
|
content = _build_entity_content(e, domain)
|
|
entity_files.append({"filename": filename, "domain": domain, "content": content})
|
|
|
|
# 6.5. Pre-filter near-duplicates BEFORE post-extract validation
|
|
# Uses same SequenceMatcher threshold as tier0. Catches duplicates cheaply ($0)
|
|
# before they create PRs and burn eval cycles.
|
|
if claim_files and existing_claims:
|
|
from difflib import SequenceMatcher as _SM
|
|
_DEDUP_THRESHOLD = 0.85
|
|
filtered = []
|
|
for cf in claim_files:
|
|
title_lower = Path(cf["filename"]).stem.replace("-", " ").lower()
|
|
title_words = set(title_lower.split()[:6])
|
|
is_dup = False
|
|
for existing in existing_claims:
|
|
existing_lower = existing.replace("-", " ").lower()
|
|
if len(title_words & set(existing_lower.split()[:6])) < 2:
|
|
continue
|
|
if _SM(None, title_lower, existing_lower).ratio() >= _DEDUP_THRESHOLD:
|
|
logger.info("Extract-dedup: skipping near-duplicate '%s' (matches '%s')", cf["filename"], existing)
|
|
is_dup = True
|
|
break
|
|
if not is_dup:
|
|
filtered.append(cf)
|
|
if len(filtered) < len(claim_files):
|
|
logger.info("Extract-dedup: filtered %d/%d near-duplicates", len(claim_files) - len(filtered), len(claim_files))
|
|
claim_files = filtered
|
|
|
|
# 7. Post-extraction validation
|
|
if claim_files:
|
|
kept_claims, rejected_claims, stats = validate_and_fix_claims(
|
|
claim_files, domain, agent_lower, existing_claims,
|
|
repo_root=str(config.MAIN_WORKTREE),
|
|
)
|
|
if rejected_claims:
|
|
logger.info(
|
|
"Post-extract rejected %d/%d claims for %s: %s",
|
|
len(rejected_claims), len(claim_files), source_file,
|
|
stats.get("rejections", [])[:5],
|
|
)
|
|
claim_files = kept_claims
|
|
|
|
if not claim_files and not entity_files and not enrichments:
|
|
logger.info("No valid claims/entities/enrichments after validation for %s — archiving as null-result", source_file)
|
|
# Mark DB as null_result so queue scan won't re-extract even if file stays in queue
|
|
# (the main-worktree push in _archive_source frequently fails — DB is authoritative).
|
|
try:
|
|
conn.execute(
|
|
"""INSERT INTO sources (path, status, updated_at) VALUES (?, 'null_result', datetime('now'))
|
|
ON CONFLICT(path) DO UPDATE SET status='null_result', updated_at=datetime('now')""",
|
|
(source_path,),
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
logger.debug("Failed to mark source as null_result in DB", exc_info=True)
|
|
await _archive_source(source_path, domain, "null-result")
|
|
return 0, 0
|
|
|
|
# 8. Create branch, write files, commit, push
|
|
slug = Path(source_file).stem
|
|
branch = f"extract/{slug}-{secrets.token_hex(2)}"
|
|
|
|
# Prepare extract worktree
|
|
rc, _ = await _git("fetch", "origin", "main", cwd=str(EXTRACT_WORKTREE))
|
|
rc, _ = await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE))
|
|
rc, _ = await _git("reset", "--hard", "origin/main", cwd=str(EXTRACT_WORKTREE))
|
|
rc, _ = await _git("checkout", "-b", branch, cwd=str(EXTRACT_WORKTREE))
|
|
if rc != 0:
|
|
# Branch might already exist
|
|
await _git("branch", "-D", branch, cwd=str(EXTRACT_WORKTREE))
|
|
rc, out = await _git("checkout", "-b", branch, cwd=str(EXTRACT_WORKTREE))
|
|
if rc != 0:
|
|
logger.error("Failed to create branch %s: %s", branch, out)
|
|
return 0, 1
|
|
|
|
# Write claim files
|
|
worktree = EXTRACT_WORKTREE
|
|
files_written = []
|
|
for cf in claim_files:
|
|
domain_dir = worktree / "domains" / cf["domain"]
|
|
domain_dir.mkdir(parents=True, exist_ok=True)
|
|
fpath = domain_dir / cf["filename"]
|
|
fpath.write_text(cf["content"], encoding="utf-8")
|
|
files_written.append(f"domains/{cf['domain']}/{cf['filename']}")
|
|
|
|
for ef in entity_files:
|
|
entity_dir = worktree / "entities" / domain
|
|
entity_dir.mkdir(parents=True, exist_ok=True)
|
|
fpath = entity_dir / ef["filename"]
|
|
fpath.write_text(ef["content"], encoding="utf-8")
|
|
files_written.append(f"entities/{domain}/{ef['filename']}")
|
|
|
|
# Write enrichments as modifications to existing claim files
|
|
for enr in enrichments:
|
|
target = enr.get("target_file", "")
|
|
evidence = enr.get("evidence", "")
|
|
enr_type = enr.get("type", "extend") # confirm|challenge|extend
|
|
source_ref = enr.get("source_ref", source_file)
|
|
if not target or not evidence:
|
|
continue
|
|
# Find the target claim file in the worktree (search domains/)
|
|
target_stem = Path(target.replace(".md", "")).name
|
|
found = None
|
|
for domain_dir in (worktree / "domains").iterdir():
|
|
candidate = domain_dir / f"{target_stem}.md"
|
|
if candidate.exists():
|
|
found = candidate
|
|
break
|
|
if not found:
|
|
logger.debug("Enrichment target %s not found in worktree", target)
|
|
continue
|
|
# Append enrichment evidence to the claim file
|
|
existing = found.read_text(encoding="utf-8")
|
|
label = {"confirm": "Supporting", "challenge": "Challenging", "extend": "Extending"}.get(enr_type, "Additional")
|
|
enrichment_block = f"\n\n## {label} Evidence\n\n**Source:** {source_ref}\n\n{evidence}\n"
|
|
found.write_text(existing + enrichment_block, encoding="utf-8")
|
|
rel_path = str(found.relative_to(worktree))
|
|
if rel_path not in files_written:
|
|
files_written.append(rel_path)
|
|
logger.info("Enrichment applied to %s (%s)", target, enr_type)
|
|
|
|
if not files_written:
|
|
logger.info("No files written for %s — cleaning up", source_file)
|
|
# Path B null-result: enrichments existed but all targets missing in worktree.
|
|
# No PR, no cooldown match — without DB update this re-extracts every 60s.
|
|
# (Ganymede review, commit 469cb7f follow-up.)
|
|
try:
|
|
conn.execute(
|
|
"""INSERT INTO sources (path, status, updated_at) VALUES (?, 'null_result', datetime('now'))
|
|
ON CONFLICT(path) DO UPDATE SET status='null_result', updated_at=datetime('now')""",
|
|
(source_path,),
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
logger.debug("Failed to mark source as null_result (path B)", exc_info=True)
|
|
await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE))
|
|
await _git("branch", "-D", branch, cwd=str(EXTRACT_WORKTREE))
|
|
await _archive_source(source_path, domain, "null-result")
|
|
return 0, 0
|
|
|
|
# Post-write: connect new claims to existing KB via vector search (non-fatal)
|
|
claim_paths = [str(worktree / f) for f in files_written if f.startswith("domains/")]
|
|
if claim_paths:
|
|
try:
|
|
connect_stats = connect_new_claims(claim_paths)
|
|
if connect_stats["connected"] > 0:
|
|
logger.info(
|
|
"Extract-connect: %d/%d claims → %d edges",
|
|
connect_stats["connected"], len(claim_paths), connect_stats["edges_added"],
|
|
)
|
|
except Exception:
|
|
logger.warning("Extract-connect failed (non-fatal)", exc_info=True)
|
|
|
|
# Archive the source WITHIN the extract branch (not via separate push on main).
|
|
# Prevents the runaway-extraction race: when archive-to-main push fails (non-FF,
|
|
# non-pushable worktree state), file returns to queue and gets re-extracted every
|
|
# cycle. Moving the archive into the extract branch makes it atomic with the PR
|
|
# merge — when the PR merges, the source is archived automatically.
|
|
try:
|
|
archive_rel = _archive_source_in_worktree(
|
|
worktree, source_path, domain, "processed", agent_lower, extract_model,
|
|
)
|
|
if archive_rel:
|
|
files_written.append(archive_rel["new"])
|
|
# The queue file was deleted; git add handles the removal
|
|
await _git("add", "inbox/queue/", cwd=str(EXTRACT_WORKTREE))
|
|
except Exception:
|
|
logger.exception("In-branch archive failed for %s (continuing)", source_file)
|
|
|
|
# Stage and commit
|
|
for f in files_written:
|
|
await _git("add", f, cwd=str(EXTRACT_WORKTREE))
|
|
|
|
commit_msg = (
|
|
f"{agent_lower}: extract claims from {slug}\n\n"
|
|
f"- Source: {source_path}\n"
|
|
f"- Domain: {domain}\n"
|
|
f"- Claims: {len(claim_files)}, Entities: {len(entity_files)}\n"
|
|
f"- Enrichments: {len(enrichments)}\n"
|
|
f"- Extracted by: pipeline ingest (OpenRouter {extract_model})\n\n"
|
|
f"Pentagon-Agent: {agent_name} <PIPELINE>"
|
|
)
|
|
|
|
rc, out = await _git("commit", "-m", commit_msg, cwd=str(EXTRACT_WORKTREE))
|
|
if rc != 0:
|
|
logger.error("Commit failed for %s: %s", branch, out)
|
|
await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE))
|
|
await _git("branch", "-D", branch, cwd=str(EXTRACT_WORKTREE))
|
|
return 0, 1
|
|
|
|
# Push branch
|
|
rc, out = await _git("push", "-u", "origin", branch, cwd=str(EXTRACT_WORKTREE))
|
|
if rc != 0:
|
|
logger.error("Push failed for %s: %s", branch, out)
|
|
await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE))
|
|
await _git("branch", "-D", branch, cwd=str(EXTRACT_WORKTREE))
|
|
return 0, 1
|
|
|
|
# 9. Create PR on Forgejo
|
|
agent_token_file = config.SECRETS_DIR / f"forgejo-{agent_lower}-token"
|
|
if not agent_token_file.exists():
|
|
agent_token_file = config.SECRETS_DIR / "forgejo-leo-token"
|
|
agent_token = agent_token_file.read_text().strip()
|
|
|
|
pr_title = f"{agent_lower}: extract claims from {slug}"
|
|
pr_body = (
|
|
f"## Automated Extraction\n\n"
|
|
f"**Source:** `{source_path}`\n"
|
|
f"**Domain:** {domain}\n"
|
|
f"**Agent:** {agent_name}\n"
|
|
f"**Model:** {extract_model}\n\n"
|
|
f"### Extraction Summary\n"
|
|
f"- **Claims:** {len(claim_files)}\n"
|
|
f"- **Entities:** {len(entity_files)}\n"
|
|
f"- **Enrichments:** {len(enrichments)}\n"
|
|
f"- **Decisions:** {len(decisions)}\n"
|
|
f"- **Facts:** {len(facts)}\n\n"
|
|
f"{notes}\n\n"
|
|
f"---\n"
|
|
f"*Extracted by pipeline ingest stage (replaces extract-cron.sh)*"
|
|
)
|
|
|
|
pr_result = await forgejo_api(
|
|
"POST",
|
|
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls",
|
|
body={"title": pr_title, "body": pr_body, "base": "main", "head": branch},
|
|
token=agent_token,
|
|
)
|
|
|
|
if pr_result and pr_result.get("number"):
|
|
pr_num = pr_result["number"]
|
|
logger.info("PR #%d created for %s (%d claims, %d entities)", pr_num, source_file, len(claim_files), len(entity_files))
|
|
|
|
# Store contributor attribution: who submitted this source?
|
|
# Priority: proposed_by field → intake_tier inference → "unknown"
|
|
if proposed_by:
|
|
contributor = proposed_by.strip().strip('"').strip("'")
|
|
elif intake_tier == "research-task":
|
|
contributor = f"{agent_name} (self-directed)"
|
|
elif intake_tier == "directed":
|
|
contributor = "@m3taversal"
|
|
else:
|
|
# Default: if no proposed_by and not a research task, Cory submitted it
|
|
contributor = "@m3taversal"
|
|
|
|
# Build pipe-separated claim titles for the description field
|
|
claim_titles = " | ".join(
|
|
c.get("title", c.get("filename", "").replace("-", " ").replace(".md", ""))
|
|
for c in claims_raw if c.get("title") or c.get("filename")
|
|
)
|
|
|
|
# Success path: mark source as 'extracting' so queue scan's DB-status filter
|
|
# skips it between PR creation and merge. Without this, cooldown is load-bearing
|
|
# (Ganymede review, commit 469cb7f follow-up).
|
|
try:
|
|
conn.execute(
|
|
"""INSERT INTO sources (path, status, updated_at) VALUES (?, 'extracting', datetime('now'))
|
|
ON CONFLICT(path) DO UPDATE SET status='extracting', updated_at=datetime('now')""",
|
|
(source_path,),
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
logger.debug("Failed to mark source as extracting", exc_info=True)
|
|
|
|
# Upsert: if discover_external_prs already created the row, update it;
|
|
# if not, create a partial row that discover will complete.
|
|
source_channel = classify_source_channel(branch)
|
|
try:
|
|
conn.execute(
|
|
"""INSERT INTO prs (number, branch, status, submitted_by, source_path, description, source_channel)
|
|
VALUES (?, ?, 'open', ?, ?, ?, ?)
|
|
ON CONFLICT(number) DO UPDATE SET
|
|
submitted_by = excluded.submitted_by,
|
|
source_path = excluded.source_path,
|
|
description = COALESCE(excluded.description, prs.description),
|
|
source_channel = COALESCE(prs.source_channel, excluded.source_channel)""",
|
|
(pr_num, branch, contributor, source_path, claim_titles, source_channel),
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
logger.debug("Failed to upsert submitted_by for PR #%d", pr_num, exc_info=True)
|
|
|
|
# Also store on source record
|
|
try:
|
|
conn.execute(
|
|
"UPDATE sources SET submitted_by = ? WHERE path = ?",
|
|
(contributor, source_path),
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
logger.debug("Failed to update source submitted_by", exc_info=True)
|
|
else:
|
|
logger.warning("PR creation may have failed for %s — response: %s", source_file, pr_result)
|
|
|
|
# Clean up extract worktree
|
|
await _git("checkout", "main", cwd=str(EXTRACT_WORKTREE))
|
|
|
|
# Note: source archival happened in-branch before commit (see _archive_source_in_worktree).
|
|
# Do NOT call _archive_source() here — the broken main-worktree-push path caused the
|
|
# runaway extraction bug. Archive is now atomic with PR merge.
|
|
|
|
return 1, 0
|
|
|
|
|
|
def _archive_source_in_worktree(
|
|
worktree: Path,
|
|
source_path: str,
|
|
domain: str,
|
|
status: str,
|
|
agent: str | None,
|
|
extraction_model: str,
|
|
) -> dict | None:
|
|
"""Move source file from inbox/queue/ to inbox/archive/<domain>/ WITHIN extract worktree.
|
|
|
|
Updates frontmatter (status, processed_by, processed_date, extraction_model) and
|
|
returns {"old": old_rel_path, "new": new_rel_path} or None if not found.
|
|
|
|
The caller commits this change as part of the extract branch, so the archive lands
|
|
atomically with the PR merge — no separate push on main required.
|
|
"""
|
|
queue_path = worktree / source_path
|
|
if not queue_path.exists():
|
|
logger.warning("Source %s not found in worktree queue — skipping in-branch archive", source_path)
|
|
return None
|
|
|
|
if status == "null-result":
|
|
dest_dir = worktree / "inbox" / "null-result"
|
|
else:
|
|
dest_dir = worktree / "inbox" / "archive" / (domain or "unknown")
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest_path = dest_dir / queue_path.name
|
|
|
|
content = queue_path.read_text(encoding="utf-8")
|
|
today = date.today().isoformat()
|
|
content = re.sub(r"^status: unprocessed", f"status: {status}", content, flags=re.MULTILINE)
|
|
if agent and "processed_by:" not in content:
|
|
content = re.sub(
|
|
r"(^status: \w+)",
|
|
rf"\1\nprocessed_by: {agent}\nprocessed_date: {today}",
|
|
content,
|
|
count=1,
|
|
flags=re.MULTILINE,
|
|
)
|
|
if "extraction_model:" not in content:
|
|
content = re.sub(
|
|
r"(^status: \w+.*?)(\n---)",
|
|
rf'\1\nextraction_model: "{extraction_model}"\2',
|
|
content,
|
|
count=1,
|
|
flags=re.MULTILINE | re.DOTALL,
|
|
)
|
|
|
|
dest_path.write_text(content, encoding="utf-8")
|
|
queue_path.unlink()
|
|
|
|
old_rel = str(queue_path.relative_to(worktree))
|
|
new_rel = str(dest_path.relative_to(worktree))
|
|
return {"old": old_rel, "new": new_rel}
|
|
|
|
|
|
async def _archive_source(
|
|
source_path: str,
|
|
domain: str,
|
|
status: str,
|
|
agent: str | None = None,
|
|
) -> None:
|
|
"""Move source from inbox/queue/ to archive (or null-result) on main.
|
|
|
|
Uses worktree lock to avoid conflicts with other main-writing processes.
|
|
"""
|
|
source_file = os.path.basename(source_path)
|
|
main = str(config.MAIN_WORKTREE)
|
|
|
|
try:
|
|
async with async_main_worktree_lock():
|
|
# Pull latest
|
|
await _git("pull", "--rebase", "origin", "main", cwd=main, timeout=30)
|
|
|
|
queue_path = Path(main) / "inbox" / "queue" / source_file
|
|
if not queue_path.exists():
|
|
logger.warning("Source %s not found in queue — may have been archived already", source_file)
|
|
return
|
|
|
|
if status == "null-result":
|
|
dest_dir = Path(main) / "inbox" / "null-result"
|
|
else:
|
|
dest_dir = Path(main) / "inbox" / "archive" / (domain or "unknown")
|
|
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest_path = dest_dir / source_file
|
|
|
|
# Read and update frontmatter
|
|
content = queue_path.read_text(encoding="utf-8")
|
|
today = date.today().isoformat()
|
|
|
|
content = re.sub(r"^status: unprocessed", f"status: {status}", content, flags=re.MULTILINE)
|
|
if agent and "processed_by:" not in content:
|
|
content = re.sub(
|
|
r"(^status: \w+)",
|
|
rf"\1\nprocessed_by: {agent}\nprocessed_date: {today}",
|
|
content,
|
|
count=1,
|
|
flags=re.MULTILINE,
|
|
)
|
|
if "extraction_model:" not in content:
|
|
content = re.sub(
|
|
r"(^status: \w+.*?)(\n---)",
|
|
rf'\1\nextraction_model: "{config.MODEL_SONNET_OR}"\2',
|
|
content,
|
|
count=1,
|
|
flags=re.MULTILINE | re.DOTALL,
|
|
)
|
|
|
|
dest_path.write_text(content, encoding="utf-8")
|
|
queue_path.unlink()
|
|
|
|
# Git add, commit, push
|
|
await _git("add", "inbox/", cwd=main)
|
|
commit_msg = (
|
|
f"source: {source_file} → {status}\n\n"
|
|
f"Pentagon-Agent: Epimetheus <PIPELINE>"
|
|
)
|
|
await _git("commit", "-m", commit_msg, cwd=main)
|
|
|
|
# Push with retry
|
|
for attempt in range(3):
|
|
rc, out = await _git("push", "origin", "main", cwd=main, timeout=30)
|
|
if rc == 0:
|
|
break
|
|
logger.warning("Push attempt %d failed: %s", attempt + 1, out)
|
|
await _git("pull", "--rebase", "origin", "main", cwd=main, timeout=30)
|
|
else:
|
|
logger.error("Failed to push source archival after 3 attempts")
|
|
|
|
except Exception:
|
|
logger.exception("Failed to archive source %s", source_file)
|
|
|
|
|
|
async def extract_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|
"""Main extraction cycle — called by the pipeline daemon's ingest stage.
|
|
|
|
Finds unprocessed sources in inbox/queue/, extracts claims, creates PRs.
|
|
Returns (succeeded, errors) for circuit breaker tracking.
|
|
"""
|
|
main = config.MAIN_WORKTREE
|
|
|
|
# Find unprocessed sources
|
|
queue_dir = main / "inbox" / "queue"
|
|
if not queue_dir.exists():
|
|
return 0, 0
|
|
|
|
# DB-authoritative status filter: exclude sources where DB records non-unprocessed state.
|
|
# File frontmatter alone isn't reliable — archive pushes can fail, leaving stale file state.
|
|
# The sources table is the authoritative record of whether a source has been processed.
|
|
db_non_unprocessed = {
|
|
r["path"] for r in conn.execute(
|
|
"SELECT path FROM sources WHERE status != 'unprocessed'"
|
|
).fetchall()
|
|
}
|
|
|
|
unprocessed = []
|
|
for f in sorted(queue_dir.glob("*.md")):
|
|
try:
|
|
content = f.read_text(encoding="utf-8")
|
|
fm = _parse_source_frontmatter(content)
|
|
if fm.get("status") != "unprocessed":
|
|
continue
|
|
rel_path = str(f.relative_to(main))
|
|
if rel_path in db_non_unprocessed:
|
|
continue
|
|
unprocessed.append((rel_path, content, fm))
|
|
except Exception:
|
|
logger.debug("Failed to read source %s", f, exc_info=True)
|
|
|
|
# Don't early-return here — re-extraction sources may exist even when queue is empty
|
|
# (the re-extraction check runs after open-PR filtering below)
|
|
|
|
# Filter out sources that already have open extraction PRs
|
|
open_pr_slugs = set()
|
|
try:
|
|
prs = await forgejo_api(
|
|
"GET",
|
|
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls?state=open&limit=50",
|
|
)
|
|
if prs:
|
|
for pr in prs:
|
|
head = pr.get("head", {}).get("ref", "")
|
|
if head.startswith("extract/"):
|
|
# Extract the source slug from branch name (extract/{slug}-{nonce})
|
|
slug_part = head[len("extract/"):]
|
|
# Remove the random suffix (last 5 chars: -{4-hex-chars})
|
|
if len(slug_part) > 5 and slug_part[-5] == "-":
|
|
slug_part = slug_part[:-5]
|
|
open_pr_slugs.add(slug_part)
|
|
except Exception:
|
|
logger.debug("Failed to check open PRs for dedup", exc_info=True)
|
|
|
|
if open_pr_slugs:
|
|
before = len(unprocessed)
|
|
unprocessed = [
|
|
(sp, c, f) for sp, c, f in unprocessed
|
|
if Path(sp).stem not in open_pr_slugs
|
|
]
|
|
skipped = before - len(unprocessed)
|
|
if skipped:
|
|
logger.info("Skipped %d source(s) with existing open PRs", skipped)
|
|
|
|
# Cooldown: skip sources with ANY PR in last EXTRACTION_COOLDOWN_HOURS.
|
|
# Defense-in-depth for DB-status filter — catches the window between PR
|
|
# creation and DB status update if anything races.
|
|
if unprocessed:
|
|
cooldown_hours = config.EXTRACTION_COOLDOWN_HOURS
|
|
recent_source_paths = {
|
|
r["source_path"] for r in conn.execute(
|
|
"""SELECT DISTINCT source_path FROM prs
|
|
WHERE source_path IS NOT NULL
|
|
AND created_at > datetime('now', ? || ' hours')""",
|
|
(f"-{cooldown_hours}",),
|
|
).fetchall() if r["source_path"]
|
|
}
|
|
if recent_source_paths:
|
|
before = len(unprocessed)
|
|
unprocessed = [
|
|
(sp, c, f) for sp, c, f in unprocessed
|
|
if sp not in recent_source_paths
|
|
]
|
|
cooled = before - len(unprocessed)
|
|
if cooled:
|
|
logger.info("Cooldown: skipped %d source(s) with PRs in last %dh", cooled, cooldown_hours)
|
|
|
|
# ── Check for re-extraction sources (must run even when queue is empty) ──
|
|
reextract_rows = conn.execute(
|
|
"""SELECT path, feedback FROM sources
|
|
WHERE status = 'needs_reextraction' AND feedback IS NOT NULL
|
|
ORDER BY updated_at ASC LIMIT ?""",
|
|
(max(1, MAX_SOURCES - len(unprocessed)),),
|
|
).fetchall()
|
|
|
|
if not unprocessed and not reextract_rows:
|
|
return 0, 0
|
|
|
|
if unprocessed:
|
|
logger.info("Extract cycle: %d unprocessed source(s) found, processing up to %d", len(unprocessed), MAX_SOURCES)
|
|
if reextract_rows:
|
|
logger.info("Extract cycle: %d source(s) queued for re-extraction", len(reextract_rows))
|
|
|
|
# Load existing claims for dedup
|
|
existing_claims = load_existing_claims_from_repo(str(main))
|
|
|
|
# Ensure extract worktree exists and is clean
|
|
if not EXTRACT_WORKTREE.exists():
|
|
logger.error("Extract worktree not found at %s", EXTRACT_WORKTREE)
|
|
return 0, 1
|
|
|
|
total_ok = 0
|
|
total_err = 0
|
|
|
|
for row in reextract_rows:
|
|
reex_path = row["path"]
|
|
# Source was archived — read from archive location
|
|
archive_base = main / "inbox" / "archive"
|
|
# Try to find the file in archive subdirs
|
|
reex_file = None
|
|
for subdir in archive_base.iterdir():
|
|
candidate = subdir / Path(reex_path).name
|
|
if candidate.exists():
|
|
reex_file = candidate
|
|
break
|
|
if not reex_file:
|
|
# Try original path as fallback
|
|
candidate = main / reex_path
|
|
if candidate.exists():
|
|
reex_file = candidate
|
|
|
|
if not reex_file:
|
|
logger.warning("Re-extraction: source %s not found on disk — skipping", reex_path)
|
|
continue
|
|
|
|
try:
|
|
reex_content = reex_file.read_text(encoding="utf-8")
|
|
reex_fm = _parse_source_frontmatter(reex_content)
|
|
reex_feedback = json.loads(row["feedback"]) if row["feedback"] else {}
|
|
|
|
logger.info("Re-extracting %s with feedback: %s", reex_path, list(reex_feedback.get("issues", [])))
|
|
|
|
conn.execute(
|
|
"UPDATE sources SET status = 'extracting', updated_at = datetime('now') WHERE path = ?",
|
|
(reex_path,),
|
|
)
|
|
conn.commit()
|
|
|
|
ok, err = await _extract_one_source(conn, reex_path, reex_content, reex_fm, existing_claims, feedback=reex_feedback)
|
|
total_ok += ok
|
|
total_err += err
|
|
|
|
if ok:
|
|
conn.execute(
|
|
"UPDATE sources SET status = 'extracted', updated_at = datetime('now') WHERE path = ?",
|
|
(reex_path,),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"UPDATE sources SET status = 'error', last_error = 're-extraction failed', updated_at = datetime('now') WHERE path = ?",
|
|
(reex_path,),
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
logger.exception("Re-extraction failed for %s", reex_path)
|
|
total_err += 1
|
|
|
|
for source_path, content, fm in unprocessed[:MAX_SOURCES]:
|
|
try:
|
|
ok, err = await _extract_one_source(conn, source_path, content, fm, existing_claims)
|
|
total_ok += ok
|
|
total_err += err
|
|
except Exception:
|
|
logger.exception("Unhandled error extracting %s", source_path)
|
|
total_err += 1
|
|
|
|
# Brief pause between sources
|
|
await asyncio.sleep(2)
|
|
|
|
logger.info("Extract cycle complete: %d succeeded, %d errors", total_ok, total_err)
|
|
return total_ok, total_err
|