Multi-Agent Systems
Monitor complex multi-agent collaborations and workflows with comprehensive observability.
Overview
Multi-agent systems involve multiple AI agents working together to solve complex problems. AgenticAnts provides complete visibility into:
- Agent Communication - Track messages between agents
- Workflow Orchestration - Monitor complex workflows
- Collaboration Patterns - Understand agent interactions
- Performance Bottlenecks - Identify slow agents
- Cost Attribution - Track costs per agent
Architecture Patterns
1. Sequential Workflow
Agents work in a pipeline, passing results to the next agent:
from agenticants import AgenticAnts
from agenticants.integrations import langchain
ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
langchain.auto_instrument(ants)
def sequential_workflow(input_text: str):
# Create main workflow trace
workflow_trace = ants.trace.create(
name="sequential-workflow",
input=input_text,
metadata={
"workflow_type": "sequential",
"total_agents": 3
}
)
# Agent 1: Text Preprocessor
with workflow_trace.span("text-preprocessor") as span:
preprocessed = preprocess_agent.process(input_text)
span.set_metadata({
"agent": "preprocessor",
"output_length": len(preprocessed)
})
# Agent 2: Content Analyzer
with workflow_trace.span("content-analyzer") as span:
analysis = analyzer_agent.process(preprocessed)
span.set_metadata({
"agent": "analyzer",
"analysis_type": analysis.type
})
# Agent 3: Response Generator
with workflow_trace.span("response-generator") as span:
response = generator_agent.process(analysis)
span.set_metadata({
"agent": "generator",
"response_length": len(response)
})
workflow_trace.complete(
output=response,
metadata={
"total_agents": 3,
"success": True
}
)
return response2. Parallel Processing
Multiple agents work simultaneously on different tasks:
import { AgenticAnts } from '@agenticants/sdk'
const ants = new AgenticAnts({ apiKey: process.env.AGENTICANTS_API_KEY })
async function parallelWorkflow(inputData: any) {
// Create main workflow trace
const workflowTrace = await ants.trace.create({
name: 'parallel-workflow',
input: inputData,
metadata: {
workflowType: 'parallel',
totalAgents: 3
}
})
try {
// Run agents in parallel
const [sentiment, entities, summary] = await Promise.all([
// Agent 1: Sentiment Analysis
workflowTrace.span('sentiment-analyzer', async (span) => {
const result = await sentimentAgent.analyze(inputData.text)
span.setMetadata({ agent: 'sentiment-analyzer', score: result.score })
return result
}),
// Agent 2: Entity Extraction
workflowTrace.span('entity-extractor', async (span) => {
const result = await entityAgent.extract(inputData.text)
span.setMetadata({ agent: 'entity-extractor', count: result.entities.length })
return result
}),
// Agent 3: Text Summarizer
workflowTrace.span('text-summarizer', async (span) => {
const result = await summarizerAgent.summarize(inputData.text)
span.setMetadata({ agent: 'text-summarizer', summaryLength: result.length })
return result
})
])
// Combine results
const combinedResult = {
sentiment: sentiment.score,
entities: entities.entities,
summary: summary.text
}
await workflowTrace.complete({
output: combinedResult,
metadata: {
totalAgents: 3,
success: true
}
})
return combinedResult
} catch (error) {
await workflowTrace.error({
error: error.message,
metadata: { success: false }
})
throw error
}
}3. Hierarchical Coordination
A coordinator agent manages multiple worker agents:
class HierarchicalMultiAgent:
def __init__(self):
self.coordinator = CoordinatorAgent()
self.workers = {
'researcher': ResearcherAgent(),
'writer': WriterAgent(),
'reviewer': ReviewerAgent()
}
def process_task(self, task: str):
# Create main workflow trace
workflow_trace = ants.trace.create(
name="hierarchical-workflow",
input=task,
metadata={
"workflow_type": "hierarchical",
"coordinator": "coordinator-agent",
"workers": list(self.workers.keys())
}
)
try:
# Coordinator analyzes task
with workflow_trace.span("coordinator-analysis") as span:
plan = self.coordinator.analyze_task(task)
span.set_metadata({
"agent": "coordinator",
"plan_steps": len(plan.steps)
})
# Execute plan with workers
results = {}
for step in plan.steps:
worker_name = step.assigned_worker
with workflow_trace.span(f"worker-{worker_name}") as span:
result = self.workers[worker_name].execute(step)
results[worker_name] = result
span.set_metadata({
"agent": worker_name,
"step": step.name,
"result_size": len(str(result))
})
# Coordinator synthesizes results
with workflow_trace.span("coordinator-synthesis") as span:
final_result = self.coordinator.synthesize(results)
span.set_metadata({
"agent": "coordinator",
"final_result_length": len(str(final_result))
})
workflow_trace.complete(
output=final_result,
metadata={
"total_steps": len(plan.steps),
"success": True
}
)
return final_result
except Exception as error:
workflow_trace.error(error=str(error))
raise errorAgent Communication Monitoring
Message Passing Tracking
class CommunicatingAgents:
def __init__(self):
self.agents = {
'agent_a': AgentA(),
'agent_b': AgentB(),
'agent_c': AgentC()
}
def send_message(self, from_agent: str, to_agent: str, message: dict):
# Track inter-agent communication
comm_trace = ants.trace.create(
name="agent-communication",
input=message,
metadata={
"from_agent": from_agent,
"to_agent": to_agent,
"message_type": message.get("type"),
"communication_id": message.get("id")
}
)
try:
# Send message
response = self.agents[to_agent].receive_message(message)
comm_trace.complete(
output=response,
metadata={
"response_time": comm_trace.duration,
"success": True
}
)
return response
except Exception as error:
comm_trace.error(error=str(error))
raise errorConversation Flow Tracking
class ConversationFlow {
private agents: Map<string, any> = new Map()
async processConversation(conversation: Conversation) {
const conversationTrace = await ants.trace.create({
name: 'conversation-flow',
input: conversation,
metadata: {
conversationId: conversation.id,
participantCount: conversation.participants.length,
messageCount: conversation.messages.length
}
})
for (const message of conversation.messages) {
await conversationTrace.span(`message-${message.id}`, async (span) => {
const agent = this.agents.get(message.agentId)
const response = await agent.processMessage(message)
span.setMetadata({
agentId: message.agentId,
messageType: message.type,
responseLength: response.length
})
return response
})
}
await conversationTrace.complete({
output: conversation.responses,
metadata: { success: true }
})
}
}Workflow Orchestration
Complex Workflow with Decision Points
class ComplexWorkflow:
def __init__(self):
self.decision_agent = DecisionAgent()
self.execution_agents = {
'path_a': PathAAgent(),
'path_b': PathBAgent(),
'path_c': PathCAgent()
}
def execute_workflow(self, input_data: dict):
workflow_trace = ants.trace.create(
name="complex-workflow",
input=input_data,
metadata={
"workflow_type": "complex",
"has_decision_points": True
}
)
try:
# Step 1: Decision making
with workflow_trace.span("decision-making") as span:
decision = self.decision_agent.decide(input_data)
span.set_metadata({
"agent": "decision-agent",
"decision": decision.path,
"confidence": decision.confidence
})
# Step 2: Execute based on decision
if decision.path == "path_a":
with workflow_trace.span("path-a-execution") as span:
result = self.execution_agents['path_a'].execute(input_data)
span.set_metadata({
"agent": "path-a-agent",
"execution_time": span.duration
})
elif decision.path == "path_b":
with workflow_trace.span("path-b-execution") as span:
result = self.execution_agents['path_b'].execute(input_data)
span.set_metadata({
"agent": "path-b-agent",
"execution_time": span.duration
})
else:
with workflow_trace.span("path-c-execution") as span:
result = self.execution_agents['path_c'].execute(input_data)
span.set_metadata({
"agent": "path-c-agent",
"execution_time": span.duration
})
workflow_trace.complete(
output=result,
metadata={
"decision_path": decision.path,
"success": True
}
)
return result
except Exception as error:
workflow_trace.error(error=str(error))
])Performance Monitoring
Agent Performance Comparison
def monitor_agent_performance():
# Get performance metrics for all agents
agents = ['agent_a', 'agent_b', 'agent_c']
for agent in agents:
metrics = ants.metrics.get_agent_metrics(
agent_name=agent,
period="last_7_days"
)
print(f"\n{agent} Performance:")
print(f" Average latency: {metrics.avg_latency}ms")
print(f" Success rate: {metrics.success_rate}%")
print(f" Total requests: {metrics.total_requests}")
print(f" Cost: ${metrics.total_cost}")
# Check for performance issues
if metrics.avg_latency > 5000:
print(f" ⚠️ High latency detected!")
if metrics.success_rate < 95:
print(f" ⚠️ Low success rate detected!")
if metrics.total_cost > 100:
print(f" ⚠️ High cost detected!")Workflow Bottleneck Analysis
async function analyzeWorkflowBottlenecks() {
const workflows = await ants.metrics.getWorkflowMetrics({
period: 'last_7_days'
})
for (const workflow of workflows) {
console.log(`\nWorkflow: ${workflow.name}`)
// Find slowest steps
const slowestSteps = workflow.steps
.sort((a, b) => b.avgLatency - a.avgLatency)
.slice(0, 3)
console.log('Slowest steps:')
slowestSteps.forEach(step => {
console.log(` ${step.name}: ${step.avgLatency}ms avg`)
})
// Find error-prone steps
const errorProneSteps = workflow.steps
.filter(step => step.errorRate > 5)
.sort((a, b) => b.errorRate - a.errorRate)
if (errorProneSteps.length > 0) {
console.log('Error-prone steps:')
errorProneSteps.forEach(step => {
console.log(` ${step.name}: ${step.errorRate}% error rate`)
})
}
}
}Cost Attribution
Track Costs Per Agent
def analyze_agent_costs():
agents = ['agent_a', 'agent_b', 'agent_c']
total_cost = 0
agent_costs = {}
for agent in agents:
cost_metrics = ants.finops.get_agent_costs(
agent_name=agent,
period="last_30_days"
)
agent_costs[agent] = cost_metrics.total_cost
total_cost += cost_metrics.total_cost
print(f"{agent}: ${cost_metrics.total_cost:.2f}")
print(f" Token usage: {cost_metrics.total_tokens:,}")
print(f" Cost per token: ${cost_metrics.cost_per_token:.6f}")
print(f"\nTotal cost: ${total_cost:.2f}")
# Cost distribution
for agent, cost in agent_costs.items():
percentage = (cost / total_cost) * 100
print(f"{agent}: {percentage:.1f}% of total cost")Optimize Expensive Agents
async function optimizeExpensiveAgents() {
const agentCosts = await ants.finops.getAgentCosts({
period: 'last_30_days',
sortBy: 'cost'
})
console.log('Agent cost analysis:')
for (const agent of agentCosts) {
console.log(`\n${agent.name}: $${agent.totalCost}`)
// Get optimization recommendations
const recommendations = await ants.finops.getOptimizations({
agent: agent.name
})
if (recommendations.length > 0) {
console.log('Optimization opportunities:')
recommendations.forEach(rec => {
console.log(` - ${rec.title}: Save $${rec.savings}/month`)
})
}
}
}Best Practices
1. Design for Observability
class ObservableMultiAgent:
def __init__(self):
self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY'))
def execute_workflow(self, input_data):
# Always create a main trace
main_trace = self.ants.trace.create(
name="observable-workflow",
input=input_data,
metadata={
"workflow_version": "1.0",
"environment": "production"
}
)
try:
# Use spans for each agent interaction
with main_trace.span("agent-interaction") as span:
result = self.process_with_agents(input_data)
span.set_metadata({
"agents_used": len(self.agents),
"result_size": len(str(result))
})
main_trace.complete(output=result)
return result
except Exception as error:
main_trace.error(error=str(error))
raise error2. Implement Circuit Breakers
class ResilientMultiAgent {
private circuitBreakers: Map<string, CircuitBreaker> = new Map()
async executeWithCircuitBreaker(agentName: string, operation: () => Promise<any>) {
const breaker = this.circuitBreakers.get(agentName)
if (breaker && breaker.isOpen()) {
throw new Error(`Circuit breaker open for ${agentName}`)
}
try {
const result = await operation()
breaker?.recordSuccess()
return result
} catch (error) {
breaker?.recordFailure()
throw error
}
}
}3. Monitor Agent Health
def monitor_agent_health():
agents = ['agent_a', 'agent_b', 'agent_c']
for agent in agents:
health = ants.sre.get_agent_health(agent)
print(f"\n{agent} Health:")
print(f" Status: {health.status}")
print(f" Uptime: {health.uptime}%")
print(f" Last check: {health.last_check}")
if health.status != "healthy":
print(f" ⚠️ Health issues detected!")
print(f" Issues: {health.issues}")Troubleshooting
Common Issues
Issue: Agents not communicating properly
# Debug agent communication
def debug_agent_communication():
traces = ants.traces.query({
"name": "agent-communication",
"status": "error",
"period": "last_24h"
})
for trace in traces:
print(f"Communication error: {trace.error}")
print(f"From: {trace.metadata.get('from_agent')}")
print(f"To: {trace.metadata.get('to_agent')}")
print(f"Message: {trace.input}")Issue: Workflow taking too long
// Debug workflow performance
async function debugWorkflowPerformance() {
const slowWorkflows = await ants.metrics.getSlowWorkflows({
threshold: 30000, // 30 seconds
period: 'last_24h'
})
for (const workflow of slowWorkflows) {
console.log(`Slow workflow: ${workflow.name}`)
console.log(`Duration: ${workflow.duration}ms`)
// Analyze spans to find bottlenecks
const spans = await ants.traces.getSpans(workflow.traceId)
const slowestSpans = spans
.sort((a, b) => b.duration - a.duration)
.slice(0, 3)
console.log('Slowest spans:')
slowestSpans.forEach(span => {
console.log(` ${span.name}: ${span.duration}ms`)
})
}
}Next Steps
- Production Deployment - Learn production best practices
- Cost Optimization - Reduce costs with our optimization guide
- Debugging - Troubleshoot issues with our debugging guide
Example Projects
- Multi-Agent Customer Support - GitHub Repository (opens in a new tab)
- Collaborative Content Creation - GitHub Repository (opens in a new tab)
- Research Assistant Team - GitHub Repository (opens in a new tab)
Congratulations! 🎉 You now have comprehensive monitoring for your multi-agent systems with complete visibility into agent interactions, performance, and costs.