teleo-infrastructure/scripts/install_telegram_smart_research_gates.py
twentyOne2x bfc28e084b
Wire Leo Telegram x402 smart research (#17)
* Wire Leo Telegram x402 smart research

* Suppress token-bearing Telegram HTTP logs

* Keep Telegram typing visible during Leo proxy calls

* Allow Leo Telegram social research spend cap

* Route contextual Leo research prompts to smart research

* Generalize Leo smart research intent routing

* Resume Leo smart research from paid work orders
2026-06-23 18:37:33 +02:00

203 lines
7.8 KiB
Python

#!/usr/bin/env python3
"""Install Telegram smart-research paid gates without printing approval refs."""
from __future__ import annotations
import argparse
import getpass
import grp
import json
import os
import pwd
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
APPROVAL_REF_RE = re.compile(r"^[A-Za-z0-9._:@/-]{8,256}$")
CHAT_ID_RE = re.compile(r"^-?\d+$")
MAX_SMART_RESEARCH_USD = 0.06
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Install server-side gates for Leo Telegram smart-research paid execution.",
allow_abbrev=False,
)
parser.add_argument("--agent", default="leo-wallet-test", help="Agent instance name for teleo-agent@<agent>")
parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets")
parser.add_argument("--allow-paid", action="store_true", help="Enable paid smart research for one allowed chat")
parser.add_argument("--allowed-chat-id", help="Telegram chat id allowed to trigger paid smart research")
parser.add_argument("--max-usd", default="0.01", help="Maximum spend per Telegram smart-research call")
parser.add_argument(
"--approval-ref-from-stdin",
action="store_true",
help="Read approval ref from stdin instead of a hidden prompt",
)
parser.add_argument("--owner", default="teleo", help="Env/approval file owner after write")
parser.add_argument("--group", default="teleo", help="Env/approval file group after write")
parser.add_argument("--no-chown", action="store_true", help="Skip chown; useful for local tests")
parser.add_argument("--dry-run", action="store_true", help="Validate inputs without writing files")
parser.add_argument("--restart-service", action="store_true", help="Restart teleo-agent@<agent>.service")
parser.add_argument("--output", help="Optional sanitized JSON proof path")
namespace, unknown = parser.parse_known_args(argv)
if unknown:
print(
"ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.",
file=sys.stderr,
)
raise SystemExit(2)
return namespace
def read_approval_ref(*, from_stdin: bool) -> str:
approval_ref = sys.stdin.read() if from_stdin else getpass.getpass("Leo smart-research approval ref: ")
return approval_ref.strip()
def validate_agent_name(agent: str) -> None:
if not re.match(r"^[A-Za-z0-9_.-]+$", agent):
raise ValueError("agent must contain only letters, numbers, dot, dash, or underscore")
def validate_max_usd(value: str) -> str:
try:
parsed = float(value)
except ValueError:
raise ValueError("max-usd must be numeric") from None
if parsed <= 0 or parsed > MAX_SMART_RESEARCH_USD:
raise ValueError(f"max-usd must be greater than 0 and no more than {MAX_SMART_RESEARCH_USD:.2f}")
return f"{parsed:.2f}"
def validate_paid_inputs(args: argparse.Namespace) -> str | None:
if not args.allow_paid:
return None
if not args.allowed_chat_id or not CHAT_ID_RE.match(args.allowed_chat_id):
raise ValueError("--allowed-chat-id is required for --allow-paid and must be an integer")
approval_ref = read_approval_ref(from_stdin=args.approval_ref_from_stdin)
if not APPROVAL_REF_RE.match(approval_ref):
raise ValueError("approval ref shape is invalid")
return approval_ref
def resolve_owner_group(owner: str, group: str, *, no_chown: bool) -> tuple[int | None, int | None]:
if no_chown:
return None, None
return pwd.getpwnam(owner).pw_uid, grp.getgrnam(group).gr_gid
def write_private_file(path: Path, content: str, *, uid: int | None, gid: int | None, dry_run: bool) -> None:
if dry_run:
return
path.parent.mkdir(parents=True, mode=0o700, exist_ok=True)
tmp_path = path.with_name(f".{path.name}.tmp-{os.getpid()}")
fd = os.open(tmp_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
try:
with os.fdopen(fd, "w") as handle:
handle.write(content)
if not content.endswith("\n"):
handle.write("\n")
handle.flush()
os.fsync(handle.fileno())
os.chmod(tmp_path, 0o600)
if uid is not None or gid is not None:
os.chown(tmp_path, -1 if uid is None else uid, -1 if gid is None else gid)
os.replace(tmp_path, path)
os.chmod(path, 0o600)
finally:
if tmp_path.exists():
tmp_path.unlink()
def run_command(command: list[str], *, dry_run: bool) -> dict:
if dry_run:
return {"command": command, "skipped": "dry_run"}
proc = subprocess.run(command, check=False, text=True, capture_output=True)
return {
"command": command,
"returncode": proc.returncode,
"stdout": proc.stdout.strip(),
"stderr": proc.stderr.strip(),
}
def build_env_content(*, allow_paid: bool, allowed_chat_id: str | None, max_usd: str, approval_ref_path: Path) -> str:
lines = [
f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID={'1' if allow_paid else '0'}",
f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_MAX_USD={max_usd}",
f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE={approval_ref_path}",
]
if allow_paid and allowed_chat_id:
lines.insert(1, f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOWED_CHAT_ID={allowed_chat_id}")
return "\n".join(lines) + "\n"
def main(argv: list[str] | None = None) -> int:
args = parse_args(sys.argv[1:] if argv is None else argv)
validate_agent_name(args.agent)
max_usd = validate_max_usd(args.max_usd)
approval_ref = validate_paid_inputs(args)
uid, gid = resolve_owner_group(args.owner, args.group, no_chown=args.no_chown)
secrets_dir = Path(args.secrets_dir)
env_path = secrets_dir / f"teleo-agent-{args.agent}.env"
approval_ref_path = secrets_dir / f"{args.agent}-smart-research-approval-ref"
env_content = build_env_content(
allow_paid=args.allow_paid,
allowed_chat_id=args.allowed_chat_id,
max_usd=max_usd,
approval_ref_path=approval_ref_path,
)
write_private_file(env_path, env_content, uid=uid, gid=gid, dry_run=args.dry_run)
approval_ref_written = False
if args.allow_paid and approval_ref is not None:
write_private_file(approval_ref_path, approval_ref, uid=uid, gid=gid, dry_run=args.dry_run)
approval_ref_written = not args.dry_run
unit = f"teleo-agent@{args.agent}.service"
restart_result = None
if args.restart_service:
restart_result = run_command(["systemctl", "restart", unit], dry_run=args.dry_run)
proof = {
"schema": "livingip.telegramSmartResearchGateInstall.v1",
"generatedAt": datetime.now(timezone.utc).isoformat(),
"ok": True,
"agent": args.agent,
"envPath": str(env_path),
"envFileWritten": not args.dry_run,
"approvalRefPath": str(approval_ref_path),
"approvalRefWritten": approval_ref_written,
"approvalRefPresent": bool(args.allow_paid),
"allowedChatIdPresent": bool(args.allowed_chat_id),
"maxUsd": max_usd,
"paidEnabled": bool(args.allow_paid),
"dryRun": args.dry_run,
"fileMode": "0600",
"owner": None if args.no_chown else args.owner,
"group": None if args.no_chown else args.group,
"serviceUnit": unit,
"serviceRestart": restart_result,
"secretValuesIncluded": False,
}
output = json.dumps(proof, indent=2, sort_keys=True) + "\n"
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(output)
print(output, end="")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except ValueError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
raise SystemExit(1) from None