teleo-codex/ops/pipeline-v2/lib/connect.py
m3taversal 5f287ae9c8 epimetheus: fix connect.py title→slug mismatch in vector-search edges
claim_title payloads wrote unresolvable human-readable titles into
frontmatter related fields. Switched to claim_path with slug extraction
so reciprocal edges in merge.py can resolve targets. Renamed
neighbor_titles→neighbor_slugs throughout for consistency.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 12:25:41 +01:00

201 lines
6.6 KiB
Python

"""Atomic extract-and-connect — wire new claims to the KB at extraction time.
After extraction writes claim files to disk, this module:
1. Embeds each new claim (title + description + body snippet)
2. Searches Qdrant for semantically similar existing claims
3. Adds found neighbors as `related` edges on the NEW claim's frontmatter
Key design decision: edges are written on the NEW claim, not on existing claims.
Writing on existing claims would cause merge conflicts (same reason entities are
queued, not written on branches). When the PR merges, embed-on-merge adds the
new claim to Qdrant, and reweave can later add reciprocal edges on neighbors.
Cost: ~$0.0001 per claim (embedding only). No LLM classification — defaults to
"related". Reweave handles supports/challenges classification in a separate pass.
Owner: Epimetheus
"""
import logging
import os
import re
import sys
from pathlib import Path
logger = logging.getLogger("pipeline.connect")
# Similarity threshold for auto-connecting — below reweave's 0.70 but above
# the noise floor (~0.55). "related" still means actually related, not vaguely topical.
CONNECT_THRESHOLD = 0.65
CONNECT_MAX_NEIGHBORS = 5
# --- Import search functions ---
# This module is called from openrouter-extract-v2.py which may not have lib/ on path
# via the package, so handle both import paths.
try:
from .search import embed_query, search_qdrant
from .post_extract import parse_frontmatter, _rebuild_content
except ImportError:
sys.path.insert(0, os.path.dirname(__file__))
from search import embed_query, search_qdrant
from post_extract import parse_frontmatter, _rebuild_content
def _build_search_text(content: str) -> str:
"""Extract title + description + first 500 chars of body for embedding."""
fm, body = parse_frontmatter(content)
parts = []
if fm:
desc = fm.get("description", "")
if isinstance(desc, str) and desc:
parts.append(desc.strip('"').strip("'"))
# Get H1 title from body
h1_match = re.search(r"^# (.+)$", body, re.MULTILINE) if body else None
if h1_match:
parts.append(h1_match.group(1).strip())
# Add body snippet (skip H1 line)
if body:
body_text = re.sub(r"^# .+\n*", "", body).strip()
# Stop at "Relevant Notes" or "Topics" sections
body_text = re.split(r"\n---\n", body_text)[0].strip()
if body_text:
parts.append(body_text[:500])
return " ".join(parts)
def _add_related_edges(claim_path: str, neighbor_slugs: list[str]) -> bool:
"""Add related edges to a claim's frontmatter. Returns True if modified."""
try:
with open(claim_path) as f:
content = f.read()
except Exception as e:
logger.warning("Cannot read %s: %s", claim_path, e)
return False
fm, body = parse_frontmatter(content)
if fm is None:
return False
# Get existing related edges to avoid duplicates
existing = fm.get("related", [])
if isinstance(existing, str):
existing = [existing]
elif not isinstance(existing, list):
existing = []
existing_lower = {str(e).strip().lower() for e in existing}
# Add new edges
added = []
for slug in neighbor_slugs:
if slug.strip().lower() not in existing_lower:
added.append(slug)
existing_lower.add(slug.strip().lower())
if not added:
return False
fm["related"] = existing + added
# Rebuild and write
new_content = _rebuild_content(fm, body)
with open(claim_path, "w") as f:
f.write(new_content)
return True
def connect_new_claims(
claim_paths: list[str],
threshold: float = CONNECT_THRESHOLD,
max_neighbors: int = CONNECT_MAX_NEIGHBORS,
) -> dict:
"""Connect newly-written claims to the existing KB via vector search.
Args:
claim_paths: List of file paths to newly-written claim files.
threshold: Minimum cosine similarity for connection.
max_neighbors: Maximum edges to add per claim.
Returns:
{
"total": int,
"connected": int,
"edges_added": int,
"skipped_embed_failed": int,
"skipped_no_neighbors": int,
"connections": [{"claim": str, "neighbors": [str]}],
}
"""
stats = {
"total": len(claim_paths),
"connected": 0,
"edges_added": 0,
"skipped_embed_failed": 0,
"skipped_no_neighbors": 0,
"connections": [],
}
for claim_path in claim_paths:
try:
with open(claim_path) as f:
content = f.read()
except Exception:
continue
# Build search text from claim content
search_text = _build_search_text(content)
if not search_text or len(search_text) < 20:
stats["skipped_no_neighbors"] += 1
continue
# Embed the claim
vector = embed_query(search_text)
if vector is None:
stats["skipped_embed_failed"] += 1
continue
# Search Qdrant for neighbors (exclude nothing — new claim isn't in Qdrant yet)
hits = search_qdrant(
vector,
limit=max_neighbors,
domain=None, # Cross-domain connections are valuable
score_threshold=threshold,
)
if not hits:
stats["skipped_no_neighbors"] += 1
continue
# Extract neighbor slugs (filename stems, not titles — reciprocal edges need resolvable names)
neighbor_slugs = []
for hit in hits:
payload = hit.get("payload", {})
claim_path_qdrant = payload.get("claim_path", "")
if claim_path_qdrant:
slug = claim_path_qdrant.rsplit("/", 1)[-1].replace(".md", "")
neighbor_slugs.append(slug)
if not neighbor_slugs:
stats["skipped_no_neighbors"] += 1
continue
# Add edges to the new claim's frontmatter
if _add_related_edges(claim_path, neighbor_slugs):
stats["connected"] += 1
stats["edges_added"] += len(neighbor_slugs)
stats["connections"].append({
"claim": os.path.basename(claim_path),
"neighbors": neighbor_slugs,
})
logger.info("Connected %s%d neighbors", os.path.basename(claim_path), len(neighbor_slugs))
else:
stats["skipped_no_neighbors"] += 1
logger.info(
"Extract-and-connect: %d/%d claims connected (%d edges added, %d embed failed, %d no neighbors)",
stats["connected"], stats["total"], stats["edges_added"],
stats["skipped_embed_failed"], stats["skipped_no_neighbors"],
)
return stats