Clustering
Camel offers the following cluster related SPI:
-
Cluster Service
A regular Camel service that manages cluster resources such as views (see below)
-
Cluster View
Represent a view of the cluster with its own set of isolated resources. As today views provide supports for:
-
Leader Election
-
Topology events like members joining/leaving the cluster
-
-
Cluster Member
Represent a member of the cluster.
Cluster SPI Setup
A Cluster Service is just like any other camel service so set it up you only need to register your implementations to the CamelContext:
MyClusterServiceImpl service = new MyClusterServiceImpl();
context.addService(service); The configuration of the Cluster Service depends on the implementation you have chose. Out of the box camel provides the following implementations:
| Type | Module | Class |
|---|---|---|
consul | camel-consul | org.apache.camel.component.consul.cluster.ConsulClusterService |
file | camel-file | org.apache.camel.component.file.cluster.FileLockClusterService |
infinispan | camel-infinispan | org.apache.camel.component.infinispan.cluster.InfinispanClusterService |
jgroups-raft | camel-jgroups-raft | org.apache.camel.component.jgroups.raft.cluster.JGroupsRaftClusterService |
kubernetes | camel-kubernetes | org.apache.camel.component.kubernetes.cluster.KubernetesClusterService |
zookeeper | camel-zookeeper | org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService |
Configuration options:
ConsulClusterService
| Name | Description | Default | Type |
|---|---|---|---|
sessionTtl | The Consul session TTL in seconds | 60 | int |
sessionLockDelay | The Consul session lock delay in seconds | 5 | int |
sessionRefreshInterval | The Consul session refresh interval in seconds | 5 | int |
rootPath | The Consul cluster root directory path | /camel | String |
FileLockClusterService
| Name | Description | Default | Type |
|---|---|---|---|
acquireLockDelay | The time to wait before starting to try to acquire the cluster lock | 1 | long |
acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS | TimeUnit |
acquireLockInterval | The time to wait between attempts to try to acquire the cluster lock | 10 | long |
acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS | TimeUnit |
heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader acquireLockInterval to determine how long followers should wait before considering the leader "stale". For example, if the leader updates its heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers will tolerate up to code 2s * 3 = 6s of silence before declaring the leader unavailable | 5 | int |
rootPath | The file cluster root directory path | String |
| As with most clustering implementations, it is important that the clocks on all machines in the file lock cluster are kept in sync. |
Storing the cluster root directory on NFS
When storing the FileLockClusterService cluster root directory (see the rootPath configuration option) on NFS, all cluster members must use NFS client mount options that allow failover to work reliably.
The following NFS client mount options can help with this. You may need to adjust these options depending on your needs.
-
soft- Disables continuous retransmission attempts by the client when the NFS server does not respond to a request. -
timeo=10- The time, in deciseconds, the NFS client waits for a response from the NFS server, before it sends another request. The default is 600 (60 seconds). -
retrans=1- Specifies the number of times the NFS client attempts to retransmit a failed request to the NFS server. The default is 3. The client waits atimeotimeout period between eachretransattempt. -
lookupcache=none— Specifies how the kernel manages its cache of directory entries for the mount point.noneforces the client to revalidate all cache entries before they are used. This enables the cluster leader to immediately detect any change made to its lock file, and it prevents the lock checking mechanism from returning incorrect validity information. -
sync— Any system call that writes data to files on the mount point causes the data to be flushed to the NFS server before the system call returns control to user space. This option provides greater data cache coherence.
For more information about NFS mount options, see http://linux.die.net/man/5/nfs.
InfinispanClusterService
| Name | Description | Default | Type |
|---|---|---|---|
lifespan | The lifespan of the cache entry for the local cluster member registered to the inventory | 30 | long |
lifespanTimeUnit | The TimeUnit of the lifespan | SECONDS | TimeUnit |
JGroupsRaftClusterService
| Name | Description | Default | Type |
|---|---|---|---|
jgroupsConfig | The path to the JGroups Raft configuration | raft.xml | String |
jgroupsClusterName | The name of the cluster | jgroupsraft-master | String |
raftHandle | The RaftHandle | org.jgroups.raft.RaftHandle | |
raftId | Unique Raft id | String |
KubernetesClusterService
| Name | Description | Default | Type |
|---|---|---|---|
leaseResourceType | Kubernetes resource type used to hold the leases | LeaseResourceType.Lease | LeaseResourceType |
kubernetesResourcesNamespace | Kubernetes namespace containing the pods and the ConfigMap used for locking | String | |
kubernetesResourceName | Name of the resource used for locking (or prefix, in case multiple ones are used) | leaders | String |
groupName | Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfigMap | String | |
podName | Name of the current pod (defaults to host name) | String | |
clusterLabels | Labels used to identify the members of the cluster | empty map | Map |
jitterFactor | A jitter factor to apply in order to prevent all pods to call Kubernetes APIs in the same instant | 1.2 | double |
leaseDurationMillis | The default duration of the lease for the current leader | 15000 | long |
renewDeadlineMillis | The deadline after which the leader must stop its services because it may have lost the leadership | 10000 | long |
retryPeriodMillis | The time between two subsequent attempts to check and acquire the leadership. It is randomized using the jitter factor | 2000 | long |
ZooKeeperClusterService
| Name | Description | Default | Type |
|---|---|---|---|
nodes | The Zookeeper server hosts (multiple servers can be separated by comma) | List | |
namespace | ZooKeeper namespace. If a namespace is set here, all paths will get pre-pended with the namespace | String | |
reconnectBaseSleepTime | Initial amount of time to wait between retries | long | |
reconnectBaseSleepTimeUnit | ReconnectBaseSleepTime TimeUnit. Default is | MILLISECONDS | TimeUnit |
reconnectMaxRetries | Max number of times to retry | 3 | int |
sessionTimeout | The session timeout in milliseconds | 60000 | long |
sessionTimeoutUnit | The session timeout TimeUnit | MILLISECONDS | TimeUnit |
connectionTimeout | The connection timeout in milliseconds | 15000 | long |
connectionTimeoutUnit | The connection timeout TimeUnit | TimeUnit.MILLISECONDS | TimeUnit |
authInfoList | List of AuthInfo objects with scheme and auth | List | |
maxCloseWait | Time to wait during close to join background threads | 1000 | long |
maxCloseWaitUnit | MaxCloseWait TimeUnit | MILLISECONDS | TimeUnit |
retryPolicy | The retry policy to use. | RetryPolicy | |
basePath | The base path to store in ZooKeeper | String |
Configuration examples:
-
Spring Boot
camel.cluster.file.enabled = true camel.cluster.file.id = ${random.uuid} camel.cluster.file.root = ${java.io.tmpdir} -
Spring XML
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <bean id="zx" class="org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService"> <property name="id" value="node-1"/> <property name="basePath" value="/camel/cluster"/> <property name="nodes" value="localhost:2181"/> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring" autoStartup="false"> ... </camelContext> </beans>
Cluster SPI Usage
The Cluster SPI is leveraged by the following new implementations:
-
ClusteredRoutePolicy
This is an implementation of a RoutePolicy that starts the routes it is associated to when the Cluster View it uses takes the leadership
context.addRoutes(new RouteBuilder { @Override public void configure() throws Exception { // Create the route policy RoutePolicy policy = ClusteredRoutePolicy.forNamespace("my-ns"); // bind the policy to one or more routes from("timer:clustered?delay=1000&period=1000") .routePolicy(policy) .log("Route ${routeId} is running ..."); } });To apply the same policy to all the routes a dedicated RoutePolicyFactory can be used:
// add the clustered route policy factory to context context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); context.addRoutes(new RouteBuilder { @Override public void configure() throws Exception { // bind the policy to one or more routes from("timer:clustered?delay=1000&period=1000") .log("Route ${routeId} is running ..."); } }); -
ClusteredRouteController
This is an implementation of the RouteController SPI that lets the camel context start then starts/stops the routes when the leadership is taken/lost. This is well integrated with spring-boot apps so assuming you have your routes set-up like:
@Bean public RouteBuilder routeBuilder() { return new RouteBuilder() { @Override public void configure() throws Exception { from("timer:heartbeat?period=10000") .routeId("heartbeat") .log("HeartBeat route (timer) ..."); from("timer:clustered?period=5000") .routeId("clustered") .log("Clustered route (timer) ..."); } }; }You can then leverage Spring Boot configuration to make them clustered:
# enable the route controller camel.clustered.controller.enabled = true # define the default namespace for routes camel.clustered.controller.namespace = my-ns # exclude the route with id 'heartbeat' from the clustered ones camel.clustered.controller.routes[heartbeat].clustered = false -
Master Component
The master component is similar to a ClusteredRoutePolicy but it works on consumer level so it ensures the only a single endpoint in a cluster is consuming resources at any point in time. Set it up is very easy and all you need is to prefix singleton endpoints according to the master component syntax:
master:namespace:delegateUriA concrete example:
@Bean public RouteBuilder routeBuilder() { return new RouteBuilder() { @Override public void configure() throws Exception { from("timer:heartbeat?period=10000") .routeId("heartbeat") .log("HeartBeat route (timer) ..."); from("master:my-ns:timer:clustered?period=5000") .routeId("clustered") .log("Clustered route (timer) ..."); } }; }