#!/usr/bin/env python3 """Install Telegram smart-research paid gates without printing approval refs.""" from __future__ import annotations import argparse import getpass import grp import json import os import pwd import re import subprocess import sys from datetime import datetime, timezone from pathlib import Path APPROVAL_REF_RE = re.compile(r"^[A-Za-z0-9._:@/-]{8,256}$") CHAT_ID_RE = re.compile(r"^-?\d+$") MAX_SMART_RESEARCH_USD = 0.06 def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Install server-side gates for Leo Telegram smart-research paid execution.", allow_abbrev=False, ) parser.add_argument("--agent", default="leo-wallet-test", help="Agent instance name for teleo-agent@") parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets") parser.add_argument("--allow-paid", action="store_true", help="Enable paid smart research for one allowed chat") parser.add_argument("--allowed-chat-id", help="Telegram chat id allowed to trigger paid smart research") parser.add_argument("--max-usd", default="0.01", help="Maximum spend per Telegram smart-research call") parser.add_argument( "--approval-ref-from-stdin", action="store_true", help="Read approval ref from stdin instead of a hidden prompt", ) parser.add_argument("--owner", default="teleo", help="Env/approval file owner after write") parser.add_argument("--group", default="teleo", help="Env/approval file group after write") parser.add_argument("--no-chown", action="store_true", help="Skip chown; useful for local tests") parser.add_argument("--dry-run", action="store_true", help="Validate inputs without writing files") parser.add_argument("--restart-service", action="store_true", help="Restart teleo-agent@.service") parser.add_argument("--output", help="Optional sanitized JSON proof path") namespace, unknown = parser.parse_known_args(argv) if unknown: print( "ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.", file=sys.stderr, ) raise SystemExit(2) return namespace def read_approval_ref(*, from_stdin: bool) -> str: approval_ref = sys.stdin.read() if from_stdin else getpass.getpass("Leo smart-research approval ref: ") return approval_ref.strip() def validate_agent_name(agent: str) -> None: if not re.match(r"^[A-Za-z0-9_.-]+$", agent): raise ValueError("agent must contain only letters, numbers, dot, dash, or underscore") def validate_max_usd(value: str) -> str: try: parsed = float(value) except ValueError: raise ValueError("max-usd must be numeric") from None if parsed <= 0 or parsed > MAX_SMART_RESEARCH_USD: raise ValueError(f"max-usd must be greater than 0 and no more than {MAX_SMART_RESEARCH_USD:.2f}") return f"{parsed:.2f}" def validate_paid_inputs(args: argparse.Namespace) -> str | None: if not args.allow_paid: return None if not args.allowed_chat_id or not CHAT_ID_RE.match(args.allowed_chat_id): raise ValueError("--allowed-chat-id is required for --allow-paid and must be an integer") approval_ref = read_approval_ref(from_stdin=args.approval_ref_from_stdin) if not APPROVAL_REF_RE.match(approval_ref): raise ValueError("approval ref shape is invalid") return approval_ref def resolve_owner_group(owner: str, group: str, *, no_chown: bool) -> tuple[int | None, int | None]: if no_chown: return None, None return pwd.getpwnam(owner).pw_uid, grp.getgrnam(group).gr_gid def write_private_file(path: Path, content: str, *, uid: int | None, gid: int | None, dry_run: bool) -> None: if dry_run: return path.parent.mkdir(parents=True, mode=0o700, exist_ok=True) tmp_path = path.with_name(f".{path.name}.tmp-{os.getpid()}") fd = os.open(tmp_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600) try: with os.fdopen(fd, "w") as handle: handle.write(content) if not content.endswith("\n"): handle.write("\n") handle.flush() os.fsync(handle.fileno()) os.chmod(tmp_path, 0o600) if uid is not None or gid is not None: os.chown(tmp_path, -1 if uid is None else uid, -1 if gid is None else gid) os.replace(tmp_path, path) os.chmod(path, 0o600) finally: if tmp_path.exists(): tmp_path.unlink() def run_command(command: list[str], *, dry_run: bool) -> dict: if dry_run: return {"command": command, "skipped": "dry_run"} proc = subprocess.run(command, check=False, text=True, capture_output=True) return { "command": command, "returncode": proc.returncode, "stdout": proc.stdout.strip(), "stderr": proc.stderr.strip(), } def build_env_content(*, allow_paid: bool, allowed_chat_id: str | None, max_usd: str, approval_ref_path: Path) -> str: lines = [ f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID={'1' if allow_paid else '0'}", f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_MAX_USD={max_usd}", f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE={approval_ref_path}", ] if allow_paid and allowed_chat_id: lines.insert(1, f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOWED_CHAT_ID={allowed_chat_id}") return "\n".join(lines) + "\n" def main(argv: list[str] | None = None) -> int: args = parse_args(sys.argv[1:] if argv is None else argv) validate_agent_name(args.agent) max_usd = validate_max_usd(args.max_usd) approval_ref = validate_paid_inputs(args) uid, gid = resolve_owner_group(args.owner, args.group, no_chown=args.no_chown) secrets_dir = Path(args.secrets_dir) env_path = secrets_dir / f"teleo-agent-{args.agent}.env" approval_ref_path = secrets_dir / f"{args.agent}-smart-research-approval-ref" env_content = build_env_content( allow_paid=args.allow_paid, allowed_chat_id=args.allowed_chat_id, max_usd=max_usd, approval_ref_path=approval_ref_path, ) write_private_file(env_path, env_content, uid=uid, gid=gid, dry_run=args.dry_run) approval_ref_written = False if args.allow_paid and approval_ref is not None: write_private_file(approval_ref_path, approval_ref, uid=uid, gid=gid, dry_run=args.dry_run) approval_ref_written = not args.dry_run unit = f"teleo-agent@{args.agent}.service" restart_result = None if args.restart_service: restart_result = run_command(["systemctl", "restart", unit], dry_run=args.dry_run) proof = { "schema": "livingip.telegramSmartResearchGateInstall.v1", "generatedAt": datetime.now(timezone.utc).isoformat(), "ok": True, "agent": args.agent, "envPath": str(env_path), "envFileWritten": not args.dry_run, "approvalRefPath": str(approval_ref_path), "approvalRefWritten": approval_ref_written, "approvalRefPresent": bool(args.allow_paid), "allowedChatIdPresent": bool(args.allowed_chat_id), "maxUsd": max_usd, "paidEnabled": bool(args.allow_paid), "dryRun": args.dry_run, "fileMode": "0600", "owner": None if args.no_chown else args.owner, "group": None if args.no_chown else args.group, "serviceUnit": unit, "serviceRestart": restart_result, "secretValuesIncluded": False, } output = json.dumps(proof, indent=2, sort_keys=True) + "\n" if args.output: output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(output) print(output, end="") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except ValueError as exc: print(f"ERROR: {exc}", file=sys.stderr) raise SystemExit(1) from None