Provider cost dashboards are designed for accounting, not for engineering: they tell you what you spent after the billing period closes, not before a single call goes over budget. Teams running LLM features in production need enforcement that fires before the spend happens — at the request level, not the invoice level. This post covers how to build that: response-level cost tracking, per-user and per-project thresholds, and alerts to Slack, email, or PagerDuty that trip before a threshold is crossed rather than after.
The problem with provider-side alerts
OpenAI lets you set a "soft limit" in the dashboard. When your spend crosses that amount for the calendar month, they send one email. One. If your usage spikes between that email and the end of the month, you find out on the invoice. Anthropic's console shows a usage graph with no native alerting at all.
"After the fact" is the design. Neither provider exposes a webhook or a real-time event stream you can subscribe to. The billing APIs they offer return cumulative totals for a time window, which means your only option for proactive alerting is to poll those endpoints yourself and fire your own notifications.
This article walks through exactly that: tracking spend from API responses in real time, comparing it against thresholds on a schedule, and sending alerts to Slack, PagerDuty, or email before the threshold is crossed rather than after.
Prerequisites
- An OpenAI API key with billing read access (Organization > API Keys, enable "Read billing" scope on the key — this is separate from your completions key)
- For Anthropic: spend tracking directly from API responses, since Anthropic does not expose a public usage API
- Python 3.10+, with the
openaiandanthropicSDK packages installed - A Slack incoming webhook URL, PagerDuty Events API v2 routing key, or SMTP credentials, depending on which channels you want
- Persistent storage between process restarts: SQLite, Redis, or any key-value store works
Step 1: Track spend from API responses
The most reliable source of spend data is not the billing API. It is the usage object returned in every API response. Each completion response includes prompt_tokens and completion_tokens. Multiply those by the per-token price for that model and you have the cost for that call.
This gives you real-time cost data rather than batched billing totals, and it works identically for both OpenAI and Anthropic.
Here is a Python helper that wraps the OpenAI client, computes cost per call, and writes it to SQLite:
import sqlite3
import time
from openai import OpenAI
# Current pricing as of 2026-06. Check https://openai.com/pricing for updates.
MODEL_PRICES = {
"gpt-4o": {"input": 1.25 / 1_000_000, "output": 5.00 / 1_000_000},
"gpt-4.1-mini": {"input": 0.40 / 1_000_000, "output": 1.60 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
# gpt-4-turbo is deprecated as of 2025 — use gpt-4.1 for equivalent capability.
"gpt-4.1": {"input": 2.00 / 1_000_000, "output": 8.00 / 1_000_000},
}
def compute_cost(model: str, usage) -> float:
prices = MODEL_PRICES.get(model, {"input": 0.0, "output": 0.0})
return (
usage.prompt_tokens * prices["input"]
+ usage.completion_tokens * prices["output"]
)
def init_db(path="costs.db"):
conn = sqlite3.connect(path)
conn.execute("""
CREATE TABLE IF NOT EXISTS usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
user_id TEXT NOT NULL,
project TEXT NOT NULL,
model TEXT NOT NULL,
cost_usd REAL NOT NULL
)
""")
conn.commit()
return conn
def tracked_completion(client, conn, user_id, project, **kwargs):
response = client.chat.completions.create(**kwargs)
cost = compute_cost(kwargs["model"], response.usage)
conn.execute(
"INSERT INTO usage (ts, user_id, project, model, cost_usd) VALUES (?,?,?,?,?)",
(int(time.time()), user_id, project, kwargs["model"], cost),
)
conn.commit()
return response
# Usage:
client = OpenAI()
conn = init_db()
response = tracked_completion(
client, conn,
user_id="user_42",
project="support-bot",
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello"}],
)The same pattern works for Anthropic. Replace usage.prompt_tokens / usage.completion_tokens with usage.input_tokens / usage.output_tokens from the Anthropic response object and update the model price table accordingly.
Step 2: Query cumulative token counts from the OpenAI usage API
Response-level tracking covers calls your code makes. If you also want to cross-check against OpenAI's own numbers — or reconcile usage across multiple services — you can poll their usage endpoint.
Important: /v1/usage returns token counts only, not dollar amounts. To get USD spend, multiply token counts by your per-model price. The /dashboard/billing/usage endpoint returns aggregate dollar totals but offers no per-model or per-user breakdown.
Additional caveat: /dashboard/billing/usage requires an organization-scoped API key, not a project key. The usage fields shown reflect OpenAI's API shape as of mid-2026 — field names may vary; always test against the live API.
import httpx
from datetime import date, timedelta
def fetch_openai_token_counts(api_key: str, days: int = 1) -> dict:
"""
Return aggregated token counts for the last `days` days from /v1/usage.
Returns a dict with 'context_tokens' and 'generated_tokens'.
NOTE: this endpoint returns token counts, not USD. Call tokens_to_usd() to convert.
"""
end = date.today()
start = end - timedelta(days=days)
resp = httpx.get(
"https://api.openai.com/v1/usage",
headers={"Authorization": f"Bearer {api_key}"},
params={"start_date": start.isoformat(), "end_date": end.isoformat()},
)
resp.raise_for_status()
data = resp.json()
context_tokens = sum(
day.get("n_context_tokens_total", 0) for day in data.get("data", [])
)
generated_tokens = sum(
day.get("n_generated_tokens_total", 0) for day in data.get("data", [])
)
return {"context_tokens": context_tokens, "generated_tokens": generated_tokens}
def tokens_to_usd(token_counts: dict, model: str) -> float:
"""
Convert token counts returned by fetch_openai_token_counts() into USD.
Uses the same MODEL_PRICES table defined in Step 1.
'context_tokens' maps to input price; 'generated_tokens' maps to output price.
"""
prices = MODEL_PRICES.get(model, {"input": 0.0, "output": 0.0})
return (
token_counts["context_tokens"] * prices["input"]
+ token_counts["generated_tokens"] * prices["output"]
)
# Example: estimate total spend for the past day if you only use gpt-4o-mini
token_counts = fetch_openai_token_counts(api_key="sk-...", days=1)
estimated_usd = tokens_to_usd(token_counts, model="gpt-4o-mini")
print(f"Estimated spend: ${estimated_usd:.4f}")
# Note: /v1/usage aggregates across all models in one bucket, so this estimate
# is only accurate if your traffic uses a single model. For mixed-model usage,
# rely on per-call tracking from Step 1, which records the model on every call.
def fetch_openai_billing_usd(org_api_key: str, days: int = 30) -> float:
"""
Return the actual USD total from OpenAI's billing endpoint.
Requires an organization-scoped API key (not a project key).
The response value is in cents — divide by 100 to get dollars.
Use this for end-of-period reconciliation, not real-time alerting.
"""
end = date.today()
start = end - timedelta(days=days)
resp = httpx.get(
"https://api.openai.com/dashboard/billing/usage",
headers={"Authorization": f"Bearer {org_api_key}"},
params={"start_date": start.isoformat(), "end_date": end.isoformat()},
)
resp.raise_for_status()
return resp.json().get("total_usage", 0) / 100 # cents → dollarsIn practice, the cleanest pattern is: use response-level tracking from Step 1 as your primary source for real-time alerting (it gives you per-call, per-model, per-user cost immediately), and use fetch_openai_billing_usd once a day or once a week to reconcile your running totals against OpenAI's authoritative number. Skip fetch_openai_token_counts unless you specifically need to audit token volume independent of cost.
Step 3: Set thresholds — daily, per-user, per-project
Hard-coded monthly limits miss the real failure modes: a single user running a loop, one project misconfigured to call GPT-4o instead of GPT-4o-mini, or a daily spike that will blow the monthly budget by the 5th.
Define thresholds at multiple granularities and store them alongside your cost data:
THRESHOLDS = {
"daily_total_usd": 50.0,
"per_user_daily_usd": 5.0,
"per_project_daily_usd": 20.0,
"monthly_total_usd": 800.0,
}
def check_thresholds(conn, thresholds: dict) -> list[dict]:
"""Returns a list of threshold violations. Empty list means all clear."""
violations = []
today_start = int(time.mktime(date.today().timetuple()))
# Daily total
row = conn.execute(
"SELECT SUM(cost_usd) FROM usage WHERE ts >= ?", (today_start,)
).fetchone()
daily_total = row[0] or 0.0
if daily_total >= thresholds["daily_total_usd"]:
violations.append({
"type": "daily_total",
"current": daily_total,
"limit": thresholds["daily_total_usd"],
"message": f"Daily spend ${daily_total:.2f} hit limit ${thresholds['daily_total_usd']:.2f}",
})
# Per-user daily
rows = conn.execute(
"SELECT user_id, SUM(cost_usd) FROM usage WHERE ts >= ? GROUP BY user_id",
(today_start,),
).fetchall()
for user_id, user_total in rows:
if user_total >= thresholds["per_user_daily_usd"]:
violations.append({
"type": "per_user_daily",
"user_id": user_id,
"current": user_total,
"limit": thresholds["per_user_daily_usd"],
"message": f"User {user_id} daily spend ${user_total:.2f} hit limit",
})
# Per-project daily
rows = conn.execute(
"SELECT project, SUM(cost_usd) FROM usage WHERE ts >= ? GROUP BY project",
(today_start,),
).fetchall()
for project, proj_total in rows:
if proj_total >= thresholds["per_project_daily_usd"]:
violations.append({
"type": "per_project_daily",
"project": project,
"current": proj_total,
"limit": thresholds["per_project_daily_usd"],
"message": f"Project {project} daily spend ${proj_total:.2f} hit limit",
})
return violationsRun check_thresholds on a cron schedule — every five minutes is usually enough for daily thresholds. For per-call limits you can call it inline after each tracked_completion.
Step 4: Send alerts — Slack, email, and PagerDuty
Once you have a violations list, routing it to the right channel is straightforward.
Slack (incoming webhook)
import httpx
def alert_slack(webhook_url: str, violations: list[dict]) -> None:
if not violations:
return
blocks = [
{
"type": "header",
"text": {"type": "plain_text", "text": "LLM spend alert"},
}
]
for v in violations:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f":warning: *{v['type']}*\n{v['message']}\nCurrent: `${v['current']:.4f}` / Limit: `${v['limit']:.2f}`",
},
})
httpx.post(webhook_url, json={"blocks": blocks}).raise_for_status()Create the webhook at api.slack.com/apps, add it to a channel, and pass the URL as webhook_url.
Email (SMTP)
import smtplib
from email.message import EmailMessage
def alert_email(
smtp_host: str,
smtp_port: int,
username: str,
password: str,
from_addr: str,
to_addrs: list[str],
violations: list[dict],
) -> None:
if not violations:
return
body_lines = ["LLM spend threshold violations:\n"]
for v in violations:
body_lines.append(
f" [{v['type']}] {v['message']} "
f"(current ${v['current']:.4f}, limit ${v['limit']:.2f})"
)
msg = EmailMessage()
msg["Subject"] = f"LLM spend alert — {len(violations)} violation(s)"
msg["From"] = from_addr
msg["To"] = ", ".join(to_addrs)
msg.set_content("\n".join(body_lines))
with smtplib.SMTP_SSL(smtp_host, smtp_port) as server:
server.login(username, password)
server.send_message(msg)PagerDuty
For on-call escalation, use the PagerDuty Events API v2. Send a trigger event with severity set to "critical" or "warning" depending on how far over the threshold you are:
def alert_pagerduty(routing_key: str, violations: list[dict]) -> None:
if not violations:
return
for v in violations:
pct_over = (v["current"] - v["limit"]) / v["limit"] * 100
severity = "critical" if pct_over > 20 else "warning"
httpx.post(
"https://events.pagerduty.com/v2/enqueue",
json={
"routing_key": routing_key,
"event_action": "trigger",
"dedup_key": f"llm-spend-{v['type']}-{v.get('user_id', v.get('project', 'total'))}",
"payload": {
"summary": v["message"],
"severity": severity,
"source": "llm-cost-monitor",
"custom_details": v,
},
},
).raise_for_status()The dedup_key prevents duplicate pages for the same ongoing violation. PagerDuty will auto-resolve when you send a resolve event with the same key.
Step 5: Make it proactive — check before the call, not after
Alerting after the call is still reactive. You are notified that a threshold was crossed, but the call that crossed it already ran. For hard budget enforcement, check the accumulated spend before each call and skip it if the budget is exhausted.
def budget_remaining(conn, user_id: str, daily_limit: float) -> float:
today_start = int(time.mktime(date.today().timetuple()))
# NOTE: In production, replace this inline SUM query with a cached counter
# (Redis INCRBYFLOAT) updated on write — querying on every request adds
# 5-20ms latency at scale.
row = conn.execute(
"SELECT SUM(cost_usd) FROM usage WHERE ts >= ? AND user_id = ?",
(today_start, user_id),
).fetchone()
spent = row[0] or 0.0
return max(0.0, daily_limit - spent)
def tracked_completion_with_guard(
client, conn, user_id, project, daily_limit_per_user, **kwargs
):
remaining = budget_remaining(conn, user_id, daily_limit_per_user)
if remaining <= 0:
raise RuntimeError(
f"User {user_id} has exhausted their daily LLM budget of ${daily_limit_per_user:.2f}"
)
# Optionally: estimate token cost before the call and check if it fits within remaining budget.
# A rough heuristic is ~0.75 tokens per character for English text.
# This estimate works for English prose — for code, non-English text, or multi-turn context windows, use tiktoken or the provider's tokenizer for accurate pre-flight counts.
estimated_input_tokens = sum(
len(m["content"]) * 0.75 for m in kwargs.get("messages", [])
)
model = kwargs.get("model", "gpt-4o-mini")
price = MODEL_PRICES.get(model, {"input": 0.0, "output": 0.0})
estimated_cost = estimated_input_tokens * price["input"]
if estimated_cost > remaining:
raise RuntimeError(
f"Estimated call cost ${estimated_cost:.4f} would exceed remaining budget ${remaining:.4f}"
)
return tracked_completion(client, conn, user_id, project, **kwargs)This is the pattern that makes LLM cost monitoring actually useful at the per-user level. Dashboard alerts tell you something went wrong. Pre-call guards stop it from going wrong in the first place.
Frequently asked questions
Do I need to poll the OpenAI billing endpoint for accurate spend data?
For per-call cost tracking, no. Calculate cost from the usage object in each API response — prompt_tokens × input_price + completion_tokens × output_price. The billing endpoint gives you aggregate dollar totals but no per-user or per-model breakdown. Use the response-level calculation for metering and the billing endpoint as a weekly reconciliation check.
What is the right alerting granularity — per-call, per-day, or per-month?
Per-call monitoring catches runaway agents. Per-day thresholds catch sustained overuse before it compounds. Per-month caps match your billing cycle. In practice, run all three: flag any call that costs more than a configurable threshold, alert at 80% of the daily budget, and hard-block at the monthly cap.
Should my pre-call estimate block or just warn?
Block for external API endpoints (users spending real money). Warn for internal tools (developers testing). The pre-call estimate has a margin of error — typically 10-15% due to output-length uncertainty — so set your block threshold 15% below the actual budget to absorb that variance.
How do I handle multi-tenant spend attribution?
Tag every API call with a user or tenant identifier. Aggregate by that tag in your cost store. The challenge is that standard provider responses return cost data but not the identifier — your tracking layer has to inject it before the call and link it to the response after.
Conclusion
These five steps form a complete, production-ready cost-control loop: response-level metering for accuracy, polling for aggregate reconciliation, threshold logic for alerting, pre-call estimation for enforcement, and a single guard function that every call routes through. SQLite handles low-to-moderate traffic; Redis counters replace it when per-request latency starts to matter.
The bill should never be the first signal that something is wrong.
For teams that would rather skip the plumbing, noburn.dev handles metering, threshold enforcement, and alerting out of the box — including automatic pricing-table updates when providers cut rates, so you never touch a price constant again. Free tier at noburn.dev/docs.