From 7d0ae0c8cb7137105bdb177604e380c614907140 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Wed, 1 Apr 2026 16:39:22 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20add=20cascade=20automation=20=E2=80=94?= =?UTF-8?q?=20auto-flag=20dependent=20beliefs/positions=20on=20claim=20cha?= =?UTF-8?q?nge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After each PR merge, cascade.py scans the diff for changed claim files, then checks all agent beliefs and positions for depends_on references. Affected agents get inbox notifications via agent-state. Non-fatal — cascade failure never blocks merges. Hook point: merge.py after _embed_merged_claims, before _delete_remote_branch. Co-Authored-By: Claude Opus 4.6 (1M context) --- lib/cascade.py | 249 +++++++++++++++++++++++++++++++++++++++++++++++++ lib/merge.py | 9 ++ 2 files changed, 258 insertions(+) create mode 100644 lib/cascade.py diff --git a/lib/cascade.py b/lib/cascade.py new file mode 100644 index 0000000..c40ba08 --- /dev/null +++ b/lib/cascade.py @@ -0,0 +1,249 @@ +"""Cascade automation — auto-flag dependent beliefs/positions when claims change. + +Hook point: called from merge.py after _embed_merged_claims, before _delete_remote_branch. +Uses the same main_sha/branch_sha diff to detect changed claim files, then scans +all agent beliefs and positions for depends_on references to those claims. + +Notifications are written to /opt/teleo-eval/agent-state/{agent}/inbox/ using +the same atomic-write pattern as lib-state.sh. +""" + +import asyncio +import json +import logging +import os +import re +import tempfile +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger("pipeline.cascade") + +AGENT_STATE_DIR = Path("/opt/teleo-eval/agent-state") +CLAIM_DIRS = {"domains/", "core/", "foundations/", "decisions/"} +AGENT_NAMES = ["rio", "leo", "clay", "astra", "vida", "theseus"] + + +def _extract_claim_titles_from_diff(diff_files: list[str]) -> set[str]: + """Extract claim titles from changed file paths.""" + titles = set() + for fpath in diff_files: + if not fpath.endswith(".md"): + continue + if not any(fpath.startswith(d) for d in CLAIM_DIRS): + continue + basename = os.path.basename(fpath) + if basename.startswith("_") or basename == "directory.md": + continue + title = basename.removesuffix(".md") + titles.add(title) + return titles + + +def _normalize_for_match(text: str) -> str: + """Normalize for fuzzy matching: lowercase, hyphens to spaces, strip punctuation, collapse whitespace.""" + text = text.lower().strip() + text = text.replace("-", " ") + text = re.sub(r"[^\w\s]", "", text) + text = re.sub(r"\s+", " ", text) + return text + + +def _slug_to_words(slug: str) -> str: + """Convert kebab-case slug to space-separated words.""" + return slug.replace("-", " ") + + +def _parse_depends_on(file_path: Path) -> tuple[str, list[str]]: + """Parse a belief or position file's depends_on entries. + + Returns (agent_name, [dependency_titles]). + """ + try: + content = file_path.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + return ("", []) + + agent = "" + deps = [] + in_frontmatter = False + in_depends = False + + for line in content.split("\n"): + if line.strip() == "---": + if not in_frontmatter: + in_frontmatter = True + continue + else: + break + + if in_frontmatter: + if line.startswith("agent:"): + agent = line.split(":", 1)[1].strip().strip('"').strip("'") + elif line.startswith("depends_on:"): + in_depends = True + rest = line.split(":", 1)[1].strip() + if rest.startswith("["): + items = re.findall(r'"([^"]+)"|\'([^\']+)\'', rest) + for item in items: + dep = item[0] or item[1] + dep = dep.strip("[]").replace("[[", "").replace("]]", "") + deps.append(dep) + in_depends = False + elif in_depends: + if line.startswith(" - "): + dep = line.strip().lstrip("- ").strip('"').strip("'") + dep = dep.replace("[[", "").replace("]]", "") + deps.append(dep) + elif line.strip() and not line.startswith(" "): + in_depends = False + + # Also scan body for [[wiki-links]] + body_links = re.findall(r"\[\[([^\]]+)\]\]", content) + for link in body_links: + if link not in deps: + deps.append(link) + + return (agent, deps) + + +def _write_inbox_message(agent: str, subject: str, body: str) -> bool: + """Write a cascade notification to an agent's inbox. Atomic tmp+rename.""" + inbox_dir = AGENT_STATE_DIR / agent / "inbox" + if not inbox_dir.exists(): + logger.warning("cascade: no inbox dir for agent %s, skipping", agent) + return False + + ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") + filename = f"cascade-{ts}-{subject[:60]}.md" + final_path = inbox_dir / filename + + try: + fd, tmp_path = tempfile.mkstemp(dir=str(inbox_dir), suffix=".tmp") + with os.fdopen(fd, "w") as f: + f.write(f"---\n") + f.write(f"type: cascade\n") + f.write(f"from: pipeline\n") + f.write(f"to: {agent}\n") + f.write(f"subject: \"{subject}\"\n") + f.write(f"created: {datetime.now(timezone.utc).isoformat()}\n") + f.write(f"status: unread\n") + f.write(f"---\n\n") + f.write(body) + os.rename(tmp_path, str(final_path)) + return True + except OSError: + logger.exception("cascade: failed to write inbox message for %s", agent) + return False + + +def _find_matches(deps: list[str], claim_lookup: dict[str, str]) -> list[str]: + """Check if any dependency matches a changed claim.""" + matched = [] + for dep in deps: + norm = _normalize_for_match(dep) + if norm in claim_lookup: + matched.append(claim_lookup[norm]) + else: + for claim_norm, claim_orig in claim_lookup.items(): + if claim_norm in norm or norm in claim_norm: + matched.append(claim_orig) + break + return matched + + +def _format_cascade_body( + file_name: str, + file_type: str, + matched_claims: list[str], + pr_num: int, +) -> str: + """Format the cascade notification body.""" + claims_list = "\n".join(f"- {c}" for c in matched_claims) + return ( + f"# Cascade: upstream claims changed\n\n" + f"Your {file_type} **{file_name}** depends on claims that were modified in PR #{pr_num}.\n\n" + f"## Changed claims\n\n{claims_list}\n\n" + f"## Action needed\n\n" + f"Review whether your {file_type}'s confidence, description, or grounding " + f"needs updating in light of these changes. If the evidence strengthened, " + f"consider increasing confidence. If it weakened or contradicted, flag for " + f"re-evaluation.\n" + ) + + +async def cascade_after_merge( + main_sha: str, + branch_sha: str, + pr_num: int, + main_worktree: Path, +) -> int: + """Scan for beliefs/positions affected by claims changed in this merge. + + Returns the number of cascade notifications sent. + """ + # 1. Get changed files + proc = await asyncio.create_subprocess_exec( + "git", "diff", "--name-only", "--diff-filter=ACMR", + main_sha, branch_sha, + cwd=str(main_worktree), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + logger.warning("cascade: git diff timed out") + return 0 + + if proc.returncode != 0: + logger.warning("cascade: git diff failed (rc=%d)", proc.returncode) + return 0 + + diff_files = [f for f in stdout.decode().strip().split("\n") if f] + + # 2. Extract claim titles from changed files + changed_claims = _extract_claim_titles_from_diff(diff_files) + if not changed_claims: + return 0 + + logger.info("cascade: %d claims changed in PR #%d: %s", + len(changed_claims), pr_num, list(changed_claims)[:5]) + + # Build normalized lookup for fuzzy matching + claim_lookup = {} + for claim in changed_claims: + claim_lookup[_normalize_for_match(claim)] = claim + claim_lookup[_normalize_for_match(_slug_to_words(claim))] = claim + + # 3. Scan all beliefs and positions + notifications = 0 + agents_dir = main_worktree / "agents" + if not agents_dir.exists(): + logger.warning("cascade: no agents/ dir in worktree") + return 0 + + for agent_name in AGENT_NAMES: + agent_dir = agents_dir / agent_name + if not agent_dir.exists(): + continue + + for subdir, file_type in [("beliefs", "belief"), ("positions", "position")]: + target_dir = agent_dir / subdir + if not target_dir.exists(): + continue + for md_file in target_dir.glob("*.md"): + _, deps = _parse_depends_on(md_file) + matched = _find_matches(deps, claim_lookup) + if matched: + body = _format_cascade_body(md_file.name, file_type, matched, pr_num) + if _write_inbox_message(agent_name, f"claim-changed-affects-{file_type}", body): + notifications += 1 + logger.info("cascade: notified %s — %s '%s' affected by %s", + agent_name, file_type, md_file.stem, matched) + + if notifications: + logger.info("cascade: sent %d notifications for PR #%d", notifications, pr_num) + return notifications diff --git a/lib/merge.py b/lib/merge.py index 6099e29..89261ca 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -23,6 +23,7 @@ from . import config, db from .db import classify_branch from .dedup import dedup_evidence_blocks from .domains import detect_domain_from_branch +from .cascade import cascade_after_merge from .forgejo import api as forgejo_api # Pipeline-owned branch prefixes — only these get auto-merged. @@ -1046,6 +1047,14 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: # Embed new/changed claims into Qdrant (non-fatal) await _embed_merged_claims(main_sha, branch_sha) + + # Cascade: notify agents whose beliefs/positions depend on changed claims + try: + cascaded = await cascade_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE) + if cascaded: + logger.info("PR #%d: %d cascade notifications sent", pr_num, cascaded) + except Exception: + logger.exception("PR #%d: cascade check failed (non-fatal)", pr_num) # Delete remote branch immediately (Ganymede Q4) await _delete_remote_branch(branch)