feat(kb): apply_proposal engine (stage 2 of KB apply pipeline) (#35)
Some checks are pending
CI / lint-and-test (push) Waiting to run

* feat(kb): apply_proposal engine to land approved proposals into canonical

Stage 2 of the KB apply pipeline (approve -> APPLY -> render -> surface).
Turns an approved kb_stage.kb_proposals row into canonical public.* rows and
flips the ledger to 'applied' in one verified transaction.

- Connects as the narrow kb_apply role (never superuser): writes only
  strategies, strategy_nodes, claim_evidence, claim_edges + kb_proposals ledger.
  Enforces "agents propose, do not self-apply" at the DB boundary.
- Per-type handlers: revise_strategy (versioned strategy + node replace),
  add_edge, attach_evidence (requires existing source_id; source minting is
  intentionally out of scope for kb_apply's grants).
- Strict apply_payload contract (v1); freeform eval packets are normalized
  upstream, not applied directly.
- --dry-run prints exact SQL; idempotent (refuses non-approved / already-applied);
  transactional with an in-txn DO-block invariant check that rolls back on failure.
- Unit tests cover SQL builders, validation, dispatch, and status guards.

* fix(kb): rowcount=1 apply guard + real applied_by FK stamp

Closes the three draft-exit review items on the apply engine:

- Ledger flip now runs in a DO block asserting exactly one 'approved'
  row moved to 'applied' (GET DIAGNOSTICS row_count). Closes the
  concurrent double-apply race — load_proposal (read) and the flip
  (write) are separate statements, so a row lock cannot span them; only
  one concurrent apply can match status='approved', so rowcount=1 is the
  authoritative guard. A loser RAISEs and the whole txn rolls back.
- applied_by_agent_id is stamped as a real FK resolved from public.agents
  by handle, defaulting to the kb-apply service agent — no more NULL FK,
  no backfill needed.
- scripts/kb_apply_prereqs.sql: one-time superuser bootstrap — inserts the
  kb-apply service-agent row (kb_apply never gets INSERT on agents), grants
  kb_apply SELECT on public.agents, and ensures the one-active-strategy
  unique index (idempotent; already present on prod).

18/18 unit tests pass.

* fix(kb): hard-resolve applied_by handle, RAISE on NULL FK

Resolve applied_by into a variable and assert NOT NULL before the ledger
flip, instead of an inline subselect that silently stamps a NULL
applied_by_agent_id on an unresolved handle. Since the FK is ON DELETE SET
NULL, a bad handle (typo/unseeded agent) was a legal silent NULL -- the
perpetually-NULL FK we eliminated. Unresolved handle now hard-fails ->
rollback. Non-default --applied-by (operator, future drafters) is the path
that goes through the lookup and could strand NULL.
This commit is contained in:
Fawaz 2026-07-04 19:57:49 -04:00 committed by GitHub
parent 66ecbf316e
commit 7bb6fc417b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 648 additions and 0 deletions

419
scripts/apply_proposal.py Normal file
View file

@ -0,0 +1,419 @@
#!/usr/bin/env python3
"""Apply an approved kb_stage proposal into canonical public.* rows.
Stage 2 of the KB apply pipeline: approve -> APPLY -> render -> surface.
Governance boundary
-------------------
This tool connects to Postgres as the narrow ``kb_apply`` login role, never as
superuser. ``kb_apply`` can write only ``public.strategies``,
``public.strategy_nodes``, ``public.claim_evidence``, ``public.claim_edges`` and
update the ``kb_stage.kb_proposals`` ledger. It holds no DELETE, no DDL, and no
access to any other table (agents, budgets, personas, ...). This enforces
"agents propose, but do not self-apply" at the database boundary rather than by
convention. The tool is an operator, invoked by a human/reviewer after approval;
it is deliberately not part of the agent runtime.
Flow
----
1. Read the proposal (must be ``status='approved'``).
2. Dispatch by ``proposal_type`` to build the canonical write from a strict
``apply_payload`` contract carried on the proposal payload.
3. Emit a single transaction: canonical writes, then a DO block that flips the
ledger to ``applied`` asserting rowcount=1 (the concurrent double-apply
guard), stamps ``applied_by_agent_id`` as a real FK, and verifies invariants
(RAISE on violation -> rollback), COMMIT.
Prerequisites (one-time, superuser): scripts/kb_apply_prereqs.sql bootstraps the
``kb-apply`` service-agent row, grants ``kb_apply`` SELECT on public.agents, and
ensures the one-active-strategy unique index. Run it before the first apply.
``--dry-run`` prints the exact SQL and does not connect for writes.
Contract (v1)
-------------
Freeform Leo eval packets are NOT applied directly. Each proposal must carry a
strict ``apply_payload`` object; a separate normalizer produces it. Shapes:
revise_strategy:
{"apply_payload": {
"agent_id": "<uuid>",
"strategy": {"diagnosis": str, "guiding_policy": str,
"proximate_objectives": [ ... ]},
"strategy_nodes": [{"node_type": "diagnosis|policy|objective",
"title": str, "body": str, "rank": int}, ...]}}
add_edge:
{"apply_payload": {"from_claim": "<uuid>", "to_claim": "<uuid>",
"edge_type": "<edge_type>", "weight": <0..1|null>}}
attach_evidence:
{"apply_payload": {"evidence": [{"claim_id": "<uuid>",
"source_id": "<uuid>",
"role": "<evidence_role>",
"weight": <0..1|null>}, ...]}}
Note: attach_evidence requires an existing ``source_id`` per item. kb_apply has
no INSERT on ``public.sources``, so source creation/dedup is intentionally out of
scope for this tool and handled upstream (a follow-up + explicit grant).
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
DEFAULT_SECRETS_FILE = "/home/teleo/.hermes/profiles/leoclean/secrets/kb-apply.env"
DEFAULT_CONTAINER = "teleo-pg"
DEFAULT_DB = "teleo"
DEFAULT_HOST = "127.0.0.1"
DEFAULT_ROLE = "kb_apply"
# Handle of the service-agent row that performs canonical writes. Bootstrapped
# once by superuser (scripts/kb_apply_prereqs.sql); kb_apply holds SELECT-only on
# public.agents so it can resolve this into applied_by_agent_id, never INSERT.
SERVICE_AGENT_HANDLE = "kb-apply"
APPLYABLE_TYPES = ("revise_strategy", "add_edge", "attach_evidence")
# --------------------------------------------------------------------------- #
# SQL helpers #
# --------------------------------------------------------------------------- #
def sql_literal(value: Any) -> str:
"""Render a Python value as a safe SQL literal."""
if value is None:
return "null"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return repr(value)
return "'" + str(value).replace("'", "''") + "'"
def _jsonb(value: Any) -> str:
return sql_literal(json.dumps(value, sort_keys=True)) + "::jsonb"
def _ledger_and_verify(proposal_id: str, applied_by: str, extra_checks: str) -> str:
"""Flip the proposal to applied and verify invariants, inside the txn.
The flip runs in a DO block that asserts exactly one row moved from
'approved' to 'applied' (``GET DIAGNOSTICS ... = row_count``). This closes
the concurrent double-apply race: ``load_proposal`` (read) and this flip
(write) are separate statements, so a ``SELECT ... FOR UPDATE`` row lock
would not span them -- but only one concurrent apply can match
``status='approved'``, so rowcount=1 is the authoritative guard. A loser
sees 0 rows and RAISEs -> the whole transaction rolls back.
``applied_by_agent_id`` is stamped as a real FK resolved from
``public.agents`` by handle (requires SELECT public.agents; the service-agent
row is bootstrapped once by superuser -- see scripts/kb_apply_prereqs.sql).
The resolve is a hard lookup: an unresolved handle RAISEs -> rollback, never
a silent NULL FK (the column is ON DELETE SET NULL, so NULL is legal and
would otherwise pass unnoticed).
"""
return f"""
do $flip$
declare
flipped int;
resolved_agent_id uuid;
begin
select id into resolved_agent_id
from public.agents
where handle = {sql_literal(applied_by)};
if resolved_agent_id is null then
raise exception 'apply_proposal: applied_by handle % does not resolve to a public.agents row; refusing to stamp a NULL applied_by_agent_id', {sql_literal(applied_by)};
end if;
update kb_stage.kb_proposals
set status = 'applied',
applied_by_handle = {sql_literal(applied_by)},
applied_by_agent_id = resolved_agent_id,
applied_at = now(),
updated_at = now()
where id = {sql_literal(proposal_id)}::uuid
and status = 'approved';
get diagnostics flipped = row_count;
if flipped <> 1 then
raise exception 'apply_proposal: ledger flip affected % row(s), expected 1; proposal % was not approved (already applied or a concurrent apply won)', flipped, {sql_literal(proposal_id)};
end if;
{extra_checks}
end
$flip$;
""".rstrip()
# --------------------------------------------------------------------------- #
# Per-type SQL builders (pure; unit-tested without a DB) #
# --------------------------------------------------------------------------- #
def build_revise_strategy_sql(
apply_payload: Dict[str, Any], proposal_id: str, reviewer: Optional[str]
) -> str:
agent_id = apply_payload.get("agent_id")
strategy = apply_payload.get("strategy") or {}
nodes: List[Dict[str, Any]] = apply_payload.get("strategy_nodes") or []
if not agent_id:
raise ValueError("revise_strategy apply_payload requires 'agent_id'")
for key in ("diagnosis", "guiding_policy", "proximate_objectives"):
if strategy.get(key) is None:
raise ValueError(f"revise_strategy apply_payload.strategy requires '{key}'")
if not nodes:
raise ValueError("revise_strategy apply_payload requires non-empty 'strategy_nodes'")
node_values = []
for n in nodes:
nt = n.get("node_type")
if nt not in ("diagnosis", "policy", "objective"):
raise ValueError(f"invalid strategy node_type: {nt!r}")
if not n.get("title") or n.get("body") is None:
raise ValueError("each strategy_node requires 'title' and 'body'")
node_values.append(
f"({sql_literal(agent_id)}::uuid, {sql_literal(nt)}, "
f"{sql_literal(n['title'])}, {sql_literal(n['body'])}, "
f"{int(n.get('rank', 1))})"
)
node_values_sql = ",\n ".join(node_values)
canonical = f"""
-- deactivate the current active strategy for this agent (one_active invariant)
update public.strategies
set active = false
where agent_id = {sql_literal(agent_id)}::uuid and active;
-- insert the new strategy as the next version, active
insert into public.strategies
(agent_id, diagnosis, guiding_policy, proximate_objectives, version, active)
select {sql_literal(agent_id)}::uuid,
{sql_literal(strategy['diagnosis'])},
{sql_literal(strategy['guiding_policy'])},
{_jsonb(strategy['proximate_objectives'])},
coalesce(max(version), 0) + 1,
true
from public.strategies
where agent_id = {sql_literal(agent_id)}::uuid;
-- retire the agent's current strategy nodes (shared NULL-agent nodes untouched)
update public.strategy_nodes
set status = 'retired', updated_at = now()
where agent_id = {sql_literal(agent_id)}::uuid and status <> 'retired';
-- insert the new node set
insert into public.strategy_nodes (agent_id, node_type, title, body, rank)
values
{node_values_sql};
""".rstrip()
checks = f""" if (select count(*) from public.strategies
where agent_id = {sql_literal(agent_id)}::uuid and active) <> 1 then
raise exception 'apply_proposal: expected exactly one active strategy for agent %', {sql_literal(agent_id)};
end if;"""
return _wrap_txn(canonical, _ledger_and_verify(proposal_id, reviewer, checks))
def build_add_edge_sql(
apply_payload: Dict[str, Any], proposal_id: str, reviewer: Optional[str]
) -> str:
f = apply_payload.get("from_claim")
t = apply_payload.get("to_claim")
et = apply_payload.get("edge_type")
w = apply_payload.get("weight")
if not (f and t and et):
raise ValueError("add_edge apply_payload requires 'from_claim', 'to_claim', 'edge_type'")
if f == t:
raise ValueError("add_edge from_claim and to_claim must differ")
canonical = f"""
-- insert the edge unless an identical (from,to,type) edge already exists
insert into public.claim_edges (from_claim, to_claim, edge_type, weight)
select {sql_literal(f)}::uuid, {sql_literal(t)}::uuid,
{sql_literal(et)}::edge_type, {sql_literal(w)}
where not exists (
select 1 from public.claim_edges
where from_claim = {sql_literal(f)}::uuid
and to_claim = {sql_literal(t)}::uuid
and edge_type = {sql_literal(et)}::edge_type);
""".rstrip()
checks = f""" if not exists (select 1 from public.claim_edges
where from_claim = {sql_literal(f)}::uuid
and to_claim = {sql_literal(t)}::uuid
and edge_type = {sql_literal(et)}::edge_type) then
raise exception 'apply_proposal: claim_edge not present after apply';
end if;"""
return _wrap_txn(canonical, _ledger_and_verify(proposal_id, reviewer, checks))
def build_attach_evidence_sql(
apply_payload: Dict[str, Any], proposal_id: str, reviewer: Optional[str]
) -> str:
evidence: List[Dict[str, Any]] = apply_payload.get("evidence") or []
if not evidence:
raise ValueError("attach_evidence apply_payload requires non-empty 'evidence'")
statements = []
checks = []
for i, ev in enumerate(evidence):
claim_id = ev.get("claim_id")
source_id = ev.get("source_id")
role = ev.get("role") or "grounds"
weight = ev.get("weight")
if not claim_id:
raise ValueError(f"evidence[{i}] requires 'claim_id'")
if not source_id:
raise ValueError(
f"evidence[{i}] requires an existing 'source_id'. "
"kb_apply cannot create public.sources; mint the source upstream first."
)
statements.append(
f"""insert into public.claim_evidence (claim_id, source_id, role, weight)
select {sql_literal(claim_id)}::uuid, {sql_literal(source_id)}::uuid,
{sql_literal(role)}::evidence_role, {sql_literal(weight)}
where not exists (
select 1 from public.claim_evidence
where claim_id = {sql_literal(claim_id)}::uuid
and source_id = {sql_literal(source_id)}::uuid
and role = {sql_literal(role)}::evidence_role);"""
)
checks.append(
f""" if not exists (select 1 from public.claim_evidence
where claim_id = {sql_literal(claim_id)}::uuid
and source_id = {sql_literal(source_id)}::uuid
and role = {sql_literal(role)}::evidence_role) then
raise exception 'apply_proposal: evidence row % not present after apply', {i};
end if;"""
)
canonical = "\n".join(statements)
return _wrap_txn(canonical, _ledger_and_verify(proposal_id, reviewer, "\n".join(checks)))
def _wrap_txn(canonical_sql: str, ledger_sql: str) -> str:
return f"begin;\n{canonical_sql}\n{ledger_sql}\ncommit;\n"
BUILDERS = {
"revise_strategy": build_revise_strategy_sql,
"add_edge": build_add_edge_sql,
"attach_evidence": build_attach_evidence_sql,
}
def build_apply_sql(proposal: Dict[str, Any], applied_by: Optional[str]) -> str:
ptype = proposal["proposal_type"]
if ptype not in BUILDERS:
raise ValueError(f"apply_proposal cannot apply proposal_type {ptype!r}")
payload = proposal.get("payload") or {}
apply_payload = payload.get("apply_payload")
if apply_payload is None:
raise ValueError(
"proposal payload has no 'apply_payload' (contract v1). "
"Run the normalizer to produce apply_payload before applying."
)
return BUILDERS[ptype](apply_payload, proposal["id"], applied_by or SERVICE_AGENT_HANDLE)
def assert_applyable(proposal: Dict[str, Any]) -> None:
status = proposal.get("status")
if status == "applied":
raise SystemExit(f"proposal {proposal['id']} is already applied (idempotent no-op)")
if status != "approved":
raise SystemExit(
f"proposal {proposal['id']} has status {status!r}; only 'approved' proposals apply"
)
# --------------------------------------------------------------------------- #
# Execution (docker exec as the kb_apply role) #
# --------------------------------------------------------------------------- #
def load_password(secrets_file: str) -> str:
path = Path(secrets_file)
if not path.is_file():
raise SystemExit(f"secrets file not found: {secrets_file}")
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
if key.strip() in ("KB_APPLY_PASSWORD", "PGPASSWORD"):
return val.strip().strip('"').strip("'")
raise SystemExit(f"no KB_APPLY_PASSWORD/PGPASSWORD entry in {secrets_file}")
def run_psql(args: argparse.Namespace, sql: str, password: str) -> str:
command = [
"docker", "exec", "-e", "PGPASSWORD", "-i", args.container,
"psql", "-U", args.role, "-h", args.host, "-d", args.db,
"-v", "ON_ERROR_STOP=1", "-At", "-q",
]
result = subprocess.run(
command, input=sql, text=True, capture_output=True,
env={"PGPASSWORD": password, "PATH": "/usr/bin:/bin:/usr/local/bin"},
check=False,
)
if result.returncode != 0:
raise SystemExit(
f"psql failed ({result.returncode})\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
)
return result.stdout
def load_proposal(args: argparse.Namespace, password: str) -> Dict[str, Any]:
sql = f"""select jsonb_build_object(
'id', id::text,
'proposal_type', proposal_type,
'status', status,
'payload', payload)::text
from kb_stage.kb_proposals
where id = {sql_literal(args.proposal_id)}::uuid;"""
out = run_psql(args, sql, password).strip()
if not out:
raise SystemExit(f"proposal not found: {args.proposal_id}")
return json.loads(out)
def parse_args(argv: List[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("proposal_id", help="UUID of the approved proposal to apply")
p.add_argument(
"--applied-by",
default=SERVICE_AGENT_HANDLE,
help="agent handle recorded as applied_by_handle and resolved to the "
"applied_by_agent_id FK (default: the kb-apply service agent)",
)
p.add_argument("--dry-run", action="store_true", help="print the SQL, do not execute")
p.add_argument("--secrets-file", default=DEFAULT_SECRETS_FILE)
p.add_argument("--container", default=DEFAULT_CONTAINER)
p.add_argument("--db", default=DEFAULT_DB)
p.add_argument("--host", default=DEFAULT_HOST)
p.add_argument("--role", default=DEFAULT_ROLE)
return p.parse_args(argv)
def main(argv: Optional[List[str]] = None) -> int:
args = parse_args(sys.argv[1:] if argv is None else argv)
if args.dry_run:
# Dry-run still needs the proposal to build SQL; read it as kb_apply.
password = load_password(args.secrets_file)
proposal = load_proposal(args, password)
assert_applyable(proposal)
print(build_apply_sql(proposal, args.applied_by))
return 0
password = load_password(args.secrets_file)
proposal = load_proposal(args, password)
assert_applyable(proposal)
sql = build_apply_sql(proposal, args.applied_by)
run_psql(args, sql, password)
print(f"applied proposal {proposal['id']} ({proposal['proposal_type']})")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,31 @@
-- kb_apply prerequisites -- run ONCE as the postgres superuser at deploy time.
--
-- Stage 2 of the KB apply pipeline (scripts/apply_proposal.py) connects as the
-- narrow kb_apply role and stamps applied_by_agent_id as a real FK. The FK is
-- resolved from public.agents by handle, so before the first apply:
-- 1. a 'kb-apply' service-agent row must exist (one-time bootstrap fixture),
-- 2. kb_apply needs SELECT on public.agents to resolve it (never INSERT),
-- 3. the one-active-strategy invariant must be enforced by a unique index.
--
-- Every statement is idempotent; re-running the file is a no-op. It grants no
-- write on public.agents to kb_apply -- the service-agent row is written here,
-- once, by the superuser, and the runtime role only ever reads it.
begin;
-- 1. Service-agent fixture. This INSERT is the one-time superuser bootstrap;
-- kb_apply itself never gains INSERT on public.agents (SELECT only, below).
insert into public.agents (id, handle, kind)
values ('44444444-4444-4444-4444-444444444444', 'kb-apply', 'service')
on conflict (handle) do nothing;
-- 2. Allow the apply tool to resolve applied_by_agent_id. Read-only, no INSERT.
grant select on public.agents to kb_apply;
-- 3. Enforce "one active strategy per agent" at the database. Already present on
-- current prod as one_active_strategy_per_agent; IF NOT EXISTS documents and
-- guarantees the dependency for fresh environments (no-op where it exists).
create unique index if not exists one_active_strategy_per_agent
on public.strategies (agent_id) where active;
commit;

View file

@ -0,0 +1,198 @@
"""Unit tests for scripts/apply_proposal.py.
Pure tests: exercise the SQL builders, payload validation, and dispatch without
a live database. Runnable under pytest or standalone (`python3 tests/test_apply_proposal.py`).
"""
from pathlib import Path
import sys
try:
import pytest
except ImportError: # standalone fallback so the file runs without pytest installed
import contextlib
import types
pytest = types.SimpleNamespace()
@contextlib.contextmanager
def _raises(exc):
try:
yield
except exc:
return
raise AssertionError(f"expected {exc.__name__} to be raised")
pytest.raises = _raises
REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(REPO_ROOT / "scripts"))
import apply_proposal as ap # noqa: E402
# --- literals ------------------------------------------------------------- #
def test_sql_literal_escapes_single_quotes():
assert ap.sql_literal("O'Brien") == "'O''Brien'"
def test_sql_literal_none_is_null():
assert ap.sql_literal(None) == "null"
def test_sql_literal_bool_and_number():
assert ap.sql_literal(True) == "true"
assert ap.sql_literal(0.5) == "0.5"
# --- revise_strategy ------------------------------------------------------ #
def _revise_payload():
return {
"apply_payload": {
"agent_id": "11111111-1111-1111-1111-111111111111",
"strategy": {
"diagnosis": "Species-level inflection.",
"guiding_policy": "Build Teleo as a multi-agent public intelligence.",
"proximate_objectives": ["route before creating", "keep KB calibrated"],
},
"strategy_nodes": [
{"node_type": "diagnosis", "title": "Inflection", "body": "b1", "rank": 1},
{"node_type": "policy", "title": "Multi-agent PI", "body": "b2", "rank": 1},
],
}
}
def test_revise_strategy_sql_shape():
sql = ap.build_revise_strategy_sql(_revise_payload()["apply_payload"], "pid-1", "m3ta")
assert sql.startswith("begin;")
assert sql.rstrip().endswith("commit;")
# deactivate old, insert versioned active, retire nodes, insert new nodes
assert "update public.strategies" in sql
assert "active = false" in sql
assert "coalesce(max(version), 0) + 1" in sql
assert "update public.strategy_nodes" in sql
assert "insert into public.strategy_nodes" in sql
# ledger + invariant
assert "status = 'applied'" in sql
assert "exactly one active strategy" in sql
# proximate_objectives rendered as jsonb
assert "::jsonb" in sql
def test_revise_strategy_requires_agent_id():
payload = _revise_payload()["apply_payload"]
del payload["agent_id"]
with pytest.raises(ValueError):
ap.build_revise_strategy_sql(payload, "pid-1", None)
def test_revise_strategy_rejects_bad_node_type():
payload = _revise_payload()["apply_payload"]
payload["strategy_nodes"][0]["node_type"] = "manifesto"
with pytest.raises(ValueError):
ap.build_revise_strategy_sql(payload, "pid-1", None)
# --- add_edge ------------------------------------------------------------- #
def test_add_edge_sql_dedup_guard():
payload = {"from_claim": "aaaa", "to_claim": "bbbb", "edge_type": "supersedes", "weight": 0.9}
sql = ap.build_add_edge_sql(payload, "pid-2", None)
assert "insert into public.claim_edges" in sql
assert "not exists" in sql
assert "::edge_type" in sql
assert "status = 'applied'" in sql
def test_add_edge_rejects_self_loop():
payload = {"from_claim": "same", "to_claim": "same", "edge_type": "supports"}
with pytest.raises(ValueError):
ap.build_add_edge_sql(payload, "pid-2", None)
# --- attach_evidence ------------------------------------------------------ #
def test_attach_evidence_sql_with_source_id():
payload = {"evidence": [{"claim_id": "cccc", "source_id": "dddd", "role": "grounds", "weight": 0.78}]}
sql = ap.build_attach_evidence_sql(payload, "pid-3", None)
assert "insert into public.claim_evidence" in sql
assert "::evidence_role" in sql
assert "not exists" in sql
def test_attach_evidence_requires_source_id():
payload = {"evidence": [{"claim_id": "cccc"}]}
with pytest.raises(ValueError):
ap.build_attach_evidence_sql(payload, "pid-3", None)
# --- dispatch + guards ---------------------------------------------------- #
def test_build_apply_sql_requires_apply_payload():
proposal = {"id": "pid", "proposal_type": "add_edge", "payload": {"rationale": "x"}}
with pytest.raises(ValueError):
ap.build_apply_sql(proposal, None)
def test_build_apply_sql_rejects_unsupported_type():
proposal = {"id": "pid", "proposal_type": "reject_claim", "payload": {"apply_payload": {}}}
with pytest.raises(ValueError):
ap.build_apply_sql(proposal, None)
def test_assert_applyable_blocks_pending():
with pytest.raises(SystemExit):
ap.assert_applyable({"id": "pid", "status": "pending_review"})
def test_assert_applyable_blocks_already_applied():
with pytest.raises(SystemExit):
ap.assert_applyable({"id": "pid", "status": "applied"})
def test_assert_applyable_allows_approved():
ap.assert_applyable({"id": "pid", "status": "approved"}) # no raise
# --- ledger flip: concurrency guard + FK stamp --------------------------- #
def test_ledger_flip_asserts_rowcount_one():
# The flip must guard against a concurrent double-apply by asserting exactly
# one 'approved' row moved, not by re-reading status afterwards.
sql = ap.build_add_edge_sql({"from_claim": "a", "to_claim": "b", "edge_type": "supports"}, "pid", None)
assert "get diagnostics" in sql
assert "flipped <> 1" in sql
assert "and status = 'approved'" in sql
def test_ledger_flip_stamps_agent_fk():
sql = ap.build_add_edge_sql({"from_claim": "a", "to_claim": "b", "edge_type": "supports"}, "pid", "kb-apply")
# Hard resolve into a variable + NOT-NULL assert, never a silent inline
# subselect that would stamp NULL on an unresolved handle.
assert "select id into resolved_agent_id" in sql
assert "resolved_agent_id is null then" in sql
assert "applied_by_agent_id = resolved_agent_id" in sql
assert "applied_by_handle = 'kb-apply'" in sql
def test_build_apply_sql_defaults_applied_by_to_service_agent():
proposal = {
"id": "pid",
"proposal_type": "add_edge",
"payload": {"apply_payload": {"from_claim": "a", "to_claim": "b", "edge_type": "supports"}},
}
sql = ap.build_apply_sql(proposal, None)
assert f"applied_by_handle = '{ap.SERVICE_AGENT_HANDLE}'" in sql
if __name__ == "__main__":
import traceback
failures = 0
for name, fn in sorted(globals().items()):
if name.startswith("test_") and callable(fn):
try:
fn()
print(f"PASS {name}")
except Exception: # noqa: BLE001
failures += 1
print(f"FAIL {name}")
traceback.print_exc()
sys.exit(1 if failures else 0)