Evaluate RAG quality with faithfulness and relevance metrics, add guardrails against hallucination, implement caching and streaming, and deploy as a FastAPI service with monitoring.
Build a production-grade RAG API with automated evaluation, hallucination guardrails, response caching, streaming output, and deployment via FastAPI. This is the capstone lesson — everything from Days 1–4 comes together into a system you can actually ship.
Building a RAG prototype takes a day. Shipping a RAG system that your organization trusts takes weeks of evaluation, guardrails, and operational infrastructure. The gap between "works on my laptop" and "works in production" is where most RAG projects die. Today we close that gap. You will learn how to measure whether your RAG system is actually good, how to prevent it from hallucinating in production, how to make it fast, and how to deploy it as a real API.
You cannot improve what you do not measure. RAG evaluation has three dimensions, and you need to track all three:
"It seems to work." Ship it. Users report wrong answers. You have no data. You cannot diagnose whether the problem is retrieval, generation, or both. Iterate blindly.
Evaluation suite with 50+ test questions. Automated scoring on every code change. You know exactly which questions fail and why. Improve systematically.
# pip install ragas from ragas import evaluate from ragas.metrics import ( faithfulness, answer_relevancy, context_precision, context_recall, ) from datasets import Dataset # Create an evaluation dataset eval_data = { "question": [ "What internet speed is required for remote work?", "Can contractors work remotely?", "What are the core working hours?", "What happens if performance declines?", ], "answer": [], # Will be filled by our RAG chain "contexts": [], # Retrieved chunks for each question "ground_truth": [ # Human-written correct answers "At least 50 Mbps download speed.", "Contractors must obtain written approval from their department head.", "10 AM to 3 PM Eastern Time.", "The employee may be required to return to the office for a 30-day improvement period.", ], } # Run our RAG chain on each question and collect answers + contexts for question in eval_data["question"]: # Get retrieved docs docs = retriever.invoke(question) contexts = [d.page_content for d in docs] eval_data["contexts"].append(contexts) # Get the RAG answer answer = rag_chain.invoke(question) eval_data["answer"].append(answer) # Run RAGAS evaluation dataset = Dataset.from_dict(eval_data) results = evaluate( dataset, metrics=[faithfulness, answer_relevancy, context_precision, context_recall], ) print("Overall scores:") print(f" Faithfulness: {results['faithfulness']:.3f}") print(f" Answer Relevancy: {results['answer_relevancy']:.3f}") print(f" Context Precision: {results['context_precision']:.3f}") print(f" Context Recall: {results['context_recall']:.3f}") # Drill into per-question scores df = results.to_pandas() print("\nPer-question breakdown:") print(df[["question", "faithfulness", "answer_relevancy"]].to_string())
Even with good retrieval, LLMs can still hallucinate. Production systems need explicit guardrails. Here are three practical approaches.
from langchain_core.prompts import ChatPromptTemplate # Production-grade RAG prompt with explicit guardrails production_prompt = ChatPromptTemplate.from_template("""You are a helpful assistant that answers questions using ONLY the provided context. Follow these rules strictly: 1. ONLY use information from the context below to answer. 2. If the context does not contain enough information, say: "I don't have enough information to answer that question." 3. NEVER make up information, infer beyond what's stated, or use your training knowledge. 4. If the question is ambiguous, state the ambiguity and ask for clarification. 5. Quote the relevant part of the context when possible. 6. If multiple chunks are relevant, synthesize them but stay faithful to the source text. Context: {context} Question: {question} Answer (remember: ONLY use the context above):""") # This prompt is 5x more effective at preventing hallucination # than a simple "Answer based on the context" instruction.
from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser model = ChatOpenAI(model="gpt-4o-mini", temperature=0) # Verification chain: check if the answer is grounded verify_prompt = ChatPromptTemplate.from_template("""Analyze whether the Answer is fully supported by the Context. Context: {context} Answer: {answer} Return a JSON object with: - "is_grounded": true if every claim in the answer can be found in the context - "unsupported_claims": list of any claims not found in the context - "confidence": a score from 0.0 to 1.0 JSON:""") verify_chain = verify_prompt | model | JsonOutputParser() # Use it as a post-generation check def safe_rag_answer(question, rag_chain, retriever, verify_chain): # Get the answer and its context docs = retriever.invoke(question) context = "\n\n".join(d.page_content for d in docs) answer = rag_chain.invoke(question) # Verify grounding verification = verify_chain.invoke({ "context": context, "answer": answer }) if verification["is_grounded"] and verification["confidence"] > 0.8: return {"answer": answer, "verified": True, "sources": docs} else: return { "answer": "I'm not confident in my answer. Please verify.", "verified": False, "issues": verification["unsupported_claims"], "original_answer": answer, } result = safe_rag_answer("What is the VPN policy?", rag_chain, retriever, verify_chain) print(result)
Many RAG queries are repeated. Caching saves both time and money. LangChain supports both exact-match caching and semantic caching (similar questions return cached answers).
from langchain_core.globals import set_llm_cache from langchain_community.cache import SQLiteCache # Exact-match cache: same prompt → same response (no LLM call) set_llm_cache(SQLiteCache(database_path=".langchain_cache.db")) # First call: hits the LLM (~800ms) import time start = time.time() answer1 = rag_chain.invoke("What is the VPN policy?") print(f"First call: {time.time()-start:.2f}s") # Second call: cache hit (~5ms) start = time.time() answer2 = rag_chain.invoke("What is the VPN policy?") print(f"Cached call: {time.time()-start:.2f}s") # For embedding-level caching (avoid re-embedding the same text) from langchain.embeddings import CacheBackedEmbeddings from langchain.storage import LocalFileStore from langchain_openai import OpenAIEmbeddings underlying = OpenAIEmbeddings(model="text-embedding-3-small") store = LocalFileStore("./embedding_cache/") cached_embeddings = CacheBackedEmbeddings.from_bytes_store( underlying_embeddings=underlying, document_embedding_cache=store, namespace=underlying.model, # Separate caches by model ) # Use cached_embeddings instead of raw embeddings when creating vector stores # Re-indexing the same documents is instant (cache hits)
Users expect to see tokens appearing as the model generates them. Streaming is essential for production RAG applications. Here is how to stream both the chain and the final API.
from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough model = ChatOpenAI(model="gpt-4o-mini", temperature=0, streaming=True) def format_docs(docs): return "\n\n".join(d.page_content for d in docs) rag_prompt = ChatPromptTemplate.from_template("""Answer based ONLY on: {context} Question: {question} Answer:""") chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | rag_prompt | model | StrOutputParser() ) # Stream tokens as they arrive print("Streaming answer:") for chunk in chain.stream("What equipment does the company provide?"): print(chunk, end="", flush=True) print() # Newline at end # Async streaming (for web applications) import asyncio async def stream_answer(question): async for chunk in chain.astream(question): print(chunk, end="", flush=True) yield chunk # asyncio.run(stream_answer("What are the core hours?"))
Here is the complete FastAPI application that packages your RAG system as a production API with streaming, error handling, and health checks.
# pip install fastapi uvicorn from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough import json, time, logging app = FastAPI(title="RAG API", version="1.0") logger = logging.getLogger("rag-api") # Initialize RAG components at startup embeddings = OpenAIEmbeddings(model="text-embedding-3-small") vectorstore = Chroma( persist_directory="./chroma_db", embedding_function=embeddings, collection_name="company-policies" ) retriever = vectorstore.as_retriever(search_kwargs={"k": 5}) model = ChatOpenAI(model="gpt-4o-mini", temperature=0, streaming=True) rag_prompt = ChatPromptTemplate.from_template("""Answer based ONLY on: {context} If the context doesn't contain the answer, say "I don't have that information." Question: {question} Answer:""") def format_docs(docs): return "\n\n".join(d.page_content for d in docs) chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | rag_prompt | model | StrOutputParser() ) class Query(BaseModel): question: str stream: bool = False @app.get("/health") def health(): count = vectorstore._collection.count() return {"status": "healthy", "vectors": count} @app.post("/ask") async def ask(query: Query): start = time.time() if query.stream: async def generate(): async for chunk in chain.astream(query.question): yield f"data: {json.dumps({'token': chunk})}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream") # Non-streaming: return full answer with sources docs = retriever.invoke(query.question) answer = chain.invoke(query.question) elapsed = time.time() - start logger.info(f"Question: {query.question} | Time: {elapsed:.2f}s") return { "answer": answer, "sources": [ {"content": d.page_content[:200], "metadata": d.metadata} for d in docs ], "latency_ms": round(elapsed * 1000), } # Run: uvicorn api:app --host 0.0.0.0 --port 8000
curl -X POST http://localhost:8000/ask -H "Content-Type: application/json" -d '{"question": "What equipment does the company provide?", "stream": false}'
Once deployed, you need visibility into how your RAG system is performing. Track these metrics:
import json, time from datetime import datetime from pathlib import Path class RAGMonitor: """Simple file-based monitoring for RAG queries.""" def __init__(self, log_path="rag_queries.jsonl"): self.log_path = Path(log_path) def log_query(self, question, answer, sources, latency_ms, num_chunks, user_feedback=None): entry = { "timestamp": datetime.now().isoformat(), "question": question, "answer_preview": answer[:200], "num_sources": len(sources), "latency_ms": latency_ms, "num_chunks_retrieved": num_chunks, "is_refusal": "don't have" in answer.lower(), "feedback": user_feedback, } with open(self.log_path, "a") as f: f.write(json.dumps(entry) + "\n") def summary(self): """Print monitoring summary from log file.""" entries = [] with open(self.log_path) as f: for line in f: entries.append(json.loads(line)) latencies = [e["latency_ms"] for e in entries] refusals = sum(1 for e in entries if e["is_refusal"]) print(f"Total queries: {len(entries)}") print(f"Avg latency: {sum(latencies)/len(latencies):.0f}ms") print(f"P95 latency: {sorted(latencies)[int(len(latencies)*0.95)]:.0f}ms") print(f"Refusal rate: {refusals/len(entries)*100:.1f}%") monitor = RAGMonitor() # Call monitor.log_query(...) after each API response # Call monitor.summary() for a dashboard view
Before shipping your RAG system, walk through this checklist:
You have now built a complete RAG system from scratch. Confirm you can do all of the following: