An application needs to consume Messages, but it wants to control when it consumes each message.
How can an application consume a message when the application is ready?
The application should use a Polling Consumer, one that explicitly makes a call when it wants to receive a message.
In Camel the
PollingConsumer is represented by the PollingConsumer interface.
You can get hold of a
PollingConsumer in several ways in Camel:
If you need to use Polling Consumer from within a route, then the Poll Enrich EIP can be used.
On the other hand if you need to use Polling Consumer programmatically, then using ConsumerTemplate is a good choice.
And if you want to use the lower level Camel APIs then you can create the
PollingConsumer instance to be used.
You can programmatically create an instance of
PollingConsumer from any endpoint as shown below:
Endpoint endpoint = context.getEndpoint("activemq:my.queue"); PollingConsumer consumer = endpoint.createPollingConsumer(); Exchange exchange = consumer.receive();
There are three main polling methods on PollingConsumer:
Waits until a message is available and then returns it; potentially blocking forever
Attempts to receive a message exchange, waiting up to the given timeout and returning null if no message exchange could be received within the time available
Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet
In Camel there are two kinds of
Custom - Some components have their own custom implementation of
PollingConsumerwhich is optimized for the given component.
EventDrivenPollingConsumeris the default implementation otherwise.
EventDrivenPollingConsumer supports the following options:
| || || |
The queue size for the internal hand-off queue between the polling consumer, and producers sending data into the queue.
| || || |
Whether to block any producer if the internal queue is full.
| || |
To use a timeout (in milliseconds) when the producer is blocked if the internal queue is full. If the value is
You can configure these options in endpoints URIs, such as shown below:
Endpoint endpoint = context.getEndpoint("file:inbox?pollingConsumerQueueSize=50"); PollingConsumer consumer = endpoint.createPollingConsumer(); Exchange exchange = consumer.receive(5000);