teleo-infrastructure/telegram/x-ingest.py
m3taversal 670c50f384
Some checks are pending
CI / lint-and-test (push) Waiting to run
fix: add telegram/ and tests/ to deploy pipeline, remove hardcoded API key
deploy.sh was missing telegram/ and tests/ directories — code existed in
repo but never synced to VPS. Also removes hardcoded twitterapi.io key
from x-ingest.py (reads from secrets file like all other modules).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 17:15:55 +01:00

96 lines
3 KiB
Python

#!/usr/bin/env python3
"""Pull all tweets from specified X accounts and save as JSON archives."""
import json
import sys
import time
import urllib.request
from pathlib import Path
API_KEY_FILE = "/opt/teleo-eval/secrets/twitterapi-io-key"
def _load_api_key():
try:
return Path(API_KEY_FILE).read_text().strip()
except FileNotFoundError:
print(f"ERROR: API key not found at {API_KEY_FILE}", file=sys.stderr)
sys.exit(1)
API_KEY = _load_api_key()
BASE = "https://api.twitterapi.io/twitter/user/last_tweets"
OUT_DIR = "/opt/teleo-eval/x-archives"
ACCOUNTS = [
"m3taversal",
"Living_IP",
"teLEOhuman",
"aiCLAYno",
"futaRdIO_ai",
]
import os
os.makedirs(OUT_DIR, exist_ok=True)
def fetch_page(username, cursor=None):
url = f"{BASE}?userName={username}"
if cursor:
url += f"&cursor={cursor}"
req = urllib.request.Request(url, headers={"X-API-Key": API_KEY})
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
except Exception as e:
print(f" ERROR fetching {username}: {e}")
return None
def pull_all_tweets(username):
all_tweets = []
cursor = None
page = 0
while True:
page += 1
print(f" Page {page} (cursor: {'yes' if cursor else 'start'})...", end=" ")
data = fetch_page(username, cursor)
if not data or data.get("status") != "success":
print(f"FAILED: {data}")
break
tweets = data.get("data", {}).get("tweets", [])
next_cursor = data.get("data", {}).get("next_cursor")
# Deduplicate
seen_ids = {t["id"] for t in all_tweets}
new_tweets = [t for t in tweets if t["id"] not in seen_ids]
all_tweets.extend(new_tweets)
print(f"{len(new_tweets)} new tweets (total: {len(all_tweets)})")
if not next_cursor or not new_tweets:
break
cursor = next_cursor
time.sleep(1) # Rate limit courtesy
return all_tweets
for account in ACCOUNTS:
print(f"\n=== @{account} ===")
tweets = pull_all_tweets(account)
# Save raw
outfile = os.path.join(OUT_DIR, f"{account}-tweets.json")
with open(outfile, "w") as f:
json.dump({"account": account, "tweet_count": len(tweets), "tweets": tweets}, f, indent=2)
print(f" Saved {len(tweets)} tweets to {outfile}")
# Quick stats
originals = [t for t in tweets if not t.get("text", "").startswith("RT @") and not t.get("isReply")]
replies = [t for t in tweets if t.get("isReply")]
rts = [t for t in tweets if t.get("text", "").startswith("RT @")]
print(f" Breakdown: {len(originals)} original, {len(replies)} replies, {len(rts)} RTs")
if originals:
top = sorted(originals, key=lambda t: int(t.get("viewCount", 0) or 0), reverse=True)[:5]
print(f" Top 5 by views:")
for t in top:
text = t["text"][:80].replace("\n", " ")
print(f" {t.get('viewCount', '?')} views | {t.get('likeCount', '?')} likes | {text}...")
print("\n=== DONE ===")