#Multi-Agent Systems on WhatsApp with A2A Protocol
One Agent Isn't Enough Anymore
One AI agent is useful. Three AI agents working together? That's 10x more powerful.
You can't build a single AI that's world-class at support, sales, billing, and logistics. But you can build four specialized agents that each excel at one domain—and let them collaborate seamlessly through the A2A protocol.
Multi-agent systems let you decompose complex problems into focused specialists. MoltFlow's A2A protocol provides the coordination layer that makes this practical.
Here's what this unlocks:
- Specialization: Each agent masters one domain (support, sales, billing) instead of being mediocre at everything
- Parallel processing: Multiple agents can handle different conversations simultaneously
- Fault tolerance: If one agent goes down, others continue working
- Easy scaling: Add new specialist agents without rewriting existing ones
Real example: An e-commerce company replaced a single overloaded chatbot with four specialized agents. Order tracking agent, product recommendations agent, support agent, and billing agent. Customer satisfaction jumped 34% because each agent was excellent at its specific job.
Multi-Agent Architecture Patterns
Two main patterns emerge when building multi-agent systems:
Hub-and-Spoke
One orchestrator agent receives all messages and routes them to specialist agents. Think of it as a dispatcher.
When to use:
- You need centralized logging and auditing
- Customer always talks to one consistent "face"
- Simple routing logic (keywords, intent classification)
Pros: Easy to debug, simple message flow, single point of control Cons: Orchestrator is a single point of failure, can become a bottleneck
Mesh Network
Agents talk directly to each other. Customer Support agent might forward to Billing agent, which calls Inventory agent, which notifies Shipping agent.
When to use:
- Complex workflows requiring agent-to-agent collaboration
- High throughput requirements
- Each agent needs autonomy to make decisions
Pros: No single point of failure, scales better, agents can optimize their own workflows Cons: Harder to debug, need robust error handling, potential for circular messaging
For most use cases? Start with hub-and-spoke. It's simpler to reason about and easier to debug. You can always evolve to mesh when you need the scalability.
Building the Orchestrator Agent
The orchestrator is your traffic cop. It receives messages, figures out which specialist can help, and routes accordingly.
Here's a minimal orchestrator in Python:
import asyncio
import httpx
from typing import Dict, Optional
class AgentOrchestrator:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://apiv2.waiflow.app"
# Agent registry: map intents to specialist agents
self.agents = {
"support": {"phone": "1234567890", "capabilities": ["troubleshooting", "how-to"]},
"sales": {"phone": "1234567891", "capabilities": ["pricing", "features", "demo"]},
"billing": {"phone": "1234567892", "capabilities": ["invoice", "payment", "subscription"]}
}
async def route_message(self, message: str, customer_phone: str) -> Optional[str]:
"""Determine which agent should handle this message."""
message_lower = message.lower()
# Simple keyword-based routing (use LLM for production)
if any(word in message_lower for word in ["help", "support", "broken", "error"]):
return "support"
elif any(word in message_lower for word in ["price", "buy", "demo", "trial"]):
return "sales"
elif any(word in message_lower for word in ["invoice", "payment", "bill", "charge"]):
return "billing"
else:
# Default to support for unknown intents
return "support"
async def forward_to_agent(self, agent_type: str, message: str, customer_phone: str):
"""Send A2A message to specialist agent."""
agent = self.agents.get(agent_type)
if not agent:
print(f"Unknown agent type: {agent_type}")
return None
# Construct JSON-RPC request
rpc_request = {
"jsonrpc": "2.0",
"id": f"req-{customer_phone}-{asyncio.get_event_loop().time()}",
"method": "message.handle",
"params": {
"customer_phone": customer_phone,
"message": message,
"context": {
"routed_from": "orchestrator",
"original_intent": agent_type
}
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/v2/a2a",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=rpc_request,
timeout=10.0
)
if response.status_code == 200:
result = response.json()
return result.get("result")
else:
print(f"A2A request failed: {response.status_code}")
return None
# Usage
orchestrator = AgentOrchestrator(api_key="your_api_key_here")
await orchestrator.forward_to_agent("support", "My account is broken", "16505551234")What this does:
- Maintains a registry of specialist agents with their phone numbers and capabilities
- Uses simple keyword matching to determine intent (swap this for LLM classification in production)
- Constructs JSON-RPC 2.0 messages following A2A protocol spec
- Forwards messages to the appropriate specialist via
POST /api/v2/a2a
The orchestrator doesn't solve problems itself. It just routes intelligently.
Building Specialist Agents
Each specialist agent focuses on one domain. Here's the structure:
Support Agent
class SupportAgent:
def __init__(self, phone: str, api_key: str):
self.phone = phone
self.api_key = api_key
self.knowledge_base = self.load_knowledge_base()
async def handle_message(self, customer_phone: str, message: str, context: dict):
"""Handle support-related messages."""
# Search knowledge base
answer = self.search_knowledge_base(message)
if answer:
# Send response via WhatsApp
await self.send_message(customer_phone, answer)
return {"status": "resolved", "answer": answer}
else:
# Escalate to human support
await self.escalate_to_human(customer_phone, message)
return {"status": "escalated"}
def search_knowledge_base(self, query: str) -> Optional[str]:
# In production: use RAG with OpenClaw or vector search
# For demo: simple keyword matching
for article in self.knowledge_base:
if any(keyword in query.lower() for keyword in article["keywords"]):
return article["answer"]
return NoneSales Agent
class SalesAgent:
def __init__(self, phone: str, api_key: str):
self.phone = phone
self.api_key = api_key
self.pricing_tiers = self.load_pricing()
async def handle_message(self, customer_phone: str, message: str, context: dict):
"""Handle sales inquiries."""
message_lower = message.lower()
if "pricing" in message_lower or "cost" in message_lower:
pricing_info = self.format_pricing()
await self.send_message(customer_phone, pricing_info)
return {"status": "info_provided", "type": "pricing"}
elif "demo" in message_lower:
demo_link = self.schedule_demo(customer_phone)
await self.send_message(
customer_phone,
f"Great! I've scheduled a demo for you: {demo_link}"
)
return {"status": "demo_scheduled"}
else:
# General sales inquiry
await self.send_message(
customer_phone,
"I can help with pricing, features, or scheduling a demo. What interests you?"
)
return {"status": "clarifying"}Billing Agent
class BillingAgent:
def __init__(self, phone: str, api_key: str):
self.phone = phone
self.api_key = api_key
self.stripe_client = stripe.StripeClient(api_key=os.getenv("STRIPE_KEY"))
async def handle_message(self, customer_phone: str, message: str, context: dict):
"""Handle billing and payment issues."""
# Look up customer in Stripe by phone
customer = await self.find_customer(customer_phone)
if "invoice" in message.lower():
invoices = await self.get_recent_invoices(customer.id)
invoice_list = self.format_invoices(invoices)
await self.send_message(customer_phone, invoice_list)
return {"status": "invoices_sent"}
elif "payment" in message.lower():
# Check payment method status
payment_methods = await self.get_payment_methods(customer.id)
if not payment_methods:
payment_link = self.create_payment_link(customer.id)
await self.send_message(
customer_phone,
f"Please update your payment method: {payment_link}"
)
return {"status": "payment_link_sent"}Agent registration:
Each specialist exposes /.well-known/agent.json:
{
"name": "Support Agent",
"version": "1.0.0",
"capabilities": [
"message.handle",
"knowledge.search",
"ticket.create"
],
"authentication": {
"type": "bearer",
"endpoint": "/api/v2/a2a/auth"
},
"rpc_endpoint": "/api/v2/a2a/rpc"
}Message Routing and Response Forwarding
Once the orchestrator routes a message, the specialist agent processes it and can optionally respond. Here's the full flow:
async def handle_incoming_message(orchestrator: AgentOrchestrator, message: dict):
"""Process incoming WhatsApp message and route to specialist."""
customer_phone = message["from"]
text = message["text"]
# Step 1: Route to appropriate agent
agent_type = await orchestrator.route_message(text, customer_phone)
# Step 2: Forward via A2A
result = await orchestrator.forward_to_agent(agent_type, text, customer_phone)
# Step 3: Log the interaction
await log_a2a_interaction(
customer_phone=customer_phone,
message=text,
routed_to=agent_type,
result=result
)
return resultResponse handling:
Specialist agents respond by sending WhatsApp messages directly to the customer. The orchestrator doesn't need to relay responses—each agent has its own MoltFlow session.
async def send_message(self, phone: str, text: str):
"""Send WhatsApp message from this agent's session."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/v2/sessions/{self.session_id}/messages",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"chatId": f"{phone}@c.us",
"text": text
}
)
return response.json()This keeps the orchestrator lightweight. It routes, then gets out of the way.
Error Handling and Fallbacks
Multi-agent systems need robust error handling. Agents can fail, time out, or return errors.
Specialist Unavailable
What happens when you route to the Billing agent but it's down?
async def forward_to_agent_with_fallback(
self,
agent_type: str,
message: str,
customer_phone: str
):
"""Forward to specialist with fallback to orchestrator."""
try:
result = await asyncio.wait_for(
self.forward_to_agent(agent_type, message, customer_phone),
timeout=5.0 # 5 second timeout
)
return result
except asyncio.TimeoutError:
# Agent didn't respond in time
print(f"Agent {agent_type} timed out, using fallback")
return await self.handle_fallback(customer_phone, message)
except Exception as e:
# Agent returned error
print(f"Agent {agent_type} error: {e}")
return await self.handle_fallback(customer_phone, message)
async def handle_fallback(self, customer_phone: str, message: str):
"""Fallback when specialist agent fails."""
# Option 1: Use a generic response
await self.send_message(
customer_phone,
"I'm having trouble connecting to our specialist. A human agent will follow up shortly."
)
# Option 2: Create a support ticket
await self.create_support_ticket(customer_phone, message)
return {"status": "fallback", "action": "ticket_created"}Timeout Configuration
Different messages have different urgency. Billing questions can wait longer than "my account is broken."
TIMEOUT_BY_INTENT = {
"support": 3.0, # Fast response for support
"sales": 10.0, # Sales can wait for LLM
"billing": 5.0 # Billing needs to be responsive
}
timeout = TIMEOUT_BY_INTENT.get(agent_type, 5.0)
result = await asyncio.wait_for(forward_to_agent(...), timeout=timeout)Escalation to Humans
When all else fails, escalate to a human:
async def escalate_to_human(self, customer_phone: str, message: str, reason: str):
"""Escalate to human agent when AI can't handle."""
# Create ticket in support system
ticket = await create_zendesk_ticket({
"customer_phone": customer_phone,
"message": message,
"reason": reason,
"priority": "high"
})
# Notify customer
await self.send_message(
customer_phone,
f"I've created ticket #{ticket.id} and a specialist will respond within 2 hours."
)
# Notify internal team
await self.send_slack_alert({
"channel": "#support",
"text": f"AI escalation: Ticket {ticket.id} - {reason}"
})Real-World Example: E-Commerce Multi-Agent System
Let's put it all together. An online store uses four agents:
1. Order Tracking Agent
- Queries Shopify API for order status
- Sends tracking links
- Handles "where's my order?" questions
2. Product Recommendation Agent
- Analyzes purchase history
- Suggests complementary products
- Handles "what should I buy?" questions
3. Support Agent
- Searches knowledge base via OpenClaw
- Handles returns, refunds, damaged items
- Escalates complex issues to humans
4. Billing Agent
- Sends invoices via Stripe
- Handles payment failures
- Updates payment methods
Message flow:
- Customer: "Where's my order for #12345?"
- Orchestrator routes to Order Tracking Agent
- Order Tracking Agent queries Shopify, finds order is shipped
- Order Tracking Agent sends: "Your order shipped yesterday! Track it here: [link]"
- Orchestrator logs the interaction for analytics
Why this works:
- Each agent is less than
200lines of focused code - Adding a new "Warranty Agent" doesn't touch existing agents
- If Product Recommendation Agent crashes, orders still get tracked
- Each agent can use different LLMs (GPT-4 for sales, Claude for support, etc.)
What's Next
You've seen how to build multi-agent systems using the A2A protocol. Multi-agent orchestration is unique to MoltFlow—no other WhatsApp platform offers this level of agent coordination.
Continue learning:
- A2A Protocol: How AI Agents Communicate — Learn agent discovery and authentication fundamentals
- A2A Security Best Practices — Secure your multi-agent system with authentication and rate limiting
- OpenClaw WhatsApp Support Bot — Build specialized support agents with AI
- The Complete MoltFlow API Guide — Full REST API reference
Ready to implement multi-agent workflows? Follow our step-by-step guide: Multi-Agent Workflows with A2A
Start a free trial and get 100 A2A requests/hour on the Free plan. Multi-agent architectures transform WhatsApp from a messaging app into a distributed AI platform.
> Try MoltFlow Free — 100 messages/month