claim_title payloads wrote unresolvable human-readable titles into frontmatter related fields. Switched to claim_path with slug extraction so reciprocal edges in merge.py can resolve targets. Renamed neighbor_titles→neighbor_slugs throughout for consistency. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
201 lines
6.6 KiB
Python
201 lines
6.6 KiB
Python
"""Atomic extract-and-connect — wire new claims to the KB at extraction time.
|
|
|
|
After extraction writes claim files to disk, this module:
|
|
1. Embeds each new claim (title + description + body snippet)
|
|
2. Searches Qdrant for semantically similar existing claims
|
|
3. Adds found neighbors as `related` edges on the NEW claim's frontmatter
|
|
|
|
Key design decision: edges are written on the NEW claim, not on existing claims.
|
|
Writing on existing claims would cause merge conflicts (same reason entities are
|
|
queued, not written on branches). When the PR merges, embed-on-merge adds the
|
|
new claim to Qdrant, and reweave can later add reciprocal edges on neighbors.
|
|
|
|
Cost: ~$0.0001 per claim (embedding only). No LLM classification — defaults to
|
|
"related". Reweave handles supports/challenges classification in a separate pass.
|
|
|
|
Owner: Epimetheus
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger("pipeline.connect")
|
|
|
|
# Similarity threshold for auto-connecting — below reweave's 0.70 but above
|
|
# the noise floor (~0.55). "related" still means actually related, not vaguely topical.
|
|
CONNECT_THRESHOLD = 0.65
|
|
CONNECT_MAX_NEIGHBORS = 5
|
|
|
|
# --- Import search functions ---
|
|
# This module is called from openrouter-extract-v2.py which may not have lib/ on path
|
|
# via the package, so handle both import paths.
|
|
try:
|
|
from .search import embed_query, search_qdrant
|
|
from .post_extract import parse_frontmatter, _rebuild_content
|
|
except ImportError:
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from search import embed_query, search_qdrant
|
|
from post_extract import parse_frontmatter, _rebuild_content
|
|
|
|
|
|
def _build_search_text(content: str) -> str:
|
|
"""Extract title + description + first 500 chars of body for embedding."""
|
|
fm, body = parse_frontmatter(content)
|
|
parts = []
|
|
if fm:
|
|
desc = fm.get("description", "")
|
|
if isinstance(desc, str) and desc:
|
|
parts.append(desc.strip('"').strip("'"))
|
|
# Get H1 title from body
|
|
h1_match = re.search(r"^# (.+)$", body, re.MULTILINE) if body else None
|
|
if h1_match:
|
|
parts.append(h1_match.group(1).strip())
|
|
# Add body snippet (skip H1 line)
|
|
if body:
|
|
body_text = re.sub(r"^# .+\n*", "", body).strip()
|
|
# Stop at "Relevant Notes" or "Topics" sections
|
|
body_text = re.split(r"\n---\n", body_text)[0].strip()
|
|
if body_text:
|
|
parts.append(body_text[:500])
|
|
return " ".join(parts)
|
|
|
|
|
|
def _add_related_edges(claim_path: str, neighbor_slugs: list[str]) -> bool:
|
|
"""Add related edges to a claim's frontmatter. Returns True if modified."""
|
|
try:
|
|
with open(claim_path) as f:
|
|
content = f.read()
|
|
except Exception as e:
|
|
logger.warning("Cannot read %s: %s", claim_path, e)
|
|
return False
|
|
|
|
fm, body = parse_frontmatter(content)
|
|
if fm is None:
|
|
return False
|
|
|
|
# Get existing related edges to avoid duplicates
|
|
existing = fm.get("related", [])
|
|
if isinstance(existing, str):
|
|
existing = [existing]
|
|
elif not isinstance(existing, list):
|
|
existing = []
|
|
|
|
existing_lower = {str(e).strip().lower() for e in existing}
|
|
|
|
# Add new edges
|
|
added = []
|
|
for slug in neighbor_slugs:
|
|
if slug.strip().lower() not in existing_lower:
|
|
added.append(slug)
|
|
existing_lower.add(slug.strip().lower())
|
|
|
|
if not added:
|
|
return False
|
|
|
|
fm["related"] = existing + added
|
|
|
|
# Rebuild and write
|
|
new_content = _rebuild_content(fm, body)
|
|
with open(claim_path, "w") as f:
|
|
f.write(new_content)
|
|
|
|
return True
|
|
|
|
|
|
def connect_new_claims(
|
|
claim_paths: list[str],
|
|
threshold: float = CONNECT_THRESHOLD,
|
|
max_neighbors: int = CONNECT_MAX_NEIGHBORS,
|
|
) -> dict:
|
|
"""Connect newly-written claims to the existing KB via vector search.
|
|
|
|
Args:
|
|
claim_paths: List of file paths to newly-written claim files.
|
|
threshold: Minimum cosine similarity for connection.
|
|
max_neighbors: Maximum edges to add per claim.
|
|
|
|
Returns:
|
|
{
|
|
"total": int,
|
|
"connected": int,
|
|
"edges_added": int,
|
|
"skipped_embed_failed": int,
|
|
"skipped_no_neighbors": int,
|
|
"connections": [{"claim": str, "neighbors": [str]}],
|
|
}
|
|
"""
|
|
stats = {
|
|
"total": len(claim_paths),
|
|
"connected": 0,
|
|
"edges_added": 0,
|
|
"skipped_embed_failed": 0,
|
|
"skipped_no_neighbors": 0,
|
|
"connections": [],
|
|
}
|
|
|
|
for claim_path in claim_paths:
|
|
try:
|
|
with open(claim_path) as f:
|
|
content = f.read()
|
|
except Exception:
|
|
continue
|
|
|
|
# Build search text from claim content
|
|
search_text = _build_search_text(content)
|
|
if not search_text or len(search_text) < 20:
|
|
stats["skipped_no_neighbors"] += 1
|
|
continue
|
|
|
|
# Embed the claim
|
|
vector = embed_query(search_text)
|
|
if vector is None:
|
|
stats["skipped_embed_failed"] += 1
|
|
continue
|
|
|
|
# Search Qdrant for neighbors (exclude nothing — new claim isn't in Qdrant yet)
|
|
hits = search_qdrant(
|
|
vector,
|
|
limit=max_neighbors,
|
|
domain=None, # Cross-domain connections are valuable
|
|
score_threshold=threshold,
|
|
)
|
|
|
|
if not hits:
|
|
stats["skipped_no_neighbors"] += 1
|
|
continue
|
|
|
|
# Extract neighbor slugs (filename stems, not titles — reciprocal edges need resolvable names)
|
|
neighbor_slugs = []
|
|
for hit in hits:
|
|
payload = hit.get("payload", {})
|
|
claim_path_qdrant = payload.get("claim_path", "")
|
|
if claim_path_qdrant:
|
|
slug = claim_path_qdrant.rsplit("/", 1)[-1].replace(".md", "")
|
|
neighbor_slugs.append(slug)
|
|
|
|
if not neighbor_slugs:
|
|
stats["skipped_no_neighbors"] += 1
|
|
continue
|
|
|
|
# Add edges to the new claim's frontmatter
|
|
if _add_related_edges(claim_path, neighbor_slugs):
|
|
stats["connected"] += 1
|
|
stats["edges_added"] += len(neighbor_slugs)
|
|
stats["connections"].append({
|
|
"claim": os.path.basename(claim_path),
|
|
"neighbors": neighbor_slugs,
|
|
})
|
|
logger.info("Connected %s → %d neighbors", os.path.basename(claim_path), len(neighbor_slugs))
|
|
else:
|
|
stats["skipped_no_neighbors"] += 1
|
|
|
|
logger.info(
|
|
"Extract-and-connect: %d/%d claims connected (%d edges added, %d embed failed, %d no neighbors)",
|
|
stats["connected"], stats["total"], stats["edges_added"],
|
|
stats["skipped_embed_failed"], stats["skipped_no_neighbors"],
|
|
)
|
|
|
|
return stats
|