diff --git a/scripts/apply_proposal.py b/scripts/apply_proposal.py new file mode 100644 index 0000000..8757e81 --- /dev/null +++ b/scripts/apply_proposal.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python3 +"""Apply an approved kb_stage proposal into canonical public.* rows. + +Stage 2 of the KB apply pipeline: approve -> APPLY -> render -> surface. + +Governance boundary +------------------- +This tool connects to Postgres as the narrow ``kb_apply`` login role, never as +superuser. ``kb_apply`` can write only ``public.strategies``, +``public.strategy_nodes``, ``public.claim_evidence``, ``public.claim_edges`` and +update the ``kb_stage.kb_proposals`` ledger. It holds no DELETE, no DDL, and no +access to any other table (agents, budgets, personas, ...). This enforces +"agents propose, but do not self-apply" at the database boundary rather than by +convention. The tool is an operator, invoked by a human/reviewer after approval; +it is deliberately not part of the agent runtime. + +Flow +---- +1. Read the proposal (must be ``status='approved'``). +2. Dispatch by ``proposal_type`` to build the canonical write from a strict + ``apply_payload`` contract carried on the proposal payload. +3. Emit a single transaction: canonical writes, then a DO block that flips the + ledger to ``applied`` asserting rowcount=1 (the concurrent double-apply + guard), stamps ``applied_by_agent_id`` as a real FK, and verifies invariants + (RAISE on violation -> rollback), COMMIT. + +Prerequisites (one-time, superuser): scripts/kb_apply_prereqs.sql bootstraps the +``kb-apply`` service-agent row, grants ``kb_apply`` SELECT on public.agents, and +ensures the one-active-strategy unique index. Run it before the first apply. + +``--dry-run`` prints the exact SQL and does not connect for writes. + +Contract (v1) +------------- +Freeform Leo eval packets are NOT applied directly. Each proposal must carry a +strict ``apply_payload`` object; a separate normalizer produces it. Shapes: + + revise_strategy: + {"apply_payload": { + "agent_id": "", + "strategy": {"diagnosis": str, "guiding_policy": str, + "proximate_objectives": [ ... ]}, + "strategy_nodes": [{"node_type": "diagnosis|policy|objective", + "title": str, "body": str, "rank": int}, ...]}} + + add_edge: + {"apply_payload": {"from_claim": "", "to_claim": "", + "edge_type": "", "weight": <0..1|null>}} + + attach_evidence: + {"apply_payload": {"evidence": [{"claim_id": "", + "source_id": "", + "role": "", + "weight": <0..1|null>}, ...]}} + +Note: attach_evidence requires an existing ``source_id`` per item. kb_apply has +no INSERT on ``public.sources``, so source creation/dedup is intentionally out of +scope for this tool and handled upstream (a follow-up + explicit grant). +""" + +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional + +DEFAULT_SECRETS_FILE = "/home/teleo/.hermes/profiles/leoclean/secrets/kb-apply.env" +DEFAULT_CONTAINER = "teleo-pg" +DEFAULT_DB = "teleo" +DEFAULT_HOST = "127.0.0.1" +DEFAULT_ROLE = "kb_apply" + +# Handle of the service-agent row that performs canonical writes. Bootstrapped +# once by superuser (scripts/kb_apply_prereqs.sql); kb_apply holds SELECT-only on +# public.agents so it can resolve this into applied_by_agent_id, never INSERT. +SERVICE_AGENT_HANDLE = "kb-apply" + +APPLYABLE_TYPES = ("revise_strategy", "add_edge", "attach_evidence") + + +# --------------------------------------------------------------------------- # +# SQL helpers # +# --------------------------------------------------------------------------- # +def sql_literal(value: Any) -> str: + """Render a Python value as a safe SQL literal.""" + if value is None: + return "null" + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, (int, float)): + return repr(value) + return "'" + str(value).replace("'", "''") + "'" + + +def _jsonb(value: Any) -> str: + return sql_literal(json.dumps(value, sort_keys=True)) + "::jsonb" + + +def _ledger_and_verify(proposal_id: str, applied_by: str, extra_checks: str) -> str: + """Flip the proposal to applied and verify invariants, inside the txn. + + The flip runs in a DO block that asserts exactly one row moved from + 'approved' to 'applied' (``GET DIAGNOSTICS ... = row_count``). This closes + the concurrent double-apply race: ``load_proposal`` (read) and this flip + (write) are separate statements, so a ``SELECT ... FOR UPDATE`` row lock + would not span them -- but only one concurrent apply can match + ``status='approved'``, so rowcount=1 is the authoritative guard. A loser + sees 0 rows and RAISEs -> the whole transaction rolls back. + + ``applied_by_agent_id`` is stamped as a real FK resolved from + ``public.agents`` by handle (requires SELECT public.agents; the service-agent + row is bootstrapped once by superuser -- see scripts/kb_apply_prereqs.sql). + The resolve is a hard lookup: an unresolved handle RAISEs -> rollback, never + a silent NULL FK (the column is ON DELETE SET NULL, so NULL is legal and + would otherwise pass unnoticed). + """ + return f""" +do $flip$ +declare + flipped int; + resolved_agent_id uuid; +begin + select id into resolved_agent_id + from public.agents + where handle = {sql_literal(applied_by)}; + if resolved_agent_id is null then + raise exception 'apply_proposal: applied_by handle % does not resolve to a public.agents row; refusing to stamp a NULL applied_by_agent_id', {sql_literal(applied_by)}; + end if; + update kb_stage.kb_proposals + set status = 'applied', + applied_by_handle = {sql_literal(applied_by)}, + applied_by_agent_id = resolved_agent_id, + applied_at = now(), + updated_at = now() + where id = {sql_literal(proposal_id)}::uuid + and status = 'approved'; + get diagnostics flipped = row_count; + if flipped <> 1 then + raise exception 'apply_proposal: ledger flip affected % row(s), expected 1; proposal % was not approved (already applied or a concurrent apply won)', flipped, {sql_literal(proposal_id)}; + end if; +{extra_checks} +end +$flip$; +""".rstrip() + + +# --------------------------------------------------------------------------- # +# Per-type SQL builders (pure; unit-tested without a DB) # +# --------------------------------------------------------------------------- # +def build_revise_strategy_sql( + apply_payload: Dict[str, Any], proposal_id: str, reviewer: Optional[str] +) -> str: + agent_id = apply_payload.get("agent_id") + strategy = apply_payload.get("strategy") or {} + nodes: List[Dict[str, Any]] = apply_payload.get("strategy_nodes") or [] + if not agent_id: + raise ValueError("revise_strategy apply_payload requires 'agent_id'") + for key in ("diagnosis", "guiding_policy", "proximate_objectives"): + if strategy.get(key) is None: + raise ValueError(f"revise_strategy apply_payload.strategy requires '{key}'") + if not nodes: + raise ValueError("revise_strategy apply_payload requires non-empty 'strategy_nodes'") + + node_values = [] + for n in nodes: + nt = n.get("node_type") + if nt not in ("diagnosis", "policy", "objective"): + raise ValueError(f"invalid strategy node_type: {nt!r}") + if not n.get("title") or n.get("body") is None: + raise ValueError("each strategy_node requires 'title' and 'body'") + node_values.append( + f"({sql_literal(agent_id)}::uuid, {sql_literal(nt)}, " + f"{sql_literal(n['title'])}, {sql_literal(n['body'])}, " + f"{int(n.get('rank', 1))})" + ) + + node_values_sql = ",\n ".join(node_values) + canonical = f""" +-- deactivate the current active strategy for this agent (one_active invariant) +update public.strategies + set active = false + where agent_id = {sql_literal(agent_id)}::uuid and active; + +-- insert the new strategy as the next version, active +insert into public.strategies + (agent_id, diagnosis, guiding_policy, proximate_objectives, version, active) +select {sql_literal(agent_id)}::uuid, + {sql_literal(strategy['diagnosis'])}, + {sql_literal(strategy['guiding_policy'])}, + {_jsonb(strategy['proximate_objectives'])}, + coalesce(max(version), 0) + 1, + true + from public.strategies + where agent_id = {sql_literal(agent_id)}::uuid; + +-- retire the agent's current strategy nodes (shared NULL-agent nodes untouched) +update public.strategy_nodes + set status = 'retired', updated_at = now() + where agent_id = {sql_literal(agent_id)}::uuid and status <> 'retired'; + +-- insert the new node set +insert into public.strategy_nodes (agent_id, node_type, title, body, rank) +values + {node_values_sql}; +""".rstrip() + + checks = f""" if (select count(*) from public.strategies + where agent_id = {sql_literal(agent_id)}::uuid and active) <> 1 then + raise exception 'apply_proposal: expected exactly one active strategy for agent %', {sql_literal(agent_id)}; + end if;""" + + return _wrap_txn(canonical, _ledger_and_verify(proposal_id, reviewer, checks)) + + +def build_add_edge_sql( + apply_payload: Dict[str, Any], proposal_id: str, reviewer: Optional[str] +) -> str: + f = apply_payload.get("from_claim") + t = apply_payload.get("to_claim") + et = apply_payload.get("edge_type") + w = apply_payload.get("weight") + if not (f and t and et): + raise ValueError("add_edge apply_payload requires 'from_claim', 'to_claim', 'edge_type'") + if f == t: + raise ValueError("add_edge from_claim and to_claim must differ") + + canonical = f""" +-- insert the edge unless an identical (from,to,type) edge already exists +insert into public.claim_edges (from_claim, to_claim, edge_type, weight) +select {sql_literal(f)}::uuid, {sql_literal(t)}::uuid, + {sql_literal(et)}::edge_type, {sql_literal(w)} + where not exists ( + select 1 from public.claim_edges + where from_claim = {sql_literal(f)}::uuid + and to_claim = {sql_literal(t)}::uuid + and edge_type = {sql_literal(et)}::edge_type); +""".rstrip() + + checks = f""" if not exists (select 1 from public.claim_edges + where from_claim = {sql_literal(f)}::uuid + and to_claim = {sql_literal(t)}::uuid + and edge_type = {sql_literal(et)}::edge_type) then + raise exception 'apply_proposal: claim_edge not present after apply'; + end if;""" + + return _wrap_txn(canonical, _ledger_and_verify(proposal_id, reviewer, checks)) + + +def build_attach_evidence_sql( + apply_payload: Dict[str, Any], proposal_id: str, reviewer: Optional[str] +) -> str: + evidence: List[Dict[str, Any]] = apply_payload.get("evidence") or [] + if not evidence: + raise ValueError("attach_evidence apply_payload requires non-empty 'evidence'") + + statements = [] + checks = [] + for i, ev in enumerate(evidence): + claim_id = ev.get("claim_id") + source_id = ev.get("source_id") + role = ev.get("role") or "grounds" + weight = ev.get("weight") + if not claim_id: + raise ValueError(f"evidence[{i}] requires 'claim_id'") + if not source_id: + raise ValueError( + f"evidence[{i}] requires an existing 'source_id'. " + "kb_apply cannot create public.sources; mint the source upstream first." + ) + statements.append( + f"""insert into public.claim_evidence (claim_id, source_id, role, weight) +select {sql_literal(claim_id)}::uuid, {sql_literal(source_id)}::uuid, + {sql_literal(role)}::evidence_role, {sql_literal(weight)} + where not exists ( + select 1 from public.claim_evidence + where claim_id = {sql_literal(claim_id)}::uuid + and source_id = {sql_literal(source_id)}::uuid + and role = {sql_literal(role)}::evidence_role);""" + ) + checks.append( + f""" if not exists (select 1 from public.claim_evidence + where claim_id = {sql_literal(claim_id)}::uuid + and source_id = {sql_literal(source_id)}::uuid + and role = {sql_literal(role)}::evidence_role) then + raise exception 'apply_proposal: evidence row % not present after apply', {i}; + end if;""" + ) + + canonical = "\n".join(statements) + return _wrap_txn(canonical, _ledger_and_verify(proposal_id, reviewer, "\n".join(checks))) + + +def _wrap_txn(canonical_sql: str, ledger_sql: str) -> str: + return f"begin;\n{canonical_sql}\n{ledger_sql}\ncommit;\n" + + +BUILDERS = { + "revise_strategy": build_revise_strategy_sql, + "add_edge": build_add_edge_sql, + "attach_evidence": build_attach_evidence_sql, +} + + +def build_apply_sql(proposal: Dict[str, Any], applied_by: Optional[str]) -> str: + ptype = proposal["proposal_type"] + if ptype not in BUILDERS: + raise ValueError(f"apply_proposal cannot apply proposal_type {ptype!r}") + payload = proposal.get("payload") or {} + apply_payload = payload.get("apply_payload") + if apply_payload is None: + raise ValueError( + "proposal payload has no 'apply_payload' (contract v1). " + "Run the normalizer to produce apply_payload before applying." + ) + return BUILDERS[ptype](apply_payload, proposal["id"], applied_by or SERVICE_AGENT_HANDLE) + + +def assert_applyable(proposal: Dict[str, Any]) -> None: + status = proposal.get("status") + if status == "applied": + raise SystemExit(f"proposal {proposal['id']} is already applied (idempotent no-op)") + if status != "approved": + raise SystemExit( + f"proposal {proposal['id']} has status {status!r}; only 'approved' proposals apply" + ) + + +# --------------------------------------------------------------------------- # +# Execution (docker exec as the kb_apply role) # +# --------------------------------------------------------------------------- # +def load_password(secrets_file: str) -> str: + path = Path(secrets_file) + if not path.is_file(): + raise SystemExit(f"secrets file not found: {secrets_file}") + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, val = line.partition("=") + if key.strip() in ("KB_APPLY_PASSWORD", "PGPASSWORD"): + return val.strip().strip('"').strip("'") + raise SystemExit(f"no KB_APPLY_PASSWORD/PGPASSWORD entry in {secrets_file}") + + +def run_psql(args: argparse.Namespace, sql: str, password: str) -> str: + command = [ + "docker", "exec", "-e", "PGPASSWORD", "-i", args.container, + "psql", "-U", args.role, "-h", args.host, "-d", args.db, + "-v", "ON_ERROR_STOP=1", "-At", "-q", + ] + result = subprocess.run( + command, input=sql, text=True, capture_output=True, + env={"PGPASSWORD": password, "PATH": "/usr/bin:/bin:/usr/local/bin"}, + check=False, + ) + if result.returncode != 0: + raise SystemExit( + f"psql failed ({result.returncode})\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" + ) + return result.stdout + + +def load_proposal(args: argparse.Namespace, password: str) -> Dict[str, Any]: + sql = f"""select jsonb_build_object( + 'id', id::text, + 'proposal_type', proposal_type, + 'status', status, + 'payload', payload)::text + from kb_stage.kb_proposals + where id = {sql_literal(args.proposal_id)}::uuid;""" + out = run_psql(args, sql, password).strip() + if not out: + raise SystemExit(f"proposal not found: {args.proposal_id}") + return json.loads(out) + + +def parse_args(argv: List[str]) -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("proposal_id", help="UUID of the approved proposal to apply") + p.add_argument( + "--applied-by", + default=SERVICE_AGENT_HANDLE, + help="agent handle recorded as applied_by_handle and resolved to the " + "applied_by_agent_id FK (default: the kb-apply service agent)", + ) + p.add_argument("--dry-run", action="store_true", help="print the SQL, do not execute") + p.add_argument("--secrets-file", default=DEFAULT_SECRETS_FILE) + p.add_argument("--container", default=DEFAULT_CONTAINER) + p.add_argument("--db", default=DEFAULT_DB) + p.add_argument("--host", default=DEFAULT_HOST) + p.add_argument("--role", default=DEFAULT_ROLE) + return p.parse_args(argv) + + +def main(argv: Optional[List[str]] = None) -> int: + args = parse_args(sys.argv[1:] if argv is None else argv) + + if args.dry_run: + # Dry-run still needs the proposal to build SQL; read it as kb_apply. + password = load_password(args.secrets_file) + proposal = load_proposal(args, password) + assert_applyable(proposal) + print(build_apply_sql(proposal, args.applied_by)) + return 0 + + password = load_password(args.secrets_file) + proposal = load_proposal(args, password) + assert_applyable(proposal) + sql = build_apply_sql(proposal, args.applied_by) + run_psql(args, sql, password) + print(f"applied proposal {proposal['id']} ({proposal['proposal_type']})") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/kb_apply_prereqs.sql b/scripts/kb_apply_prereqs.sql new file mode 100644 index 0000000..badde61 --- /dev/null +++ b/scripts/kb_apply_prereqs.sql @@ -0,0 +1,31 @@ +-- kb_apply prerequisites -- run ONCE as the postgres superuser at deploy time. +-- +-- Stage 2 of the KB apply pipeline (scripts/apply_proposal.py) connects as the +-- narrow kb_apply role and stamps applied_by_agent_id as a real FK. The FK is +-- resolved from public.agents by handle, so before the first apply: +-- 1. a 'kb-apply' service-agent row must exist (one-time bootstrap fixture), +-- 2. kb_apply needs SELECT on public.agents to resolve it (never INSERT), +-- 3. the one-active-strategy invariant must be enforced by a unique index. +-- +-- Every statement is idempotent; re-running the file is a no-op. It grants no +-- write on public.agents to kb_apply -- the service-agent row is written here, +-- once, by the superuser, and the runtime role only ever reads it. + +begin; + +-- 1. Service-agent fixture. This INSERT is the one-time superuser bootstrap; +-- kb_apply itself never gains INSERT on public.agents (SELECT only, below). +insert into public.agents (id, handle, kind) +values ('44444444-4444-4444-4444-444444444444', 'kb-apply', 'service') +on conflict (handle) do nothing; + +-- 2. Allow the apply tool to resolve applied_by_agent_id. Read-only, no INSERT. +grant select on public.agents to kb_apply; + +-- 3. Enforce "one active strategy per agent" at the database. Already present on +-- current prod as one_active_strategy_per_agent; IF NOT EXISTS documents and +-- guarantees the dependency for fresh environments (no-op where it exists). +create unique index if not exists one_active_strategy_per_agent + on public.strategies (agent_id) where active; + +commit; diff --git a/tests/test_apply_proposal.py b/tests/test_apply_proposal.py new file mode 100644 index 0000000..5511e48 --- /dev/null +++ b/tests/test_apply_proposal.py @@ -0,0 +1,198 @@ +"""Unit tests for scripts/apply_proposal.py. + +Pure tests: exercise the SQL builders, payload validation, and dispatch without +a live database. Runnable under pytest or standalone (`python3 tests/test_apply_proposal.py`). +""" + +from pathlib import Path +import sys + +try: + import pytest +except ImportError: # standalone fallback so the file runs without pytest installed + import contextlib + import types + + pytest = types.SimpleNamespace() + + @contextlib.contextmanager + def _raises(exc): + try: + yield + except exc: + return + raise AssertionError(f"expected {exc.__name__} to be raised") + + pytest.raises = _raises + +REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(REPO_ROOT / "scripts")) + +import apply_proposal as ap # noqa: E402 + + +# --- literals ------------------------------------------------------------- # +def test_sql_literal_escapes_single_quotes(): + assert ap.sql_literal("O'Brien") == "'O''Brien'" + + +def test_sql_literal_none_is_null(): + assert ap.sql_literal(None) == "null" + + +def test_sql_literal_bool_and_number(): + assert ap.sql_literal(True) == "true" + assert ap.sql_literal(0.5) == "0.5" + + +# --- revise_strategy ------------------------------------------------------ # +def _revise_payload(): + return { + "apply_payload": { + "agent_id": "11111111-1111-1111-1111-111111111111", + "strategy": { + "diagnosis": "Species-level inflection.", + "guiding_policy": "Build Teleo as a multi-agent public intelligence.", + "proximate_objectives": ["route before creating", "keep KB calibrated"], + }, + "strategy_nodes": [ + {"node_type": "diagnosis", "title": "Inflection", "body": "b1", "rank": 1}, + {"node_type": "policy", "title": "Multi-agent PI", "body": "b2", "rank": 1}, + ], + } + } + + +def test_revise_strategy_sql_shape(): + sql = ap.build_revise_strategy_sql(_revise_payload()["apply_payload"], "pid-1", "m3ta") + assert sql.startswith("begin;") + assert sql.rstrip().endswith("commit;") + # deactivate old, insert versioned active, retire nodes, insert new nodes + assert "update public.strategies" in sql + assert "active = false" in sql + assert "coalesce(max(version), 0) + 1" in sql + assert "update public.strategy_nodes" in sql + assert "insert into public.strategy_nodes" in sql + # ledger + invariant + assert "status = 'applied'" in sql + assert "exactly one active strategy" in sql + # proximate_objectives rendered as jsonb + assert "::jsonb" in sql + + +def test_revise_strategy_requires_agent_id(): + payload = _revise_payload()["apply_payload"] + del payload["agent_id"] + with pytest.raises(ValueError): + ap.build_revise_strategy_sql(payload, "pid-1", None) + + +def test_revise_strategy_rejects_bad_node_type(): + payload = _revise_payload()["apply_payload"] + payload["strategy_nodes"][0]["node_type"] = "manifesto" + with pytest.raises(ValueError): + ap.build_revise_strategy_sql(payload, "pid-1", None) + + +# --- add_edge ------------------------------------------------------------- # +def test_add_edge_sql_dedup_guard(): + payload = {"from_claim": "aaaa", "to_claim": "bbbb", "edge_type": "supersedes", "weight": 0.9} + sql = ap.build_add_edge_sql(payload, "pid-2", None) + assert "insert into public.claim_edges" in sql + assert "not exists" in sql + assert "::edge_type" in sql + assert "status = 'applied'" in sql + + +def test_add_edge_rejects_self_loop(): + payload = {"from_claim": "same", "to_claim": "same", "edge_type": "supports"} + with pytest.raises(ValueError): + ap.build_add_edge_sql(payload, "pid-2", None) + + +# --- attach_evidence ------------------------------------------------------ # +def test_attach_evidence_sql_with_source_id(): + payload = {"evidence": [{"claim_id": "cccc", "source_id": "dddd", "role": "grounds", "weight": 0.78}]} + sql = ap.build_attach_evidence_sql(payload, "pid-3", None) + assert "insert into public.claim_evidence" in sql + assert "::evidence_role" in sql + assert "not exists" in sql + + +def test_attach_evidence_requires_source_id(): + payload = {"evidence": [{"claim_id": "cccc"}]} + with pytest.raises(ValueError): + ap.build_attach_evidence_sql(payload, "pid-3", None) + + +# --- dispatch + guards ---------------------------------------------------- # +def test_build_apply_sql_requires_apply_payload(): + proposal = {"id": "pid", "proposal_type": "add_edge", "payload": {"rationale": "x"}} + with pytest.raises(ValueError): + ap.build_apply_sql(proposal, None) + + +def test_build_apply_sql_rejects_unsupported_type(): + proposal = {"id": "pid", "proposal_type": "reject_claim", "payload": {"apply_payload": {}}} + with pytest.raises(ValueError): + ap.build_apply_sql(proposal, None) + + +def test_assert_applyable_blocks_pending(): + with pytest.raises(SystemExit): + ap.assert_applyable({"id": "pid", "status": "pending_review"}) + + +def test_assert_applyable_blocks_already_applied(): + with pytest.raises(SystemExit): + ap.assert_applyable({"id": "pid", "status": "applied"}) + + +def test_assert_applyable_allows_approved(): + ap.assert_applyable({"id": "pid", "status": "approved"}) # no raise + + +# --- ledger flip: concurrency guard + FK stamp --------------------------- # +def test_ledger_flip_asserts_rowcount_one(): + # The flip must guard against a concurrent double-apply by asserting exactly + # one 'approved' row moved, not by re-reading status afterwards. + sql = ap.build_add_edge_sql({"from_claim": "a", "to_claim": "b", "edge_type": "supports"}, "pid", None) + assert "get diagnostics" in sql + assert "flipped <> 1" in sql + assert "and status = 'approved'" in sql + + +def test_ledger_flip_stamps_agent_fk(): + sql = ap.build_add_edge_sql({"from_claim": "a", "to_claim": "b", "edge_type": "supports"}, "pid", "kb-apply") + # Hard resolve into a variable + NOT-NULL assert, never a silent inline + # subselect that would stamp NULL on an unresolved handle. + assert "select id into resolved_agent_id" in sql + assert "resolved_agent_id is null then" in sql + assert "applied_by_agent_id = resolved_agent_id" in sql + assert "applied_by_handle = 'kb-apply'" in sql + + +def test_build_apply_sql_defaults_applied_by_to_service_agent(): + proposal = { + "id": "pid", + "proposal_type": "add_edge", + "payload": {"apply_payload": {"from_claim": "a", "to_claim": "b", "edge_type": "supports"}}, + } + sql = ap.build_apply_sql(proposal, None) + assert f"applied_by_handle = '{ap.SERVICE_AGENT_HANDLE}'" in sql + + +if __name__ == "__main__": + import traceback + + failures = 0 + for name, fn in sorted(globals().items()): + if name.startswith("test_") and callable(fn): + try: + fn() + print(f"PASS {name}") + except Exception: # noqa: BLE001 + failures += 1 + print(f"FAIL {name}") + traceback.print_exc() + sys.exit(1 if failures else 0)