Multicast

The Multicast EIP allows routing the same message to a number of endpoints and process them in a different way.

image

The Multicast EIP has many features and is also used as a baseline for the Recipient List and Split EIPs. For example, the Multicast EIP is capable of aggregating each multicasted message into a single response message as the result after the Multicast EIP.

Options

The Multicast eip supports 3 options, which are listed below.

Name Description Default Type

note

The note for this node.

String

description

The description for this node.

String

disabled

Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime.

false

Boolean

aggregationStrategy

Reference to the AggregationStrategy to assemble the replies from the multicasts into a single outgoing message. By default Camel uses the last reply as the outgoing message.

AggregationStrategy

aggregationStrategyMethodName

The method name to use when using a POJO as the AggregationStrategy.

String

aggregationStrategyMethodAllowNull

If true then null is used as the oldExchange when there is no data to aggregate, when using POJOs as the AggregationStrategy.

false

Boolean

parallelAggregate

Deprecated If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe.

false

Boolean

parallelProcessing

If enabled then sending messages to the multicasts occurs concurrently. The caller thread still waits until all messages are fully processed before it continues.

false

Boolean

synchronous

When enabled then the same thread is used to continue routing after the multicast is complete, even if parallel processing is enabled.

false

Boolean

streaming

If enabled then Camel will process replies out-of-order, in the order they come back. If disabled, Camel will process replies in the same order as defined by the multicast.

false

Boolean

stopOnException

If enabled then stops further multicast processing if an exception or failure occurred during processing of an exchange, and the caused exception will be thrown. The default behavior is to not stop but continue processing till the end.

false

Boolean

timeout

Total timeout in millis when using parallel processing. If the multicast has not been able to process all replies within the given timeframe, then the timeout triggers and the multicast breaks out and continues.

0

String

executorService

Reference to a custom thread pool to use for parallel processing. Setting this option implies parallel processing.

ExecutorService

onPrepare

Reference to a processor for preparing the exchange to be sent. Can be used to deep-clone messages that should be sent.

Processor

shareUnitOfWork

Shares the unit of work with the parent and each of the multicast exchanges. By default each multicast exchange has its own individual unit of work.

false

Boolean

outputs

Required

List

Exchange properties

The Multicast eip supports 3 exchange properties, which are listed below.

The exchange properties are set on the Exchange by the EIP, unless otherwise specified in the description. This means those properties are available after this EIP has completed processing the Exchange.

Name Description Default Type

CamelMulticastIndex

An index counter that increases for each Exchange being multicasted. The counter starts from 0.

int

CamelMulticastComplete

Whether this Exchange is the last.

boolean

CamelToEndpoint

Endpoint URI where this Exchange is being sent to.

String

Using Multicast

The following example shows how to take a request from the direct:a endpoint, then multicast these requests to direct:x, direct:y, and direct:z.

  • Java

  • XML

  • YAML

from("direct:a")
  .multicast()
    .to("direct:x")
    .to("direct:y")
    .to("direct:z");
<route>
    <from uri="direct:a"/>
    <multicast>
        <to uri="direct:b"/>
        <to uri="direct:c"/>
        <to uri="direct:d"/>
    </multicast>
</route>
- route:
    from:
      uri: direct:a
      steps:
        - multicast:
            steps:
              - to:
                  uri: direct:b
              - to:
                  uri: direct:c
              - to:
                  uri: direct:d

By default, Multicast EIP runs in single threaded mode, which means that the next multicasted message is processed only when the previous is finished. This means that direct:b must be done before Camel will call direct:c and so on.

Multicasting with parallel processing

You can enable parallel processing with Multicast EIP so each multicasted message is processed by its own thread in parallel.

The example below enabled parallel mode:

  • Java

  • XML

  • YAML

from("direct:a")
  .multicast().parallelProcessing()
    .to("direct:x")
    .to("direct:y")
    .to("direct:z");
<route>
    <from uri="direct:a"/>
    <multicast parallelProcessing="true">
        <to uri="direct:b"/>
        <to uri="direct:c"/>
        <to uri="direct:d"/>
    </multicast>
</route>
- route:
    from:
      uri: direct:a
      steps:
        - multicast:
            parallelProcessing: "true"
            steps:
              - to:
                  uri: direct:b
              - to:
                  uri: direct:c
              - to:
                  uri: direct:d

When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thread that called the multicast, then make sure to enable the synchronous option as well.

Ending a Multicast block

You may want to continue routing the exchange after the Multicast EIP.

In the example below, then sending to mock:result happens after the Multicast EIP has finished. In other words, direct:x, direct:y, and direct:z should be completed first, before the message continues.

  • Java

  • XML

  • YAML

from("direct:a")
  .multicast().parallelProcessing()
    .to("direct:x")
    .to("direct:y")
    .to("direct:z")
  .end()
  .to("mock:result");

Note that you need to use end() to mark where multicast ends, and where other EIPs can be added to continue the route.

<route>
    <from uri="direct:a"/>
    <multicast parallelProcessing="true">
        <to uri="direct:b"/>
        <to uri="direct:c"/>
        <to uri="direct:d"/>
    </multicast>
    <to uri="mock:result"/>
</route>
- route:
    from:
      uri: direct:a
      steps:
        - multicast:
            parallelProcessing: "true"
            steps:
              - to:
                  uri: direct:b
              - to:
                  uri: direct:c
              - to:
                  uri: direct:d
        - to:
            uri: mock:result

Aggregating

The AggregationStrategy is used for aggregating all the multicasted exchanges together as a single response exchange, that becomes the outgoing exchange after the Multicast EIP block.

The example now aggregates with the MyAggregationStrategy class:

  • Java

  • XML

  • YAML

from("direct:start")
  .multicast(new MyAggregationStrategy()).parallelProcessing().timeout(500)
    .to("direct:x")
    .to("direct:y")
    .to("direct:z")
  .end()
  .to("mock:result");

We can refer to the FQN class name with #class: syntax as shown below:

<route>
    <from uri="direct:a"/>
    <multicast parallelProcessing="true" timeout="5000"
               aggregationStrategy="#class:com.foo.MyAggregationStrategy">
        <to uri="direct:b"/>
        <to uri="direct:c"/>
        <to uri="direct:d"/>
    </multicast>
    <to uri="mock:result"/>
</route>
- route:
    from:
      uri: direct:a
      steps:
        - multicast:
            aggregationStrategy: "#class:com.foo.MyAggregationStrategy"
            timeout: 5000
            parallelProcessing: "true"
            steps:
              - to:
                  uri: direct:b
              - to:
                  uri: direct:c
              - to:
                  uri: direct:d
        - to:
            uri: mock:result

The Multicast, Recipient List, and Splitter EIPs have special support for using AggregationStrategy with access to the original input exchange. You may want to use this when you aggregate messages and there has been a failure in one of the messages, which you then want to enrich on the original input message and return as response; it’s the aggregate method with three exchange parameters.

Stop processing in case of exception

The Multicast EIP will by default continue to process the entire exchange even in case one of the multicasted messages will throw an exception during routing.

For example, if you want to multicast to three destinations and the second destination fails by an exception. What Camel does by default is to process the remainder destinations. You have the chance to deal with the exception when aggregating using an AggregationStrategy.

But sometimes you want Camel to stop and let the exception be propagated back, and let the Camel Error Handler handle it. You can do this by specifying that it should stop in case of an exception occurred. This is done by the stopOnException option as shown below:

  • Java

  • XML

  • YAML

from("direct:start")
    .multicast()
        .stopOnException().to("direct:foo", "direct:bar", "direct:baz")
    .end()
    .to("mock:result");

    from("direct:foo").to("mock:foo");

    from("direct:bar").process(new MyProcessor()).to("mock:bar");

    from("direct:baz").to("mock:baz");
<routes>
    <route>
        <from uri="direct:start"/>
        <multicast stopOnException="true">
            <to uri="direct:foo"/>
            <to uri="direct:bar"/>
            <to uri="direct:baz"/>
        </multicast>
        <to uri="mock:result"/>
    </route>

    <route>
        <from uri="direct:foo"/>
        <to uri="mock:foo"/>
    </route>

    <route>
        <from uri="direct:bar"/>
        <process ref="myProcessor"/>
        <to uri="mock:bar"/>
    </route>

    <route>
        <from uri="direct:baz"/>
        <to uri="mock:baz"/>
    </route>
</routes>
- route:
    from:
      uri: direct:start
      steps:
        - multicast:
            stopOnException: "true"
            steps:
              - to:
                  uri: direct:foo
              - to:
                  uri: direct:bar
              - to:
                  uri: direct:baz
        - to:
            uri: mock:result
- route:
    from:
      uri: direct:foo
      steps:
        - to:
            uri: mock:foo
- route:
    from:
      uri: direct:bar
      steps:
        - process:
            ref: myProcessor
        - to:
            uri: mock:bar
- route:
    from:
      uri: direct:baz
      steps:
        - to:
            uri: mock:baz

In the example above, then MyProcessor is causing a failure and throws an exception. This means the Multicast EIP will stop after this, and not the last route (direct:baz).

Preparing the message by deep copying before multicasting

The multicast EIP will copy the source exchange and multicast each copy. However, the copy is a shallow copy, so in case you have mutable message bodies, then any changes will be visible by the other copied messages. If you want to use a deep clone copy, then you need to use a custom onPrepare which allows you to create a deep copy of the message body in the Processor.

Notice the onPrepare can be used for any kind of custom logic that you would like to execute before the Exchange is being multicasted.

See Also

Because Multicast EIP is a baseline for the Recipient List and Split EIPs, then you can find more information in those EIPs about features that are also available with Multicast EIP.