Docs
Guides & Tutorials
Rag System

RAG System Monitoring

Observe retrieval-augmented generation systems end-to-end with comprehensive monitoring and optimization.

Overview

Retrieval-Augmented Generation (RAG) systems combine information retrieval with text generation. AgenticAnts provides complete observability for:

  • Retrieval Quality - Monitor vector search performance
  • Generation Performance - Track LLM response quality
  • End-to-End Latency - Measure total system performance
  • Cost Optimization - Track retrieval and generation costs
  • Quality Metrics - Monitor answer relevance and accuracy

RAG Architecture Components

Typical RAG System Flow

from agenticants import AgenticAnts
from agenticants.integrations import langchain
 
ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
langchain.auto_instrument(ants)
 
class RAGSystem:
    def __init__(self):
        self.vector_store = VectorStore()
        self.retriever = Retriever()
        self.generator = Generator()
    
    def query(self, question: str, user_id: str = None):
        # Create main RAG trace
        rag_trace = ants.trace.create(
            name="rag-system",
            input=question,
            metadata={
                "user_id": user_id,
                "system_type": "rag",
                "components": ["retrieval", "generation"]
            }
        )
        
        try:
            # Step 1: Query Preprocessing
            with rag_trace.span("query-preprocessing") as span:
                processed_query = self.preprocess_query(question)
                span.set_metadata({
                    "original_length": len(question),
                    "processed_length": len(processed_query),
                    "preprocessing_time": span.duration
                })
            
            # Step 2: Vector Retrieval
            with rag_trace.span("vector-retrieval") as span:
                retrieved_docs = self.retriever.retrieve(processed_query)
                span.set_metadata({
                    "retrieved_count": len(retrieved_docs),
                    "retrieval_time": span.duration,
                    "vector_db": "pinecone"
                })
            
            # Step 3: Context Preparation
            with rag_trace.span("context-preparation") as span:
                context = self.prepare_context(retrieved_docs)
                span.set_metadata({
                    "context_length": len(context),
                    "preparation_time": span.duration
                })
            
            # Step 4: Generation
            with rag_trace.span("generation") as span:
                answer = self.generator.generate(question, context)
                span.set_metadata({
                    "generation_time": span.duration,
                    "answer_length": len(answer),
                    "model": "gpt-4"
                })
            
            # Step 5: Post-processing
            with rag_trace.span("post-processing") as span:
                final_answer = self.post_process(answer)
                span.set_metadata({
                    "post_processing_time": span.duration,
                    "final_length": len(final_answer)
                })
            
            rag_trace.complete(
                output=final_answer,
                metadata={
                    "total_components": 5,
                    "success": True,
                    "total_time": rag_trace.duration
                }
            )
            
            return final_answer
            
        except Exception as error:
            rag_trace.error(error=str(error))
            raise error

Retrieval Quality Monitoring

Vector Search Performance

class RetrievalMonitor:
    def __init__(self):
        self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
    
    def monitor_retrieval(self, query: str, top_k: int = 5):
        retrieval_trace = self.ants.trace.create(
            name="vector-retrieval",
            input=query,
            metadata={
                "top_k": top_k,
                "retrieval_type": "semantic_search"
            }
        )
        
        try:
            # Track retrieval metrics
            start_time = time.time()
            
            # Perform vector search
            results = self.vector_store.similarity_search(
                query=query,
                k=top_k
            )
            
            retrieval_time = time.time() - start_time
            
            # Calculate relevance scores
            relevance_scores = []
            for result in results:
                score = self.calculate_relevance(query, result.content)
                relevance_scores.append(score)
            
            retrieval_trace.complete(
                output=results,
                metadata={
                    "retrieval_time": retrieval_time,
                    "results_count": len(results),
                    "avg_relevance": sum(relevance_scores) / len(relevance_scores),
                    "min_relevance": min(relevance_scores),
                    "max_relevance": max(relevance_scores)
                }
            )
            
            return results
            
        except Exception as error:
            retrieval_trace.error(error=str(error))
            raise error
    
    def calculate_relevance(self, query: str, content: str) -> float:
        # Implement relevance scoring logic
        # This could use semantic similarity, keyword matching, etc.
        pass

Retrieval Quality Metrics

class RetrievalQualityMonitor {
  private ants: AgenticAnts
  
  async monitorRetrievalQuality(query: string, retrievedDocs: any[]) {
    const qualityTrace = await this.ants.trace.create({
      name: 'retrieval-quality',
      input: query,
      metadata: {
        retrievedCount: retrievedDocs.length,
        queryType: this.classifyQuery(query)
      }
    })
    
    try {
      // Calculate quality metrics
      const metrics = {
        relevance: await this.calculateRelevance(query, retrievedDocs),
        diversity: this.calculateDiversity(retrievedDocs),
        coverage: this.calculateCoverage(query, retrievedDocs),
        freshness: this.calculateFreshness(retrievedDocs)
      }
      
      await qualityTrace.complete({
        output: metrics,
        metadata: {
          avgRelevance: metrics.relevance.avg,
          diversityScore: metrics.diversity,
          coverageScore: metrics.coverage,
          freshnessScore: metrics.freshness
        }
      })
      
      return metrics
      
    } catch (error) {
      await qualityTrace.error({ error: error.message })
      throw error
    }
  }
  
  private classifyQuery(query: string): string {
    // Classify query type (factual, analytical, creative, etc.)
    return 'factual' // Simplified
  }
  
  private calculateDiversity(docs: any[]): number {
    // Calculate diversity of retrieved documents
    return 0.8 // Simplified
  }
  
  private calculateCoverage(query: string, docs: any[]): number {
    // Calculate how well documents cover the query
    return 0.9 // Simplified
  }
  
  private calculateFreshness(docs: any[]): number {
    // Calculate freshness of retrieved documents
    return 0.7 // Simplified
  }
}

Generation Performance Monitoring

LLM Generation Tracking

class GenerationMonitor:
    def __init__(self):
        self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
    
    def monitor_generation(self, question: str, context: str, model: str = "gpt-4"):
        generation_trace = self.ants.trace.create(
            name="llm-generation",
            input=question,
            metadata={
                "model": model,
                "context_length": len(context),
                "question_length": len(question)
            }
        )
        
        try:
            # Track generation metrics
            start_time = time.time()
            
            # Generate response
            response = self.generate_response(question, context, model)
            
            generation_time = time.time() - start_time
            
            # Calculate quality metrics
            quality_metrics = self.evaluate_response_quality(question, response)
            
            generation_trace.complete(
                output=response,
                metadata={
                    "generation_time": generation_time,
                    "response_length": len(response),
                    "tokens_used": self.count_tokens(response),
                    "quality_score": quality_metrics["overall_score"],
                    "relevance_score": quality_metrics["relevance"],
                    "coherence_score": quality_metrics["coherence"],
                    "factual_accuracy": quality_metrics["factual_accuracy"]
                }
            )
            
            return response
            
        except Exception as error:
            generation_trace.error(error=str(error))
            raise error
    
    def evaluate_response_quality(self, question: str, response: str) -> dict:
        # Implement quality evaluation logic
        return {
            "overall_score": 0.85,
            "relevance": 0.9,
            "coherence": 0.8,
            "factual_accuracy": 0.85
        }

Response Quality Evaluation

class ResponseQualityEvaluator {
  private ants: AgenticAnts
  
  async evaluateResponse(question: string, response: string, context: string) {
    const evaluationTrace = await this.ants.trace.create({
      name: 'response-evaluation',
      input: { question, response, context },
      metadata: {
        questionLength: question.length,
        responseLength: response.length,
        contextLength: context.length
      }
    })
    
    try {
      // Evaluate multiple quality dimensions
      const evaluations = await Promise.all([
        this.evaluateRelevance(question, response),
        this.evaluateCoherence(response),
        this.evaluateFactualAccuracy(response, context),
        this.evaluateCompleteness(question, response)
      ])
      
      const overallScore = evaluations.reduce((sum, score) => sum + score, 0) / evaluations.length
      
      await evaluationTrace.complete({
        output: { overallScore, evaluations },
        metadata: {
          relevance: evaluations[0],
          coherence: evaluations[1],
          factualAccuracy: evaluations[2],
          completeness: evaluations[3],
          overallScore
        }
      })
      
      return { overallScore, evaluations }
      
    } catch (error) {
      await evaluationTrace.error({ error: error.message })
      throw error
    }
  }
  
  private async evaluateRelevance(question: string, response: string): Promise<number> {
    // Implement relevance evaluation
    return 0.9
  }
  
  private async evaluateCoherence(response: string): Promise<number> {
    // Implement coherence evaluation
    return 0.8
  }
  
  private async evaluateFactualAccuracy(response: string, context: string): Promise<number> {
    // Implement factual accuracy evaluation
    return 0.85
  }
  
  private async evaluateCompleteness(question: string, response: string): Promise<number> {
    // Implement completeness evaluation
    return 0.9
  }
}

End-to-End RAG Monitoring

Complete RAG Pipeline Tracking

class CompleteRAGMonitor:
    def __init__(self):
        self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
    
    def monitor_rag_pipeline(self, question: str, user_id: str = None):
        # Create comprehensive RAG trace
        rag_trace = self.ants.trace.create(
            name="complete-rag-pipeline",
            input=question,
            metadata={
                "user_id": user_id,
                "pipeline_version": "1.0",
                "components": ["preprocessing", "retrieval", "generation", "evaluation"]
            }
        )
        
        try:
            # Step 1: Query Analysis
            with rag_trace.span("query-analysis") as span:
                query_analysis = self.analyze_query(question)
                span.set_metadata({
                    "query_type": query_analysis["type"],
                    "complexity": query_analysis["complexity"],
                    "keywords": query_analysis["keywords"]
                })
            
            # Step 2: Retrieval
            with rag_trace.span("retrieval") as span:
                retrieved_docs = self.retrieve_documents(question)
                span.set_metadata({
                    "retrieved_count": len(retrieved_docs),
                    "retrieval_time": span.duration,
                    "avg_relevance": self.calculate_avg_relevance(retrieved_docs)
                })
            
            # Step 3: Context Preparation
            with rag_trace.span("context-preparation") as span:
                context = self.prepare_context(retrieved_docs)
                span.set_metadata({
                    "context_length": len(context),
                    "preparation_time": span.duration
                })
            
            # Step 4: Generation
            with rag_trace.span("generation") as span:
                response = self.generate_response(question, context)
                span.set_metadata({
                    "generation_time": span.duration,
                    "response_length": len(response),
                    "tokens_used": self.count_tokens(response)
                })
            
            # Step 5: Quality Evaluation
            with rag_trace.span("quality-evaluation") as span:
                quality_metrics = self.evaluate_response_quality(question, response, context)
                span.set_metadata({
                    "quality_score": quality_metrics["overall_score"],
                    "evaluation_time": span.duration
                })
            
            # Step 6: Post-processing
            with rag_trace.span("post-processing") as span:
                final_response = self.post_process(response)
                span.set_metadata({
                    "post_processing_time": span.duration,
                    "final_length": len(final_response)
                })
            
            rag_trace.complete(
                output=final_response,
                metadata={
                    "total_components": 6,
                    "success": True,
                    "total_time": rag_trace.duration,
                    "quality_score": quality_metrics["overall_score"]
                }
            )
            
            return final_response
            
        except Exception as error:
            rag_trace.error(error=str(error))
            raise error

Cost Optimization

RAG Cost Tracking

class RAGCostMonitor:
    def __init__(self):
        self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
    
    def track_rag_costs(self, question: str, retrieved_docs: list, response: str):
        cost_trace = self.ants.trace.create(
            name="rag-cost-tracking",
            input=question,
            metadata={
                "cost_components": ["retrieval", "generation", "evaluation"]
            }
        )
        
        try:
            # Calculate retrieval costs
            retrieval_cost = self.calculate_retrieval_cost(retrieved_docs)
            
            # Calculate generation costs
            generation_cost = self.calculate_generation_cost(response)
            
            # Calculate evaluation costs
            evaluation_cost = self.calculate_evaluation_cost(question, response)
            
            total_cost = retrieval_cost + generation_cost + evaluation_cost
            
            cost_trace.complete(
                output={"total_cost": total_cost},
                metadata={
                    "retrieval_cost": retrieval_cost,
                    "generation_cost": generation_cost,
                    "evaluation_cost": evaluation_cost,
                    "total_cost": total_cost,
                    "cost_per_token": total_cost / self.count_tokens(response)
                }
            )
            
            return total_cost
            
        except Exception as error:
            cost_trace.error(error=str(error))
            raise error
    
    def calculate_retrieval_cost(self, docs: list) -> float:
        # Calculate cost based on vector search operations
        return len(docs) * 0.001  # Simplified
    
    def calculate_generation_cost(self, response: str) -> float:
        # Calculate cost based on token usage
        tokens = self.count_tokens(response)
        return tokens * 0.00003  # Simplified
    
    def calculate_evaluation_cost(self, question: str, response: str) -> float:
        # Calculate cost for quality evaluation
        return 0.001  # Simplified

Cost Optimization Strategies

class RAGCostOptimizer {
  private ants: AgenticAnts
  
  async optimizeRAGCosts() {
    // Analyze current costs
    const costAnalysis = await this.ants.finops.getRAGCosts({
      period: 'last_30_days'
    })
    
    console.log('RAG Cost Analysis:')
    console.log(`Total cost: $${costAnalysis.totalCost}`)
    console.log(`Retrieval cost: $${costAnalysis.retrievalCost}`)
    console.log(`Generation cost: $${costAnalysis.generationCost}`)
    
    // Get optimization recommendations
    const recommendations = await this.ants.finops.getRAGOptimizations()
    
    console.log('\nOptimization Recommendations:')
    recommendations.forEach(rec => {
      console.log(`- ${rec.title}: Save $${rec.savings}/month`)
      console.log(`  Action: ${rec.action}`)
    })
    
    return recommendations
  }
  
  async implementOptimizations(recommendations: any[]) {
    for (const rec of recommendations) {
      switch (rec.type) {
        case 'reduce_retrieval_k':
          await this.reduceRetrievalK(rec.optimalK)
          break
        case 'optimize_context_length':
          await this.optimizeContextLength(rec.optimalLength)
          break
        case 'implement_caching':
          await this.implementCaching()
          break
        case 'use_smaller_model':
          await this.useSmallerModel(rec.model)
          break
      }
    }
  }
}

Performance Monitoring

RAG Performance Metrics

class RAGPerformanceMonitor:
    def __init__(self):
        self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
    
    def monitor_rag_performance(self):
        # Get performance metrics for RAG system
        metrics = self.ants.metrics.get_rag_metrics(
            period="last_7_days"
        )
        
        print("RAG Performance Metrics:")
        print(f"  Average query time: {metrics.avg_query_time}ms")
        print(f"  Average retrieval time: {metrics.avg_retrieval_time}ms")
        print(f"  Average generation time: {metrics.avg_generation_time}ms")
        print(f"  Average response quality: {metrics.avg_response_quality}")
        print(f"  Success rate: {metrics.success_rate}%")
        print(f"  Error rate: {metrics.error_rate}%")
        
        # Check for performance issues
        if metrics.avg_query_time > 10000:
            print("  ⚠️  High query time detected!")
        
        if metrics.avg_response_quality < 0.7:
            print("  ⚠️  Low response quality detected!")
        
        if metrics.error_rate > 5:
            print("  ⚠️  High error rate detected!")
        
        return metrics

Performance Optimization

class RAGPerformanceOptimizer {
  private ants: AgenticAnts
  
  async optimizeRAGPerformance() {
    // Analyze performance bottlenecks
    const bottlenecks = await this.ants.sre.findRAGBottlenecks({
      period: 'last_7_days'
    })
    
    console.log('RAG Performance Bottlenecks:')
    bottlenecks.forEach(bottleneck => {
      console.log(`- ${bottleneck.component}: ${bottleneck.avgTime}ms avg`)
      console.log(`  Optimization: ${bottleneck.optimization}`)
    })
    
    // Implement optimizations
    for (const bottleneck of bottlenecks) {
      await this.implementOptimization(bottleneck)
    }
  }
  
  private async implementOptimization(bottleneck: any) {
    switch (bottleneck.component) {
      case 'retrieval':
        await this.optimizeRetrieval()
        break
      case 'generation':
        await this.optimizeGeneration()
        break
      case 'context_preparation':
        await this.optimizeContextPreparation()
        break
    }
  }
}

Quality Monitoring

Response Quality Tracking

class RAGQualityMonitor:
    def __init__(self):
        self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
    
    def monitor_response_quality(self, question: str, response: str, context: str):
        quality_trace = self.ants.trace.create(
            name="rag-quality-monitoring",
            input=question,
            metadata={
                "quality_dimensions": ["relevance", "accuracy", "completeness", "coherence"]
            }
        )
        
        try:
            # Evaluate response quality
            quality_metrics = self.evaluate_response_quality(question, response, context)
            
            quality_trace.complete(
                output=quality_metrics,
                metadata={
                    "overall_quality": quality_metrics["overall_score"],
                    "relevance": quality_metrics["relevance"],
                    "accuracy": quality_metrics["accuracy"],
                    "completeness": quality_metrics["completeness"],
                    "coherence": quality_metrics["coherence"]
                }
            )
            
            return quality_metrics
            
        except Exception as error:
            quality_trace.error(error=str(error))
            raise error
    
    def evaluate_response_quality(self, question: str, response: str, context: str) -> dict:
        # Implement comprehensive quality evaluation
        return {
            "overall_score": 0.85,
            "relevance": 0.9,
            "accuracy": 0.8,
            "completeness": 0.85,
            "coherence": 0.8
        }

Best Practices

1. Comprehensive Monitoring

class BestPracticeRAGSystem:
    def __init__(self):
        self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
    
    def query_with_monitoring(self, question: str, user_id: str = None):
        # Always create comprehensive traces
        rag_trace = self.ants.trace.create(
            name="best-practice-rag",
            input=question,
            metadata={
                "user_id": user_id,
                "monitoring_level": "comprehensive",
                "quality_tracking": True,
                "cost_tracking": True,
                "performance_tracking": True
            }
        )
        
        try:
            # Monitor all components
            with rag_trace.span("retrieval") as span:
                docs = self.retrieve_documents(question)
                span.set_metadata({
                    "retrieved_count": len(docs),
                    "retrieval_time": span.duration
                })
            
            with rag_trace.span("generation") as span:
                response = self.generate_response(question, docs)
                span.set_metadata({
                    "response_length": len(response),
                    "generation_time": span.duration
                })
            
            with rag_trace.span("quality-evaluation") as span:
                quality = self.evaluate_quality(question, response, docs)
                span.set_metadata({
                    "quality_score": quality["overall_score"],
                    "evaluation_time": span.duration
                })
            
            rag_trace.complete(
                output=response,
                metadata={
                    "success": True,
                    "quality_score": quality["overall_score"]
                }
            )
            
            return response
            
        except Exception as error:
            rag_trace.error(error=str(error))
            raise error

2. Error Handling and Recovery

class ResilientRAGSystem {
  private ants: AgenticAnts
  
  async queryWithRecovery(question: string, userId?: string) {
    const ragTrace = await this.ants.trace.create({
      name: 'resilient-rag',
      input: question,
      metadata: {
        userId,
        retryEnabled: true,
        fallbackEnabled: true
      }
    })
    
    try {
      // Try primary RAG system
      const response = await this.primaryRAGSystem.query(question)
      
      await ragTrace.complete({
        output: response,
        metadata: { success: true, system: 'primary' }
      })
      
      return response
      
    } catch (error) {
      // Try fallback system
      try {
        const fallbackResponse = await this.fallbackRAGSystem.query(question)
        
        await ragTrace.complete({
          output: fallbackResponse,
          metadata: { success: true, system: 'fallback' }
        })
        
        return fallbackResponse
        
      } catch (fallbackError) {
        await ragTrace.error({
          error: fallbackError.message,
          metadata: { success: false, systems: ['primary', 'fallback'] }
        })
        
        throw fallbackError
      }
    }
  }
}

Troubleshooting

Common RAG Issues

Issue: Low retrieval quality

def debug_retrieval_quality():
    # Analyze retrieval performance
    traces = ants.traces.query({
        "name": "vector-retrieval",
        "period": "last_24h"
    })
    
    for trace in traces:
        if trace.metadata.get("avg_relevance", 0) < 0.7:
            print(f"Low relevance query: {trace.input}")
            print(f"Relevance score: {trace.metadata.get('avg_relevance')}")
            print(f"Retrieved docs: {trace.metadata.get('results_count')}")

Issue: High generation costs

async function debugGenerationCosts() {
  const expensiveTraces = await ants.traces.query({
    name: 'llm-generation',
    period: 'last_24h',
    sortBy: 'cost'
  })
  
  console.log('Most expensive generations:')
  expensiveTraces.slice(0, 10).forEach(trace => {
    console.log(`Cost: $${trace.metadata.cost}`)
    console.log(`Tokens: ${trace.metadata.tokens_used}`)
    console.log(`Query: ${trace.input}`)
  })
}

Next Steps

Example Projects


Congratulations! 🎉 You now have comprehensive monitoring for your RAG systems with complete visibility into retrieval quality, generation performance, and end-to-end metrics.