Add Telegram smart research gate installer (#13)
This commit is contained in:
parent
dba8a21e74
commit
595f977a94
2 changed files with 308 additions and 0 deletions
202
scripts/install_telegram_smart_research_gates.py
Normal file
202
scripts/install_telegram_smart_research_gates.py
Normal file
|
|
@ -0,0 +1,202 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Install Telegram smart-research paid gates without printing approval refs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import getpass
|
||||
import grp
|
||||
import json
|
||||
import os
|
||||
import pwd
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
APPROVAL_REF_RE = re.compile(r"^[A-Za-z0-9._:@/-]{8,256}$")
|
||||
CHAT_ID_RE = re.compile(r"^-?\d+$")
|
||||
|
||||
|
||||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Install server-side gates for Leo Telegram smart-research paid execution.",
|
||||
allow_abbrev=False,
|
||||
)
|
||||
parser.add_argument("--agent", default="leo-wallet-test", help="Agent instance name for teleo-agent@<agent>")
|
||||
parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets")
|
||||
parser.add_argument("--allow-paid", action="store_true", help="Enable paid smart research for one allowed chat")
|
||||
parser.add_argument("--allowed-chat-id", help="Telegram chat id allowed to trigger paid smart research")
|
||||
parser.add_argument("--max-usd", default="0.01", help="Maximum spend per Telegram smart-research call")
|
||||
parser.add_argument(
|
||||
"--approval-ref-from-stdin",
|
||||
action="store_true",
|
||||
help="Read approval ref from stdin instead of a hidden prompt",
|
||||
)
|
||||
parser.add_argument("--owner", default="teleo", help="Env/approval file owner after write")
|
||||
parser.add_argument("--group", default="teleo", help="Env/approval file group after write")
|
||||
parser.add_argument("--no-chown", action="store_true", help="Skip chown; useful for local tests")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Validate inputs without writing files")
|
||||
parser.add_argument("--restart-service", action="store_true", help="Restart teleo-agent@<agent>.service")
|
||||
parser.add_argument("--output", help="Optional sanitized JSON proof path")
|
||||
|
||||
namespace, unknown = parser.parse_known_args(argv)
|
||||
if unknown:
|
||||
print(
|
||||
"ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
raise SystemExit(2)
|
||||
return namespace
|
||||
|
||||
|
||||
def read_approval_ref(*, from_stdin: bool) -> str:
|
||||
approval_ref = sys.stdin.read() if from_stdin else getpass.getpass("Leo smart-research approval ref: ")
|
||||
return approval_ref.strip()
|
||||
|
||||
|
||||
def validate_agent_name(agent: str) -> None:
|
||||
if not re.match(r"^[A-Za-z0-9_.-]+$", agent):
|
||||
raise ValueError("agent must contain only letters, numbers, dot, dash, or underscore")
|
||||
|
||||
|
||||
def validate_max_usd(value: str) -> str:
|
||||
try:
|
||||
parsed = float(value)
|
||||
except ValueError:
|
||||
raise ValueError("max-usd must be numeric") from None
|
||||
if parsed <= 0 or parsed > 0.01:
|
||||
raise ValueError("max-usd must be greater than 0 and no more than 0.01")
|
||||
return f"{parsed:.2f}"
|
||||
|
||||
|
||||
def validate_paid_inputs(args: argparse.Namespace) -> str | None:
|
||||
if not args.allow_paid:
|
||||
return None
|
||||
if not args.allowed_chat_id or not CHAT_ID_RE.match(args.allowed_chat_id):
|
||||
raise ValueError("--allowed-chat-id is required for --allow-paid and must be an integer")
|
||||
approval_ref = read_approval_ref(from_stdin=args.approval_ref_from_stdin)
|
||||
if not APPROVAL_REF_RE.match(approval_ref):
|
||||
raise ValueError("approval ref shape is invalid")
|
||||
return approval_ref
|
||||
|
||||
|
||||
def resolve_owner_group(owner: str, group: str, *, no_chown: bool) -> tuple[int | None, int | None]:
|
||||
if no_chown:
|
||||
return None, None
|
||||
return pwd.getpwnam(owner).pw_uid, grp.getgrnam(group).gr_gid
|
||||
|
||||
|
||||
def write_private_file(path: Path, content: str, *, uid: int | None, gid: int | None, dry_run: bool) -> None:
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
path.parent.mkdir(parents=True, mode=0o700, exist_ok=True)
|
||||
tmp_path = path.with_name(f".{path.name}.tmp-{os.getpid()}")
|
||||
fd = os.open(tmp_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
|
||||
try:
|
||||
with os.fdopen(fd, "w") as handle:
|
||||
handle.write(content)
|
||||
if not content.endswith("\n"):
|
||||
handle.write("\n")
|
||||
handle.flush()
|
||||
os.fsync(handle.fileno())
|
||||
os.chmod(tmp_path, 0o600)
|
||||
if uid is not None or gid is not None:
|
||||
os.chown(tmp_path, -1 if uid is None else uid, -1 if gid is None else gid)
|
||||
os.replace(tmp_path, path)
|
||||
os.chmod(path, 0o600)
|
||||
finally:
|
||||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
|
||||
|
||||
def run_command(command: list[str], *, dry_run: bool) -> dict:
|
||||
if dry_run:
|
||||
return {"command": command, "skipped": "dry_run"}
|
||||
proc = subprocess.run(command, check=False, text=True, capture_output=True)
|
||||
return {
|
||||
"command": command,
|
||||
"returncode": proc.returncode,
|
||||
"stdout": proc.stdout.strip(),
|
||||
"stderr": proc.stderr.strip(),
|
||||
}
|
||||
|
||||
|
||||
def build_env_content(*, allow_paid: bool, allowed_chat_id: str | None, max_usd: str, approval_ref_path: Path) -> str:
|
||||
lines = [
|
||||
f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID={'1' if allow_paid else '0'}",
|
||||
f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_MAX_USD={max_usd}",
|
||||
f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE={approval_ref_path}",
|
||||
]
|
||||
if allow_paid and allowed_chat_id:
|
||||
lines.insert(1, f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOWED_CHAT_ID={allowed_chat_id}")
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = parse_args(sys.argv[1:] if argv is None else argv)
|
||||
validate_agent_name(args.agent)
|
||||
max_usd = validate_max_usd(args.max_usd)
|
||||
approval_ref = validate_paid_inputs(args)
|
||||
uid, gid = resolve_owner_group(args.owner, args.group, no_chown=args.no_chown)
|
||||
|
||||
secrets_dir = Path(args.secrets_dir)
|
||||
env_path = secrets_dir / f"teleo-agent-{args.agent}.env"
|
||||
approval_ref_path = secrets_dir / f"{args.agent}-smart-research-approval-ref"
|
||||
env_content = build_env_content(
|
||||
allow_paid=args.allow_paid,
|
||||
allowed_chat_id=args.allowed_chat_id,
|
||||
max_usd=max_usd,
|
||||
approval_ref_path=approval_ref_path,
|
||||
)
|
||||
|
||||
write_private_file(env_path, env_content, uid=uid, gid=gid, dry_run=args.dry_run)
|
||||
approval_ref_written = False
|
||||
if args.allow_paid and approval_ref is not None:
|
||||
write_private_file(approval_ref_path, approval_ref, uid=uid, gid=gid, dry_run=args.dry_run)
|
||||
approval_ref_written = not args.dry_run
|
||||
|
||||
unit = f"teleo-agent@{args.agent}.service"
|
||||
restart_result = None
|
||||
if args.restart_service:
|
||||
restart_result = run_command(["systemctl", "restart", unit], dry_run=args.dry_run)
|
||||
|
||||
proof = {
|
||||
"schema": "livingip.telegramSmartResearchGateInstall.v1",
|
||||
"generatedAt": datetime.now(timezone.utc).isoformat(),
|
||||
"ok": True,
|
||||
"agent": args.agent,
|
||||
"envPath": str(env_path),
|
||||
"envFileWritten": not args.dry_run,
|
||||
"approvalRefPath": str(approval_ref_path),
|
||||
"approvalRefWritten": approval_ref_written,
|
||||
"approvalRefPresent": bool(args.allow_paid),
|
||||
"allowedChatIdPresent": bool(args.allowed_chat_id),
|
||||
"maxUsd": max_usd,
|
||||
"paidEnabled": bool(args.allow_paid),
|
||||
"dryRun": args.dry_run,
|
||||
"fileMode": "0600",
|
||||
"owner": None if args.no_chown else args.owner,
|
||||
"group": None if args.no_chown else args.group,
|
||||
"serviceUnit": unit,
|
||||
"serviceRestart": restart_result,
|
||||
"secretValuesIncluded": False,
|
||||
}
|
||||
|
||||
output = json.dumps(proof, indent=2, sort_keys=True) + "\n"
|
||||
if args.output:
|
||||
output_path = Path(args.output)
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(output)
|
||||
print(output, end="")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
raise SystemExit(main())
|
||||
except ValueError as exc:
|
||||
print(f"ERROR: {exc}", file=sys.stderr)
|
||||
raise SystemExit(1) from None
|
||||
106
tests/test_install_telegram_smart_research_gates.py
Normal file
106
tests/test_install_telegram_smart_research_gates.py
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
"""Tests for safe Telegram smart-research gate installation."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
SCRIPT = REPO_ROOT / "scripts" / "install_telegram_smart_research_gates.py"
|
||||
|
||||
|
||||
def run_installer(
|
||||
args: list[str],
|
||||
*,
|
||||
approval_ref: str = "approval_ref_livingip_x402_20260622",
|
||||
) -> subprocess.CompletedProcess:
|
||||
return subprocess.run(
|
||||
[sys.executable, str(SCRIPT), *args],
|
||||
input=approval_ref,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
|
||||
def test_installs_paid_gate_from_stdin_without_echoing_secret_or_chat_id(tmp_path):
|
||||
approval_ref = "approval_ref_livingip_x402_20260622"
|
||||
chat_id = "-1001234567890"
|
||||
proof_path = tmp_path / "proof.json"
|
||||
proc = run_installer(
|
||||
[
|
||||
"--agent",
|
||||
"leo-wallet-test",
|
||||
"--secrets-dir",
|
||||
str(tmp_path / "secrets"),
|
||||
"--allow-paid",
|
||||
"--allowed-chat-id",
|
||||
chat_id,
|
||||
"--approval-ref-from-stdin",
|
||||
"--no-chown",
|
||||
"--output",
|
||||
str(proof_path),
|
||||
],
|
||||
approval_ref=approval_ref,
|
||||
)
|
||||
|
||||
assert proc.returncode == 0, proc.stderr
|
||||
combined_output = proc.stdout + proc.stderr + proof_path.read_text()
|
||||
assert approval_ref not in combined_output
|
||||
assert chat_id not in combined_output
|
||||
|
||||
proof = json.loads(proof_path.read_text())
|
||||
env_path = Path(proof["envPath"])
|
||||
approval_path = Path(proof["approvalRefPath"])
|
||||
assert proof["ok"] is True
|
||||
assert proof["agent"] == "leo-wallet-test"
|
||||
assert proof["paidEnabled"] is True
|
||||
assert proof["approvalRefPresent"] is True
|
||||
assert proof["allowedChatIdPresent"] is True
|
||||
assert proof["secretValuesIncluded"] is False
|
||||
|
||||
env_content = env_path.read_text()
|
||||
assert "LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID=1" in env_content
|
||||
assert f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOWED_CHAT_ID={chat_id}" in env_content
|
||||
assert f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE={approval_path}" in env_content
|
||||
assert approval_path.read_text().strip() == approval_ref
|
||||
|
||||
assert stat.S_IMODE(os.stat(env_path).st_mode) == 0o600
|
||||
assert stat.S_IMODE(os.stat(approval_path).st_mode) == 0o600
|
||||
|
||||
|
||||
def test_installs_disabled_gate_without_approval_ref(tmp_path):
|
||||
proof_path = tmp_path / "proof.json"
|
||||
proc = run_installer(
|
||||
[
|
||||
"--agent",
|
||||
"leo-wallet-test",
|
||||
"--secrets-dir",
|
||||
str(tmp_path / "secrets"),
|
||||
"--no-chown",
|
||||
"--output",
|
||||
str(proof_path),
|
||||
],
|
||||
approval_ref="",
|
||||
)
|
||||
|
||||
assert proc.returncode == 0, proc.stderr
|
||||
proof = json.loads(proof_path.read_text())
|
||||
env_path = Path(proof["envPath"])
|
||||
approval_path = Path(proof["approvalRefPath"])
|
||||
assert proof["paidEnabled"] is False
|
||||
assert proof["approvalRefWritten"] is False
|
||||
assert "LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID=0" in env_path.read_text()
|
||||
assert not approval_path.exists()
|
||||
|
||||
|
||||
def test_refuses_cli_approval_ref_argument_without_echoing_secret():
|
||||
approval_ref = "approval_ref_livingip_x402_20260622"
|
||||
proc = run_installer(["--approval-ref", approval_ref], approval_ref="")
|
||||
|
||||
combined_output = proc.stdout + proc.stderr
|
||||
assert proc.returncode == 2
|
||||
assert approval_ref not in combined_output
|
||||
assert "Secret-bearing CLI args are not accepted" in combined_output
|
||||
Loading…
Reference in a new issue