A2A - Producer Guide

Producer — Calling a Remote Agent

The producer calls a remote A2A agent. It automatically discovers the agent card, selects the protocol binding, acquires credentials, and wraps/unwraps messages.

Basic Call

  • YAML

  • Java

- route:
    from:
      uri: direct:call-agent
      steps:
        - setBody:
            constant: "What is the weather?"
        - to:
            uri: a2a:http://remote-agent:8080
        - log:
            message: "Agent replied: ${a2a:text}"
from("direct:call-agent")
    .setBody(constant("What is the weather?"))
    .to("a2a:http://remote-agent:8080")
    .log("Agent replied: ${a2a:text}");

The producer:

  1. Fetches the agent card from http://remote-agent:8080/.well-known/agent-card.json

  2. Wraps the body as an A2A SendMessage request

  3. Sends the request using the card’s declared protocol binding

  4. Returns the response as the exchange body — by default (dataFormat=PAYLOAD) as extracted text; with dataFormat=POJO as the full Task or Message Java object

Authenticated Calls

OIDC (Client Credentials)

- to:
    uri: a2a:http://remote-agent:8080
    parameters:
      oauthProfile: my-profile

Acquires a token via client-credentials grant, caches it, and refreshes on expiry.

API Key

- to:
    uri: a2a:http://remote-agent:8082
    parameters:
      apiKey: "{{my.api.key}}"
      apiKeyHeader: X-API-Key

Bearer Token

- to:
    uri: a2a:http://remote-agent:8080
    parameters:
      bearerToken: "{{my.token}}"

JSON-RPC Producer

- to:
    uri: a2a:http://remote-agent:8081
    parameters:
      protocolBinding: JSONRPC

Producer Streaming

The producer supports two streaming modes, controlled by the dataFormat parameter.

Default (PAYLOAD/POJO): Lazy Iterator

The exchange body is a SseEventIterator (implements Iterator<StreamResponse> + Closeable) that parses SSE events lazily from the remote agent’s response stream. Each call to next() blocks until the next event arrives. Use with the Split EIP for event-by-event processing:

  • YAML

  • Java

- setHeader:
    name: CamelA2AOperation
    constant: MESSAGE_STREAM
- to: a2a:https://agent.example.com
- split:
    simple: "${body}"
    streaming: true
    steps:
      - choice:
          when:
            - simple: "${body.statusUpdate} != null"
              steps:
                - log:
                    message: "Progress: ${body.statusUpdate.status.state}"
            - simple: "${body.message} != null"
              steps:
                - log:
                    message: "Final message received"
from("direct:stream")
    .setHeader("CamelA2AOperation", constant("MESSAGE_STREAM"))
    .to("a2a:https://agent.example.com")
    .split(body()).streaming()
        .log("Event: ${body}")
    .end();

The iterator implements Closeable, so Camel’s Split EIP automatically releases the underlying HTTP connection after iteration.

To buffer all events into a List<StreamResponse>:

- to: a2a:https://agent.example.com
- convertBodyTo:
    type: java.util.List

Raw Passthrough (RAW mode)

With dataFormat=RAW, the exchange body is the raw InputStream from the remote agent’s SSE response. No parsing or buffering — bytes flow through untouched. Ideal for proxying SSE streams to a browser:

  • YAML

  • Java

- to:
    uri: a2a:https://agent.example.com?dataFormat=RAW
    parameters:
      operation: MESSAGE_STREAM
- setHeader:
    name: Content-Type
    constant: text/event-stream
from("platform-http:/stream")
    .to("a2a:https://agent.example.com?dataFormat=RAW&operation=MESSAGE_STREAM")
    .setHeader("Content-Type", constant("text/event-stream"));

Subscribe to Task Updates

Subscribe to ongoing task updates from a remote agent:

- setHeader:
    name: CamelA2AOperation
    constant: TASK_SUBSCRIBE
- setHeader:
    name: CamelA2ATaskId
    simple: "${exchangeProperty.taskId}"
- to: a2a:https://agent.example.com
- split:
    simple: "${body}"
    streaming: true
    steps:
      - log:
          message: "Task update: ${body.statusUpdate.status.state}"

Streaming requests use asyncTimeout (default 5 minutes) instead of the standard 60-second request timeout. SSE heartbeat comments from the remote agent are automatically filtered during parsing.

Async Task Lifecycle (returnImmediately + GetTask Polling)

# Step 1: Submit the task
- setBody:
    constant: "Start long operation"
- to:
    uri: a2a:http://remote-agent:8083
    parameters:
      oauthProfile: assistant
- setVariable:
    name: taskId
    simple: "${header.CamelA2ATaskId}"

# Step 2: Poll for completion
- removeHeaders:
    pattern: "*"
- setHeader:
    name: CamelA2AOperation
    constant: TASK_GET
- setHeader:
    name: CamelA2ATaskId
    simple: "${variable.taskId}"
- to:
    uri: a2a:http://remote-agent:8083
    parameters:
      oauthProfile: assistant

Push Notification Registration

# Step 1: Submit task
- setBody:
    constant: "Track my package"
- to: a2a:http://remote-agent:8085
- setVariable:
    name: taskId
    simple: "${header.CamelA2ATaskId}"

# Step 2: Register push notification webhook
- removeHeaders:
    pattern: "*"
- setHeader:
    name: CamelA2AOperation
    constant: PUSH_CONFIG_CREATE
- setHeader:
    name: CamelA2ATaskId
    simple: "${variable.taskId}"
- setBody:
    simple: "${ref:pushConfig}"
- to: a2a:http://remote-agent:8085

Configure the webhook URL as a bean:

camel.beans.pushConfig = #class:org.apache.camel.component.a2a.model.TaskPushNotificationConfig
camel.beans.pushConfig.url = https://my-server:8090/webhook

The full push notification config CRUD is also available. PUSH_CONFIG_GET and PUSH_CONFIG_DELETE require both CamelA2ATaskId and CamelA2APushConfigId. Use JSON-RPC binding for these two producer operations when addressing another camel-a2a agent with separate task and config IDs.

# List push configs for a task
- setHeader:
    name: CamelA2AOperation
    constant: PUSH_CONFIG_LIST
- setHeader:
    name: CamelA2ATaskId
    simple: "${variable.taskId}"
- to: a2a:https://agent.example.com

# Get a specific push config
- setHeader:
    name: CamelA2AOperation
    constant: PUSH_CONFIG_GET
- setHeader:
    name: CamelA2ATaskId
    simple: "${variable.taskId}"
- setHeader:
    name: CamelA2APushConfigId
    simple: "${variable.configId}"
- to: "a2a:https://agent.example.com?protocolBinding=JSONRPC"

# Delete a push config
- setHeader:
    name: CamelA2AOperation
    constant: PUSH_CONFIG_DELETE
- setHeader:
    name: CamelA2ATaskId
    simple: "${variable.taskId}"
- setHeader:
    name: CamelA2APushConfigId
    simple: "${variable.configId}"
- to: "a2a:https://agent.example.com?protocolBinding=JSONRPC"

Parallel Multicast

Call multiple agents concurrently using Camel’s multicast EIP:

- multicast:
    parallelProcessing: true
    aggregationStrategy: "#class:MyAggregator"
    steps:
      - to: direct:call-weather
      - to: direct:call-news
      - to: direct:call-fortune

- route:
    id: call-weather
    from:
      uri: direct:call-weather
      steps:
        - removeHeaders:
            pattern: "*"
        - setBody:
            constant: "What is the weather?"
        - to:
            uri: a2a:http://weather-agent:8080
            parameters:
              oauthProfile: my-profile

Use removeHeaders: "*" before each producer call in a multicast branch to prevent leaking headers (like CamelA2ATaskId) from one branch to another.

Address Override

Override the remote agent’s URL from the card using host/port/basePath config:

- to: a2a:https://agent.example.com?host=http://localhost&port=8080

Priority without producer credentials: host config > card’s supportedInterfaces URL > agentCardSource URL. When producer credentials are configured (apiKey, bearerToken, or oauthProfile) and host is not set, the producer sends credentialed requests only to the HTTP(S) agentCardSource origin. This avoids sending credentials to a URL supplied by the remote card’s supportedInterfaces field. The port and basePath producer overrides are applied when host is configured. A host value without a scheme is treated as HTTPS.

The producer does not forward arbitrary inbound Authorization or Cookie headers. Only credentials resolved from the A2A endpoint configuration are applied to outgoing requests.

Redirect Handling

By default, the producer does not follow HTTP redirects to prevent credential leakage on cross-origin redirects. Enable only when the remote agent is known to issue redirects:

- to: a2a:https://agent.example.com?followRedirects=true
followRedirects applies to operation requests only. Agent card discovery never follows redirects — a card URL that returns a redirect fails with an error.

Producer operation responses must be 2xx. Redirect and other non-2xx responses fail before response deserialization, so a 3xx response cannot be accidentally parsed as an A2A payload.

Request Timeouts

The producer uses different timeouts depending on the operation:

  • Non-streaming requests: hard-coded at 60 seconds. There is currently no configuration parameter to override this.

  • Streaming requests (SendStreamingMessage, SubscribeToTask): uses asyncTimeout (default 300,000 ms / 5 minutes).

  • Streaming read timeout: streamingReadTimeout (default 300,000 ms / 5 minutes) — if no SSE event arrives from the remote agent within this period, the stream is closed with an error. Prevents indefinite blocking when a remote agent stops sending events.

  • Streaming event size: each decoded SSE event is limited by maxPayloadSize (default 6 MiB). Oversized events fail the producer stream instead of accumulating unbounded memory.

  • TCP connection: connectTimeout (default 30,000 ms / 30 seconds).

A2A-Version Header

The producer sends an A2A-Version: 1.0 header on every outgoing operation request, per the A2A v1.0 specification. This is automatic and not configurable.