teleo-infrastructure/lib/costs.py
m3taversal 681afad506
Some checks failed
CI / lint-and-test (push) Has been cancelled
Consolidate pipeline code from teleo-codex + VPS into single repo
Sources merged:
- teleo-codex/ops/pipeline-v2/ (11 newer lib files, 5 new lib modules)
- teleo-codex/ops/ (agent-state, diagnostics expansion, systemd units, ops scripts)
- VPS /opt/teleo-eval/telegram/ (10 new bot files, agent configs)
- VPS /opt/teleo-eval/pipeline/ops/ (vector-gc, backfill-descriptions)
- VPS /opt/teleo-eval/sync-mirror.sh (Bug 2 + Step 2.5 fixes)

Non-trivial merges:
- connect.py: kept codex threshold (0.65) + added infra domain parameter
- watchdog.py: kept infra version (stale_pr integration, superset of codex)
- deploy.sh: codex rsync version (interim, until VPS git clone migration)
- diagnostics/app.py: codex decomposed dashboard (14 new route modules)

81 files changed, +17105/-200 lines

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 16:52:26 +01:00

110 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Cost tracking — per-model per-day with budget enforcement."""
import logging
from datetime import date
from . import config
logger = logging.getLogger("pipeline.costs")
def record_usage(
conn,
model: str,
stage: str,
input_tokens: int = 0,
output_tokens: int = 0,
backend: str = "api",
duration_ms: int = 0,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
cost_estimate_usd: float = 0.0,
):
"""Record usage and compute cost. Returns cost in USD.
backend: "max" (Claude Max subscription, free) or "api" (paid).
Claude Max calls are tracked for volume metrics but cost $0. (Ganymede)
"""
# Always compute estimated cost from tokens × published rates
rates = config.MODEL_COSTS.get(model)
if rates and (input_tokens or output_tokens):
estimated = (input_tokens * rates["input"] + output_tokens * rates["output"]) / 1000
# Cache reads are ~90% cheaper than regular input
if cache_read_tokens and rates:
estimated += (cache_read_tokens * rates["input"] * 0.1) / 1000
if cache_write_tokens and rates:
estimated += (cache_write_tokens * rates["input"] * 1.25) / 1000
else:
estimated = 0.0
# Use caller-provided estimate if we can't compute (e.g. CLI gives its own)
if cost_estimate_usd > 0 and estimated == 0:
estimated = cost_estimate_usd
cost_estimate_usd = estimated
if backend == "max":
cost = 0.0 # subscription — no actual spend
else:
cost = estimated if estimated > 0 else 0.0
today = date.today().isoformat()
# Include backend in the stage key so max vs api are tracked separately
stage_key = f"{stage}:{backend}" if backend != "api" else stage
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd,
duration_ms, cache_read_tokens, cache_write_tokens, cost_estimate_usd)
VALUES (?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (date, model, stage) DO UPDATE SET
calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd,
duration_ms = duration_ms + excluded.duration_ms,
cache_read_tokens = cache_read_tokens + excluded.cache_read_tokens,
cache_write_tokens = cache_write_tokens + excluded.cache_write_tokens,
cost_estimate_usd = cost_estimate_usd + excluded.cost_estimate_usd""",
(today, model, stage_key, input_tokens, output_tokens, cost,
duration_ms, cache_read_tokens, cache_write_tokens, cost_estimate_usd),
)
return cost
def get_daily_spend(conn, day: str = None) -> float:
"""Get total OpenRouter spend for a given day (default: today)."""
if day is None:
day = date.today().isoformat()
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd), 0) as total FROM costs WHERE date = ?",
(day,),
).fetchone()
return row["total"]
def get_daily_breakdown(conn, day: str = None) -> list:
"""Get per-model per-stage breakdown for a day."""
if day is None:
day = date.today().isoformat()
rows = conn.execute(
"""SELECT model, stage, calls, input_tokens, output_tokens, cost_usd,
duration_ms, cache_read_tokens, cache_write_tokens, cost_estimate_usd
FROM costs WHERE date = ? ORDER BY cost_usd DESC""",
(day,),
).fetchall()
return [dict(r) for r in rows]
def check_budget(conn) -> dict:
"""Check budget status. Returns {ok, spend, budget, pct}."""
spend = get_daily_spend(conn)
pct = spend / config.OPENROUTER_DAILY_BUDGET if config.OPENROUTER_DAILY_BUDGET > 0 else 0
return {
"ok": pct < 1.0,
"warn": pct >= config.OPENROUTER_WARN_THRESHOLD,
"spend": round(spend, 4),
"budget": config.OPENROUTER_DAILY_BUDGET,
"pct": round(pct * 100, 1),
}
def budget_allows(conn) -> bool:
"""Quick check: is spending under daily budget?"""
return check_budget(conn)["ok"]