Compare commits
5 commits
main
...
epimetheus
| Author | SHA1 | Date | |
|---|---|---|---|
| 7d0ae0c8cb | |||
| 6e8aab2dd9 | |||
| b766176259 | |||
| 71b62e6d03 | |||
| eee2b5c78b |
6 changed files with 348 additions and 42 deletions
|
|
@ -100,6 +100,13 @@ for SOURCE in $SOURCES; do
|
||||||
BASENAME=$(basename "$SOURCE" .md)
|
BASENAME=$(basename "$SOURCE" .md)
|
||||||
BRANCH="extract/$BASENAME"
|
BRANCH="extract/$BASENAME"
|
||||||
|
|
||||||
|
# Skip already-processed sources (status set by extraction script, merged to main)
|
||||||
|
if grep -q "^status: processed\|^status: enrichment\|^status: null-result" "$SOURCE" 2>/dev/null; then
|
||||||
|
echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (already processed — status in frontmatter)" >> $LOG
|
||||||
|
SKIPPED=$((SKIPPED + 1))
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
|
||||||
# Skip conversation archives — valuable content enters through standalone sources,
|
# Skip conversation archives — valuable content enters through standalone sources,
|
||||||
# inline tags (SOURCE:/CLAIM:), and transcript review. Raw conversations produce
|
# inline tags (SOURCE:/CLAIM:), and transcript review. Raw conversations produce
|
||||||
# low-quality claims with schema failures. (Epimetheus session 4)
|
# low-quality claims with schema failures. (Epimetheus session 4)
|
||||||
|
|
@ -244,6 +251,32 @@ Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>" >> $LOG 2>&1
|
||||||
SUCCESS=$((SUCCESS + 1))
|
SUCCESS=$((SUCCESS + 1))
|
||||||
echo " -> SUCCESS ($CHANGED files)" >> $LOG
|
echo " -> SUCCESS ($CHANGED files)" >> $LOG
|
||||||
|
|
||||||
|
# Move source from queue/ to archive/ on main worktree (prevents re-extraction)
|
||||||
|
DOMAIN=$(grep -m1 "^domain:" "$MAIN_REPO/inbox/queue/$BASENAME.md" 2>/dev/null | sed 's/^domain:\s*//')
|
||||||
|
ARCHIVE_DIR="$MAIN_REPO/inbox/archive/${DOMAIN:-uncategorized}"
|
||||||
|
mkdir -p "$ARCHIVE_DIR"
|
||||||
|
if [ -f "$MAIN_REPO/inbox/queue/$BASENAME.md" ]; then
|
||||||
|
mv "$MAIN_REPO/inbox/queue/$BASENAME.md" "$ARCHIVE_DIR/$BASENAME.md"
|
||||||
|
cd $MAIN_REPO
|
||||||
|
git add "inbox/queue/$BASENAME.md" "inbox/archive/" 2>/dev/null
|
||||||
|
git commit -m "pipeline: archive $BASENAME after extraction
|
||||||
|
|
||||||
|
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>" >> $LOG 2>&1
|
||||||
|
PUSH_OK=0
|
||||||
|
for attempt in 1 2 3; do
|
||||||
|
git pull --rebase origin main >> $LOG 2>&1
|
||||||
|
git push origin main >> $LOG 2>&1 && { PUSH_OK=1; break; }
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
if [ "$PUSH_OK" -eq 0 ]; then
|
||||||
|
echo " -> WARNING: archive commit for $BASENAME failed to push after 3 attempts" >> $LOG
|
||||||
|
fi
|
||||||
|
cd $REPO
|
||||||
|
git fetch origin main >> $LOG 2>&1
|
||||||
|
git reset --hard origin/main >> $LOG 2>&1
|
||||||
|
echo " -> Archived $BASENAME to $ARCHIVE_DIR" >> $LOG
|
||||||
|
fi
|
||||||
|
|
||||||
# Back to main
|
# Back to main
|
||||||
git checkout -f main >> $LOG 2>&1
|
git checkout -f main >> $LOG 2>&1
|
||||||
|
|
||||||
|
|
|
||||||
249
lib/cascade.py
Normal file
249
lib/cascade.py
Normal file
|
|
@ -0,0 +1,249 @@
|
||||||
|
"""Cascade automation — auto-flag dependent beliefs/positions when claims change.
|
||||||
|
|
||||||
|
Hook point: called from merge.py after _embed_merged_claims, before _delete_remote_branch.
|
||||||
|
Uses the same main_sha/branch_sha diff to detect changed claim files, then scans
|
||||||
|
all agent beliefs and positions for depends_on references to those claims.
|
||||||
|
|
||||||
|
Notifications are written to /opt/teleo-eval/agent-state/{agent}/inbox/ using
|
||||||
|
the same atomic-write pattern as lib-state.sh.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import tempfile
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
logger = logging.getLogger("pipeline.cascade")
|
||||||
|
|
||||||
|
AGENT_STATE_DIR = Path("/opt/teleo-eval/agent-state")
|
||||||
|
CLAIM_DIRS = {"domains/", "core/", "foundations/", "decisions/"}
|
||||||
|
AGENT_NAMES = ["rio", "leo", "clay", "astra", "vida", "theseus"]
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_claim_titles_from_diff(diff_files: list[str]) -> set[str]:
|
||||||
|
"""Extract claim titles from changed file paths."""
|
||||||
|
titles = set()
|
||||||
|
for fpath in diff_files:
|
||||||
|
if not fpath.endswith(".md"):
|
||||||
|
continue
|
||||||
|
if not any(fpath.startswith(d) for d in CLAIM_DIRS):
|
||||||
|
continue
|
||||||
|
basename = os.path.basename(fpath)
|
||||||
|
if basename.startswith("_") or basename == "directory.md":
|
||||||
|
continue
|
||||||
|
title = basename.removesuffix(".md")
|
||||||
|
titles.add(title)
|
||||||
|
return titles
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_for_match(text: str) -> str:
|
||||||
|
"""Normalize for fuzzy matching: lowercase, hyphens to spaces, strip punctuation, collapse whitespace."""
|
||||||
|
text = text.lower().strip()
|
||||||
|
text = text.replace("-", " ")
|
||||||
|
text = re.sub(r"[^\w\s]", "", text)
|
||||||
|
text = re.sub(r"\s+", " ", text)
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
def _slug_to_words(slug: str) -> str:
|
||||||
|
"""Convert kebab-case slug to space-separated words."""
|
||||||
|
return slug.replace("-", " ")
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_depends_on(file_path: Path) -> tuple[str, list[str]]:
|
||||||
|
"""Parse a belief or position file's depends_on entries.
|
||||||
|
|
||||||
|
Returns (agent_name, [dependency_titles]).
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
content = file_path.read_text(encoding="utf-8")
|
||||||
|
except (OSError, UnicodeDecodeError):
|
||||||
|
return ("", [])
|
||||||
|
|
||||||
|
agent = ""
|
||||||
|
deps = []
|
||||||
|
in_frontmatter = False
|
||||||
|
in_depends = False
|
||||||
|
|
||||||
|
for line in content.split("\n"):
|
||||||
|
if line.strip() == "---":
|
||||||
|
if not in_frontmatter:
|
||||||
|
in_frontmatter = True
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
if in_frontmatter:
|
||||||
|
if line.startswith("agent:"):
|
||||||
|
agent = line.split(":", 1)[1].strip().strip('"').strip("'")
|
||||||
|
elif line.startswith("depends_on:"):
|
||||||
|
in_depends = True
|
||||||
|
rest = line.split(":", 1)[1].strip()
|
||||||
|
if rest.startswith("["):
|
||||||
|
items = re.findall(r'"([^"]+)"|\'([^\']+)\'', rest)
|
||||||
|
for item in items:
|
||||||
|
dep = item[0] or item[1]
|
||||||
|
dep = dep.strip("[]").replace("[[", "").replace("]]", "")
|
||||||
|
deps.append(dep)
|
||||||
|
in_depends = False
|
||||||
|
elif in_depends:
|
||||||
|
if line.startswith(" - "):
|
||||||
|
dep = line.strip().lstrip("- ").strip('"').strip("'")
|
||||||
|
dep = dep.replace("[[", "").replace("]]", "")
|
||||||
|
deps.append(dep)
|
||||||
|
elif line.strip() and not line.startswith(" "):
|
||||||
|
in_depends = False
|
||||||
|
|
||||||
|
# Also scan body for [[wiki-links]]
|
||||||
|
body_links = re.findall(r"\[\[([^\]]+)\]\]", content)
|
||||||
|
for link in body_links:
|
||||||
|
if link not in deps:
|
||||||
|
deps.append(link)
|
||||||
|
|
||||||
|
return (agent, deps)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_inbox_message(agent: str, subject: str, body: str) -> bool:
|
||||||
|
"""Write a cascade notification to an agent's inbox. Atomic tmp+rename."""
|
||||||
|
inbox_dir = AGENT_STATE_DIR / agent / "inbox"
|
||||||
|
if not inbox_dir.exists():
|
||||||
|
logger.warning("cascade: no inbox dir for agent %s, skipping", agent)
|
||||||
|
return False
|
||||||
|
|
||||||
|
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
|
||||||
|
filename = f"cascade-{ts}-{subject[:60]}.md"
|
||||||
|
final_path = inbox_dir / filename
|
||||||
|
|
||||||
|
try:
|
||||||
|
fd, tmp_path = tempfile.mkstemp(dir=str(inbox_dir), suffix=".tmp")
|
||||||
|
with os.fdopen(fd, "w") as f:
|
||||||
|
f.write(f"---\n")
|
||||||
|
f.write(f"type: cascade\n")
|
||||||
|
f.write(f"from: pipeline\n")
|
||||||
|
f.write(f"to: {agent}\n")
|
||||||
|
f.write(f"subject: \"{subject}\"\n")
|
||||||
|
f.write(f"created: {datetime.now(timezone.utc).isoformat()}\n")
|
||||||
|
f.write(f"status: unread\n")
|
||||||
|
f.write(f"---\n\n")
|
||||||
|
f.write(body)
|
||||||
|
os.rename(tmp_path, str(final_path))
|
||||||
|
return True
|
||||||
|
except OSError:
|
||||||
|
logger.exception("cascade: failed to write inbox message for %s", agent)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _find_matches(deps: list[str], claim_lookup: dict[str, str]) -> list[str]:
|
||||||
|
"""Check if any dependency matches a changed claim."""
|
||||||
|
matched = []
|
||||||
|
for dep in deps:
|
||||||
|
norm = _normalize_for_match(dep)
|
||||||
|
if norm in claim_lookup:
|
||||||
|
matched.append(claim_lookup[norm])
|
||||||
|
else:
|
||||||
|
for claim_norm, claim_orig in claim_lookup.items():
|
||||||
|
if claim_norm in norm or norm in claim_norm:
|
||||||
|
matched.append(claim_orig)
|
||||||
|
break
|
||||||
|
return matched
|
||||||
|
|
||||||
|
|
||||||
|
def _format_cascade_body(
|
||||||
|
file_name: str,
|
||||||
|
file_type: str,
|
||||||
|
matched_claims: list[str],
|
||||||
|
pr_num: int,
|
||||||
|
) -> str:
|
||||||
|
"""Format the cascade notification body."""
|
||||||
|
claims_list = "\n".join(f"- {c}" for c in matched_claims)
|
||||||
|
return (
|
||||||
|
f"# Cascade: upstream claims changed\n\n"
|
||||||
|
f"Your {file_type} **{file_name}** depends on claims that were modified in PR #{pr_num}.\n\n"
|
||||||
|
f"## Changed claims\n\n{claims_list}\n\n"
|
||||||
|
f"## Action needed\n\n"
|
||||||
|
f"Review whether your {file_type}'s confidence, description, or grounding "
|
||||||
|
f"needs updating in light of these changes. If the evidence strengthened, "
|
||||||
|
f"consider increasing confidence. If it weakened or contradicted, flag for "
|
||||||
|
f"re-evaluation.\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def cascade_after_merge(
|
||||||
|
main_sha: str,
|
||||||
|
branch_sha: str,
|
||||||
|
pr_num: int,
|
||||||
|
main_worktree: Path,
|
||||||
|
) -> int:
|
||||||
|
"""Scan for beliefs/positions affected by claims changed in this merge.
|
||||||
|
|
||||||
|
Returns the number of cascade notifications sent.
|
||||||
|
"""
|
||||||
|
# 1. Get changed files
|
||||||
|
proc = await asyncio.create_subprocess_exec(
|
||||||
|
"git", "diff", "--name-only", "--diff-filter=ACMR",
|
||||||
|
main_sha, branch_sha,
|
||||||
|
cwd=str(main_worktree),
|
||||||
|
stdout=asyncio.subprocess.PIPE,
|
||||||
|
stderr=asyncio.subprocess.PIPE,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
proc.kill()
|
||||||
|
await proc.wait()
|
||||||
|
logger.warning("cascade: git diff timed out")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if proc.returncode != 0:
|
||||||
|
logger.warning("cascade: git diff failed (rc=%d)", proc.returncode)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
diff_files = [f for f in stdout.decode().strip().split("\n") if f]
|
||||||
|
|
||||||
|
# 2. Extract claim titles from changed files
|
||||||
|
changed_claims = _extract_claim_titles_from_diff(diff_files)
|
||||||
|
if not changed_claims:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
logger.info("cascade: %d claims changed in PR #%d: %s",
|
||||||
|
len(changed_claims), pr_num, list(changed_claims)[:5])
|
||||||
|
|
||||||
|
# Build normalized lookup for fuzzy matching
|
||||||
|
claim_lookup = {}
|
||||||
|
for claim in changed_claims:
|
||||||
|
claim_lookup[_normalize_for_match(claim)] = claim
|
||||||
|
claim_lookup[_normalize_for_match(_slug_to_words(claim))] = claim
|
||||||
|
|
||||||
|
# 3. Scan all beliefs and positions
|
||||||
|
notifications = 0
|
||||||
|
agents_dir = main_worktree / "agents"
|
||||||
|
if not agents_dir.exists():
|
||||||
|
logger.warning("cascade: no agents/ dir in worktree")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
for agent_name in AGENT_NAMES:
|
||||||
|
agent_dir = agents_dir / agent_name
|
||||||
|
if not agent_dir.exists():
|
||||||
|
continue
|
||||||
|
|
||||||
|
for subdir, file_type in [("beliefs", "belief"), ("positions", "position")]:
|
||||||
|
target_dir = agent_dir / subdir
|
||||||
|
if not target_dir.exists():
|
||||||
|
continue
|
||||||
|
for md_file in target_dir.glob("*.md"):
|
||||||
|
_, deps = _parse_depends_on(md_file)
|
||||||
|
matched = _find_matches(deps, claim_lookup)
|
||||||
|
if matched:
|
||||||
|
body = _format_cascade_body(md_file.name, file_type, matched, pr_num)
|
||||||
|
if _write_inbox_message(agent_name, f"claim-changed-affects-{file_type}", body):
|
||||||
|
notifications += 1
|
||||||
|
logger.info("cascade: notified %s — %s '%s' affected by %s",
|
||||||
|
agent_name, file_type, md_file.stem, matched)
|
||||||
|
|
||||||
|
if notifications:
|
||||||
|
logger.info("cascade: sent %d notifications for PR #%d", notifications, pr_num)
|
||||||
|
return notifications
|
||||||
|
|
@ -23,6 +23,7 @@ from . import config, db
|
||||||
from .db import classify_branch
|
from .db import classify_branch
|
||||||
from .dedup import dedup_evidence_blocks
|
from .dedup import dedup_evidence_blocks
|
||||||
from .domains import detect_domain_from_branch
|
from .domains import detect_domain_from_branch
|
||||||
|
from .cascade import cascade_after_merge
|
||||||
from .forgejo import api as forgejo_api
|
from .forgejo import api as forgejo_api
|
||||||
|
|
||||||
# Pipeline-owned branch prefixes — only these get auto-merged.
|
# Pipeline-owned branch prefixes — only these get auto-merged.
|
||||||
|
|
@ -1046,6 +1047,14 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
|
||||||
# Embed new/changed claims into Qdrant (non-fatal)
|
# Embed new/changed claims into Qdrant (non-fatal)
|
||||||
await _embed_merged_claims(main_sha, branch_sha)
|
await _embed_merged_claims(main_sha, branch_sha)
|
||||||
|
|
||||||
|
|
||||||
|
# Cascade: notify agents whose beliefs/positions depend on changed claims
|
||||||
|
try:
|
||||||
|
cascaded = await cascade_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE)
|
||||||
|
if cascaded:
|
||||||
|
logger.info("PR #%d: %d cascade notifications sent", pr_num, cascaded)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("PR #%d: cascade check failed (non-fatal)", pr_num)
|
||||||
# Delete remote branch immediately (Ganymede Q4)
|
# Delete remote branch immediately (Ganymede Q4)
|
||||||
await _delete_remote_branch(branch)
|
await _delete_remote_branch(branch)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -163,15 +163,29 @@ def fix_frontmatter(content: str, domain: str, agent: str) -> tuple[str, list[st
|
||||||
|
|
||||||
|
|
||||||
def fix_wiki_links(content: str, existing_claims: set[str]) -> tuple[str, list[str]]:
|
def fix_wiki_links(content: str, existing_claims: set[str]) -> tuple[str, list[str]]:
|
||||||
"""Strip brackets from broken wiki links, keeping the text. Returns (fixed_content, fixes)."""
|
"""Fix or strip broken wiki links. Resolves slug→space mismatches before stripping.
|
||||||
|
|
||||||
|
The LLM often generates wiki links as slugs (hyphens) but KB filenames use spaces.
|
||||||
|
Try normalizing hyphens→spaces before giving up and stripping brackets.
|
||||||
|
"""
|
||||||
fixes = []
|
fixes = []
|
||||||
|
# Build a lookup: normalized (lowercased, hyphens→spaces) → original stem
|
||||||
|
_normalized_lookup: dict[str, str] = {}
|
||||||
|
for stem in existing_claims:
|
||||||
|
_normalized_lookup[stem.lower().replace("-", " ")] = stem
|
||||||
|
|
||||||
def replace_broken(match):
|
def replace_broken(match):
|
||||||
link = match.group(1).strip()
|
link = match.group(1).strip()
|
||||||
if link not in existing_claims:
|
if link in existing_claims:
|
||||||
fixes.append(f"stripped_wiki_link:{link[:60]}")
|
return match.group(0) # Exact match — keep as-is
|
||||||
return link # Keep text, remove brackets
|
# Try normalizing slug to spaces
|
||||||
return match.group(0)
|
normalized = link.lower().replace("-", " ")
|
||||||
|
if normalized in _normalized_lookup:
|
||||||
|
resolved = _normalized_lookup[normalized]
|
||||||
|
fixes.append(f"resolved_wiki_link:{link[:40]}->{resolved[:40]}")
|
||||||
|
return f"[[{resolved}]]"
|
||||||
|
fixes.append(f"stripped_wiki_link:{link[:60]}")
|
||||||
|
return link # Keep text, remove brackets
|
||||||
|
|
||||||
fixed = WIKI_LINK_RE.sub(replace_broken, content)
|
fixed = WIKI_LINK_RE.sub(replace_broken, content)
|
||||||
return fixed, fixes
|
return fixed, fixes
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ sys.path.insert(0, str(Path(__file__).parent))
|
||||||
|
|
||||||
from lib.extraction_prompt import build_extraction_prompt
|
from lib.extraction_prompt import build_extraction_prompt
|
||||||
from lib.post_extract import (
|
from lib.post_extract import (
|
||||||
|
fix_wiki_links,
|
||||||
load_existing_claims_from_repo,
|
load_existing_claims_from_repo,
|
||||||
validate_and_fix_claims,
|
validate_and_fix_claims,
|
||||||
validate_and_fix_entities,
|
validate_and_fix_entities,
|
||||||
|
|
@ -262,7 +263,7 @@ def reconstruct_claim_content(claim, domain, agent):
|
||||||
]
|
]
|
||||||
for r in related[:5]:
|
for r in related[:5]:
|
||||||
lines.append(f"- [[{r}]]")
|
lines.append(f"- [[{r}]]")
|
||||||
lines.extend(["", "Topics:", "- [[_map]]", ""])
|
lines.extend(["", "Topics:", ""])
|
||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -456,6 +457,22 @@ def main():
|
||||||
written.append(filename)
|
written.append(filename)
|
||||||
print(f" Wrote: {claim_path}")
|
print(f" Wrote: {claim_path}")
|
||||||
|
|
||||||
|
# ── Fix wiki links: resolve slugs→spaces, strip dead links ──
|
||||||
|
if written:
|
||||||
|
existing_stems = {Path(c).stem for c in existing_claims}
|
||||||
|
wiki_fix_count = 0
|
||||||
|
for filename in written:
|
||||||
|
claim_path = os.path.join(domain_dir, filename)
|
||||||
|
with open(claim_path) as f:
|
||||||
|
content = f.read()
|
||||||
|
fixed_content, fixes = fix_wiki_links(content, existing_stems)
|
||||||
|
if fixes:
|
||||||
|
with open(claim_path, "w") as f:
|
||||||
|
f.write(fixed_content)
|
||||||
|
wiki_fix_count += len(fixes)
|
||||||
|
if wiki_fix_count:
|
||||||
|
print(f" Wiki links: {wiki_fix_count} fixed/stripped")
|
||||||
|
|
||||||
# ── Atomic connect: wire new claims to existing KB via vector search ──
|
# ── Atomic connect: wire new claims to existing KB via vector search ──
|
||||||
connect_stats = {"connected": 0, "edges_added": 0}
|
connect_stats = {"connected": 0, "edges_added": 0}
|
||||||
if written:
|
if written:
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,8 @@ from telegram.ext import (
|
||||||
|
|
||||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||||
import json as _json
|
import json as _json
|
||||||
from kb_retrieval import KBIndex, format_context_for_prompt, retrieve_context
|
from kb_retrieval import KBIndex, retrieve_context, retrieve_vector_context
|
||||||
|
from retrieval import orchestrate_retrieval
|
||||||
from market_data import get_token_price, format_price_context
|
from market_data import get_token_price, format_price_context
|
||||||
from worktree_lock import main_worktree_lock
|
from worktree_lock import main_worktree_lock
|
||||||
from x_client import search_tweets, fetch_from_url, check_research_rate_limit, record_research_usage, get_research_remaining
|
from x_client import search_tweets, fetch_from_url, check_research_rate_limit, record_research_usage, get_research_remaining
|
||||||
|
|
@ -994,41 +995,24 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
logger.warning("Query reformulation failed: %s", e)
|
logger.warning("Query reformulation failed: %s", e)
|
||||||
# Fall through — use raw text
|
# Fall through — use raw text
|
||||||
|
|
||||||
# Retrieve full KB context (entity resolution + claim search + agent positions)
|
# Unified retrieval: keyword → decompose → vector → RRF merge
|
||||||
t_kb = time.monotonic()
|
# Both systems search independently; RRF boosts claims found by both.
|
||||||
kb_ctx = retrieve_context(search_query_text, KB_READ_DIR, index=kb_index)
|
def _vector_fn(q):
|
||||||
kb_context_text = format_context_for_prompt(kb_ctx)
|
return retrieve_vector_context(q) # no keyword_paths exclusion — RRF deduplicates
|
||||||
kb_duration = int((time.monotonic() - t_kb) * 1000)
|
retrieval_result = await orchestrate_retrieval(
|
||||||
retrieval_layers = ["keyword"] if (kb_ctx and (kb_ctx.entities or kb_ctx.claims)) else []
|
text=text,
|
||||||
tool_calls.append({
|
search_query=search_query_text,
|
||||||
"tool": "retrieve_context",
|
kb_read_dir=KB_READ_DIR,
|
||||||
"input": {"query": search_query_text[:200], "original_query": text[:200] if search_query_text != text else None},
|
kb_index=kb_index,
|
||||||
"output": {"entities": len(kb_ctx.entities) if kb_ctx else 0,
|
llm_fn=call_openrouter,
|
||||||
"claims": len(kb_ctx.claims) if kb_ctx else 0},
|
triage_model=TRIAGE_MODEL,
|
||||||
"duration_ms": kb_duration,
|
retrieve_context_fn=retrieve_context,
|
||||||
})
|
retrieve_vector_fn=_vector_fn,
|
||||||
|
)
|
||||||
# Layer 1+2: Qdrant vector search + graph expansion (semantic, complements keyword)
|
kb_context_text = retrieval_result["kb_context_text"]
|
||||||
# Pass keyword-matched paths to exclude duplicates at Qdrant query level
|
kb_ctx = retrieval_result["kb_ctx"]
|
||||||
# Normalize: KBIndex stores absolute paths, Qdrant stores repo-relative paths
|
retrieval_layers = retrieval_result["retrieval_layers"]
|
||||||
keyword_paths = []
|
tool_calls.extend(retrieval_result["tool_calls"])
|
||||||
if kb_ctx and kb_ctx.claims:
|
|
||||||
for c in kb_ctx.claims:
|
|
||||||
p = c.path
|
|
||||||
if KB_READ_DIR and p.startswith(KB_READ_DIR):
|
|
||||||
p = p[len(KB_READ_DIR):].lstrip("/")
|
|
||||||
keyword_paths.append(p)
|
|
||||||
from kb_retrieval import retrieve_vector_context
|
|
||||||
vector_context, vector_meta = retrieve_vector_context(search_query_text, keyword_paths=keyword_paths)
|
|
||||||
if vector_context:
|
|
||||||
kb_context_text = kb_context_text + "\n\n" + vector_context
|
|
||||||
retrieval_layers.extend(vector_meta.get("layers_hit", []))
|
|
||||||
tool_calls.append({
|
|
||||||
"tool": "retrieve_qdrant_context", "input": {"query": text[:200]},
|
|
||||||
"output": {"direct_hits": len(vector_meta.get("direct_results", [])),
|
|
||||||
"expanded": len(vector_meta.get("expanded_results", []))},
|
|
||||||
"duration_ms": vector_meta.get("duration_ms", 0),
|
|
||||||
})
|
|
||||||
|
|
||||||
stats = get_db_stats()
|
stats = get_db_stats()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue