RAG System Monitoring
Observe retrieval-augmented generation systems end-to-end with comprehensive monitoring and optimization.
Overview
Retrieval-Augmented Generation (RAG) systems combine information retrieval with text generation. AgenticAnts provides complete observability for:
- Retrieval Quality - Monitor vector search performance
- Generation Performance - Track LLM response quality
- End-to-End Latency - Measure total system performance
- Cost Optimization - Track retrieval and generation costs
- Quality Metrics - Monitor answer relevance and accuracy
RAG Architecture Components
Typical RAG System Flow
from agenticants import AgenticAnts
from agenticants.integrations import langchain
ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
langchain.auto_instrument(ants)
class RAGSystem:
def __init__(self):
self.vector_store = VectorStore()
self.retriever = Retriever()
self.generator = Generator()
def query(self, question: str, user_id: str = None):
# Create main RAG trace
rag_trace = ants.trace.create(
name="rag-system",
input=question,
metadata={
"user_id": user_id,
"system_type": "rag",
"components": ["retrieval", "generation"]
}
)
try:
# Step 1: Query Preprocessing
with rag_trace.span("query-preprocessing") as span:
processed_query = self.preprocess_query(question)
span.set_metadata({
"original_length": len(question),
"processed_length": len(processed_query),
"preprocessing_time": span.duration
})
# Step 2: Vector Retrieval
with rag_trace.span("vector-retrieval") as span:
retrieved_docs = self.retriever.retrieve(processed_query)
span.set_metadata({
"retrieved_count": len(retrieved_docs),
"retrieval_time": span.duration,
"vector_db": "pinecone"
})
# Step 3: Context Preparation
with rag_trace.span("context-preparation") as span:
context = self.prepare_context(retrieved_docs)
span.set_metadata({
"context_length": len(context),
"preparation_time": span.duration
})
# Step 4: Generation
with rag_trace.span("generation") as span:
answer = self.generator.generate(question, context)
span.set_metadata({
"generation_time": span.duration,
"answer_length": len(answer),
"model": "gpt-4"
})
# Step 5: Post-processing
with rag_trace.span("post-processing") as span:
final_answer = self.post_process(answer)
span.set_metadata({
"post_processing_time": span.duration,
"final_length": len(final_answer)
})
rag_trace.complete(
output=final_answer,
metadata={
"total_components": 5,
"success": True,
"total_time": rag_trace.duration
}
)
return final_answer
except Exception as error:
rag_trace.error(error=str(error))
raise errorRetrieval Quality Monitoring
Vector Search Performance
class RetrievalMonitor:
def __init__(self):
self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
def monitor_retrieval(self, query: str, top_k: int = 5):
retrieval_trace = self.ants.trace.create(
name="vector-retrieval",
input=query,
metadata={
"top_k": top_k,
"retrieval_type": "semantic_search"
}
)
try:
# Track retrieval metrics
start_time = time.time()
# Perform vector search
results = self.vector_store.similarity_search(
query=query,
k=top_k
)
retrieval_time = time.time() - start_time
# Calculate relevance scores
relevance_scores = []
for result in results:
score = self.calculate_relevance(query, result.content)
relevance_scores.append(score)
retrieval_trace.complete(
output=results,
metadata={
"retrieval_time": retrieval_time,
"results_count": len(results),
"avg_relevance": sum(relevance_scores) / len(relevance_scores),
"min_relevance": min(relevance_scores),
"max_relevance": max(relevance_scores)
}
)
return results
except Exception as error:
retrieval_trace.error(error=str(error))
raise error
def calculate_relevance(self, query: str, content: str) -> float:
# Implement relevance scoring logic
# This could use semantic similarity, keyword matching, etc.
passRetrieval Quality Metrics
class RetrievalQualityMonitor {
private ants: AgenticAnts
async monitorRetrievalQuality(query: string, retrievedDocs: any[]) {
const qualityTrace = await this.ants.trace.create({
name: 'retrieval-quality',
input: query,
metadata: {
retrievedCount: retrievedDocs.length,
queryType: this.classifyQuery(query)
}
})
try {
// Calculate quality metrics
const metrics = {
relevance: await this.calculateRelevance(query, retrievedDocs),
diversity: this.calculateDiversity(retrievedDocs),
coverage: this.calculateCoverage(query, retrievedDocs),
freshness: this.calculateFreshness(retrievedDocs)
}
await qualityTrace.complete({
output: metrics,
metadata: {
avgRelevance: metrics.relevance.avg,
diversityScore: metrics.diversity,
coverageScore: metrics.coverage,
freshnessScore: metrics.freshness
}
})
return metrics
} catch (error) {
await qualityTrace.error({ error: error.message })
throw error
}
}
private classifyQuery(query: string): string {
// Classify query type (factual, analytical, creative, etc.)
return 'factual' // Simplified
}
private calculateDiversity(docs: any[]): number {
// Calculate diversity of retrieved documents
return 0.8 // Simplified
}
private calculateCoverage(query: string, docs: any[]): number {
// Calculate how well documents cover the query
return 0.9 // Simplified
}
private calculateFreshness(docs: any[]): number {
// Calculate freshness of retrieved documents
return 0.7 // Simplified
}
}Generation Performance Monitoring
LLM Generation Tracking
class GenerationMonitor:
def __init__(self):
self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
def monitor_generation(self, question: str, context: str, model: str = "gpt-4"):
generation_trace = self.ants.trace.create(
name="llm-generation",
input=question,
metadata={
"model": model,
"context_length": len(context),
"question_length": len(question)
}
)
try:
# Track generation metrics
start_time = time.time()
# Generate response
response = self.generate_response(question, context, model)
generation_time = time.time() - start_time
# Calculate quality metrics
quality_metrics = self.evaluate_response_quality(question, response)
generation_trace.complete(
output=response,
metadata={
"generation_time": generation_time,
"response_length": len(response),
"tokens_used": self.count_tokens(response),
"quality_score": quality_metrics["overall_score"],
"relevance_score": quality_metrics["relevance"],
"coherence_score": quality_metrics["coherence"],
"factual_accuracy": quality_metrics["factual_accuracy"]
}
)
return response
except Exception as error:
generation_trace.error(error=str(error))
raise error
def evaluate_response_quality(self, question: str, response: str) -> dict:
# Implement quality evaluation logic
return {
"overall_score": 0.85,
"relevance": 0.9,
"coherence": 0.8,
"factual_accuracy": 0.85
}Response Quality Evaluation
class ResponseQualityEvaluator {
private ants: AgenticAnts
async evaluateResponse(question: string, response: string, context: string) {
const evaluationTrace = await this.ants.trace.create({
name: 'response-evaluation',
input: { question, response, context },
metadata: {
questionLength: question.length,
responseLength: response.length,
contextLength: context.length
}
})
try {
// Evaluate multiple quality dimensions
const evaluations = await Promise.all([
this.evaluateRelevance(question, response),
this.evaluateCoherence(response),
this.evaluateFactualAccuracy(response, context),
this.evaluateCompleteness(question, response)
])
const overallScore = evaluations.reduce((sum, score) => sum + score, 0) / evaluations.length
await evaluationTrace.complete({
output: { overallScore, evaluations },
metadata: {
relevance: evaluations[0],
coherence: evaluations[1],
factualAccuracy: evaluations[2],
completeness: evaluations[3],
overallScore
}
})
return { overallScore, evaluations }
} catch (error) {
await evaluationTrace.error({ error: error.message })
throw error
}
}
private async evaluateRelevance(question: string, response: string): Promise<number> {
// Implement relevance evaluation
return 0.9
}
private async evaluateCoherence(response: string): Promise<number> {
// Implement coherence evaluation
return 0.8
}
private async evaluateFactualAccuracy(response: string, context: string): Promise<number> {
// Implement factual accuracy evaluation
return 0.85
}
private async evaluateCompleteness(question: string, response: string): Promise<number> {
// Implement completeness evaluation
return 0.9
}
}End-to-End RAG Monitoring
Complete RAG Pipeline Tracking
class CompleteRAGMonitor:
def __init__(self):
self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
def monitor_rag_pipeline(self, question: str, user_id: str = None):
# Create comprehensive RAG trace
rag_trace = self.ants.trace.create(
name="complete-rag-pipeline",
input=question,
metadata={
"user_id": user_id,
"pipeline_version": "1.0",
"components": ["preprocessing", "retrieval", "generation", "evaluation"]
}
)
try:
# Step 1: Query Analysis
with rag_trace.span("query-analysis") as span:
query_analysis = self.analyze_query(question)
span.set_metadata({
"query_type": query_analysis["type"],
"complexity": query_analysis["complexity"],
"keywords": query_analysis["keywords"]
})
# Step 2: Retrieval
with rag_trace.span("retrieval") as span:
retrieved_docs = self.retrieve_documents(question)
span.set_metadata({
"retrieved_count": len(retrieved_docs),
"retrieval_time": span.duration,
"avg_relevance": self.calculate_avg_relevance(retrieved_docs)
})
# Step 3: Context Preparation
with rag_trace.span("context-preparation") as span:
context = self.prepare_context(retrieved_docs)
span.set_metadata({
"context_length": len(context),
"preparation_time": span.duration
})
# Step 4: Generation
with rag_trace.span("generation") as span:
response = self.generate_response(question, context)
span.set_metadata({
"generation_time": span.duration,
"response_length": len(response),
"tokens_used": self.count_tokens(response)
})
# Step 5: Quality Evaluation
with rag_trace.span("quality-evaluation") as span:
quality_metrics = self.evaluate_response_quality(question, response, context)
span.set_metadata({
"quality_score": quality_metrics["overall_score"],
"evaluation_time": span.duration
})
# Step 6: Post-processing
with rag_trace.span("post-processing") as span:
final_response = self.post_process(response)
span.set_metadata({
"post_processing_time": span.duration,
"final_length": len(final_response)
})
rag_trace.complete(
output=final_response,
metadata={
"total_components": 6,
"success": True,
"total_time": rag_trace.duration,
"quality_score": quality_metrics["overall_score"]
}
)
return final_response
except Exception as error:
rag_trace.error(error=str(error))
raise errorCost Optimization
RAG Cost Tracking
class RAGCostMonitor:
def __init__(self):
self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
def track_rag_costs(self, question: str, retrieved_docs: list, response: str):
cost_trace = self.ants.trace.create(
name="rag-cost-tracking",
input=question,
metadata={
"cost_components": ["retrieval", "generation", "evaluation"]
}
)
try:
# Calculate retrieval costs
retrieval_cost = self.calculate_retrieval_cost(retrieved_docs)
# Calculate generation costs
generation_cost = self.calculate_generation_cost(response)
# Calculate evaluation costs
evaluation_cost = self.calculate_evaluation_cost(question, response)
total_cost = retrieval_cost + generation_cost + evaluation_cost
cost_trace.complete(
output={"total_cost": total_cost},
metadata={
"retrieval_cost": retrieval_cost,
"generation_cost": generation_cost,
"evaluation_cost": evaluation_cost,
"total_cost": total_cost,
"cost_per_token": total_cost / self.count_tokens(response)
}
)
return total_cost
except Exception as error:
cost_trace.error(error=str(error))
raise error
def calculate_retrieval_cost(self, docs: list) -> float:
# Calculate cost based on vector search operations
return len(docs) * 0.001 # Simplified
def calculate_generation_cost(self, response: str) -> float:
# Calculate cost based on token usage
tokens = self.count_tokens(response)
return tokens * 0.00003 # Simplified
def calculate_evaluation_cost(self, question: str, response: str) -> float:
# Calculate cost for quality evaluation
return 0.001 # SimplifiedCost Optimization Strategies
class RAGCostOptimizer {
private ants: AgenticAnts
async optimizeRAGCosts() {
// Analyze current costs
const costAnalysis = await this.ants.finops.getRAGCosts({
period: 'last_30_days'
})
console.log('RAG Cost Analysis:')
console.log(`Total cost: $${costAnalysis.totalCost}`)
console.log(`Retrieval cost: $${costAnalysis.retrievalCost}`)
console.log(`Generation cost: $${costAnalysis.generationCost}`)
// Get optimization recommendations
const recommendations = await this.ants.finops.getRAGOptimizations()
console.log('\nOptimization Recommendations:')
recommendations.forEach(rec => {
console.log(`- ${rec.title}: Save $${rec.savings}/month`)
console.log(` Action: ${rec.action}`)
})
return recommendations
}
async implementOptimizations(recommendations: any[]) {
for (const rec of recommendations) {
switch (rec.type) {
case 'reduce_retrieval_k':
await this.reduceRetrievalK(rec.optimalK)
break
case 'optimize_context_length':
await this.optimizeContextLength(rec.optimalLength)
break
case 'implement_caching':
await this.implementCaching()
break
case 'use_smaller_model':
await this.useSmallerModel(rec.model)
break
}
}
}
}Performance Monitoring
RAG Performance Metrics
class RAGPerformanceMonitor:
def __init__(self):
self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
def monitor_rag_performance(self):
# Get performance metrics for RAG system
metrics = self.ants.metrics.get_rag_metrics(
period="last_7_days"
)
print("RAG Performance Metrics:")
print(f" Average query time: {metrics.avg_query_time}ms")
print(f" Average retrieval time: {metrics.avg_retrieval_time}ms")
print(f" Average generation time: {metrics.avg_generation_time}ms")
print(f" Average response quality: {metrics.avg_response_quality}")
print(f" Success rate: {metrics.success_rate}%")
print(f" Error rate: {metrics.error_rate}%")
# Check for performance issues
if metrics.avg_query_time > 10000:
print(" ⚠️ High query time detected!")
if metrics.avg_response_quality < 0.7:
print(" ⚠️ Low response quality detected!")
if metrics.error_rate > 5:
print(" ⚠️ High error rate detected!")
return metricsPerformance Optimization
class RAGPerformanceOptimizer {
private ants: AgenticAnts
async optimizeRAGPerformance() {
// Analyze performance bottlenecks
const bottlenecks = await this.ants.sre.findRAGBottlenecks({
period: 'last_7_days'
})
console.log('RAG Performance Bottlenecks:')
bottlenecks.forEach(bottleneck => {
console.log(`- ${bottleneck.component}: ${bottleneck.avgTime}ms avg`)
console.log(` Optimization: ${bottleneck.optimization}`)
})
// Implement optimizations
for (const bottleneck of bottlenecks) {
await this.implementOptimization(bottleneck)
}
}
private async implementOptimization(bottleneck: any) {
switch (bottleneck.component) {
case 'retrieval':
await this.optimizeRetrieval()
break
case 'generation':
await this.optimizeGeneration()
break
case 'context_preparation':
await this.optimizeContextPreparation()
break
}
}
}Quality Monitoring
Response Quality Tracking
class RAGQualityMonitor:
def __init__(self):
self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
def monitor_response_quality(self, question: str, response: str, context: str):
quality_trace = self.ants.trace.create(
name="rag-quality-monitoring",
input=question,
metadata={
"quality_dimensions": ["relevance", "accuracy", "completeness", "coherence"]
}
)
try:
# Evaluate response quality
quality_metrics = self.evaluate_response_quality(question, response, context)
quality_trace.complete(
output=quality_metrics,
metadata={
"overall_quality": quality_metrics["overall_score"],
"relevance": quality_metrics["relevance"],
"accuracy": quality_metrics["accuracy"],
"completeness": quality_metrics["completeness"],
"coherence": quality_metrics["coherence"]
}
)
return quality_metrics
except Exception as error:
quality_trace.error(error=str(error))
raise error
def evaluate_response_quality(self, question: str, response: str, context: str) -> dict:
# Implement comprehensive quality evaluation
return {
"overall_score": 0.85,
"relevance": 0.9,
"accuracy": 0.8,
"completeness": 0.85,
"coherence": 0.8
}Best Practices
1. Comprehensive Monitoring
class BestPracticeRAGSystem:
def __init__(self):
self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
def query_with_monitoring(self, question: str, user_id: str = None):
# Always create comprehensive traces
rag_trace = self.ants.trace.create(
name="best-practice-rag",
input=question,
metadata={
"user_id": user_id,
"monitoring_level": "comprehensive",
"quality_tracking": True,
"cost_tracking": True,
"performance_tracking": True
}
)
try:
# Monitor all components
with rag_trace.span("retrieval") as span:
docs = self.retrieve_documents(question)
span.set_metadata({
"retrieved_count": len(docs),
"retrieval_time": span.duration
})
with rag_trace.span("generation") as span:
response = self.generate_response(question, docs)
span.set_metadata({
"response_length": len(response),
"generation_time": span.duration
})
with rag_trace.span("quality-evaluation") as span:
quality = self.evaluate_quality(question, response, docs)
span.set_metadata({
"quality_score": quality["overall_score"],
"evaluation_time": span.duration
})
rag_trace.complete(
output=response,
metadata={
"success": True,
"quality_score": quality["overall_score"]
}
)
return response
except Exception as error:
rag_trace.error(error=str(error))
raise error2. Error Handling and Recovery
class ResilientRAGSystem {
private ants: AgenticAnts
async queryWithRecovery(question: string, userId?: string) {
const ragTrace = await this.ants.trace.create({
name: 'resilient-rag',
input: question,
metadata: {
userId,
retryEnabled: true,
fallbackEnabled: true
}
})
try {
// Try primary RAG system
const response = await this.primaryRAGSystem.query(question)
await ragTrace.complete({
output: response,
metadata: { success: true, system: 'primary' }
})
return response
} catch (error) {
// Try fallback system
try {
const fallbackResponse = await this.fallbackRAGSystem.query(question)
await ragTrace.complete({
output: fallbackResponse,
metadata: { success: true, system: 'fallback' }
})
return fallbackResponse
} catch (fallbackError) {
await ragTrace.error({
error: fallbackError.message,
metadata: { success: false, systems: ['primary', 'fallback'] }
})
throw fallbackError
}
}
}
}Troubleshooting
Common RAG Issues
Issue: Low retrieval quality
def debug_retrieval_quality():
# Analyze retrieval performance
traces = ants.traces.query({
"name": "vector-retrieval",
"period": "last_24h"
})
for trace in traces:
if trace.metadata.get("avg_relevance", 0) < 0.7:
print(f"Low relevance query: {trace.input}")
print(f"Relevance score: {trace.metadata.get('avg_relevance')}")
print(f"Retrieved docs: {trace.metadata.get('results_count')}")Issue: High generation costs
async function debugGenerationCosts() {
const expensiveTraces = await ants.traces.query({
name: 'llm-generation',
period: 'last_24h',
sortBy: 'cost'
})
console.log('Most expensive generations:')
expensiveTraces.slice(0, 10).forEach(trace => {
console.log(`Cost: $${trace.metadata.cost}`)
console.log(`Tokens: ${trace.metadata.tokens_used}`)
console.log(`Query: ${trace.input}`)
})
}Next Steps
- Production Deployment - Learn production best practices
- Cost Optimization - Reduce costs with our optimization guide
- Debugging - Troubleshoot issues with our debugging guide
Example Projects
- Document Q&A System - GitHub Repository (opens in a new tab)
- Knowledge Base Chatbot - GitHub Repository (opens in a new tab)
- Research Assistant - GitHub Repository (opens in a new tab)
Congratulations! 🎉 You now have comprehensive monitoring for your RAG systems with complete visibility into retrieval quality, generation performance, and end-to-end metrics.