Poll Enrich

Camel supports the Content Enricher from the EIP patterns.

image

In Camel the Content Enricher can be done in several ways:

The most natural Camel approach is using Enrich EIP, which comes as two kinds:

  • Enrich EIP: This is the most common content enricher that uses a Producer to obtain the data. It is usually used for Request Reply messaging, for instance, to invoke an external web service.

  • Poll Enrich EIP: Uses a Polling Consumer to obtain the additional data. It is usually used for Event Message messaging, for instance, to read a file or download a file using FTP.

This page documents the Poll Enrich EIP.

Options

The Poll Enrich eip supports 1 options, which are listed below.

Name Description Default Type

note

The note for this node.

String

description

The description for this node.

String

disabled

Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime.

false

Boolean

expression

Required The expression to compute the endpoint URI to poll-enrich from.

ExpressionDefinition

variableReceive

To use a variable to store the received message body (only body, not headers). This makes it handy to use variables for user data and to easily control what data to use for sending and receiving.

String

aggregationStrategy

Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message.

AggregationStrategy

aggregationStrategyMethodName

This option can be used to explicitly declare the method name to use, when using POJOs as the AggregationStrategy.

String

aggregationStrategyMethodAllowNull

If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy.

String

aggregateOnException

If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method.

false

Boolean

timeout

Timeout in millis when polling from the external service. A negative value waits until a message is available (could block indefinitely). Zero attempts to receive immediately without waiting. A positive value waits up to the given timeout.

-1

String

cacheSize

Sets the maximum size used by the ConsumerCache which is used to cache and reuse consumers when uris are reused. Use 0 for default cache size, or -1 to turn cache off.

Integer

ignoreInvalidEndpoint

Whether to ignore an invalid endpoint URI when trying to create a consumer with that endpoint.

false

Boolean

allowOptimisedComponents

Whether to allow components to optimise if they are PollDynamicAware.

true

Boolean

autoStartComponents

Whether to auto startup components when poll enricher is starting up.

true

Boolean

Exchange properties

The Poll Enrich eip supports 1 exchange properties, which are listed below.

The exchange properties are set on the Exchange by the EIP, unless otherwise specified in the description. This means those properties are available after this EIP has completed processing the Exchange.

Name Description Default Type

CamelToEndpoint

Endpoint URI where this Exchange is being sent to.

String

Content enrichment using Poll Enrich EIP

pollEnrich uses a Polling Consumer to obtain the additional data. It is usually used for Event Message messaging, for instance, to read a file or download a file using FTP.

The pollEnrich works just the same as enrich, however, as it uses a Polling Consumer, we have three methods when polling:

  • receive: Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.

  • receiveNoWait: Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet.

  • receive(timeout): Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns the message or null if the timeout expired.

Poll Enrich with timeout

It is good practice to use timeout value.

By default, Camel will use the receive which may block until there is a message available. It is therefore recommended to always provide a timeout value, to make this clear that we may wait for a message until the timeout is hit.

You can pass in a timeout value that determines which method to use:

  • if timeout is -1 or other negative number then receive is selected (Important: the receive method may block if there is no message)

  • if timeout is 0 then receiveNoWait is selected

  • otherwise, receive(timeout) is selected

The timeout values are in milliseconds.

Using Poll Enrich

The content enricher (pollEnrich) retrieves additional data from a resource endpoint in order to enrich an incoming message (contained in the original exchange).

An AggregationStrategy is used to combine the original exchange and the resource exchange. The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange) method corresponds to the original exchange, the second parameter the resource exchange.

Here’s an example for implementing an AggregationStrategy, which merges the two data together as a String with colon separator:

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        // this is just an example, for real-world use-cases the
        // aggregation strategy would be specific to the use-case

        if (resource == null) {
            return original;
        }
        Object oldBody = original.getIn().getBody();
        Object newBody = resource.getIn().getBody();
        original.getIn().setBody(oldBody + ":" + newBody);
        return original;
    }

}

You then use the AggregationStrategy with the pollEnrich in the following example, where Camel will poll a file (timeout 10 seconds). The AggregationStrategy is then used to merge the file with the existing Exchange.

  • Java

  • XML

  • YAML

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .pollEnrich("file:inbox?fileName=data.txt", 10000, aggregationStrategy)
  .to("mock:result");
<bean id="myStrategy" class="com.foo.ExampleAggregationStrategy"/>

<route>
  <from uri="direct:start"/>
  <pollEnrich timeout="10000" aggregationStrategy="myStrategy">
    <constant>file:inbox?fileName=data.txt</constant>
  </pollEnrich>
  <to uri="mock:result"/>
</route>
- route:
    from:
      uri: direct:start
      steps:
        - pollEnrich:
            aggregationStrategy: myStrategy
            timeout: 10000
            expression:
              constant:
                expression: file:inbox?fileName=data.txt
        - to:
            uri: mock:result

Using Poll Enrich with Rest DSL

You can also use pollEnrich with Rest DSL to, for example, download a file from AWS S3 as the response of an API call.

  • Java

  • XML

  • YAML

rest("/report")
  .get("/{id}/report")
    .to("direct:report");

from("direct:report")
  .pollEnrich("aws-s3:xavier-dev?amazonS3Client=#s3client&amp;deleteAfterRead=false&amp;fileName=report-file.pdf");
<rest path="/report">
    <get path="/{id}/report">
      <to uri="direct:report"/>
    </get>
</rest>

<route>
  <from uri="direct:report"/>
    <pollEnrich>
        <constant>aws-s3:xavier-dev?amazonS3Client=#s3client&amp;deleteAfterRead=false&amp;fileName=report-file.pdf</constant>
    </pollEnrich>
</route>
- rest:
    path: /report
    get:
      - path: /{id}/report
        to:
          uri: direct:report

- route:
    from:
      uri: direct:report
      steps:
        - pollEnrich:
            expression:
              constant:
                expression: "aws-s3:xavier-dev?amazonS3Client=#s3client&deleteAfterRead=false&fileName=report-file.pdf"

Notice that the enriched endpoint is a constant, however, Camel also supports dynamic endpoints which is covered next.

Poll Enrich with Dynamic Endpoints

Both enrich and pollEnrich supports using dynamic uris computed based on information from the current Exchange.

For example to pollEnrich from an endpoint that uses a header to indicate a SEDA queue name:

  • Java

  • XML

  • YAML

from("direct:start")
  .pollEnrich().simple("seda:${header.queueName}")
  .to("direct:result");
<route>
  <from uri="direct:start"/>
  <pollEnrich>
    <simple>seda:${header.queueName}</simple>
  </pollEnrich>
  <to uri="direct:result"/>
</route>
- route:
    from:
      uri: direct:start
      steps:
        - pollEnrich:
            expression:
              simple:
                expression: "seda:${header.queueName}"
        - to:
            uri: direct:result
See the cacheSize option for more details on how much cache to use depending on how many or few unique endpoints are used.

Using Poll Enrich with file based components

When using poll or pollEnrich with the file based components, then the eagerLimitMaxMessagesPerPoll option has changed default from false to true from Camel 4.13 onwards. Only use-cases where you need to sort the files first, requires to explicit set the option eagerLimitMaxMessagesPerPoll=false to make Camel scan for all files first before sorting, and then poll or pollEnrich will then pick the top file after the sorting.

This improves performance for use-cases without need for sorting first.

See More