Skip to main content
Skip to article

#A2A Security: Authentication, Trust & Rate Limiting

Unsecured AI Agents Are a Liability

AI agents talking to each other without security? That's a disaster waiting to happen.

Unlike humans who spot phishing attempts, AI agents blindly execute commands they receive. One compromised agent can control your entire network—issuing refunds, leaking customer data, or triggering unauthorized actions.

MoltFlow's A2A protocol has security built in from day one. You just need to configure it properly.

Think about it: your Billing agent has access to Stripe. Your Support agent can create refunds. Your Sales agent can send quotes. An unprotected A2A system lets any malicious agent pretend to be your Orchestrator and issue commands.

The good news? The A2A protocol has security built in from the ground up. You just need to configure it properly.

Authentication Between Agents

Every A2A request requires authentication. No exceptions.

Bearer Token Authentication

The standard approach: each agent authenticates with your API and receives a JWT token.

Step 1: Agent registers and gets credentials

bash
curl -X POST https://apiv2.waiflow.app/api/v2/a2a/agents/register \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Support Agent",
    "phone": "1234567890",
    "capabilities": ["message.handle", "ticket.create"]
  }'

Response:

json
{
  "agent_id": "agent_abc123",
  "api_key": "mf_live_xyz789...",
  "secret": "secret_key_for_jwt_signing"
}

Step 2: Agent exchanges credentials for JWT

bash
curl -X POST https://apiv2.waiflow.app/api/v2/a2a/auth \
  -H "Content-Type: application/json" \
  -d '{
    "agent_id": "agent_abc123",
    "api_key": "mf_live_xyz789..."
  }'

Response:

json
{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "Bearer",
  "expires_in": 3600
}

Step 3: Use token for all A2A requests

python
import httpx

async def send_a2a_message(access_token: str, target_phone: str, method: str, params: dict):
    """Send authenticated A2A message."""
    rpc_request = {
        "jsonrpc": "2.0",
        "id": "req-123",
        "method": method,
        "params": params
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://apiv2.waiflow.app/api/v2/a2a",
            headers={
                "Authorization": f"Bearer {access_token}",
                "Content-Type": "application/json"
            },
            json=rpc_request,
            timeout=10.0
        )

        if response.status_code == 401:
            raise AuthenticationError("Token expired or invalid")

        return response.json()

Token expiration:

JWTs expire after 1 hour by default. Your agent needs to refresh before expiration:

python
class A2AClient:
    def __init__(self, agent_id: str, api_key: str):
        self.agent_id = agent_id
        self.api_key = api_key
        self.access_token = None
        self.token_expires_at = None

    async def ensure_authenticated(self):
        """Refresh token if expired."""
        now = time.time()

        if not self.access_token or now >= self.token_expires_at:
            await self.refresh_token()

    async def refresh_token(self):
        """Get new access token."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                "https://apiv2.waiflow.app/api/v2/a2a/auth",
                json={
                    "agent_id": self.agent_id,
                    "api_key": self.api_key
                }
            )

            data = response.json()
            self.access_token = data["access_token"]
            # Set expiration 5 minutes early to avoid edge cases
            self.token_expires_at = time.time() + data["expires_in"] - 300

API Key Authentication

For simpler setups or long-lived agents, you can skip JWT and use API keys directly:

python
async def send_a2a_message_with_api_key(api_key: str, method: str, params: dict):
    """Send A2A message using API key authentication."""
    rpc_request = {
        "jsonrpc": "2.0",
        "id": "req-123",
        "method": method,
        "params": params
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://apiv2.waiflow.app/api/v2/a2a",
            headers={
                "X-API-Key": api_key,
                "Content-Type": "application/json"
            },
            json=rpc_request
        )

        return response.json()

When to use each:

Auth MethodUse CaseProsCons
JWT BearerProduction multi-agent systemsShort-lived tokens, better securityNeed refresh logic
API KeySimple scripts, single agentsNo expiration, easy to useLeaked key = full access forever

For anything beyond a proof-of-concept, use JWT. The refresh logic is 20 lines of code and saves you from catastrophic API key leaks.

Trust Levels

Not all agents are created equal. The A2A protocol supports three trust levels:

  • untrusted (default): Agent can send messages but you verify everything
  • trusted: Agent's requests are processed with minimal validation
  • blocked: Agent is denied all access

Setting Trust Levels

bash
# Trust an agent after verifying its identity
curl -X PATCH https://apiv2.waiflow.app/api/v2/agents/peers/agent_abc123/trust \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "trust_level": "trusted"
  }'

Trust-Based Request Handling

Your agent should handle different trust levels appropriately:

python
class AgentRequestHandler:
    def __init__(self):
        self.trusted_agents = set()
        self.blocked_agents = set()

    async def handle_a2a_request(self, request: dict, sender_agent_id: str):
        """Process A2A request based on sender's trust level."""

        # Check if sender is blocked
        if sender_agent_id in self.blocked_agents:
            return {
                "jsonrpc": "2.0",
                "id": request.get("id"),
                "error": {
                    "code": -32001,
                    "message": "Agent is blocked"
                }
            }

        # Trusted agents get fast-tracked
        if sender_agent_id in self.trusted_agents:
            return await self.execute_method(request)

        # Untrusted agents get validated
        if not await self.validate_request(request):
            return {
                "jsonrpc": "2.0",
                "id": request.get("id"),
                "error": {
                    "code": -32600,
                    "message": "Invalid request from untrusted agent"
                }
            }

        return await self.execute_method(request)

    async def validate_request(self, request: dict) -> bool:
        """Validate requests from untrusted agents."""
        # Check request structure
        if not all(key in request for key in ["jsonrpc", "method", "params"]):
            return False

        # Check method is allowed
        allowed_methods = ["message.handle", "status.check"]
        if request["method"] not in allowed_methods:
            return False

        # Validate parameters
        if request["method"] == "message.handle":
            params = request["params"]
            if not isinstance(params.get("message"), str):
                return False
            if len(params["message"]) > 4096:  # Max message length
                return False

        return True

Automatic Trust Demotion

If a trusted agent starts behaving suspiciously, demote it automatically:

python
async def monitor_agent_behavior(agent_id: str, request: dict):
    """Track agent behavior and demote if suspicious."""
    suspicious_signals = []

    # Check rate of requests
    recent_requests = await get_recent_requests(agent_id, window_seconds=60)
    if len(recent_requests) > 100:
        suspicious_signals.append("high_request_rate")

    # Check for unusual methods
    if request["method"] not in await get_typical_methods(agent_id):
        suspicious_signals.append("unusual_method")

    # Check parameter patterns
    if await contains_sql_injection_pattern(request["params"]):
        suspicious_signals.append("injection_attempt")

    # Demote if multiple signals
    if len(suspicious_signals) >= 2:
        await demote_agent_trust(agent_id, from_level="trusted", to_level="untrusted")
        await send_alert(f"Agent {agent_id} demoted due to: {suspicious_signals}")

Rate Limiting Per Agent

Rogue agents can spam your system. Rate limits prevent this.

Default Rate Limits

MoltFlow applies rate limits based on your plan:

PlanA2A Requests/HourBurst LimitPer-Agent Limit
Free1001050/hour
Starter1,00050200/hour
Business10,0002001,000/hour
EnterpriseCustomCustomCustom

Reading Rate Limit Headers

Every A2A response includes rate limit headers:

http
HTTP/1.1 200 OK
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 847
X-RateLimit-Reset: 1643723400

Handle rate limits in your client:

python
async def send_with_rate_limit_handling(client: A2AClient, method: str, params: dict):
    """Send A2A request with automatic retry on rate limit."""
    try:
        response = await client.send_message(method, params)
        return response

    except RateLimitError as e:
        # Extract reset time from headers
        reset_at = e.reset_timestamp
        wait_seconds = reset_at - time.time()

        print(f"Rate limited. Waiting {wait_seconds}s until {reset_at}")
        await asyncio.sleep(wait_seconds)

        # Retry after rate limit resets
        return await client.send_message(method, params)

Custom Per-Agent Limits

You can set stricter limits for specific agents:

bash
curl -X POST https://apiv2.waiflow.app/api/v2/agents/peers/agent_abc123/limits \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "requests_per_hour": 50,
    "burst_limit": 5
  }'

Use case: You trust a new agent, but want to limit its blast radius while you monitor behavior.

Message Validation

Invalid or malicious messages can crash your agents. Validate everything.

JSON-RPC Schema Validation

python
from jsonschema import validate, ValidationError

A2A_REQUEST_SCHEMA = {
    "type": "object",
    "required": ["jsonrpc", "method", "params"],
    "properties": {
        "jsonrpc": {"type": "string", "enum": ["2.0"]},
        "id": {"type": ["string", "number"]},
        "method": {"type": "string", "pattern": "^[a-z]+\\.[a-z]+$"},
        "params": {"type": "object"}
    }
}

async def validate_a2a_request(request: dict) -> bool:
    """Validate incoming A2A request structure."""
    try:
        validate(instance=request, schema=A2A_REQUEST_SCHEMA)
        return True
    except ValidationError as e:
        print(f"Invalid A2A request: {e.message}")
        return False

Content Validation

Even valid JSON-RPC can contain malicious content:

python
import re

async def validate_message_content(message: str) -> tuple[bool, str]:
    """Validate message content for common attacks."""

    # Check length
    if len(message) > 4096:
        return False, "Message too long"

    # Check for SQL injection patterns
    sql_patterns = [
        r"(?i)(union\s+select)",
        r"(?i)(drop\s+table)",
        r"(?i)(insert\s+into)",
        r"'?\s*or\s+'?1'?\s*=\s*'?1"
    ]

    for pattern in sql_patterns:
        if re.search(pattern, message):
            return False, "Potential SQL injection detected"

    # Check for XSS patterns
    xss_patterns = [
        r"<script",
        r"javascript:",
        r"onerror\s*=",
        r"onclick\s*="
    ]

    for pattern in xss_patterns:
        if re.search(pattern, message, re.IGNORECASE):
            return False, "Potential XSS detected"

    # Check for command injection
    if any(char in message for char in [";", "|", "&", "`", "$("]):
        return False, "Potential command injection detected"

    return True, "OK"

Method Whitelisting

Only allow specific methods from external agents:

python
ALLOWED_METHODS = {
    "message.send": {"params": ["chatId", "text"]},
    "message.handle": {"params": ["customer_phone", "message"]},
    "status.check": {"params": []},
    "capability.list": {"params": []}
}

async def validate_method(method: str, params: dict) -> bool:
    """Validate method is allowed and has correct parameters."""
    if method not in ALLOWED_METHODS:
        return False

    required_params = ALLOWED_METHODS[method]["params"]
    return all(param in params for param in required_params)

Content Policy Configuration

Define rules for what agents can and can't do.

Creating a Content Policy

bash
curl -X POST https://apiv2.waiflow.app/api/v2/a2a/policy \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "policy_name": "strict_agent_policy",
    "rules": [
      {
        "type": "max_message_length",
        "value": 2000
      },
      {
        "type": "allowed_methods",
        "value": ["message.send", "status.check"]
      },
      {
        "type": "blocked_content_patterns",
        "value": ["password", "credit card", "ssn"]
      },
      {
        "type": "rate_limit_per_agent",
        "value": 100
      }
    ]
  }'

Enforcing Policies

python
class PolicyEnforcer:
    def __init__(self, policy: dict):
        self.policy = policy

    async def enforce(self, agent_id: str, request: dict) -> tuple[bool, str]:
        """Check if request complies with policy."""

        # Check method allowed
        allowed_methods = self.get_rule_value("allowed_methods")
        if request["method"] not in allowed_methods:
            return False, f"Method {request['method']} not allowed"

        # Check message length
        if "message" in request.get("params", {}):
            max_length = self.get_rule_value("max_message_length")
            if len(request["params"]["message"]) > max_length:
                return False, f"Message exceeds {max_length} characters"

        # Check blocked content
        blocked_patterns = self.get_rule_value("blocked_content_patterns")
        message = str(request.get("params", {}))
        for pattern in blocked_patterns:
            if pattern.lower() in message.lower():
                return False, f"Blocked content detected: {pattern}"

        return True, "Policy check passed"

    def get_rule_value(self, rule_type: str):
        """Extract rule value from policy."""
        for rule in self.policy.get("rules", []):
            if rule["type"] == rule_type:
                return rule["value"]
        return None

Monitoring and Alerting

You can't secure what you can't see. Monitor everything.

Request Logging

python
import logging
from datetime import datetime

async def log_a2a_request(
    agent_id: str,
    method: str,
    params: dict,
    result: dict,
    duration_ms: float
):
    """Log all A2A requests for audit trail."""
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "agent_id": agent_id,
        "method": method,
        "params": params,  # Be careful with PII here
        "result_status": "success" if "result" in result else "error",
        "duration_ms": duration_ms
    }

    # Send to your logging system
    logging.info(f"A2A request: {log_entry}")

    # For high-value operations, also store in DB
    if method in ["billing.charge", "user.delete", "data.export"]:
        await store_audit_log(log_entry)

Anomaly Detection

Detect unusual patterns:

python
async def detect_anomalies(agent_id: str):
    """Check for suspicious agent behavior."""
    recent_window = 3600  # 1 hour

    # Get agent's recent activity
    activity = await get_agent_activity(agent_id, window_seconds=recent_window)

    anomalies = []

    # Check request rate spike
    avg_rate = activity["avg_requests_per_hour"]
    current_rate = activity["current_requests_per_hour"]
    if current_rate > avg_rate * 3:
        anomalies.append({
            "type": "rate_spike",
            "severity": "medium",
            "details": f"Rate {current_rate}/hr vs avg {avg_rate}/hr"
        })

    # Check for unusual methods
    typical_methods = activity["typical_methods"]
    recent_methods = activity["recent_methods"]
    unusual = set(recent_methods) - set(typical_methods)
    if unusual:
        anomalies.append({
            "type": "unusual_methods",
            "severity": "high",
            "details": f"Unexpected methods: {unusual}"
        })

    # Check error rate
    if activity["error_rate"] > 0.5:  # >50% errors
        anomalies.append({
            "type": "high_error_rate",
            "severity": "high",
            "details": f"Error rate: {activity['error_rate']*100:.1f}%"
        })

    # Alert if anomalies detected
    if anomalies:
        await send_security_alert(agent_id, anomalies)

    return anomalies

Security Alerts

python
async def send_security_alert(agent_id: str, anomalies: list):
    """Send alert about suspicious agent behavior."""
    severity = max(a["severity"] for a in anomalies)

    # Send to Slack
    await send_slack_message(
        channel="#security",
        text=f"🚨 Security alert for agent {agent_id}",
        blocks=[
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*Agent:* {agent_id}\n*Severity:* {severity}\n*Anomalies:* {len(anomalies)}"
                }
            },
            *[
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"• {a['type']}: {a['details']}"
                    }
                }
                for a in anomalies
            ]
        ]
    )

    # Auto-block on critical severity
    if severity == "critical":
        await block_agent(agent_id, reason="Automatic block due to critical anomaly")

A2A Security Checklist

Use this checklist when deploying A2A-enabled agents:

Security ControlStatusNotes
✅ JWT authentication enabled[ ]Never use unauthenticated A2A in production
✅ Trust levels configured[ ]Default all new agents to "untrusted"
✅ Rate limits set per agent[ ]Start conservative, increase based on behavior
✅ JSON-RPC schema validation[ ]Reject malformed requests immediately
✅ Content validation enabled[ ]Check for SQL injection, XSS, command injection
✅ Method whitelist enforced[ ]Only allow necessary methods
✅ Content policy defined[ ]Document allowed/blocked patterns
✅ Request logging enabled[ ]Keep audit trail of all A2A activity
✅ Anomaly detection running[ ]Alert on unusual patterns
✅ Token rotation scheduled[ ]Rotate JWT secrets quarterly
✅ Incident response plan[ ]Know how to block agents quickly

What's Next

You now have the tools to secure your A2A agent network. MoltFlow provides field-level encryption, anti-spam protection, and built-in rate limiting—security features no other WhatsApp platform offers for agent systems.

Continue learning:

Ready to implement secure A2A? Follow our step-by-step guide: A2A Content Policy for Safe Communication

Sign up for MoltFlow and get free A2A authentication and rate limiting on all plans. Security isn't optional for AI agents—it's the foundation that lets you scale with confidence.

> Try MoltFlow Free — 100 messages/month

$ curl https://molt.waiflow.app/pricing

bash-5.2$ echo "End of post."_