A2A - Producer Guide
Producer — Calling a Remote Agent
The producer calls a remote A2A agent. It automatically discovers the agent card, selects the protocol binding, acquires credentials, and wraps/unwraps messages.
Basic Call
-
YAML
-
Java
- route:
from:
uri: direct:call-agent
steps:
- setBody:
constant: "What is the weather?"
- to:
uri: a2a:http://remote-agent:8080
- log:
message: "Agent replied: ${a2a:text}" from("direct:call-agent")
.setBody(constant("What is the weather?"))
.to("a2a:http://remote-agent:8080")
.log("Agent replied: ${a2a:text}"); The producer:
-
Fetches the agent card from
http://remote-agent:8080/.well-known/agent-card.json -
Wraps the body as an A2A
SendMessagerequest -
Sends the request using the card’s declared protocol binding
-
Returns the response as the exchange body — by default (
dataFormat=PAYLOAD) as extracted text; withdataFormat=POJOas the fullTaskorMessageJava object
Authenticated Calls
OIDC (Client Credentials)
- to:
uri: a2a:http://remote-agent:8080
parameters:
oauthProfile: my-profile Acquires a token via client-credentials grant, caches it, and refreshes on expiry.
Producer Streaming
The producer supports two streaming modes, controlled by the dataFormat parameter.
Default (PAYLOAD/POJO): Lazy Iterator
The exchange body is a SseEventIterator (implements Iterator<StreamResponse> + Closeable) that parses SSE events lazily from the remote agent’s response stream. Each call to next() blocks until the next event arrives. Use with the Split EIP for event-by-event processing:
-
YAML
-
Java
- setHeader:
name: CamelA2AOperation
constant: MESSAGE_STREAM
- to: a2a:https://agent.example.com
- split:
simple: "${body}"
streaming: true
steps:
- choice:
when:
- simple: "${body.statusUpdate} != null"
steps:
- log:
message: "Progress: ${body.statusUpdate.status.state}"
- simple: "${body.message} != null"
steps:
- log:
message: "Final message received" from("direct:stream")
.setHeader("CamelA2AOperation", constant("MESSAGE_STREAM"))
.to("a2a:https://agent.example.com")
.split(body()).streaming()
.log("Event: ${body}")
.end(); The iterator implements Closeable, so Camel’s Split EIP automatically releases the underlying HTTP connection after iteration.
To buffer all events into a List<StreamResponse>:
- to: a2a:https://agent.example.com
- convertBodyTo:
type: java.util.List Raw Passthrough (RAW mode)
With dataFormat=RAW, the exchange body is the raw InputStream from the remote agent’s SSE response. No parsing or buffering — bytes flow through untouched. Ideal for proxying SSE streams to a browser:
-
YAML
-
Java
- to:
uri: a2a:https://agent.example.com?dataFormat=RAW
parameters:
operation: MESSAGE_STREAM
- setHeader:
name: Content-Type
constant: text/event-stream from("platform-http:/stream")
.to("a2a:https://agent.example.com?dataFormat=RAW&operation=MESSAGE_STREAM")
.setHeader("Content-Type", constant("text/event-stream")); Subscribe to Task Updates
Subscribe to ongoing task updates from a remote agent:
- setHeader:
name: CamelA2AOperation
constant: TASK_SUBSCRIBE
- setHeader:
name: CamelA2ATaskId
simple: "${exchangeProperty.taskId}"
- to: a2a:https://agent.example.com
- split:
simple: "${body}"
streaming: true
steps:
- log:
message: "Task update: ${body.statusUpdate.status.state}" Streaming requests use asyncTimeout (default 5 minutes) instead of the standard 60-second request timeout. SSE heartbeat comments from the remote agent are automatically filtered during parsing.
Async Task Lifecycle (returnImmediately + GetTask Polling)
# Step 1: Submit the task
- setBody:
constant: "Start long operation"
- to:
uri: a2a:http://remote-agent:8083
parameters:
oauthProfile: assistant
- setVariable:
name: taskId
simple: "${header.CamelA2ATaskId}"
# Step 2: Poll for completion
- removeHeaders:
pattern: "*"
- setHeader:
name: CamelA2AOperation
constant: TASK_GET
- setHeader:
name: CamelA2ATaskId
simple: "${variable.taskId}"
- to:
uri: a2a:http://remote-agent:8083
parameters:
oauthProfile: assistant Push Notification Registration
# Step 1: Submit task
- setBody:
constant: "Track my package"
- to: a2a:http://remote-agent:8085
- setVariable:
name: taskId
simple: "${header.CamelA2ATaskId}"
# Step 2: Register push notification webhook
- removeHeaders:
pattern: "*"
- setHeader:
name: CamelA2AOperation
constant: PUSH_CONFIG_CREATE
- setHeader:
name: CamelA2ATaskId
simple: "${variable.taskId}"
- setBody:
simple: "${ref:pushConfig}"
- to: a2a:http://remote-agent:8085 Configure the webhook URL as a bean:
camel.beans.pushConfig = #class:org.apache.camel.component.a2a.model.TaskPushNotificationConfig
camel.beans.pushConfig.url = https://my-server:8090/webhook The full push notification config CRUD is also available. PUSH_CONFIG_GET and PUSH_CONFIG_DELETE require both CamelA2ATaskId and CamelA2APushConfigId. Use JSON-RPC binding for these two producer operations when addressing another camel-a2a agent with separate task and config IDs.
# List push configs for a task
- setHeader:
name: CamelA2AOperation
constant: PUSH_CONFIG_LIST
- setHeader:
name: CamelA2ATaskId
simple: "${variable.taskId}"
- to: a2a:https://agent.example.com
# Get a specific push config
- setHeader:
name: CamelA2AOperation
constant: PUSH_CONFIG_GET
- setHeader:
name: CamelA2ATaskId
simple: "${variable.taskId}"
- setHeader:
name: CamelA2APushConfigId
simple: "${variable.configId}"
- to: "a2a:https://agent.example.com?protocolBinding=JSONRPC"
# Delete a push config
- setHeader:
name: CamelA2AOperation
constant: PUSH_CONFIG_DELETE
- setHeader:
name: CamelA2ATaskId
simple: "${variable.taskId}"
- setHeader:
name: CamelA2APushConfigId
simple: "${variable.configId}"
- to: "a2a:https://agent.example.com?protocolBinding=JSONRPC" Parallel Multicast
Call multiple agents concurrently using Camel’s multicast EIP:
- multicast:
parallelProcessing: true
aggregationStrategy: "#class:MyAggregator"
steps:
- to: direct:call-weather
- to: direct:call-news
- to: direct:call-fortune
- route:
id: call-weather
from:
uri: direct:call-weather
steps:
- removeHeaders:
pattern: "*"
- setBody:
constant: "What is the weather?"
- to:
uri: a2a:http://weather-agent:8080
parameters:
oauthProfile: my-profile | Use |
Address Override
Override the remote agent’s URL from the card using host/port/basePath config:
- to: a2a:https://agent.example.com?host=http://localhost&port=8080 Priority without producer credentials: host config > card’s supportedInterfaces URL > agentCardSource URL. When producer credentials are configured (apiKey, bearerToken, or oauthProfile) and host is not set, the producer sends credentialed requests only to the HTTP(S) agentCardSource origin. This avoids sending credentials to a URL supplied by the remote card’s supportedInterfaces field. The port and basePath producer overrides are applied when host is configured. A host value without a scheme is treated as HTTPS.
The producer does not forward arbitrary inbound Authorization or Cookie headers. Only credentials resolved from the A2A endpoint configuration are applied to outgoing requests. |
Redirect Handling
By default, the producer does not follow HTTP redirects to prevent credential leakage on cross-origin redirects. Enable only when the remote agent is known to issue redirects:
- to: a2a:https://agent.example.com?followRedirects=true followRedirects applies to operation requests only. Agent card discovery never follows redirects — a card URL that returns a redirect fails with an error. |
Producer operation responses must be 2xx. Redirect and other non-2xx responses fail before response deserialization, so a 3xx response cannot be accidentally parsed as an A2A payload.
Request Timeouts
The producer uses different timeouts depending on the operation:
-
Non-streaming requests: hard-coded at 60 seconds. There is currently no configuration parameter to override this.
-
Streaming requests (
SendStreamingMessage,SubscribeToTask): usesasyncTimeout(default 300,000 ms / 5 minutes). -
Streaming read timeout:
streamingReadTimeout(default 300,000 ms / 5 minutes) — if no SSE event arrives from the remote agent within this period, the stream is closed with an error. Prevents indefinite blocking when a remote agent stops sending events. -
Streaming event size: each decoded SSE event is limited by
maxPayloadSize(default 6 MiB). Oversized events fail the producer stream instead of accumulating unbounded memory. -
TCP connection:
connectTimeout(default 30,000 ms / 30 seconds).