Clustering

Camel offers the following cluster related SPI:

  • Cluster Service

    A regular Camel service that manages cluster resources such as views (see below)

  • Cluster View

    Represent a view of the cluster with its own set of isolated resources. As today views provide supports for:

    • Leader Election

    • Topology events like members joining/leaving the cluster

  • Cluster Member

    Represent a member of the cluster.

Cluster SPI Setup

A Cluster Service is just like any other camel service so set it up you only need to register your implementations to the CamelContext:

MyClusterServiceImpl service = new MyClusterServiceImpl();
context.addService(service);

The configuration of the Cluster Service depends on the implementation you have chose. Out of the box camel provides the following implementations:

Type Module Class

consul

camel-consul

org.apache.camel.component.consul.cluster.ConsulClusterService

file

camel-file

org.apache.camel.component.file.cluster.FileLockClusterService

infinispan

camel-infinispan

org.apache.camel.component.infinispan.cluster.InfinispanClusterService

jgroups-raft

camel-jgroups-raft

org.apache.camel.component.jgroups.raft.cluster.JGroupsRaftClusterService

kubernetes

camel-kubernetes

org.apache.camel.component.kubernetes.cluster.KubernetesClusterService

zookeeper

camel-zookeeper

org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService

Configuration options:

ConsulClusterService

Name Description Default Type

sessionTtl

The Consul session TTL in seconds

60

int

sessionLockDelay

The Consul session lock delay in seconds

5

int

sessionRefreshInterval

The Consul session refresh interval in seconds

5

int

rootPath

The Consul cluster root directory path

/camel

String

FileLockClusterService

Name Description Default Type

acquireLockDelay

The time to wait before starting to try to acquire the cluster lock

1

long

acquireLockDelayUnit

The time unit for acquireLockDelay

SECONDS

TimeUnit

acquireLockInterval

The time to wait between attempts to try to acquire the cluster lock

10

long

acquireLockIntervalUnit

The time unit for acquireLockInterval

SECONDS

TimeUnit

heartbeatTimeoutMultiplier

Multiplier applied to the cluster leader acquireLockInterval to determine how long followers should wait before considering the leader "stale". For example, if the leader updates its heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers will tolerate up to {@code 2s * 3 = 6s} of silence before declaring the leader unavailable

5

int

rootPath

The file cluster root directory path

String

InfinispanClusterService

Name Description Default Type

lifespan

The lifespan of the cache entry for the local cluster member registered to the inventory

30

long

lifespanTimeUnit

The TimeUnit of the lifespan

SECONDS

TimeUnit

JGroupsRaftClusterService

Name Description Default Type

jgroupsConfig

The path to the JGroups Raft configuration

raft.xml

String

jgroupsClusterName

The name of the cluster

jgroupsraft-master

String

raftHandle

The RaftHandle

org.jgroups.raft.RaftHandle

raftId

Unique Raft id

String

KubernetesClusterService

Name Description Default Type

leaseResourceType

Kubernetes resource type used to hold the leases

LeaseResourceType.Lease

LeaseResourceType

kubernetesResourcesNamespace

Kubernetes namespace containing the pods and the ConfigMap used for locking

String

kubernetesResourceName

Name of the resource used for locking (or prefix, in case multiple ones are used)

leaders

String

groupName

Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfigMap

String

podName

Name of the current pod (defaults to host name)

String

clusterLabels

Labels used to identify the members of the cluster

empty map

Map

jitterFactor

A jitter factor to apply in order to prevent all pods to call Kubernetes APIs in the same instant

1.2

double

leaseDurationMillis

The default duration of the lease for the current leader

15000

long

renewDeadlineMillis

The deadline after which the leader must stop its services because it may have lost the leadership

10000

long

retryPeriodMillis

The time between two subsequent attempts to check and acquire the leadership. It is randomized using the jitter factor

2000

long

ZooKeeperClusterService

Name Description Default Type

nodes

The Zookeeper server hosts (multiple servers can be separated by comma)

List

namespace

ZooKeeper namespace. If a namespace is set here, all paths will get pre-pended with the namespace

String

reconnectBaseSleepTime

Initial amount of time to wait between retries

long

reconnectBaseSleepTimeUnit

ReconnectBaseSleepTime TimeUnit. Default is

MILLISECONDS

TimeUnit

reconnectMaxRetries

Max number of times to retry

3

int

sessionTimeout

The session timeout in milliseconds

60000

long

sessionTimeoutUnit

The session timeout TimeUnit

MILLISECONDS

TimeUnit

connectionTimeout

The connection timeout in milliseconds

15000

long

connectionTimeoutUnit

The connection timeout TimeUnit

TimeUnit.MILLISECONDS

TimeUnit

authInfoList

List of AuthInfo objects with scheme and auth

List

maxCloseWait

Time to wait during close to join background threads

1000

long

maxCloseWaitUnit

MaxCloseWait TimeUnit

MILLISECONDS

TimeUnit

retryPolicy

The retry policy to use.

RetryPolicy

basePath

The base path to store in ZooKeeper

String

Configuration examples:

  • Spring Boot

    camel.cluster.file.enabled = true
    camel.cluster.file.id = ${random.uuid}
    camel.cluster.file.root = ${java.io.tmpdir}
  • Spring XML

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="
             http://www.springframework.org/schema/beans
             http://www.springframework.org/schema/beans/spring-beans.xsd
             http://camel.apache.org/schema/spring
             http://camel.apache.org/schema/spring/camel-spring.xsd">
    
      <bean id="zx" class="org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService">
        <property name="id" value="node-1"/>
        <property name="basePath" value="/camel/cluster"/>
        <property name="nodes" value="localhost:2181"/>
      </bean>
    
      <camelContext xmlns="http://camel.apache.org/schema/spring" autoStartup="false">
        ...
      </camelContext>
    
    </beans>

Cluster SPI Usage

The Cluster SPI is leveraged by the following new implementations:

  • ClusteredRoutePolicy

    This is an implementation of a RoutePolicy that starts the routes it is associated to when the Cluster View it uses takes the leadership

    context.addRoutes(new RouteBuilder {
        @Override
        public void configure() throws Exception {
            // Create the route policy
            RoutePolicy policy = ClusteredRoutePolicy.forNamespace("my-ns");
    
            // bind the policy to one or more routes
            from("timer:clustered?delay=1000&period=1000")
                .routePolicy(policy)
                .log("Route ${routeId} is running ...");
        }
    });

    To apply the same policy to all the routes a dedicated RoutePolicyFactory can be used:

    // add the clustered route policy factory to context
    context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
    
    context.addRoutes(new RouteBuilder {
        @Override
        public void configure() throws Exception {
            // bind the policy to one or more routes
            from("timer:clustered?delay=1000&period=1000")
                .log("Route ${routeId} is running ...");
        }
    });
  • ClusteredRouteController

    This is an implementation of the RouteController SPI that lets the camel context start then starts/stops the routes when the leadership is taken/lost. This is well integrated with spring-boot apps so assuming you have your routes set-up like:

    @Bean
    public RouteBuilder routeBuilder() {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("timer:heartbeat?period=10000")
                    .routeId("heartbeat")
                    .log("HeartBeat route (timer) ...");
                from("timer:clustered?period=5000")
                    .routeId("clustered")
                    .log("Clustered route (timer) ...");
            }
        };
    }

    You can then leverage Spring Boot configuration to make them clustered:

    # enable the route controller
    camel.clustered.controller.enabled = true
    
    # define the default namespace for routes
    camel.clustered.controller.namespace = my-ns
    
    # exclude the route with id 'heartbeat' from the clustered ones
    camel.clustered.controller.routes[heartbeat].clustered = false
  • Master Component

    The master component is similar to a ClusteredRoutePolicy but it works on consumer level so it ensures the only a single endpoint in a cluster is consuming resources at any point in time. Set it up is very easy and all you need is to prefix singleton endpoints according to the master component syntax:

    master:namespace:delegateUri

    A concrete example:

    @Bean
    public RouteBuilder routeBuilder() {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("timer:heartbeat?period=10000")
                    .routeId("heartbeat")
                    .log("HeartBeat route (timer) ...");
    
                from("master:my-ns:timer:clustered?period=5000")
                    .routeId("clustered")
                    .log("Clustered route (timer) ...");
            }
        };
    }