Course HomeCitiesReserve Your Seat
Day 5 of 5 70 minutes

Production Agents — Reliability and Deployment

Error handling, retry logic, budget limits, structured logging, and running agents as background services. ~400 lines of Python. Production-ready.

What you'll build today

A production-grade agent wrapper with: exponential backoff retry on rate limits, per-task and per-session cost tracking with budget enforcement, structured JSON logging of every decision and tool call, model routing (cheap model for simple tasks, expensive model for complex ones), and a background task queue for running agents as services.

1
The Production Gap

What breaks in production that didn't break in dev

  • Rate limits: Anthropic's API has rate limits. Burst traffic will hit them. You need retry with backoff.
  • Cost overruns: An infinite loop + no budget limit = surprise $500 API bill
  • Silent failures: Tool errors that return empty strings instead of raising exceptions confuse Claude and waste tokens
  • No observability: Without logging, debugging production agent failures is nearly impossible
  • Model mismatch: Using Opus for every call when Haiku would do costs 15x more

This lesson wraps everything we've built with production guardrails.

2
The Code

Production agent: full implementation

Pythonagent_production.py
import anthropic, json, time, logging, uuid
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional
from pathlib import Path

# ── Structured logging setup ───────────────────────
class JSONFormatter(logging.Formatter):
    def format(self, record):
        log = {
            "ts": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "msg": record.getMessage(),
        }
        if hasattr(record, "extra"):
            log.update(record.extra)
        return json.dumps(log)

def setup_logger(name: str) -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    h = logging.FileHandler("agent.log")
    h.setFormatter(JSONFormatter())
    logger.addHandler(h)
    return logger

logger = setup_logger("agent")

# ── Cost tracking ──────────────────────────────────
# Prices per million tokens (as of April 2026)
MODEL_COSTS = {
    "claude-haiku-4-5":    {"input": 0.80,  "output": 4.00},
    "claude-sonnet-4-5":   {"input": 3.00,  "output": 15.00},
    "claude-opus-4-5":     {"input": 15.00, "output": 75.00},
}

@dataclass
class CostTracker:
    budget_usd: float = 1.0
    spent_usd: float = 0.0
    calls: int = 0

    def record(self, model: str, input_tokens: int, output_tokens: int):
        costs = MODEL_COSTS.get(model, MODEL_COSTS["claude-sonnet-4-5"])
        cost = (input_tokens * costs["input"] +
                output_tokens * costs["output"]) / 1_000_000
        self.spent_usd += cost
        self.calls += 1
        return cost

    def check_budget(self):
        if self.spent_usd >= self.budget_usd:
            raise RuntimeError(
                f"Budget exceeded: ${self.spent_usd:.4f} / ${self.budget_usd:.2f}"
            )

# ── Retry wrapper with exponential backoff ─────────
def call_with_retry(client, cost_tracker: CostTracker, **kwargs):
    """Calls Claude with automatic retry on rate limits."""
    cost_tracker.check_budget()
    max_retries = 3
    for attempt in range(max_retries):
        try:
            resp = client.messages.create(**kwargs)
            cost = cost_tracker.record(
                kwargs["model"],
                resp.usage.input_tokens,
                resp.usage.output_tokens
            )
            logger.info("api_call", extra={
                "model": kwargs["model"],
                "tokens_in": resp.usage.input_tokens,
                "tokens_out": resp.usage.output_tokens,
                "cost_usd": round(cost, 6),
                "total_spent": round(cost_tracker.spent_usd, 6)
            })
            return resp
        except anthropic.RateLimitError:
            wait = 2 ** attempt  # 1s, 2s, 4s
            print(f"Rate limited. Waiting {wait}s (attempt {attempt+1}/{max_retries})")
            time.sleep(wait)
            if attempt == max_retries - 1:
                raise
        except anthropic.APIError as e:
            logger.error(f"API error: {e}", extra={"error": str(e)})
            raise

# ── Model routing ──────────────────────────────────
def route_model(task_complexity: str) -> str:
    """Route to cheaper models for simple tasks."""
    if task_complexity == "simple":
        return "claude-haiku-4-5"   # 20x cheaper than Opus
    elif task_complexity == "medium":
        return "claude-sonnet-4-5"  # balanced
    else:
        return "claude-opus-4-5"    # complex reasoning

# ── Production agent class ─────────────────────────
class ProductionAgent:
    def __init__(self, budget_usd: float = 1.0, tools: list = None):
        self.client = anthropic.Anthropic()
        self.cost = CostTracker(budget_usd=budget_usd)
        self.tools = tools or []
        self.session_id = str(uuid.uuid4()[:8])

    def run(self, task: str,
            tool_executor=None,
            complexity: str = "medium",
            max_steps: int = 10) -> str:

        task_id = str(uuid.uuid4()[:8])
        model = route_model(complexity)
        logger.info("task_start", extra={
            "session": self.session_id, "task_id": task_id,
            "model": model, "task": task[:100]
        })

        messages = [{"role":"user","content":task}]
        start_time = time.time()

        try:
            for step in range(max_steps):
                resp = call_with_retry(
                    self.client, self.cost,
                    model=model, max_tokens=2048,
                    tools=self.tools, messages=messages
                )
                if resp.stop_reason == "end_turn":
                    answer = resp.content[0].text
                    logger.info("task_complete", extra={
                        "task_id": task_id,
                        "steps": step + 1,
                        "duration_s": round(time.time()-start_time,2),
                        "total_cost": round(self.cost.spent_usd,6)
                    })
                    return answer

                # Handle tool calls
                results = []
                for b in resp.content:
                    if b.type == "tool_use":
                        logger.info("tool_call", extra={
                            "task_id": task_id, "tool": b.name,
                            "step": step, "input_keys": list(b.input.keys())
                        })
                        try:
                            result = tool_executor(b.name, b.input)
                        except Exception as e:
                            result = f"Tool error: {e}"
                            logger.warning(
                                f"Tool {b.name} failed",
                                extra={"error": str(e)}
                            )
                        results.append({
                            "type":"tool_result",
                            "tool_use_id":b.id,
                            "content":str(result)
                        })
                messages += [
                    {"role":"assistant","content":resp.content},
                    {"role":"user","content":results}
                ]

            return "Max steps reached."

        except RuntimeError as e:  # budget exceeded
            logger.error(f"Budget exceeded on task {task_id}")
            raise

    def usage_summary(self):
        return {
            "calls": self.cost.calls,
            "spent_usd": round(self.cost.spent_usd, 6),
            "budget_usd": self.cost.budget_usd,
            "remaining_usd": round(self.cost.budget_usd - self.cost.spent_usd, 6)
        }

# ── Test it ────────────────────────────────────────
if __name__ == "__main__":
    agent = ProductionAgent(budget_usd=0.10)  # $0.10 limit

    result = agent.run(
        "Explain the difference between supervised and unsupervised learning.",
        complexity="simple"  # routes to Haiku (cheap)
    )
    print("Answer:", result)
    print("Usage:", agent.usage_summary())
    # Check agent.log for structured JSON logs

Read the log file: After running, open agent.log. Every API call, tool call, and task completion is logged as structured JSON. This makes debugging production agents tractable — you can see exactly which step failed, how many tokens it used, and what it cost.

3
Running as a Service

Background deployment with a task queue

For production, you often want agents to run in the background — accepting tasks, executing them asynchronously, and returning results. Here's a minimal pattern:

Pythonagent_service.py
import queue, threading, uuid, time
from agent_production import ProductionAgent

class AgentService:
    def __init__(self, workers=2):
        self.task_queue = queue.Queue()
        self.results = {}
        for _ in range(workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()

    def _worker(self):
        while True:
            task_id, task, kwargs = self.task_queue.get()
            try:
                agent = ProductionAgent(budget_usd=0.50)
                result = agent.run(task, **kwargs)
                self.results[task_id] = {"status":"done","result":result}
            except Exception as e:
                self.results[task_id] = {"status":"error","error":str(e)}
            finally:
                self.task_queue.task_done()

    def submit(self, task: str, **kwargs) -> str:
        task_id = str(uuid.uuid4()[:8])
        self.results[task_id] = {"status":"pending"}
        self.task_queue.put((task_id, task, kwargs))
        return task_id

    def get_result(self, task_id: str) -> dict:
        return self.results.get(task_id, {"status":"not_found"})

# Usage:
if __name__ == "__main__":
    svc = AgentService(workers=2)

    # Submit multiple tasks simultaneously
    ids = [
        svc.submit("What is machine learning?", complexity="simple"),
        svc.submit("Explain transformer architecture.", complexity="medium"),
    ]

    # Poll for results
    while any(svc.get_result(i)["status"] == "pending" for i in ids):
        time.sleep(1)

    for tid in ids:
        print(tid, svc.get_result(tid)["status"])

You've completed the AI Agents course.

You've built a basic agent, a 5-tool agent, a memory agent, a multi-agent research system, and a production-grade agent with error handling, cost controls, logging, and background deployment. That's a production-ready foundation. Most people never get here.

Take the Live Bootcamp — $1,490