* Wire Leo Telegram x402 smart research * Suppress token-bearing Telegram HTTP logs * Keep Telegram typing visible during Leo proxy calls * Allow Leo Telegram social research spend cap * Route contextual Leo research prompts to smart research * Generalize Leo smart research intent routing * Resume Leo smart research from paid work orders
203 lines
7.8 KiB
Python
203 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Install Telegram smart-research paid gates without printing approval refs."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import getpass
|
|
import grp
|
|
import json
|
|
import os
|
|
import pwd
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
APPROVAL_REF_RE = re.compile(r"^[A-Za-z0-9._:@/-]{8,256}$")
|
|
CHAT_ID_RE = re.compile(r"^-?\d+$")
|
|
MAX_SMART_RESEARCH_USD = 0.06
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Install server-side gates for Leo Telegram smart-research paid execution.",
|
|
allow_abbrev=False,
|
|
)
|
|
parser.add_argument("--agent", default="leo-wallet-test", help="Agent instance name for teleo-agent@<agent>")
|
|
parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets")
|
|
parser.add_argument("--allow-paid", action="store_true", help="Enable paid smart research for one allowed chat")
|
|
parser.add_argument("--allowed-chat-id", help="Telegram chat id allowed to trigger paid smart research")
|
|
parser.add_argument("--max-usd", default="0.01", help="Maximum spend per Telegram smart-research call")
|
|
parser.add_argument(
|
|
"--approval-ref-from-stdin",
|
|
action="store_true",
|
|
help="Read approval ref from stdin instead of a hidden prompt",
|
|
)
|
|
parser.add_argument("--owner", default="teleo", help="Env/approval file owner after write")
|
|
parser.add_argument("--group", default="teleo", help="Env/approval file group after write")
|
|
parser.add_argument("--no-chown", action="store_true", help="Skip chown; useful for local tests")
|
|
parser.add_argument("--dry-run", action="store_true", help="Validate inputs without writing files")
|
|
parser.add_argument("--restart-service", action="store_true", help="Restart teleo-agent@<agent>.service")
|
|
parser.add_argument("--output", help="Optional sanitized JSON proof path")
|
|
|
|
namespace, unknown = parser.parse_known_args(argv)
|
|
if unknown:
|
|
print(
|
|
"ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(2)
|
|
return namespace
|
|
|
|
|
|
def read_approval_ref(*, from_stdin: bool) -> str:
|
|
approval_ref = sys.stdin.read() if from_stdin else getpass.getpass("Leo smart-research approval ref: ")
|
|
return approval_ref.strip()
|
|
|
|
|
|
def validate_agent_name(agent: str) -> None:
|
|
if not re.match(r"^[A-Za-z0-9_.-]+$", agent):
|
|
raise ValueError("agent must contain only letters, numbers, dot, dash, or underscore")
|
|
|
|
|
|
def validate_max_usd(value: str) -> str:
|
|
try:
|
|
parsed = float(value)
|
|
except ValueError:
|
|
raise ValueError("max-usd must be numeric") from None
|
|
if parsed <= 0 or parsed > MAX_SMART_RESEARCH_USD:
|
|
raise ValueError(f"max-usd must be greater than 0 and no more than {MAX_SMART_RESEARCH_USD:.2f}")
|
|
return f"{parsed:.2f}"
|
|
|
|
|
|
def validate_paid_inputs(args: argparse.Namespace) -> str | None:
|
|
if not args.allow_paid:
|
|
return None
|
|
if not args.allowed_chat_id or not CHAT_ID_RE.match(args.allowed_chat_id):
|
|
raise ValueError("--allowed-chat-id is required for --allow-paid and must be an integer")
|
|
approval_ref = read_approval_ref(from_stdin=args.approval_ref_from_stdin)
|
|
if not APPROVAL_REF_RE.match(approval_ref):
|
|
raise ValueError("approval ref shape is invalid")
|
|
return approval_ref
|
|
|
|
|
|
def resolve_owner_group(owner: str, group: str, *, no_chown: bool) -> tuple[int | None, int | None]:
|
|
if no_chown:
|
|
return None, None
|
|
return pwd.getpwnam(owner).pw_uid, grp.getgrnam(group).gr_gid
|
|
|
|
|
|
def write_private_file(path: Path, content: str, *, uid: int | None, gid: int | None, dry_run: bool) -> None:
|
|
if dry_run:
|
|
return
|
|
|
|
path.parent.mkdir(parents=True, mode=0o700, exist_ok=True)
|
|
tmp_path = path.with_name(f".{path.name}.tmp-{os.getpid()}")
|
|
fd = os.open(tmp_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
|
|
try:
|
|
with os.fdopen(fd, "w") as handle:
|
|
handle.write(content)
|
|
if not content.endswith("\n"):
|
|
handle.write("\n")
|
|
handle.flush()
|
|
os.fsync(handle.fileno())
|
|
os.chmod(tmp_path, 0o600)
|
|
if uid is not None or gid is not None:
|
|
os.chown(tmp_path, -1 if uid is None else uid, -1 if gid is None else gid)
|
|
os.replace(tmp_path, path)
|
|
os.chmod(path, 0o600)
|
|
finally:
|
|
if tmp_path.exists():
|
|
tmp_path.unlink()
|
|
|
|
|
|
def run_command(command: list[str], *, dry_run: bool) -> dict:
|
|
if dry_run:
|
|
return {"command": command, "skipped": "dry_run"}
|
|
proc = subprocess.run(command, check=False, text=True, capture_output=True)
|
|
return {
|
|
"command": command,
|
|
"returncode": proc.returncode,
|
|
"stdout": proc.stdout.strip(),
|
|
"stderr": proc.stderr.strip(),
|
|
}
|
|
|
|
|
|
def build_env_content(*, allow_paid: bool, allowed_chat_id: str | None, max_usd: str, approval_ref_path: Path) -> str:
|
|
lines = [
|
|
f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID={'1' if allow_paid else '0'}",
|
|
f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_MAX_USD={max_usd}",
|
|
f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE={approval_ref_path}",
|
|
]
|
|
if allow_paid and allowed_chat_id:
|
|
lines.insert(1, f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOWED_CHAT_ID={allowed_chat_id}")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = parse_args(sys.argv[1:] if argv is None else argv)
|
|
validate_agent_name(args.agent)
|
|
max_usd = validate_max_usd(args.max_usd)
|
|
approval_ref = validate_paid_inputs(args)
|
|
uid, gid = resolve_owner_group(args.owner, args.group, no_chown=args.no_chown)
|
|
|
|
secrets_dir = Path(args.secrets_dir)
|
|
env_path = secrets_dir / f"teleo-agent-{args.agent}.env"
|
|
approval_ref_path = secrets_dir / f"{args.agent}-smart-research-approval-ref"
|
|
env_content = build_env_content(
|
|
allow_paid=args.allow_paid,
|
|
allowed_chat_id=args.allowed_chat_id,
|
|
max_usd=max_usd,
|
|
approval_ref_path=approval_ref_path,
|
|
)
|
|
|
|
write_private_file(env_path, env_content, uid=uid, gid=gid, dry_run=args.dry_run)
|
|
approval_ref_written = False
|
|
if args.allow_paid and approval_ref is not None:
|
|
write_private_file(approval_ref_path, approval_ref, uid=uid, gid=gid, dry_run=args.dry_run)
|
|
approval_ref_written = not args.dry_run
|
|
|
|
unit = f"teleo-agent@{args.agent}.service"
|
|
restart_result = None
|
|
if args.restart_service:
|
|
restart_result = run_command(["systemctl", "restart", unit], dry_run=args.dry_run)
|
|
|
|
proof = {
|
|
"schema": "livingip.telegramSmartResearchGateInstall.v1",
|
|
"generatedAt": datetime.now(timezone.utc).isoformat(),
|
|
"ok": True,
|
|
"agent": args.agent,
|
|
"envPath": str(env_path),
|
|
"envFileWritten": not args.dry_run,
|
|
"approvalRefPath": str(approval_ref_path),
|
|
"approvalRefWritten": approval_ref_written,
|
|
"approvalRefPresent": bool(args.allow_paid),
|
|
"allowedChatIdPresent": bool(args.allowed_chat_id),
|
|
"maxUsd": max_usd,
|
|
"paidEnabled": bool(args.allow_paid),
|
|
"dryRun": args.dry_run,
|
|
"fileMode": "0600",
|
|
"owner": None if args.no_chown else args.owner,
|
|
"group": None if args.no_chown else args.group,
|
|
"serviceUnit": unit,
|
|
"serviceRestart": restart_result,
|
|
"secretValuesIncluded": False,
|
|
}
|
|
|
|
output = json.dumps(proof, indent=2, sort_keys=True) + "\n"
|
|
if args.output:
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(output)
|
|
print(output, end="")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main())
|
|
except ValueError as exc:
|
|
print(f"ERROR: {exc}", file=sys.stderr)
|
|
raise SystemExit(1) from None
|