#A2A Security: Authentication, Trust & Rate Limiting
Unsecured AI Agents Are a Liability
AI agents talking to each other without security? That's a disaster waiting to happen.
Unlike humans who spot phishing attempts, AI agents blindly execute commands they receive. One compromised agent can control your entire network—issuing refunds, leaking customer data, or triggering unauthorized actions.
MoltFlow's A2A protocol has security built in from day one. You just need to configure it properly.
Think about it: your Billing agent has access to Stripe. Your Support agent can create refunds. Your Sales agent can send quotes. An unprotected A2A system lets any malicious agent pretend to be your Orchestrator and issue commands.
The good news? The A2A protocol has security built in from the ground up. You just need to configure it properly.
Authentication Between Agents
Every A2A request requires authentication. No exceptions.
Bearer Token Authentication
The standard approach: each agent authenticates with your API and receives a JWT token.
Step 1: Agent registers and gets credentials
curl -X POST https://apiv2.waiflow.app/api/v2/a2a/agents/register \
-H "Content-Type: application/json" \
-d '{
"name": "Support Agent",
"phone": "1234567890",
"capabilities": ["message.handle", "ticket.create"]
}'Response:
{
"agent_id": "agent_abc123",
"api_key": "mf_live_xyz789...",
"secret": "secret_key_for_jwt_signing"
}Step 2: Agent exchanges credentials for JWT
curl -X POST https://apiv2.waiflow.app/api/v2/a2a/auth \
-H "Content-Type: application/json" \
-d '{
"agent_id": "agent_abc123",
"api_key": "mf_live_xyz789..."
}'Response:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 3600
}Step 3: Use token for all A2A requests
import httpx
async def send_a2a_message(access_token: str, target_phone: str, method: str, params: dict):
"""Send authenticated A2A message."""
rpc_request = {
"jsonrpc": "2.0",
"id": "req-123",
"method": method,
"params": params
}
async with httpx.AsyncClient() as client:
response = await client.post(
"https://apiv2.waiflow.app/api/v2/a2a",
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
},
json=rpc_request,
timeout=10.0
)
if response.status_code == 401:
raise AuthenticationError("Token expired or invalid")
return response.json()Token expiration:
JWTs expire after 1 hour by default. Your agent needs to refresh before expiration:
class A2AClient:
def __init__(self, agent_id: str, api_key: str):
self.agent_id = agent_id
self.api_key = api_key
self.access_token = None
self.token_expires_at = None
async def ensure_authenticated(self):
"""Refresh token if expired."""
now = time.time()
if not self.access_token or now >= self.token_expires_at:
await self.refresh_token()
async def refresh_token(self):
"""Get new access token."""
async with httpx.AsyncClient() as client:
response = await client.post(
"https://apiv2.waiflow.app/api/v2/a2a/auth",
json={
"agent_id": self.agent_id,
"api_key": self.api_key
}
)
data = response.json()
self.access_token = data["access_token"]
# Set expiration 5 minutes early to avoid edge cases
self.token_expires_at = time.time() + data["expires_in"] - 300API Key Authentication
For simpler setups or long-lived agents, you can skip JWT and use API keys directly:
async def send_a2a_message_with_api_key(api_key: str, method: str, params: dict):
"""Send A2A message using API key authentication."""
rpc_request = {
"jsonrpc": "2.0",
"id": "req-123",
"method": method,
"params": params
}
async with httpx.AsyncClient() as client:
response = await client.post(
"https://apiv2.waiflow.app/api/v2/a2a",
headers={
"X-API-Key": api_key,
"Content-Type": "application/json"
},
json=rpc_request
)
return response.json()When to use each:
| Auth Method | Use Case | Pros | Cons |
|---|---|---|---|
| JWT Bearer | Production multi-agent systems | Short-lived tokens, better security | Need refresh logic |
| API Key | Simple scripts, single agents | No expiration, easy to use | Leaked key = full access forever |
For anything beyond a proof-of-concept, use JWT. The refresh logic is 20 lines of code and saves you from catastrophic API key leaks.
Trust Levels
Not all agents are created equal. The A2A protocol supports three trust levels:
- untrusted (default): Agent can send messages but you verify everything
- trusted: Agent's requests are processed with minimal validation
- blocked: Agent is denied all access
Setting Trust Levels
# Trust an agent after verifying its identity
curl -X PATCH https://apiv2.waiflow.app/api/v2/agents/peers/agent_abc123/trust \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"trust_level": "trusted"
}'Trust-Based Request Handling
Your agent should handle different trust levels appropriately:
class AgentRequestHandler:
def __init__(self):
self.trusted_agents = set()
self.blocked_agents = set()
async def handle_a2a_request(self, request: dict, sender_agent_id: str):
"""Process A2A request based on sender's trust level."""
# Check if sender is blocked
if sender_agent_id in self.blocked_agents:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32001,
"message": "Agent is blocked"
}
}
# Trusted agents get fast-tracked
if sender_agent_id in self.trusted_agents:
return await self.execute_method(request)
# Untrusted agents get validated
if not await self.validate_request(request):
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32600,
"message": "Invalid request from untrusted agent"
}
}
return await self.execute_method(request)
async def validate_request(self, request: dict) -> bool:
"""Validate requests from untrusted agents."""
# Check request structure
if not all(key in request for key in ["jsonrpc", "method", "params"]):
return False
# Check method is allowed
allowed_methods = ["message.handle", "status.check"]
if request["method"] not in allowed_methods:
return False
# Validate parameters
if request["method"] == "message.handle":
params = request["params"]
if not isinstance(params.get("message"), str):
return False
if len(params["message"]) > 4096: # Max message length
return False
return TrueAutomatic Trust Demotion
If a trusted agent starts behaving suspiciously, demote it automatically:
async def monitor_agent_behavior(agent_id: str, request: dict):
"""Track agent behavior and demote if suspicious."""
suspicious_signals = []
# Check rate of requests
recent_requests = await get_recent_requests(agent_id, window_seconds=60)
if len(recent_requests) > 100:
suspicious_signals.append("high_request_rate")
# Check for unusual methods
if request["method"] not in await get_typical_methods(agent_id):
suspicious_signals.append("unusual_method")
# Check parameter patterns
if await contains_sql_injection_pattern(request["params"]):
suspicious_signals.append("injection_attempt")
# Demote if multiple signals
if len(suspicious_signals) >= 2:
await demote_agent_trust(agent_id, from_level="trusted", to_level="untrusted")
await send_alert(f"Agent {agent_id} demoted due to: {suspicious_signals}")Rate Limiting Per Agent
Rogue agents can spam your system. Rate limits prevent this.
Default Rate Limits
MoltFlow applies rate limits based on your plan:
| Plan | A2A Requests/Hour | Burst Limit | Per-Agent Limit |
|---|---|---|---|
| Free | 100 | 10 | 50/hour |
| Starter | 1,000 | 50 | 200/hour |
| Business | 10,000 | 200 | 1,000/hour |
| Enterprise | Custom | Custom | Custom |
Reading Rate Limit Headers
Every A2A response includes rate limit headers:
HTTP/1.1 200 OK
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 847
X-RateLimit-Reset: 1643723400Handle rate limits in your client:
async def send_with_rate_limit_handling(client: A2AClient, method: str, params: dict):
"""Send A2A request with automatic retry on rate limit."""
try:
response = await client.send_message(method, params)
return response
except RateLimitError as e:
# Extract reset time from headers
reset_at = e.reset_timestamp
wait_seconds = reset_at - time.time()
print(f"Rate limited. Waiting {wait_seconds}s until {reset_at}")
await asyncio.sleep(wait_seconds)
# Retry after rate limit resets
return await client.send_message(method, params)Custom Per-Agent Limits
You can set stricter limits for specific agents:
curl -X POST https://apiv2.waiflow.app/api/v2/agents/peers/agent_abc123/limits \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"requests_per_hour": 50,
"burst_limit": 5
}'Use case: You trust a new agent, but want to limit its blast radius while you monitor behavior.
Message Validation
Invalid or malicious messages can crash your agents. Validate everything.
JSON-RPC Schema Validation
from jsonschema import validate, ValidationError
A2A_REQUEST_SCHEMA = {
"type": "object",
"required": ["jsonrpc", "method", "params"],
"properties": {
"jsonrpc": {"type": "string", "enum": ["2.0"]},
"id": {"type": ["string", "number"]},
"method": {"type": "string", "pattern": "^[a-z]+\\.[a-z]+$"},
"params": {"type": "object"}
}
}
async def validate_a2a_request(request: dict) -> bool:
"""Validate incoming A2A request structure."""
try:
validate(instance=request, schema=A2A_REQUEST_SCHEMA)
return True
except ValidationError as e:
print(f"Invalid A2A request: {e.message}")
return FalseContent Validation
Even valid JSON-RPC can contain malicious content:
import re
async def validate_message_content(message: str) -> tuple[bool, str]:
"""Validate message content for common attacks."""
# Check length
if len(message) > 4096:
return False, "Message too long"
# Check for SQL injection patterns
sql_patterns = [
r"(?i)(union\s+select)",
r"(?i)(drop\s+table)",
r"(?i)(insert\s+into)",
r"'?\s*or\s+'?1'?\s*=\s*'?1"
]
for pattern in sql_patterns:
if re.search(pattern, message):
return False, "Potential SQL injection detected"
# Check for XSS patterns
xss_patterns = [
r"<script",
r"javascript:",
r"onerror\s*=",
r"onclick\s*="
]
for pattern in xss_patterns:
if re.search(pattern, message, re.IGNORECASE):
return False, "Potential XSS detected"
# Check for command injection
if any(char in message for char in [";", "|", "&", "`", "$("]):
return False, "Potential command injection detected"
return True, "OK"Method Whitelisting
Only allow specific methods from external agents:
ALLOWED_METHODS = {
"message.send": {"params": ["chatId", "text"]},
"message.handle": {"params": ["customer_phone", "message"]},
"status.check": {"params": []},
"capability.list": {"params": []}
}
async def validate_method(method: str, params: dict) -> bool:
"""Validate method is allowed and has correct parameters."""
if method not in ALLOWED_METHODS:
return False
required_params = ALLOWED_METHODS[method]["params"]
return all(param in params for param in required_params)Content Policy Configuration
Define rules for what agents can and can't do.
Creating a Content Policy
curl -X POST https://apiv2.waiflow.app/api/v2/a2a/policy \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"policy_name": "strict_agent_policy",
"rules": [
{
"type": "max_message_length",
"value": 2000
},
{
"type": "allowed_methods",
"value": ["message.send", "status.check"]
},
{
"type": "blocked_content_patterns",
"value": ["password", "credit card", "ssn"]
},
{
"type": "rate_limit_per_agent",
"value": 100
}
]
}'Enforcing Policies
class PolicyEnforcer:
def __init__(self, policy: dict):
self.policy = policy
async def enforce(self, agent_id: str, request: dict) -> tuple[bool, str]:
"""Check if request complies with policy."""
# Check method allowed
allowed_methods = self.get_rule_value("allowed_methods")
if request["method"] not in allowed_methods:
return False, f"Method {request['method']} not allowed"
# Check message length
if "message" in request.get("params", {}):
max_length = self.get_rule_value("max_message_length")
if len(request["params"]["message"]) > max_length:
return False, f"Message exceeds {max_length} characters"
# Check blocked content
blocked_patterns = self.get_rule_value("blocked_content_patterns")
message = str(request.get("params", {}))
for pattern in blocked_patterns:
if pattern.lower() in message.lower():
return False, f"Blocked content detected: {pattern}"
return True, "Policy check passed"
def get_rule_value(self, rule_type: str):
"""Extract rule value from policy."""
for rule in self.policy.get("rules", []):
if rule["type"] == rule_type:
return rule["value"]
return NoneMonitoring and Alerting
You can't secure what you can't see. Monitor everything.
Request Logging
import logging
from datetime import datetime
async def log_a2a_request(
agent_id: str,
method: str,
params: dict,
result: dict,
duration_ms: float
):
"""Log all A2A requests for audit trail."""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"agent_id": agent_id,
"method": method,
"params": params, # Be careful with PII here
"result_status": "success" if "result" in result else "error",
"duration_ms": duration_ms
}
# Send to your logging system
logging.info(f"A2A request: {log_entry}")
# For high-value operations, also store in DB
if method in ["billing.charge", "user.delete", "data.export"]:
await store_audit_log(log_entry)Anomaly Detection
Detect unusual patterns:
async def detect_anomalies(agent_id: str):
"""Check for suspicious agent behavior."""
recent_window = 3600 # 1 hour
# Get agent's recent activity
activity = await get_agent_activity(agent_id, window_seconds=recent_window)
anomalies = []
# Check request rate spike
avg_rate = activity["avg_requests_per_hour"]
current_rate = activity["current_requests_per_hour"]
if current_rate > avg_rate * 3:
anomalies.append({
"type": "rate_spike",
"severity": "medium",
"details": f"Rate {current_rate}/hr vs avg {avg_rate}/hr"
})
# Check for unusual methods
typical_methods = activity["typical_methods"]
recent_methods = activity["recent_methods"]
unusual = set(recent_methods) - set(typical_methods)
if unusual:
anomalies.append({
"type": "unusual_methods",
"severity": "high",
"details": f"Unexpected methods: {unusual}"
})
# Check error rate
if activity["error_rate"] > 0.5: # >50% errors
anomalies.append({
"type": "high_error_rate",
"severity": "high",
"details": f"Error rate: {activity['error_rate']*100:.1f}%"
})
# Alert if anomalies detected
if anomalies:
await send_security_alert(agent_id, anomalies)
return anomaliesSecurity Alerts
async def send_security_alert(agent_id: str, anomalies: list):
"""Send alert about suspicious agent behavior."""
severity = max(a["severity"] for a in anomalies)
# Send to Slack
await send_slack_message(
channel="#security",
text=f"🚨 Security alert for agent {agent_id}",
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Agent:* {agent_id}\n*Severity:* {severity}\n*Anomalies:* {len(anomalies)}"
}
},
*[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"• {a['type']}: {a['details']}"
}
}
for a in anomalies
]
]
)
# Auto-block on critical severity
if severity == "critical":
await block_agent(agent_id, reason="Automatic block due to critical anomaly")A2A Security Checklist
Use this checklist when deploying A2A-enabled agents:
| Security Control | Status | Notes |
|---|---|---|
| ✅ JWT authentication enabled | [ ] | Never use unauthenticated A2A in production |
| ✅ Trust levels configured | [ ] | Default all new agents to "untrusted" |
| ✅ Rate limits set per agent | [ ] | Start conservative, increase based on behavior |
| ✅ JSON-RPC schema validation | [ ] | Reject malformed requests immediately |
| ✅ Content validation enabled | [ ] | Check for SQL injection, XSS, command injection |
| ✅ Method whitelist enforced | [ ] | Only allow necessary methods |
| ✅ Content policy defined | [ ] | Document allowed/blocked patterns |
| ✅ Request logging enabled | [ ] | Keep audit trail of all A2A activity |
| ✅ Anomaly detection running | [ ] | Alert on unusual patterns |
| ✅ Token rotation scheduled | [ ] | Rotate JWT secrets quarterly |
| ✅ Incident response plan | [ ] | Know how to block agents quickly |
What's Next
You now have the tools to secure your A2A agent network. MoltFlow provides field-level encryption, anti-spam protection, and built-in rate limiting—security features no other WhatsApp platform offers for agent systems.
Continue learning:
- A2A Protocol: How AI Agents Communicate — Understand agent discovery and message formats
- Building Multi-Agent Systems — Orchestrate multiple AI agents securely
- The Complete MoltFlow API Guide — Full API reference with security best practices
Ready to implement secure A2A? Follow our step-by-step guide: A2A Content Policy for Safe Communication
Sign up for MoltFlow and get free A2A authentication and rate limiting on all plans. Security isn't optional for AI agents—it's the foundation that lets you scale with confidence.
> Try MoltFlow Free — 100 messages/month