A production-grade agent wrapper with: exponential backoff retry on rate limits, per-task and per-session cost tracking with budget enforcement, structured JSON logging of every decision and tool call, model routing (cheap model for simple tasks, expensive model for complex ones), and a background task queue for running agents as services.
What breaks in production that didn't break in dev
- Rate limits: Anthropic's API has rate limits. Burst traffic will hit them. You need retry with backoff.
- Cost overruns: An infinite loop + no budget limit = surprise $500 API bill
- Silent failures: Tool errors that return empty strings instead of raising exceptions confuse Claude and waste tokens
- No observability: Without logging, debugging production agent failures is nearly impossible
- Model mismatch: Using Opus for every call when Haiku would do costs 15x more
This lesson wraps everything we've built with production guardrails.
Production agent: full implementation
import anthropic, json, time, logging, uuid
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional
from pathlib import Path
# ── Structured logging setup ───────────────────────
class JSONFormatter(logging.Formatter):
def format(self, record):
log = {
"ts": datetime.utcnow().isoformat(),
"level": record.levelname,
"msg": record.getMessage(),
}
if hasattr(record, "extra"):
log.update(record.extra)
return json.dumps(log)
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
h = logging.FileHandler("agent.log")
h.setFormatter(JSONFormatter())
logger.addHandler(h)
return logger
logger = setup_logger("agent")
# ── Cost tracking ──────────────────────────────────
# Prices per million tokens (as of April 2026)
MODEL_COSTS = {
"claude-haiku-4-5": {"input": 0.80, "output": 4.00},
"claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
"claude-opus-4-5": {"input": 15.00, "output": 75.00},
}
@dataclass
class CostTracker:
budget_usd: float = 1.0
spent_usd: float = 0.0
calls: int = 0
def record(self, model: str, input_tokens: int, output_tokens: int):
costs = MODEL_COSTS.get(model, MODEL_COSTS["claude-sonnet-4-5"])
cost = (input_tokens * costs["input"] +
output_tokens * costs["output"]) / 1_000_000
self.spent_usd += cost
self.calls += 1
return cost
def check_budget(self):
if self.spent_usd >= self.budget_usd:
raise RuntimeError(
f"Budget exceeded: ${self.spent_usd:.4f} / ${self.budget_usd:.2f}"
)
# ── Retry wrapper with exponential backoff ─────────
def call_with_retry(client, cost_tracker: CostTracker, **kwargs):
"""Calls Claude with automatic retry on rate limits."""
cost_tracker.check_budget()
max_retries = 3
for attempt in range(max_retries):
try:
resp = client.messages.create(**kwargs)
cost = cost_tracker.record(
kwargs["model"],
resp.usage.input_tokens,
resp.usage.output_tokens
)
logger.info("api_call", extra={
"model": kwargs["model"],
"tokens_in": resp.usage.input_tokens,
"tokens_out": resp.usage.output_tokens,
"cost_usd": round(cost, 6),
"total_spent": round(cost_tracker.spent_usd, 6)
})
return resp
except anthropic.RateLimitError:
wait = 2 ** attempt # 1s, 2s, 4s
print(f"Rate limited. Waiting {wait}s (attempt {attempt+1}/{max_retries})")
time.sleep(wait)
if attempt == max_retries - 1:
raise
except anthropic.APIError as e:
logger.error(f"API error: {e}", extra={"error": str(e)})
raise
# ── Model routing ──────────────────────────────────
def route_model(task_complexity: str) -> str:
"""Route to cheaper models for simple tasks."""
if task_complexity == "simple":
return "claude-haiku-4-5" # 20x cheaper than Opus
elif task_complexity == "medium":
return "claude-sonnet-4-5" # balanced
else:
return "claude-opus-4-5" # complex reasoning
# ── Production agent class ─────────────────────────
class ProductionAgent:
def __init__(self, budget_usd: float = 1.0, tools: list = None):
self.client = anthropic.Anthropic()
self.cost = CostTracker(budget_usd=budget_usd)
self.tools = tools or []
self.session_id = str(uuid.uuid4()[:8])
def run(self, task: str,
tool_executor=None,
complexity: str = "medium",
max_steps: int = 10) -> str:
task_id = str(uuid.uuid4()[:8])
model = route_model(complexity)
logger.info("task_start", extra={
"session": self.session_id, "task_id": task_id,
"model": model, "task": task[:100]
})
messages = [{"role":"user","content":task}]
start_time = time.time()
try:
for step in range(max_steps):
resp = call_with_retry(
self.client, self.cost,
model=model, max_tokens=2048,
tools=self.tools, messages=messages
)
if resp.stop_reason == "end_turn":
answer = resp.content[0].text
logger.info("task_complete", extra={
"task_id": task_id,
"steps": step + 1,
"duration_s": round(time.time()-start_time,2),
"total_cost": round(self.cost.spent_usd,6)
})
return answer
# Handle tool calls
results = []
for b in resp.content:
if b.type == "tool_use":
logger.info("tool_call", extra={
"task_id": task_id, "tool": b.name,
"step": step, "input_keys": list(b.input.keys())
})
try:
result = tool_executor(b.name, b.input)
except Exception as e:
result = f"Tool error: {e}"
logger.warning(
f"Tool {b.name} failed",
extra={"error": str(e)}
)
results.append({
"type":"tool_result",
"tool_use_id":b.id,
"content":str(result)
})
messages += [
{"role":"assistant","content":resp.content},
{"role":"user","content":results}
]
return "Max steps reached."
except RuntimeError as e: # budget exceeded
logger.error(f"Budget exceeded on task {task_id}")
raise
def usage_summary(self):
return {
"calls": self.cost.calls,
"spent_usd": round(self.cost.spent_usd, 6),
"budget_usd": self.cost.budget_usd,
"remaining_usd": round(self.cost.budget_usd - self.cost.spent_usd, 6)
}
# ── Test it ────────────────────────────────────────
if __name__ == "__main__":
agent = ProductionAgent(budget_usd=0.10) # $0.10 limit
result = agent.run(
"Explain the difference between supervised and unsupervised learning.",
complexity="simple" # routes to Haiku (cheap)
)
print("Answer:", result)
print("Usage:", agent.usage_summary())
# Check agent.log for structured JSON logs
Read the log file: After running, open agent.log. Every API call, tool call, and task completion is logged as structured JSON. This makes debugging production agents tractable — you can see exactly which step failed, how many tokens it used, and what it cost.
Background deployment with a task queue
For production, you often want agents to run in the background — accepting tasks, executing them asynchronously, and returning results. Here's a minimal pattern:
import queue, threading, uuid, time
from agent_production import ProductionAgent
class AgentService:
def __init__(self, workers=2):
self.task_queue = queue.Queue()
self.results = {}
for _ in range(workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
def _worker(self):
while True:
task_id, task, kwargs = self.task_queue.get()
try:
agent = ProductionAgent(budget_usd=0.50)
result = agent.run(task, **kwargs)
self.results[task_id] = {"status":"done","result":result}
except Exception as e:
self.results[task_id] = {"status":"error","error":str(e)}
finally:
self.task_queue.task_done()
def submit(self, task: str, **kwargs) -> str:
task_id = str(uuid.uuid4()[:8])
self.results[task_id] = {"status":"pending"}
self.task_queue.put((task_id, task, kwargs))
return task_id
def get_result(self, task_id: str) -> dict:
return self.results.get(task_id, {"status":"not_found"})
# Usage:
if __name__ == "__main__":
svc = AgentService(workers=2)
# Submit multiple tasks simultaneously
ids = [
svc.submit("What is machine learning?", complexity="simple"),
svc.submit("Explain transformer architecture.", complexity="medium"),
]
# Poll for results
while any(svc.get_result(i)["status"] == "pending" for i in ids):
time.sleep(1)
for tid in ids:
print(tid, svc.get_result(tid)["status"])
You've completed the AI Agents course.
You've built a basic agent, a 5-tool agent, a memory agent, a multi-agent research system, and a production-grade agent with error handling, cost controls, logging, and background deployment. That's a production-ready foundation. Most people never get here.
Take the Live Bootcamp — $1,490