- teleo-pipeline.py: async daemon with 4 stage loops (ingest/validate/evaluate/merge) - lib/: config, db, evaluate, validate, merge, breaker, costs, health, log modules - INFRASTRUCTURE.md: comprehensive deep-dive for onboarding - teleo-pipeline.service: systemd unit file Pentagon-Agent: Leo <294C3CA1-0205-4668-82FA-B984D54F48AD>
88 lines
2.8 KiB
Python
88 lines
2.8 KiB
Python
"""Cost tracking — per-model per-day with budget enforcement."""
|
|
|
|
import logging
|
|
from datetime import date
|
|
|
|
from . import config
|
|
|
|
logger = logging.getLogger("pipeline.costs")
|
|
|
|
|
|
def record_usage(
|
|
conn,
|
|
model: str,
|
|
stage: str,
|
|
input_tokens: int = 0,
|
|
output_tokens: int = 0,
|
|
backend: str = "api",
|
|
):
|
|
"""Record usage and compute cost. Returns cost in USD.
|
|
|
|
backend: "max" (Claude Max subscription, free) or "api" (paid).
|
|
Claude Max calls are tracked for volume metrics but cost $0. (Ganymede)
|
|
"""
|
|
if backend == "max":
|
|
cost = 0.0
|
|
else:
|
|
rates = config.MODEL_COSTS.get(model)
|
|
if not rates:
|
|
logger.warning("No cost rates for model %s, recording zero cost", model)
|
|
cost = 0.0
|
|
else:
|
|
cost = (input_tokens * rates["input"] + output_tokens * rates["output"]) / 1000
|
|
|
|
today = date.today().isoformat()
|
|
# Include backend in the stage key so max vs api are tracked separately
|
|
stage_key = f"{stage}:{backend}" if backend != "api" else stage
|
|
conn.execute(
|
|
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
|
|
VALUES (?, ?, ?, 1, ?, ?, ?)
|
|
ON CONFLICT (date, model, stage) DO UPDATE SET
|
|
calls = calls + 1,
|
|
input_tokens = input_tokens + excluded.input_tokens,
|
|
output_tokens = output_tokens + excluded.output_tokens,
|
|
cost_usd = cost_usd + excluded.cost_usd""",
|
|
(today, model, stage_key, input_tokens, output_tokens, cost),
|
|
)
|
|
return cost
|
|
|
|
|
|
def get_daily_spend(conn, day: str = None) -> float:
|
|
"""Get total OpenRouter spend for a given day (default: today)."""
|
|
if day is None:
|
|
day = date.today().isoformat()
|
|
row = conn.execute(
|
|
"SELECT COALESCE(SUM(cost_usd), 0) as total FROM costs WHERE date = ?",
|
|
(day,),
|
|
).fetchone()
|
|
return row["total"]
|
|
|
|
|
|
def get_daily_breakdown(conn, day: str = None) -> list:
|
|
"""Get per-model per-stage breakdown for a day."""
|
|
if day is None:
|
|
day = date.today().isoformat()
|
|
rows = conn.execute(
|
|
"""SELECT model, stage, calls, input_tokens, output_tokens, cost_usd
|
|
FROM costs WHERE date = ? ORDER BY cost_usd DESC""",
|
|
(day,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def check_budget(conn) -> dict:
|
|
"""Check budget status. Returns {ok, spend, budget, pct}."""
|
|
spend = get_daily_spend(conn)
|
|
pct = spend / config.OPENROUTER_DAILY_BUDGET if config.OPENROUTER_DAILY_BUDGET > 0 else 0
|
|
return {
|
|
"ok": pct < 1.0,
|
|
"warn": pct >= config.OPENROUTER_WARN_THRESHOLD,
|
|
"spend": round(spend, 4),
|
|
"budget": config.OPENROUTER_DAILY_BUDGET,
|
|
"pct": round(pct * 100, 1),
|
|
}
|
|
|
|
|
|
def budget_allows(conn) -> bool:
|
|
"""Quick check: is spending under daily budget?"""
|
|
return check_budget(conn)["ok"]
|