Some checks failed
CI / lint-and-test (push) Has been cancelled
Sources merged: - teleo-codex/ops/pipeline-v2/ (11 newer lib files, 5 new lib modules) - teleo-codex/ops/ (agent-state, diagnostics expansion, systemd units, ops scripts) - VPS /opt/teleo-eval/telegram/ (10 new bot files, agent configs) - VPS /opt/teleo-eval/pipeline/ops/ (vector-gc, backfill-descriptions) - VPS /opt/teleo-eval/sync-mirror.sh (Bug 2 + Step 2.5 fixes) Non-trivial merges: - connect.py: kept codex threshold (0.65) + added infra domain parameter - watchdog.py: kept infra version (stale_pr integration, superset of codex) - deploy.sh: codex rsync version (interim, until VPS git clone migration) - diagnostics/app.py: codex decomposed dashboard (14 new route modules) 81 files changed, +17105/-200 lines Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
202 lines
6.6 KiB
Python
202 lines
6.6 KiB
Python
"""Atomic extract-and-connect — wire new claims to the KB at extraction time.
|
|
|
|
After extraction writes claim files to disk, this module:
|
|
1. Embeds each new claim (title + description + body snippet)
|
|
2. Searches Qdrant for semantically similar existing claims
|
|
3. Adds found neighbors as `related` edges on the NEW claim's frontmatter
|
|
|
|
Key design decision: edges are written on the NEW claim, not on existing claims.
|
|
Writing on existing claims would cause merge conflicts (same reason entities are
|
|
queued, not written on branches). When the PR merges, embed-on-merge adds the
|
|
new claim to Qdrant, and reweave can later add reciprocal edges on neighbors.
|
|
|
|
Cost: ~$0.0001 per claim (embedding only). No LLM classification — defaults to
|
|
"related". Reweave handles supports/challenges classification in a separate pass.
|
|
|
|
Owner: Epimetheus
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger("pipeline.connect")
|
|
|
|
# Similarity threshold for auto-connecting — below reweave's 0.70 but above
|
|
# the noise floor (~0.55). "related" still means actually related, not vaguely topical.
|
|
CONNECT_THRESHOLD = 0.65
|
|
CONNECT_MAX_NEIGHBORS = 5
|
|
|
|
# --- Import search functions ---
|
|
# This module is called from openrouter-extract-v2.py which may not have lib/ on path
|
|
# via the package, so handle both import paths.
|
|
try:
|
|
from .search import embed_query, search_qdrant
|
|
from .post_extract import parse_frontmatter, _rebuild_content
|
|
except ImportError:
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from search import embed_query, search_qdrant
|
|
from post_extract import parse_frontmatter, _rebuild_content
|
|
|
|
|
|
def _build_search_text(content: str) -> str:
|
|
"""Extract title + description + first 500 chars of body for embedding."""
|
|
fm, body = parse_frontmatter(content)
|
|
parts = []
|
|
if fm:
|
|
desc = fm.get("description", "")
|
|
if isinstance(desc, str) and desc:
|
|
parts.append(desc.strip('"').strip("'"))
|
|
# Get H1 title from body
|
|
h1_match = re.search(r"^# (.+)$", body, re.MULTILINE) if body else None
|
|
if h1_match:
|
|
parts.append(h1_match.group(1).strip())
|
|
# Add body snippet (skip H1 line)
|
|
if body:
|
|
body_text = re.sub(r"^# .+\n*", "", body).strip()
|
|
# Stop at "Relevant Notes" or "Topics" sections
|
|
body_text = re.split(r"\n---\n", body_text)[0].strip()
|
|
if body_text:
|
|
parts.append(body_text[:500])
|
|
return " ".join(parts)
|
|
|
|
|
|
def _add_related_edges(claim_path: str, neighbor_titles: list[str]) -> bool:
|
|
"""Add related edges to a claim's frontmatter. Returns True if modified."""
|
|
try:
|
|
with open(claim_path) as f:
|
|
content = f.read()
|
|
except Exception as e:
|
|
logger.warning("Cannot read %s: %s", claim_path, e)
|
|
return False
|
|
|
|
fm, body = parse_frontmatter(content)
|
|
if fm is None:
|
|
return False
|
|
|
|
# Get existing related edges to avoid duplicates
|
|
existing = fm.get("related", [])
|
|
if isinstance(existing, str):
|
|
existing = [existing]
|
|
elif not isinstance(existing, list):
|
|
existing = []
|
|
|
|
existing_lower = {str(e).strip().lower() for e in existing}
|
|
|
|
# Add new edges
|
|
added = []
|
|
for title in neighbor_titles:
|
|
if title.strip().lower() not in existing_lower:
|
|
added.append(title)
|
|
existing_lower.add(title.strip().lower())
|
|
|
|
if not added:
|
|
return False
|
|
|
|
fm["related"] = existing + added
|
|
|
|
# Rebuild and write
|
|
new_content = _rebuild_content(fm, body)
|
|
with open(claim_path, "w") as f:
|
|
f.write(new_content)
|
|
|
|
return True
|
|
|
|
|
|
def connect_new_claims(
|
|
claim_paths: list[str],
|
|
domain: str | None = None,
|
|
threshold: float = CONNECT_THRESHOLD,
|
|
max_neighbors: int = CONNECT_MAX_NEIGHBORS,
|
|
) -> dict:
|
|
"""Connect newly-written claims to the existing KB via vector search.
|
|
|
|
Args:
|
|
claim_paths: List of file paths to newly-written claim files.
|
|
domain: Optional domain filter for Qdrant search.
|
|
threshold: Minimum cosine similarity for connection.
|
|
max_neighbors: Maximum edges to add per claim.
|
|
|
|
Returns:
|
|
{
|
|
"total": int,
|
|
"connected": int,
|
|
"edges_added": int,
|
|
"skipped_embed_failed": int,
|
|
"skipped_no_neighbors": int,
|
|
"connections": [{"claim": str, "neighbors": [str]}],
|
|
}
|
|
"""
|
|
stats = {
|
|
"total": len(claim_paths),
|
|
"connected": 0,
|
|
"edges_added": 0,
|
|
"skipped_embed_failed": 0,
|
|
"skipped_no_neighbors": 0,
|
|
"connections": [],
|
|
}
|
|
|
|
for claim_path in claim_paths:
|
|
try:
|
|
with open(claim_path) as f:
|
|
content = f.read()
|
|
except Exception:
|
|
continue
|
|
|
|
# Build search text from claim content
|
|
search_text = _build_search_text(content)
|
|
if not search_text or len(search_text) < 20:
|
|
stats["skipped_no_neighbors"] += 1
|
|
continue
|
|
|
|
# Embed the claim
|
|
vector = embed_query(search_text)
|
|
if vector is None:
|
|
stats["skipped_embed_failed"] += 1
|
|
continue
|
|
|
|
# Search Qdrant for neighbors (exclude nothing — new claim isn't in Qdrant yet)
|
|
hits = search_qdrant(
|
|
vector,
|
|
limit=max_neighbors,
|
|
domain=None, # Cross-domain connections are valuable
|
|
score_threshold=threshold,
|
|
)
|
|
|
|
if not hits:
|
|
stats["skipped_no_neighbors"] += 1
|
|
continue
|
|
|
|
# Extract neighbor titles
|
|
neighbor_titles = []
|
|
for hit in hits:
|
|
payload = hit.get("payload", {})
|
|
title = payload.get("claim_title", "")
|
|
if title:
|
|
neighbor_titles.append(title)
|
|
|
|
if not neighbor_titles:
|
|
stats["skipped_no_neighbors"] += 1
|
|
continue
|
|
|
|
# Add edges to the new claim's frontmatter
|
|
if _add_related_edges(claim_path, neighbor_titles):
|
|
stats["connected"] += 1
|
|
stats["edges_added"] += len(neighbor_titles)
|
|
stats["connections"].append({
|
|
"claim": os.path.basename(claim_path),
|
|
"neighbors": neighbor_titles,
|
|
})
|
|
logger.info("Connected %s → %d neighbors", os.path.basename(claim_path), len(neighbor_titles))
|
|
else:
|
|
stats["skipped_no_neighbors"] += 1
|
|
|
|
logger.info(
|
|
"Extract-and-connect: %d/%d claims connected (%d edges added, %d embed failed, %d no neighbors)",
|
|
stats["connected"], stats["total"], stats["edges_added"],
|
|
stats["skipped_embed_failed"], stats["skipped_no_neighbors"],
|
|
)
|
|
|
|
return stats
|