Docs
Guides & Tutorials
Multi Agent

Multi-Agent Systems

Monitor complex multi-agent collaborations and workflows with comprehensive observability.

Overview

Multi-agent systems involve multiple AI agents working together to solve complex problems. AgenticAnts provides complete visibility into:

  • Agent Communication - Track messages between agents
  • Workflow Orchestration - Monitor complex workflows
  • Collaboration Patterns - Understand agent interactions
  • Performance Bottlenecks - Identify slow agents
  • Cost Attribution - Track costs per agent

Architecture Patterns

1. Sequential Workflow

Agents work in a pipeline, passing results to the next agent:

from agenticants import AgenticAnts
from agenticants.integrations import langchain
 
ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
langchain.auto_instrument(ants)
 
def sequential_workflow(input_text: str):
    # Create main workflow trace
    workflow_trace = ants.trace.create(
        name="sequential-workflow",
        input=input_text,
        metadata={
            "workflow_type": "sequential",
            "total_agents": 3
        }
    )
    
    # Agent 1: Text Preprocessor
    with workflow_trace.span("text-preprocessor") as span:
        preprocessed = preprocess_agent.process(input_text)
        span.set_metadata({
            "agent": "preprocessor",
            "output_length": len(preprocessed)
        })
    
    # Agent 2: Content Analyzer
    with workflow_trace.span("content-analyzer") as span:
        analysis = analyzer_agent.process(preprocessed)
        span.set_metadata({
            "agent": "analyzer",
            "analysis_type": analysis.type
        })
    
    # Agent 3: Response Generator
    with workflow_trace.span("response-generator") as span:
        response = generator_agent.process(analysis)
        span.set_metadata({
            "agent": "generator",
            "response_length": len(response)
        })
    
    workflow_trace.complete(
        output=response,
        metadata={
            "total_agents": 3,
            "success": True
        }
    )
    
    return response

2. Parallel Processing

Multiple agents work simultaneously on different tasks:

import { AgenticAnts } from '@agenticants/sdk'
 
const ants = new AgenticAnts({ apiKey: process.env.AGENTICANTS_API_KEY })
 
async function parallelWorkflow(inputData: any) {
  // Create main workflow trace
  const workflowTrace = await ants.trace.create({
    name: 'parallel-workflow',
    input: inputData,
    metadata: {
      workflowType: 'parallel',
      totalAgents: 3
    }
  })
  
  try {
    // Run agents in parallel
    const [sentiment, entities, summary] = await Promise.all([
      // Agent 1: Sentiment Analysis
      workflowTrace.span('sentiment-analyzer', async (span) => {
        const result = await sentimentAgent.analyze(inputData.text)
        span.setMetadata({ agent: 'sentiment-analyzer', score: result.score })
        return result
      }),
      
      // Agent 2: Entity Extraction
      workflowTrace.span('entity-extractor', async (span) => {
        const result = await entityAgent.extract(inputData.text)
        span.setMetadata({ agent: 'entity-extractor', count: result.entities.length })
        return result
      }),
      
      // Agent 3: Text Summarizer
      workflowTrace.span('text-summarizer', async (span) => {
        const result = await summarizerAgent.summarize(inputData.text)
        span.setMetadata({ agent: 'text-summarizer', summaryLength: result.length })
        return result
      })
    ])
    
    // Combine results
    const combinedResult = {
      sentiment: sentiment.score,
      entities: entities.entities,
      summary: summary.text
    }
    
    await workflowTrace.complete({
      output: combinedResult,
      metadata: {
        totalAgents: 3,
        success: true
      }
    })
    
    return combinedResult
    
  } catch (error) {
    await workflowTrace.error({
      error: error.message,
      metadata: { success: false }
    })
    throw error
  }
}

3. Hierarchical Coordination

A coordinator agent manages multiple worker agents:

class HierarchicalMultiAgent:
    def __init__(self):
        self.coordinator = CoordinatorAgent()
        self.workers = {
            'researcher': ResearcherAgent(),
            'writer': WriterAgent(),
            'reviewer': ReviewerAgent()
        }
    
    def process_task(self, task: str):
        # Create main workflow trace
        workflow_trace = ants.trace.create(
            name="hierarchical-workflow",
            input=task,
            metadata={
                "workflow_type": "hierarchical",
                "coordinator": "coordinator-agent",
                "workers": list(self.workers.keys())
            }
        )
        
        try:
            # Coordinator analyzes task
            with workflow_trace.span("coordinator-analysis") as span:
                plan = self.coordinator.analyze_task(task)
                span.set_metadata({
                    "agent": "coordinator",
                    "plan_steps": len(plan.steps)
                })
            
            # Execute plan with workers
            results = {}
            for step in plan.steps:
                worker_name = step.assigned_worker
                with workflow_trace.span(f"worker-{worker_name}") as span:
                    result = self.workers[worker_name].execute(step)
                    results[worker_name] = result
                    span.set_metadata({
                        "agent": worker_name,
                        "step": step.name,
                        "result_size": len(str(result))
                    })
            
            # Coordinator synthesizes results
            with workflow_trace.span("coordinator-synthesis") as span:
                final_result = self.coordinator.synthesize(results)
                span.set_metadata({
                    "agent": "coordinator",
                    "final_result_length": len(str(final_result))
                })
            
            workflow_trace.complete(
                output=final_result,
                metadata={
                    "total_steps": len(plan.steps),
                    "success": True
                }
            )
            
            return final_result
            
        except Exception as error:
            workflow_trace.error(error=str(error))
            raise error

Agent Communication Monitoring

Message Passing Tracking

class CommunicatingAgents:
    def __init__(self):
        self.agents = {
            'agent_a': AgentA(),
            'agent_b': AgentB(),
            'agent_c': AgentC()
        }
    
    def send_message(self, from_agent: str, to_agent: str, message: dict):
        # Track inter-agent communication
        comm_trace = ants.trace.create(
            name="agent-communication",
            input=message,
            metadata={
                "from_agent": from_agent,
                "to_agent": to_agent,
                "message_type": message.get("type"),
                "communication_id": message.get("id")
            }
        )
        
        try:
            # Send message
            response = self.agents[to_agent].receive_message(message)
            
            comm_trace.complete(
                output=response,
                metadata={
                    "response_time": comm_trace.duration,
                    "success": True
                }
            )
            
            return response
            
        except Exception as error:
            comm_trace.error(error=str(error))
            raise error

Conversation Flow Tracking

class ConversationFlow {
  private agents: Map<string, any> = new Map()
  
  async processConversation(conversation: Conversation) {
    const conversationTrace = await ants.trace.create({
      name: 'conversation-flow',
      input: conversation,
      metadata: {
        conversationId: conversation.id,
        participantCount: conversation.participants.length,
        messageCount: conversation.messages.length
      }
    })
    
    for (const message of conversation.messages) {
      await conversationTrace.span(`message-${message.id}`, async (span) => {
        const agent = this.agents.get(message.agentId)
        const response = await agent.processMessage(message)
        
        span.setMetadata({
          agentId: message.agentId,
          messageType: message.type,
          responseLength: response.length
        })
        
        return response
      })
    }
    
    await conversationTrace.complete({
      output: conversation.responses,
      metadata: { success: true }
    })
  }
}

Workflow Orchestration

Complex Workflow with Decision Points

class ComplexWorkflow:
    def __init__(self):
        self.decision_agent = DecisionAgent()
        self.execution_agents = {
            'path_a': PathAAgent(),
            'path_b': PathBAgent(),
            'path_c': PathCAgent()
        }
    
    def execute_workflow(self, input_data: dict):
        workflow_trace = ants.trace.create(
            name="complex-workflow",
            input=input_data,
            metadata={
                "workflow_type": "complex",
                "has_decision_points": True
            }
        )
        
        try:
            # Step 1: Decision making
            with workflow_trace.span("decision-making") as span:
                decision = self.decision_agent.decide(input_data)
                span.set_metadata({
                    "agent": "decision-agent",
                    "decision": decision.path,
                    "confidence": decision.confidence
                })
            
            # Step 2: Execute based on decision
            if decision.path == "path_a":
                with workflow_trace.span("path-a-execution") as span:
                    result = self.execution_agents['path_a'].execute(input_data)
                    span.set_metadata({
                        "agent": "path-a-agent",
                        "execution_time": span.duration
                    })
            elif decision.path == "path_b":
                with workflow_trace.span("path-b-execution") as span:
                    result = self.execution_agents['path_b'].execute(input_data)
                    span.set_metadata({
                        "agent": "path-b-agent",
                        "execution_time": span.duration
                    })
            else:
                with workflow_trace.span("path-c-execution") as span:
                    result = self.execution_agents['path_c'].execute(input_data)
                    span.set_metadata({
                        "agent": "path-c-agent",
                        "execution_time": span.duration
                    })
            
            workflow_trace.complete(
                output=result,
                metadata={
                    "decision_path": decision.path,
                    "success": True
                }
            )
            
            return result
            
        except Exception as error:
            workflow_trace.error(error=str(error))
])

Performance Monitoring

Agent Performance Comparison

def monitor_agent_performance():
    # Get performance metrics for all agents
    agents = ['agent_a', 'agent_b', 'agent_c']
    
    for agent in agents:
        metrics = ants.metrics.get_agent_metrics(
            agent_name=agent,
            period="last_7_days"
        )
        
        print(f"\n{agent} Performance:")
        print(f"  Average latency: {metrics.avg_latency}ms")
        print(f"  Success rate: {metrics.success_rate}%")
        print(f"  Total requests: {metrics.total_requests}")
        print(f"  Cost: ${metrics.total_cost}")
        
        # Check for performance issues
        if metrics.avg_latency > 5000:
            print(f"  ⚠️  High latency detected!")
        
        if metrics.success_rate < 95:
            print(f"  ⚠️  Low success rate detected!")
        
        if metrics.total_cost > 100:
            print(f"  ⚠️  High cost detected!")

Workflow Bottleneck Analysis

async function analyzeWorkflowBottlenecks() {
  const workflows = await ants.metrics.getWorkflowMetrics({
    period: 'last_7_days'
  })
  
  for (const workflow of workflows) {
    console.log(`\nWorkflow: ${workflow.name}`)
    
    // Find slowest steps
    const slowestSteps = workflow.steps
      .sort((a, b) => b.avgLatency - a.avgLatency)
      .slice(0, 3)
    
    console.log('Slowest steps:')
    slowestSteps.forEach(step => {
      console.log(`  ${step.name}: ${step.avgLatency}ms avg`)
    })
    
    // Find error-prone steps
    const errorProneSteps = workflow.steps
      .filter(step => step.errorRate > 5)
      .sort((a, b) => b.errorRate - a.errorRate)
    
    if (errorProneSteps.length > 0) {
      console.log('Error-prone steps:')
      errorProneSteps.forEach(step => {
        console.log(`  ${step.name}: ${step.errorRate}% error rate`)
      })
    }
  }
}

Cost Attribution

Track Costs Per Agent

def analyze_agent_costs():
    agents = ['agent_a', 'agent_b', 'agent_c']
    
    total_cost = 0
    agent_costs = {}
    
    for agent in agents:
        cost_metrics = ants.finops.get_agent_costs(
            agent_name=agent,
            period="last_30_days"
        )
        
        agent_costs[agent] = cost_metrics.total_cost
        total_cost += cost_metrics.total_cost
        
        print(f"{agent}: ${cost_metrics.total_cost:.2f}")
        print(f"  Token usage: {cost_metrics.total_tokens:,}")
        print(f"  Cost per token: ${cost_metrics.cost_per_token:.6f}")
    
    print(f"\nTotal cost: ${total_cost:.2f}")
    
    # Cost distribution
    for agent, cost in agent_costs.items():
        percentage = (cost / total_cost) * 100
        print(f"{agent}: {percentage:.1f}% of total cost")

Optimize Expensive Agents

async function optimizeExpensiveAgents() {
  const agentCosts = await ants.finops.getAgentCosts({
    period: 'last_30_days',
    sortBy: 'cost'
  })
  
  console.log('Agent cost analysis:')
  
  for (const agent of agentCosts) {
    console.log(`\n${agent.name}: $${agent.totalCost}`)
    
    // Get optimization recommendations
    const recommendations = await ants.finops.getOptimizations({
      agent: agent.name
    })
    
    if (recommendations.length > 0) {
      console.log('Optimization opportunities:')
      recommendations.forEach(rec => {
        console.log(`  - ${rec.title}: Save $${rec.savings}/month`)
      })
    }
  }
}

Best Practices

1. Design for Observability

class ObservableMultiAgent:
    def __init__(self):
        self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
    
    def execute_workflow(self, input_data):
        # Always create a main trace
        main_trace = self.ants.trace.create(
            name="observable-workflow",
            input=input_data,
            metadata={
                "workflow_version": "1.0",
                "environment": "production"
            }
        )
        
        try:
            # Use spans for each agent interaction
            with main_trace.span("agent-interaction") as span:
                result = self.process_with_agents(input_data)
                span.set_metadata({
                    "agents_used": len(self.agents),
                    "result_size": len(str(result))
                })
            
            main_trace.complete(output=result)
            return result
            
        except Exception as error:
            main_trace.error(error=str(error))
            raise error

2. Implement Circuit Breakers

class ResilientMultiAgent {
  private circuitBreakers: Map<string, CircuitBreaker> = new Map()
  
  async executeWithCircuitBreaker(agentName: string, operation: () => Promise<any>) {
    const breaker = this.circuitBreakers.get(agentName)
    
    if (breaker && breaker.isOpen()) {
      throw new Error(`Circuit breaker open for ${agentName}`)
    }
    
    try {
      const result = await operation()
      breaker?.recordSuccess()
      return result
    } catch (error) {
      breaker?.recordFailure()
      throw error
    }
  }
}

3. Monitor Agent Health

def monitor_agent_health():
    agents = ['agent_a', 'agent_b', 'agent_c']
    
    for agent in agents:
        health = ants.sre.get_agent_health(agent)
        
        print(f"\n{agent} Health:")
        print(f"  Status: {health.status}")
        print(f"  Uptime: {health.uptime}%")
        print(f"  Last check: {health.last_check}")
        
        if health.status != "healthy":
            print(f"  ⚠️  Health issues detected!")
            print(f"  Issues: {health.issues}")

Troubleshooting

Common Issues

Issue: Agents not communicating properly

# Debug agent communication
def debug_agent_communication():
    traces = ants.traces.query({
        "name": "agent-communication",
        "status": "error",
        "period": "last_24h"
    })
    
    for trace in traces:
        print(f"Communication error: {trace.error}")
        print(f"From: {trace.metadata.get('from_agent')}")
        print(f"To: {trace.metadata.get('to_agent')}")
        print(f"Message: {trace.input}")

Issue: Workflow taking too long

// Debug workflow performance
async function debugWorkflowPerformance() {
  const slowWorkflows = await ants.metrics.getSlowWorkflows({
    threshold: 30000, // 30 seconds
    period: 'last_24h'
  })
  
  for (const workflow of slowWorkflows) {
    console.log(`Slow workflow: ${workflow.name}`)
    console.log(`Duration: ${workflow.duration}ms`)
    
    // Analyze spans to find bottlenecks
    const spans = await ants.traces.getSpans(workflow.traceId)
    const slowestSpans = spans
      .sort((a, b) => b.duration - a.duration)
      .slice(0, 3)
    
    console.log('Slowest spans:')
    slowestSpans.forEach(span => {
      console.log(`  ${span.name}: ${span.duration}ms`)
    })
  }
}

Next Steps

Example Projects


Congratulations! 🎉 You now have comprehensive monitoring for your multi-agent systems with complete visibility into agent interactions, performance, and costs.