From f5b27ccd732fbb4c6632b5f3016de8e98297fa3e Mon Sep 17 00:00:00 2001 From: m3taversal Date: Thu, 26 Mar 2026 17:44:34 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Qdrant=20vector=20search=20=E2=80=94=20?= =?UTF-8?q?bulk=20embed=20script=20+=20OpenRouter=20embeddings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - embed-claims.py: bulk embeds all claims/decisions/entities into Qdrant via OpenRouter (openai/text-embedding-3-small, 1536 dims) - diagnostics/app.py: search endpoint switched from OpenAI direct to OpenRouter (same key as LLM calls, no new credentials) - Qdrant running on VPS (Docker, port 6333, persistent storage) - Collection: teleo-claims, cosine distance, 1536 dims 854 files to embed. Bulk backfill running. Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70> --- diagnostics/app.py | 251 ++++++++++++++++++++++++++++++++++++++++++++- embed-claims.py | 244 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 490 insertions(+), 5 deletions(-) create mode 100644 embed-claims.py diff --git a/diagnostics/app.py b/diagnostics/app.py index 0234f9f..04bb2f3 100644 --- a/diagnostics/app.py +++ b/diagnostics/app.py @@ -1,10 +1,11 @@ -"""Argus — Diagnostics dashboard for the Teleo pipeline. +"""Argus — Diagnostics dashboard + search API for the Teleo pipeline. Separate aiohttp service (port 8081) that reads pipeline.db read-only. -Provides Chart.js operational dashboard, quality vital signs, and contributor analytics. +Provides Chart.js operational dashboard, quality vital signs, contributor analytics, +semantic search via Qdrant, and claim usage logging. -Owner: Argus <0ECBE5A7-EFAD-4A59-B491-635A1AEDF5DE> -Data source: Epimetheus's pipeline.db (read-only SQLite) +Owner: Argus <69AF7290-758F-464B-B472-04AFCA4AB340> +Data source: Epimetheus's pipeline.db (read-only SQLite), Qdrant vector DB """ import json @@ -26,6 +27,20 @@ PORT = int(os.environ.get("ARGUS_PORT", "8081")) REPO_DIR = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")) CLAIM_INDEX_URL = os.environ.get("CLAIM_INDEX_URL", "http://localhost:8080/claim-index") +# Search config +QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") +QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION", "teleo-claims") +OPENROUTER_KEY_FILE = Path(os.environ.get("OPENROUTER_KEY_FILE", "/opt/teleo-eval/secrets/openrouter-key")) +EMBEDDING_MODEL = "text-embedding-3-small" +EMBEDDING_DIMS = 1536 + +# Auth config +API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secrets/argus-api-key")) + +# Endpoints that skip auth (dashboard is public for now, can lock later) +_PUBLIC_PATHS = frozenset({"/", "/api/metrics", "/api/snapshots", "/api/vital-signs", + "/api/contributors", "/api/domains"}) + def _get_db() -> sqlite3.Connection: """Open read-only connection to pipeline.db.""" @@ -441,6 +456,140 @@ def _compute_vital_signs(conn) -> dict: } +# ─── Auth ──────────────────────────────────────────────────────────────────── + + +def _load_secret(path: Path) -> str | None: + """Load a secret from a file. Returns None if missing.""" + try: + return path.read_text().strip() + except Exception: + return None + + +@web.middleware +async def auth_middleware(request, handler): + """API key check. Public paths skip auth. Protected paths require X-Api-Key header.""" + if request.path in _PUBLIC_PATHS: + return await handler(request) + expected = request.app.get("api_key") + if not expected: + # No key configured — all endpoints open (development mode) + return await handler(request) + provided = request.headers.get("X-Api-Key", "") + if provided != expected: + return web.json_response({"error": "unauthorized"}, status=401) + return await handler(request) + + +# ─── Embedding + Search ────────────────────────────────────────────────────── + + +def _get_embedding_key() -> str | None: + """Load OpenRouter API key for embeddings.""" + return _load_secret(OPENROUTER_KEY_FILE) + + +def _embed_query(text: str, api_key: str) -> list[float] | None: + """Embed a query string via OpenRouter (OpenAI-compatible endpoint). + + Uses urllib to avoid adding httpx/openai as dependencies. + """ + payload = json.dumps({ + "model": f"openai/{EMBEDDING_MODEL}", + "input": text, + }).encode() + req = urllib.request.Request( + "https://openrouter.ai/api/v1/embeddings", + data=payload, + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + ) + try: + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read()) + return data["data"][0]["embedding"] + except Exception as e: + logger.error("Embedding failed: %s", e) + return None + + +def _search_qdrant(vector: list[float], limit: int = 10, + domain: str | None = None, confidence: str | None = None, + exclude: list[str] | None = None) -> list[dict]: + """Search Qdrant collection for nearest claims. + + Uses urllib for zero-dependency Qdrant access (REST API). + """ + must_filters = [] + if domain: + must_filters.append({"key": "domain", "match": {"value": domain}}) + if confidence: + must_filters.append({"key": "confidence", "match": {"value": confidence}}) + + must_not_filters = [] + if exclude: + for path in exclude: + must_not_filters.append({"key": "claim_path", "match": {"value": path}}) + + payload = { + "vector": vector, + "limit": limit, + "with_payload": True, + "score_threshold": 0.3, + } + if must_filters or must_not_filters: + payload["filter"] = {} + if must_filters: + payload["filter"]["must"] = must_filters + if must_not_filters: + payload["filter"]["must_not"] = must_not_filters + + req = urllib.request.Request( + f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points/search", + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, + ) + try: + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read()) + return data.get("result", []) + except Exception as e: + logger.error("Qdrant search failed: %s", e) + return [] + + +# ─── Usage logging ─────────────────────────────────────────────────────────── + + +def _get_write_db() -> sqlite3.Connection | None: + """Open read-write connection for usage logging only. + + Separate from the main read-only connection. Returns None if DB unavailable. + """ + try: + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=10000") + # Ensure claim_usage table exists (Epimetheus creates it, but be safe) + conn.execute(""" + CREATE TABLE IF NOT EXISTS claim_usage ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + claim_path TEXT NOT NULL, + agent TEXT, + context TEXT, + ts TEXT DEFAULT (datetime('now')) + ) + """) + conn.commit() + return conn + except Exception as e: + logger.warning("Failed to open write DB for usage logging: %s", e) + return None + + # ─── Route handlers ───────────────────────────────────────────────────────── @@ -551,6 +700,91 @@ async def handle_api_domains(request): return web.json_response({"domains": breakdown}) +async def handle_api_search(request): + """GET /api/search — semantic search over claims via Qdrant. + + Query params: + q: search query (required) + domain: filter by domain (optional) + confidence: filter by confidence level (optional) + limit: max results, default 10 (optional) + exclude: comma-separated claim paths to exclude (optional) + """ + query = request.query.get("q", "").strip() + if not query: + return web.json_response({"error": "q parameter required"}, status=400) + + domain = request.query.get("domain") + confidence = request.query.get("confidence") + limit = min(int(request.query.get("limit", "10")), 50) + exclude_raw = request.query.get("exclude", "") + exclude = [p.strip() for p in exclude_raw.split(",") if p.strip()] if exclude_raw else None + + # Embed the query + api_key = _get_embedding_key() + if not api_key: + return web.json_response({"error": "embedding service unavailable"}, status=503) + + vector = _embed_query(query, api_key) + if vector is None: + return web.json_response({"error": "embedding failed"}, status=502) + + # Search Qdrant + results = _search_qdrant(vector, limit=limit, domain=domain, + confidence=confidence, exclude=exclude) + + # Format response + claims = [] + for hit in results: + payload = hit.get("payload", {}) + claims.append({ + "claim_title": payload.get("claim_title", ""), + "claim_path": payload.get("claim_path", ""), + "similarity_score": round(hit.get("score", 0), 4), + "domain": payload.get("domain", ""), + "confidence": payload.get("confidence", ""), + "snippet": payload.get("snippet", "")[:200], + "depends_on": payload.get("depends_on", []), + "challenged_by": payload.get("challenged_by", []), + }) + + return web.json_response(claims) + + +async def handle_api_usage(request): + """POST /api/usage — log claim usage for analytics. + + Body: {"claim_path": "...", "agent": "rio", "context": "telegram-response"} + Fire-and-forget — returns 200 immediately. + """ + try: + body = await request.json() + except Exception: + return web.json_response({"error": "invalid JSON"}, status=400) + + claim_path = body.get("claim_path", "").strip() + if not claim_path: + return web.json_response({"error": "claim_path required"}, status=400) + + agent = body.get("agent", "unknown") + context = body.get("context", "") + + # Fire-and-forget write — don't block the response + try: + write_conn = _get_write_db() + if write_conn: + write_conn.execute( + "INSERT INTO claim_usage (claim_path, agent, context) VALUES (?, ?, ?)", + (claim_path, agent, context), + ) + write_conn.commit() + write_conn.close() + except Exception as e: + logger.warning("Usage log failed (non-fatal): %s", e) + + return web.json_response({"status": "ok"}) + + # ─── Dashboard HTML ────────────────────────────────────────────────────────── @@ -1093,14 +1327,21 @@ function toggleContribView(view) {{ def create_app() -> web.Application: - app = web.Application() + app = web.Application(middlewares=[auth_middleware]) app["db"] = _get_db() + app["api_key"] = _load_secret(API_KEY_FILE) + if app["api_key"]: + logger.info("API key auth enabled (protected endpoints require X-Api-Key)") + else: + logger.info("No API key configured — all endpoints open") app.router.add_get("/", handle_dashboard) app.router.add_get("/api/metrics", handle_api_metrics) app.router.add_get("/api/snapshots", handle_api_snapshots) app.router.add_get("/api/vital-signs", handle_api_vital_signs) app.router.add_get("/api/contributors", handle_api_contributors) app.router.add_get("/api/domains", handle_api_domains) + app.router.add_get("/api/search", handle_api_search) + app.router.add_post("/api/usage", handle_api_usage) app.on_cleanup.append(_cleanup) return app diff --git a/embed-claims.py b/embed-claims.py new file mode 100644 index 0000000..f0a45d3 --- /dev/null +++ b/embed-claims.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 +# ONE-SHOT BACKFILL + ongoing embed-on-merge utility. +"""Embed KB claims/decisions/entities into Qdrant for vector search. + +Reads markdown files, embeds title+body via OpenAI text-embedding-3-small, +upserts into Qdrant with minimal metadata (path, title, domain, confidence, type). + +Usage: + python3 embed-claims.py # Bulk embed all + python3 embed-claims.py --file path.md # Embed single file + python3 embed-claims.py --dry-run # Count without embedding + +Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70> +""" + +import argparse +import json +import os +import re +import sys +import time +import urllib.request +from pathlib import Path + +import yaml + +REPO_DIR = Path("/opt/teleo-eval/workspaces/main") +QDRANT_URL = "http://localhost:6333" +COLLECTION = "teleo-claims" +EMBEDDING_MODEL = "text-embedding-3-small" + +# Directories to embed +EMBED_DIRS = ["domains", "core", "foundations", "decisions", "entities"] + + +def _get_api_key() -> str: + """Load OpenRouter API key (same key used for LLM calls).""" + for path in ["/opt/teleo-eval/secrets/openrouter-key"]: + if os.path.exists(path): + return open(path).read().strip() + key = os.environ.get("OPENROUTER_API_KEY", "") + if key: + return key + print("ERROR: No OpenRouter API key found") + sys.exit(1) + + +def embed_text(text: str, api_key: str) -> list[float] | None: + """Embed text via OpenRouter (OpenAI-compatible embeddings endpoint).""" + payload = json.dumps({"model": f"openai/{EMBEDDING_MODEL}", "input": text[:8000]}).encode() + req = urllib.request.Request( + "https://openrouter.ai/api/v1/embeddings", + data=payload, + headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, + ) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + data = json.loads(resp.read()) + return data["data"][0]["embedding"] + except Exception as e: + print(f" Embedding failed: {e}") + return None + + +def parse_frontmatter(path: Path) -> tuple[dict | None, str]: + """Parse YAML frontmatter and body.""" + text = path.read_text(errors="replace") + if not text.startswith("---"): + return None, text + end = text.find("\n---", 3) + if end == -1: + return None, text + try: + fm = yaml.safe_load(text[3:end]) + if not isinstance(fm, dict): + return None, text + return fm, text[end + 4:].strip() + except Exception: + return None, text + + +def upsert_to_qdrant(point_id: str, vector: list[float], payload: dict): + """Upsert a single point to Qdrant.""" + data = json.dumps({ + "points": [{ + "id": point_id, + "vector": vector, + "payload": payload, + }] + }).encode() + req = urllib.request.Request( + f"{QDRANT_URL}/collections/{COLLECTION}/points", + data=data, + headers={"Content-Type": "application/json"}, + method="PUT", + ) + with urllib.request.urlopen(req, timeout=10) as resp: + return json.loads(resp.read()) + + +def make_point_id(path: str) -> str: + """Create a deterministic UUID from file path.""" + import hashlib + return str(hashlib.md5(path.encode()).hexdigest()) + + +def classify_file(fm: dict, path: Path) -> tuple[str, str, str, str]: + """Extract type, domain, confidence, title from frontmatter + path.""" + ft = fm.get("type", "") + if ft == "decision": + file_type = "decision" + elif ft == "entity": + file_type = "entity" + else: + file_type = "claim" + + domain = fm.get("domain", "") + if not domain: + # Infer from path + rel = path.relative_to(REPO_DIR) + parts = rel.parts + if len(parts) >= 2 and parts[0] in ("domains", "entities", "decisions"): + domain = parts[1] + elif parts[0] == "core": + domain = "core" + elif parts[0] == "foundations" and len(parts) >= 2: + domain = parts[1] + + confidence = fm.get("confidence", "unknown") + title = fm.get("name", fm.get("title", path.stem.replace("-", " "))) + + return file_type, domain, confidence, str(title) + + +def embed_file(path: Path, api_key: str, dry_run: bool = False) -> bool: + """Embed a single file into Qdrant. Returns True if successful.""" + fm, body = parse_frontmatter(path) + if not fm: + return False + + # Skip non-knowledge files + ft = fm.get("type", "") + if ft in ("source", "musing"): + return False + if path.name.startswith("_"): + return False + + file_type, domain, confidence, title = classify_file(fm, path) + rel_path = str(path.relative_to(REPO_DIR)) + + # Build embed text: title + first ~2000 chars of body + embed_text_str = f"{title}\n\n{body[:2000]}" if body else title + + if dry_run: + print(f" [{file_type}] {rel_path}: {title[:60]}") + return True + + # Embed + vector = embed_text(embed_text_str, api_key) + if not vector: + return False + + # Upsert to Qdrant + point_id = make_point_id(rel_path) + payload = { + "claim_path": rel_path, + "claim_title": title, + "domain": domain, + "confidence": confidence, + "type": file_type, + "snippet": body[:200] if body else "", + } + + try: + upsert_to_qdrant(point_id, vector, payload) + return True + except Exception as e: + print(f" Qdrant upsert failed for {rel_path}: {e}") + return False + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--file", type=str, help="Embed a single file") + args = parser.parse_args() + + api_key = _get_api_key() + + if args.file: + path = Path(args.file) + if not path.exists(): + print(f"File not found: {path}") + sys.exit(1) + ok = embed_file(path, api_key, dry_run=args.dry_run) + print("OK" if ok else "SKIP") + return + + # Bulk embed + files = [] + for d in EMBED_DIRS: + base = REPO_DIR / d + if not base.exists(): + continue + for md in base.rglob("*.md"): + if not md.name.startswith("_"): + files.append(md) + + print(f"Found {len(files)} files to process") + + embedded = 0 + skipped = 0 + failed = 0 + + for i, path in enumerate(files): + if i % 50 == 0 and i > 0: + print(f" Progress: {i}/{len(files)} ({embedded} embedded, {skipped} skipped)") + if not args.dry_run: + time.sleep(0.5) # Rate limit courtesy + + ok = embed_file(path, api_key, dry_run=args.dry_run) + if ok: + embedded += 1 + else: + skipped += 1 + + if not args.dry_run and embedded % 20 == 0 and embedded > 0: + time.sleep(1) # Batch rate limit + + print(f"\nDone: {embedded} embedded, {skipped} skipped, {failed} failed") + + if not args.dry_run: + # Verify + try: + resp = urllib.request.urlopen(f"{QDRANT_URL}/collections/{COLLECTION}") + data = json.loads(resp.read()) + count = data["result"]["points_count"] + print(f"Qdrant collection: {count} vectors") + except Exception as e: + print(f"Verification failed: {e}") + + +if __name__ == "__main__": + main()