Clustering
Camel offers the following cluster related SPI:
-
Cluster Service
A regular Camel service that manages cluster resources such as views (see below)
-
Cluster View
Represent a view of the cluster with its own set of isolated resources. As today views provide supports for:
-
Leader Election
-
Topology events like members joining/leaving the cluster
-
-
Cluster Member
Represent a member of the cluster.
Cluster SPI Setup
A Cluster Service is just like any other camel service so set it up you only need to register your implementations to the CamelContext
:
MyClusterServiceImpl service = new MyClusterServiceImpl();
context.addService(service);
The configuration of the Cluster Service depends on the implementation you have chose. Out of the box camel provides the following implementations:
Type | Module | Class |
---|---|---|
consul | camel-consul | org.apache.camel.component.consul.cluster.ConsulClusterService |
file | camel-file | org.apache.camel.component.file.cluster.FileLockClusterService |
infinispan | camel-infinispan | org.apache.camel.component.infinispan.cluster.InfinispanClusterService |
jgroups-raft | camel-jgroups-raft | org.apache.camel.component.jgroups.raft.cluster.JGroupsRaftClusterService |
kubernetes | camel-kubernetes | org.apache.camel.component.kubernetes.cluster.KubernetesClusterService |
zookeeper | camel-zookeeper | org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService |
Configuration options:
ConsulClusterService
Name | Description | Default | Type |
---|---|---|---|
sessionTtl | The Consul session TTL in seconds | 60 | int |
sessionLockDelay | The Consul session lock delay in seconds | 5 | int |
sessionRefreshInterval | The Consul session refresh interval in seconds | 5 | int |
rootPath | The Consul cluster root directory path | /camel | String |
FileLockClusterService
Name | Description | Default | Type |
---|---|---|---|
acquireLockDelay | The time to wait before starting to try to acquire the cluster lock | 1 | long |
acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS | TimeUnit |
acquireLockInterval | The time to wait between attempts to try to acquire the cluster lock | 10 | long |
acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS | TimeUnit |
heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader acquireLockInterval to determine how long followers should wait before considering the leader "stale". For example, if the leader updates its heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers will tolerate up to {@code 2s * 3 = 6s} of silence before declaring the leader unavailable | 5 | int |
rootPath | The file cluster root directory path | String |
InfinispanClusterService
Name | Description | Default | Type |
---|---|---|---|
lifespan | The lifespan of the cache entry for the local cluster member registered to the inventory | 30 | long |
lifespanTimeUnit | The TimeUnit of the lifespan | SECONDS | TimeUnit |
JGroupsRaftClusterService
Name | Description | Default | Type |
---|---|---|---|
jgroupsConfig | The path to the JGroups Raft configuration | raft.xml | String |
jgroupsClusterName | The name of the cluster | jgroupsraft-master | String |
raftHandle | The RaftHandle | org.jgroups.raft.RaftHandle | |
raftId | Unique Raft id | String |
KubernetesClusterService
Name | Description | Default | Type |
---|---|---|---|
leaseResourceType | Kubernetes resource type used to hold the leases | LeaseResourceType.Lease | LeaseResourceType |
kubernetesResourcesNamespace | Kubernetes namespace containing the pods and the ConfigMap used for locking | String | |
kubernetesResourceName | Name of the resource used for locking (or prefix, in case multiple ones are used) | leaders | String |
groupName | Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfigMap | String | |
podName | Name of the current pod (defaults to host name) | String | |
clusterLabels | Labels used to identify the members of the cluster | empty map | Map |
jitterFactor | A jitter factor to apply in order to prevent all pods to call Kubernetes APIs in the same instant | 1.2 | double |
leaseDurationMillis | The default duration of the lease for the current leader | 15000 | long |
renewDeadlineMillis | The deadline after which the leader must stop its services because it may have lost the leadership | 10000 | long |
retryPeriodMillis | The time between two subsequent attempts to check and acquire the leadership. It is randomized using the jitter factor | 2000 | long |
ZooKeeperClusterService
Name | Description | Default | Type |
---|---|---|---|
nodes | The Zookeeper server hosts (multiple servers can be separated by comma) | List | |
namespace | ZooKeeper namespace. If a namespace is set here, all paths will get pre-pended with the namespace | String | |
reconnectBaseSleepTime | Initial amount of time to wait between retries | long | |
reconnectBaseSleepTimeUnit | ReconnectBaseSleepTime TimeUnit. Default is | MILLISECONDS | TimeUnit |
reconnectMaxRetries | Max number of times to retry | 3 | int |
sessionTimeout | The session timeout in milliseconds | 60000 | long |
sessionTimeoutUnit | The session timeout TimeUnit | MILLISECONDS | TimeUnit |
connectionTimeout | The connection timeout in milliseconds | 15000 | long |
connectionTimeoutUnit | The connection timeout TimeUnit | TimeUnit.MILLISECONDS | TimeUnit |
authInfoList | List of AuthInfo objects with scheme and auth | List | |
maxCloseWait | Time to wait during close to join background threads | 1000 | long |
maxCloseWaitUnit | MaxCloseWait TimeUnit | MILLISECONDS | TimeUnit |
retryPolicy | The retry policy to use. | RetryPolicy | |
basePath | The base path to store in ZooKeeper | String |
Configuration examples:
-
Spring Boot
camel.cluster.file.enabled = true camel.cluster.file.id = ${random.uuid} camel.cluster.file.root = ${java.io.tmpdir}
-
Spring XML
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <bean id="zx" class="org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService"> <property name="id" value="node-1"/> <property name="basePath" value="/camel/cluster"/> <property name="nodes" value="localhost:2181"/> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring" autoStartup="false"> ... </camelContext> </beans>
Cluster SPI Usage
The Cluster SPI is leveraged by the following new implementations:
-
ClusteredRoutePolicy
This is an implementation of a RoutePolicy that starts the routes it is associated to when the Cluster View it uses takes the leadership
context.addRoutes(new RouteBuilder { @Override public void configure() throws Exception { // Create the route policy RoutePolicy policy = ClusteredRoutePolicy.forNamespace("my-ns"); // bind the policy to one or more routes from("timer:clustered?delay=1000&period=1000") .routePolicy(policy) .log("Route ${routeId} is running ..."); } });
To apply the same policy to all the routes a dedicated RoutePolicyFactory can be used:
// add the clustered route policy factory to context context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); context.addRoutes(new RouteBuilder { @Override public void configure() throws Exception { // bind the policy to one or more routes from("timer:clustered?delay=1000&period=1000") .log("Route ${routeId} is running ..."); } });
-
ClusteredRouteController
This is an implementation of the RouteController SPI that lets the camel context start then starts/stops the routes when the leadership is taken/lost. This is well integrated with spring-boot apps so assuming you have your routes set-up like:
@Bean public RouteBuilder routeBuilder() { return new RouteBuilder() { @Override public void configure() throws Exception { from("timer:heartbeat?period=10000") .routeId("heartbeat") .log("HeartBeat route (timer) ..."); from("timer:clustered?period=5000") .routeId("clustered") .log("Clustered route (timer) ..."); } }; }
You can then leverage Spring Boot configuration to make them clustered:
# enable the route controller camel.clustered.controller.enabled = true # define the default namespace for routes camel.clustered.controller.namespace = my-ns # exclude the route with id 'heartbeat' from the clustered ones camel.clustered.controller.routes[heartbeat].clustered = false
-
Master Component
The master component is similar to a ClusteredRoutePolicy but it works on consumer level so it ensures the only a single endpoint in a cluster is consuming resources at any point in time. Set it up is very easy and all you need is to prefix singleton endpoints according to the master component syntax:
master:namespace:delegateUri
A concrete example:
@Bean public RouteBuilder routeBuilder() { return new RouteBuilder() { @Override public void configure() throws Exception { from("timer:heartbeat?period=10000") .routeId("heartbeat") .log("HeartBeat route (timer) ..."); from("master:my-ns:timer:clustered?period=5000") .routeId("clustered") .log("Clustered route (timer) ..."); } }; }