Root cause: decision records have type: decision, but the entity indexer only accepted type: entity and only scanned entities/. The claim indexer scanned decisions/ but filtered out non-claim types. Result: decision records fell through both indexes entirely — invisible to the bot. Fix: add decisions/ to entity indexer scan paths, accept type: decision alongside type: entity, include summary/proposer in search aliases. Remove decisions/ from claim indexer (was silently dropping them anyway). Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
557 lines
21 KiB
Python
557 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""KB Retrieval for Telegram bot — multi-layer search across the Teleo knowledge base.
|
|
|
|
Architecture (Ganymede-reviewed):
|
|
Layer 1: Entity resolution — query tokens → entity name/aliases/tags → entity file
|
|
Layer 2: Claim search — substring + keyword matching on titles AND descriptions
|
|
Layer 3: Agent context — positions, beliefs referencing matched entities/claims
|
|
|
|
Entry point: retrieve_context(query, repo_dir) → KBContext
|
|
|
|
Epimetheus owns this module.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
logger = logging.getLogger("kb-retrieval")
|
|
|
|
# ─── Types ────────────────────────────────────────────────────────────
|
|
|
|
|
|
@dataclass
|
|
class EntityMatch:
|
|
"""A matched entity with its profile."""
|
|
name: str
|
|
path: str
|
|
entity_type: str
|
|
domain: str
|
|
overview: str # first ~500 chars of body
|
|
tags: list[str]
|
|
related_claims: list[str] # wiki-link titles from body
|
|
|
|
|
|
@dataclass
|
|
class ClaimMatch:
|
|
"""A matched claim."""
|
|
title: str
|
|
path: str
|
|
domain: str
|
|
confidence: str
|
|
description: str
|
|
score: float # relevance score
|
|
|
|
|
|
@dataclass
|
|
class PositionMatch:
|
|
"""An agent position on a topic."""
|
|
agent: str
|
|
title: str
|
|
content: str # first ~500 chars
|
|
|
|
|
|
@dataclass
|
|
class KBContext:
|
|
"""Full KB context for a query — passed to the LLM prompt."""
|
|
entities: list[EntityMatch] = field(default_factory=list)
|
|
claims: list[ClaimMatch] = field(default_factory=list)
|
|
positions: list[PositionMatch] = field(default_factory=list)
|
|
belief_excerpts: list[str] = field(default_factory=list)
|
|
stats: dict = field(default_factory=dict)
|
|
|
|
|
|
# ─── Index ────────────────────────────────────────────────────────────
|
|
|
|
|
|
class KBIndex:
|
|
"""In-memory index of entities, claims, and agent state. Rebuilt on mtime change."""
|
|
|
|
def __init__(self, repo_dir: str):
|
|
self.repo_dir = Path(repo_dir)
|
|
self._entities: list[dict] = [] # [{name, path, type, domain, tags, handles, body_excerpt, aliases}]
|
|
self._claims: list[dict] = [] # [{title, path, domain, confidence, description}]
|
|
self._positions: list[dict] = [] # [{agent, title, path, content}]
|
|
self._beliefs: list[dict] = [] # [{agent, path, content}]
|
|
self._entity_alias_map: dict[str, list[int]] = {} # lowercase alias → indices into _entities
|
|
self._last_build: float = 0
|
|
|
|
def ensure_fresh(self, max_age_seconds: int = 300):
|
|
"""Rebuild index if stale. Rebuilds every max_age_seconds (default 5 min)."""
|
|
now = time.time()
|
|
if now - self._last_build > max_age_seconds:
|
|
self._build()
|
|
|
|
def _build(self):
|
|
"""Rebuild all indexes from filesystem."""
|
|
logger.info("Rebuilding KB index from %s", self.repo_dir)
|
|
start = time.time()
|
|
|
|
self._entities = []
|
|
self._claims = []
|
|
self._positions = []
|
|
self._beliefs = []
|
|
self._entity_alias_map = {}
|
|
|
|
self._index_entities()
|
|
self._index_claims()
|
|
self._index_agent_state()
|
|
self._last_build = time.time()
|
|
|
|
logger.info("KB index built in %.1fs: %d entities, %d claims, %d positions",
|
|
time.time() - start, len(self._entities), len(self._claims), len(self._positions))
|
|
|
|
def _index_entities(self):
|
|
"""Scan entities/ and decisions/ for entity and decision files."""
|
|
entity_dirs = [
|
|
self.repo_dir / "entities",
|
|
self.repo_dir / "decisions",
|
|
]
|
|
for entities_dir in entity_dirs:
|
|
if not entities_dir.exists():
|
|
continue
|
|
for md_file in entities_dir.rglob("*.md"):
|
|
self._index_single_entity(md_file)
|
|
|
|
def _index_single_entity(self, md_file: Path):
|
|
"""Index a single entity or decision file."""
|
|
try:
|
|
fm, body = _parse_frontmatter(md_file)
|
|
if not fm or fm.get("type") not in ("entity", "decision"):
|
|
return
|
|
|
|
name = fm.get("name", md_file.stem)
|
|
handles = fm.get("handles", []) or []
|
|
tags = fm.get("tags", []) or []
|
|
entity_type = fm.get("entity_type", "unknown")
|
|
domain = fm.get("domain", "unknown")
|
|
|
|
# For decision records, also index summary and proposer as searchable text
|
|
summary = fm.get("summary", "")
|
|
proposer = fm.get("proposer", "")
|
|
|
|
# Build aliases from multiple sources
|
|
aliases = set()
|
|
aliases.add(name.lower())
|
|
aliases.add(md_file.stem.lower()) # slugified name
|
|
for h in handles:
|
|
aliases.add(h.lower().lstrip("@"))
|
|
for t in tags:
|
|
aliases.add(t.lower())
|
|
# Add proposer name as alias for decision records
|
|
if proposer:
|
|
aliases.add(proposer.lower())
|
|
|
|
# Mine body for ticker mentions ($XXXX and standalone ALL-CAPS tokens)
|
|
dollar_tickers = re.findall(r"\$([A-Z]{2,10})", body[:2000])
|
|
for ticker in dollar_tickers:
|
|
aliases.add(ticker.lower())
|
|
aliases.add(f"${ticker.lower()}")
|
|
# Standalone all-caps tokens (likely tickers: OMFG, META, SOL)
|
|
caps_tokens = re.findall(r"\b([A-Z]{2,10})\b", body[:2000])
|
|
for token in caps_tokens:
|
|
# Filter common English words that happen to be short caps
|
|
if token not in ("THE", "AND", "FOR", "NOT", "BUT", "HAS", "ARE", "WAS",
|
|
"ITS", "ALL", "CAN", "HAD", "HER", "ONE", "OUR", "OUT",
|
|
"NEW", "NOW", "OLD", "SEE", "WAY", "MAY", "SAY", "SHE",
|
|
"TWO", "HOW", "BOY", "DID", "GET", "PUT", "KEY", "TVL",
|
|
"AMM", "CEO", "SDK", "API", "ICO", "APY", "FAQ", "IPO"):
|
|
aliases.add(token.lower())
|
|
aliases.add(f"${token.lower()}")
|
|
|
|
# Also add aliases field if it exists (future schema)
|
|
for a in (fm.get("aliases", []) or []):
|
|
aliases.add(a.lower())
|
|
|
|
# Extract wiki-linked claim references from body
|
|
related_claims = re.findall(r"\[\[([^\]]+)\]\]", body)
|
|
|
|
# Body excerpt — for decisions, lead with summary for better prompt context
|
|
if summary:
|
|
overview = f"{summary} "
|
|
body_lines = [l for l in body.split("\n") if l.strip() and not l.startswith("#")]
|
|
remaining = 500 - len(overview)
|
|
if remaining > 0:
|
|
overview += " ".join(body_lines[:10])[:remaining]
|
|
else:
|
|
body_lines = [l for l in body.split("\n") if l.strip() and not l.startswith("#")]
|
|
overview = " ".join(body_lines[:10])[:500]
|
|
|
|
idx = len(self._entities)
|
|
self._entities.append({
|
|
"name": name,
|
|
"path": str(md_file),
|
|
"type": entity_type,
|
|
"domain": domain,
|
|
"tags": tags,
|
|
"handles": handles,
|
|
"aliases": list(aliases),
|
|
"overview": overview,
|
|
"related_claims": related_claims,
|
|
})
|
|
|
|
# Register all aliases in lookup map
|
|
for alias in aliases:
|
|
self._entity_alias_map.setdefault(alias, []).append(idx)
|
|
|
|
except Exception as e:
|
|
logger.warning("Failed to index entity %s: %s", md_file, e)
|
|
|
|
def _index_claims(self):
|
|
"""Scan domains/, core/, and foundations/ for claim files."""
|
|
claim_dirs = [
|
|
self.repo_dir / "domains",
|
|
self.repo_dir / "core",
|
|
self.repo_dir / "foundations",
|
|
]
|
|
for claim_dir in claim_dirs:
|
|
if not claim_dir.exists():
|
|
continue
|
|
for md_file in claim_dir.rglob("*.md"):
|
|
# Skip _map.md and other non-claim files
|
|
if md_file.name.startswith("_"):
|
|
continue
|
|
try:
|
|
fm, body = _parse_frontmatter(md_file)
|
|
if not fm:
|
|
# Many claims lack explicit type — index them anyway
|
|
title = md_file.stem.replace("-", " ")
|
|
self._claims.append({
|
|
"title": title,
|
|
"path": str(md_file),
|
|
"domain": _domain_from_path(md_file, self.repo_dir),
|
|
"confidence": "unknown",
|
|
"description": "",
|
|
})
|
|
continue
|
|
|
|
# Skip non-claim types if type is explicit
|
|
ft = fm.get("type")
|
|
if ft and ft not in ("claim", None):
|
|
continue
|
|
|
|
title = md_file.stem.replace("-", " ")
|
|
self._claims.append({
|
|
"title": title,
|
|
"path": str(md_file),
|
|
"domain": fm.get("domain", _domain_from_path(md_file, self.repo_dir)),
|
|
"confidence": fm.get("confidence", "unknown"),
|
|
"description": fm.get("description", ""),
|
|
})
|
|
except Exception as e:
|
|
logger.warning("Failed to index claim %s: %s", md_file, e)
|
|
|
|
def _index_agent_state(self):
|
|
"""Scan agents/ for positions and beliefs."""
|
|
agents_dir = self.repo_dir / "agents"
|
|
if not agents_dir.exists():
|
|
return
|
|
for agent_dir in agents_dir.iterdir():
|
|
if not agent_dir.is_dir():
|
|
continue
|
|
agent_name = agent_dir.name
|
|
|
|
# Index positions
|
|
positions_dir = agent_dir / "positions"
|
|
if positions_dir.exists():
|
|
for md_file in positions_dir.glob("*.md"):
|
|
try:
|
|
fm, body = _parse_frontmatter(md_file)
|
|
title = fm.get("title", md_file.stem.replace("-", " ")) if fm else md_file.stem.replace("-", " ")
|
|
content = body[:500] if body else ""
|
|
self._positions.append({
|
|
"agent": agent_name,
|
|
"title": title,
|
|
"path": str(md_file),
|
|
"content": content,
|
|
})
|
|
except Exception as e:
|
|
logger.warning("Failed to index position %s: %s", md_file, e)
|
|
|
|
# Index beliefs (just the file, we'll excerpt on demand)
|
|
beliefs_file = agent_dir / "beliefs.md"
|
|
if beliefs_file.exists():
|
|
try:
|
|
content = beliefs_file.read_text()[:3000]
|
|
self._beliefs.append({
|
|
"agent": agent_name,
|
|
"path": str(beliefs_file),
|
|
"content": content,
|
|
})
|
|
except Exception as e:
|
|
logger.warning("Failed to index beliefs %s: %s", beliefs_file, e)
|
|
|
|
|
|
# ─── Retrieval ────────────────────────────────────────────────────────
|
|
|
|
|
|
def retrieve_context(query: str, repo_dir: str, index: KBIndex | None = None,
|
|
max_claims: int = 8, max_positions: int = 3) -> KBContext:
|
|
"""Main entry point: retrieve full KB context for a query.
|
|
|
|
Three layers:
|
|
1. Entity resolution — match query tokens to entities
|
|
2. Claim search — substring + keyword matching on titles and descriptions
|
|
3. Agent context — positions and beliefs referencing matched entities/claims
|
|
"""
|
|
if index is None:
|
|
index = KBIndex(repo_dir)
|
|
index.ensure_fresh()
|
|
|
|
ctx = KBContext()
|
|
|
|
# Normalize query
|
|
query_lower = query.lower()
|
|
query_tokens = _tokenize(query_lower)
|
|
|
|
# ── Layer 1: Entity Resolution ──
|
|
matched_entity_indices = set()
|
|
for token in query_tokens:
|
|
# Direct alias match
|
|
if token in index._entity_alias_map:
|
|
matched_entity_indices.update(index._entity_alias_map[token])
|
|
# Strip $ prefix for ticker lookup
|
|
if token.startswith("$"):
|
|
bare = token[1:]
|
|
if bare in index._entity_alias_map:
|
|
matched_entity_indices.update(index._entity_alias_map[bare])
|
|
|
|
# Also try substring match on entity names (e.g. "omnipair" in "OmniPair Protocol")
|
|
for i, ent in enumerate(index._entities):
|
|
for token in query_tokens:
|
|
if len(token) >= 3 and token in ent["name"].lower():
|
|
matched_entity_indices.add(i)
|
|
|
|
for idx in matched_entity_indices:
|
|
ent = index._entities[idx]
|
|
ctx.entities.append(EntityMatch(
|
|
name=ent["name"],
|
|
path=ent["path"],
|
|
entity_type=ent["type"],
|
|
domain=ent["domain"],
|
|
overview=_sanitize_for_prompt(ent["overview"]),
|
|
tags=ent["tags"],
|
|
related_claims=ent["related_claims"],
|
|
))
|
|
|
|
# Collect entity-related claim titles for boosting
|
|
entity_claim_titles = set()
|
|
for em in ctx.entities:
|
|
for rc in em.related_claims:
|
|
entity_claim_titles.add(rc.lower().replace("-", " "))
|
|
|
|
# ── Layer 2: Claim Search ──
|
|
scored_claims: list[tuple[float, dict]] = []
|
|
|
|
for claim in index._claims:
|
|
score = _score_claim(query_lower, query_tokens, claim, entity_claim_titles)
|
|
if score > 0:
|
|
scored_claims.append((score, claim))
|
|
|
|
scored_claims.sort(key=lambda x: x[0], reverse=True)
|
|
|
|
for score, claim in scored_claims[:max_claims]:
|
|
ctx.claims.append(ClaimMatch(
|
|
title=claim["title"],
|
|
path=claim["path"],
|
|
domain=claim["domain"],
|
|
confidence=claim["confidence"],
|
|
description=_sanitize_for_prompt(claim.get("description", "")),
|
|
score=score,
|
|
))
|
|
|
|
# ── Layer 3: Agent Context ──
|
|
# Find positions referencing matched entities or claims
|
|
match_terms = set(query_tokens)
|
|
for em in ctx.entities:
|
|
match_terms.add(em.name.lower())
|
|
for cm in ctx.claims:
|
|
# Add key words from matched claim titles
|
|
match_terms.update(t for t in cm.title.lower().split() if len(t) >= 4)
|
|
|
|
for pos in index._positions:
|
|
pos_text = (pos["title"] + " " + pos["content"]).lower()
|
|
overlap = sum(1 for t in match_terms if t in pos_text)
|
|
if overlap >= 2:
|
|
ctx.positions.append(PositionMatch(
|
|
agent=pos["agent"],
|
|
title=pos["title"],
|
|
content=_sanitize_for_prompt(pos["content"]),
|
|
))
|
|
if len(ctx.positions) >= max_positions:
|
|
break
|
|
|
|
# Extract relevant belief excerpts
|
|
for belief in index._beliefs:
|
|
belief_text = belief["content"].lower()
|
|
overlap = sum(1 for t in match_terms if t in belief_text)
|
|
if overlap >= 2:
|
|
# Extract relevant paragraphs
|
|
excerpts = _extract_relevant_paragraphs(belief["content"], match_terms, max_paragraphs=2)
|
|
for exc in excerpts:
|
|
ctx.belief_excerpts.append(f"**{belief['agent']}**: {_sanitize_for_prompt(exc)}")
|
|
|
|
# Stats
|
|
ctx.stats = {
|
|
"total_claims": len(index._claims),
|
|
"total_entities": len(index._entities),
|
|
"total_positions": len(index._positions),
|
|
"entities_matched": len(ctx.entities),
|
|
"claims_matched": len(ctx.claims),
|
|
}
|
|
|
|
return ctx
|
|
|
|
|
|
# ─── Scoring ──────────────────────────────────────────────────────────
|
|
|
|
|
|
def _score_claim(query_lower: str, query_tokens: list[str], claim: dict,
|
|
entity_claim_titles: set[str]) -> float:
|
|
"""Score a claim against a query. Higher = more relevant."""
|
|
title = claim["title"].lower()
|
|
desc = claim.get("description", "").lower()
|
|
searchable = title + " " + desc
|
|
score = 0.0
|
|
|
|
# Substring match on full query (highest signal)
|
|
for token in query_tokens:
|
|
if len(token) >= 3 and token in searchable:
|
|
score += 2.0 if token in title else 1.0
|
|
|
|
# Boost if this claim is wiki-linked from a matched entity
|
|
if any(t in title for t in entity_claim_titles):
|
|
score += 5.0
|
|
|
|
# Boost multi-word matches
|
|
if len(query_tokens) >= 2:
|
|
bigrams = [f"{query_tokens[i]} {query_tokens[i+1]}" for i in range(len(query_tokens) - 1)]
|
|
for bg in bigrams:
|
|
if bg in searchable:
|
|
score += 3.0
|
|
|
|
return score
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────
|
|
|
|
|
|
def _parse_frontmatter(path: Path) -> tuple[dict | None, str]:
|
|
"""Parse YAML frontmatter and body from a markdown file."""
|
|
try:
|
|
text = path.read_text(errors="replace")
|
|
except Exception:
|
|
return None, ""
|
|
|
|
if not text.startswith("---"):
|
|
return None, text
|
|
|
|
end = text.find("\n---", 3)
|
|
if end == -1:
|
|
return None, text
|
|
|
|
try:
|
|
fm = yaml.safe_load(text[3:end])
|
|
if not isinstance(fm, dict):
|
|
return None, text
|
|
body = text[end + 4:].strip()
|
|
return fm, body
|
|
except yaml.YAMLError:
|
|
return None, text
|
|
|
|
|
|
def _domain_from_path(path: Path, repo_dir: Path) -> str:
|
|
"""Infer domain from file path."""
|
|
rel = path.relative_to(repo_dir)
|
|
parts = rel.parts
|
|
if len(parts) >= 2 and parts[0] in ("domains", "entities", "decisions"):
|
|
return parts[1]
|
|
if len(parts) >= 1 and parts[0] == "core":
|
|
return "core"
|
|
if len(parts) >= 1 and parts[0] == "foundations":
|
|
return parts[1] if len(parts) >= 2 else "foundations"
|
|
return "unknown"
|
|
|
|
|
|
def _tokenize(text: str) -> list[str]:
|
|
"""Split query into searchable tokens."""
|
|
# Keep $ prefix for ticker matching
|
|
tokens = re.findall(r"\$?\w+", text.lower())
|
|
# Filter out very short stop words but keep short tickers
|
|
return [t for t in tokens if len(t) >= 2]
|
|
|
|
|
|
def _sanitize_for_prompt(text: str) -> str:
|
|
"""Sanitize content before injecting into LLM prompt (Ganymede: security)."""
|
|
# Strip code blocks
|
|
text = re.sub(r"```.*?```", "[code block removed]", text, flags=re.DOTALL)
|
|
# Strip anything that looks like system instructions
|
|
text = re.sub(r"(system:|assistant:|human:|<\|.*?\|>)", "", text, flags=re.IGNORECASE)
|
|
# Truncate
|
|
return text[:1000]
|
|
|
|
|
|
def _extract_relevant_paragraphs(text: str, terms: set[str], max_paragraphs: int = 2) -> list[str]:
|
|
"""Extract paragraphs from text that contain the most matching terms."""
|
|
paragraphs = text.split("\n\n")
|
|
scored = []
|
|
for p in paragraphs:
|
|
p_stripped = p.strip()
|
|
if len(p_stripped) < 20:
|
|
continue
|
|
p_lower = p_stripped.lower()
|
|
overlap = sum(1 for t in terms if t in p_lower)
|
|
if overlap > 0:
|
|
scored.append((overlap, p_stripped[:300]))
|
|
scored.sort(key=lambda x: x[0], reverse=True)
|
|
return [text for _, text in scored[:max_paragraphs]]
|
|
|
|
|
|
def format_context_for_prompt(ctx: KBContext) -> str:
|
|
"""Format KBContext as text for injection into the LLM prompt."""
|
|
sections = []
|
|
|
|
if ctx.entities:
|
|
sections.append("## Matched Entities")
|
|
for ent in ctx.entities:
|
|
sections.append(f"**{ent.name}** ({ent.entity_type}, {ent.domain})")
|
|
sections.append(ent.overview)
|
|
if ent.related_claims:
|
|
sections.append("Related claims: " + ", ".join(ent.related_claims[:5]))
|
|
sections.append("")
|
|
|
|
if ctx.claims:
|
|
sections.append("## Relevant KB Claims")
|
|
for claim in ctx.claims:
|
|
sections.append(f"- **{claim.title}** (confidence: {claim.confidence}, domain: {claim.domain})")
|
|
if claim.description:
|
|
sections.append(f" {claim.description}")
|
|
sections.append("")
|
|
|
|
if ctx.positions:
|
|
sections.append("## Agent Positions")
|
|
for pos in ctx.positions:
|
|
sections.append(f"**{pos.agent}**: {pos.title}")
|
|
sections.append(pos.content[:200])
|
|
sections.append("")
|
|
|
|
if ctx.belief_excerpts:
|
|
sections.append("## Relevant Beliefs")
|
|
for exc in ctx.belief_excerpts:
|
|
sections.append(exc)
|
|
sections.append("")
|
|
|
|
if not sections:
|
|
return "No relevant KB content found for this query."
|
|
|
|
# Add stats footer
|
|
sections.append(f"---\nKB: {ctx.stats.get('total_claims', '?')} claims, "
|
|
f"{ctx.stats.get('total_entities', '?')} entities. "
|
|
f"Matched: {ctx.stats.get('entities_matched', 0)} entities, "
|
|
f"{ctx.stats.get('claims_matched', 0)} claims.")
|
|
|
|
return "\n".join(sections)
|