Azure CosmosDB
Since Camel 3.10
Both producer and consumer are supported
Azure Cosmos DB is Microsoft’s globally distributed, multimodel database service for operational and analytics workloads. It offers multi-mastering feature by automatically scaling throughput, compute, and storage. This component interacts with Azure CosmosDB through Azure SQL API.
Prerequisites
You must have a valid Windows Azure Storage account. More information is available at Azure Documentation Portal.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-azure-cosmosdb</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
URI Format
azure-cosmosdb://[databaseName][/containerName][?options]
In case of the consumer, databaseName
, containerName
are required, In case of the producer, it depends on the operation that being requested, for example if operation is on a database level, e.b: deleteDatabase, only databaseName
is required, but in case of operation being requested in container level, e.g: readItem, then databaseName
and containerName
are required.
You can append query options to the URI in the following format, ?options=value&option2=value&
…
Configuring Options
Camel components are configured on two separate levels:
-
component level
-
endpoint level
Configuring Component Options
At the component level, you set general and shared configurations that are, then, inherited by the endpoints. It is the highest configuration level.
For example, a component may have security settings, credentials for authentication, urls for network connection and so forth.
Some components only have a few options, and others may have many. Because components typically have pre-configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.
You can configure components using:
-
the Component DSL.
-
in a configuration file (
application.properties
,*.yaml
files, etc). -
directly in the Java code.
Configuring Endpoint Options
You usually spend more time setting up endpoints because they have many options. These options help you customize what you want the endpoint to do. The options are also categorized into whether the endpoint is used as a consumer (from), as a producer (to), or both.
Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL and DataFormat DSL as a type safe way of configuring endpoints and data formats in Java.
A good practice when configuring options is to use Property Placeholders.
Property placeholders provide a few benefits:
-
They help prevent using hardcoded urls, port numbers, sensitive information, and other settings.
-
They allow externalizing the configuration from the code.
-
They help the code to become more flexible and reusable.
The following two sections list all the options, firstly for the component followed by the endpoint.
Component Options
The Azure CosmosDB component supports 31 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
Sets the flag to enable client telemetry which will periodically collect database operations aggregation statistics, system information like cpu/memory and send it to cosmos monitoring service, which will be helpful during debugging. DEFAULT value is false indicating this is an opt-in feature, by default no telemetry collection. | false | boolean | |
The component configurations. | CosmosDbConfiguration | ||
Enables connections sharing across multiple Cosmos Clients. The default is false. When you have multiple instances of Cosmos Client in the same JVM interacting with multiple Cosmos accounts, enabling this allows connection sharing in Direct mode if possible between instances of Cosmos Client. Please note, when setting this option, the connection configuration (e.g., socket timeout config, idle timeout config) of the first instantiated client will be used for all other client instances. | false | boolean | |
Sets the consistency levels supported for Azure Cosmos DB client operations in the Azure Cosmos DB service. The requested ConsistencyLevel must match or be weaker than that provisioned for the database account. Consistency levels by order of strength are STRONG, BOUNDED_STALENESS, SESSION and EVENTUAL. Refer to consistency level documentation for additional details: https://docs.microsoft.com/en-us/azure/cosmos-db/consistency-levels. Enum values:
| SESSION | ConsistencyLevel | |
Sets the container partition key path. | String | ||
Sets the boolean to only return the headers and status code in Cosmos DB response in case of Create, Update and Delete operations on CosmosItem. In Consumer, it is enabled by default because of the ChangeFeed in the consumer that needs this flag to be enabled, and thus it shouldn’t be overridden. In Producer, it is advised to disable it since it reduces the network overhead. | true | boolean | |
Autowired Inject an external CosmosAsyncClient into the component which provides a client-side logical representation of the Azure Cosmos DB service. This asynchronous client is used to configure and execute requests against the service. | CosmosAsyncClient | ||
Sets if the component should create the Cosmos container automatically in case it doesn’t exist in the Cosmos database. | false | boolean | |
Sets if the component should create the Cosmos database automatically in case it doesn’t exist in the Cosmos account. | false | boolean | |
Required Sets the Azure Cosmos database endpoint the component will connect to. | String | ||
Sets the flag to enable writes on any regions for geo-replicated database accounts in the Azure Cosmos DB service. When the value of this property is true, the SDK will direct write operations to available writable regions of geo-replicated database account. Writable regions are ordered by PreferredRegions property. Setting the property value to true has no effect until EnableMultipleWriteRegions in DatabaseAccount is also set to true. DEFAULT value is true indicating that writes are directed to available writable regions of geo-replicated database account. | true | boolean | |
Sets the comma separated preferred regions for geo-replicated database accounts. For example, East US as the preferred region. When EnableEndpointDiscovery is true and PreferredRegions is non-empty, the SDK will prefer to use the regions in the container in the order they are specified to perform operations. | String | ||
Sets whether to allow for reads to go to multiple regions configured on an account of Azure Cosmos DB service. DEFAULT value is true. If this property is not set, the default is true for all Consistency Levels other than Bounded Staleness, The default is false for Bounded Staleness. 1. endpointDiscoveryEnabled is true 2. the Azure Cosmos DB account has more than one region. | true | boolean | |
Sets throughput of the resources in the Azure Cosmos DB service. | ThroughputProperties | ||
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | |
Sets the ChangeFeedProcessorOptions to be used. Unless specifically set the default values that will be used are: maximum items per page or FeedResponse: 100 lease renew interval: 17 seconds lease acquire interval: 13 seconds lease expiration interval: 60 seconds feed poll delay: 5 seconds maximum scale count: unlimited. | ChangeFeedProcessorOptions | ||
Sets if the component should create Cosmos lease container for the consumer automatically in case it doesn’t exist in Cosmos database. | false | boolean | |
Sets if the component should create the Cosmos lease database for the consumer automatically in case it doesn’t exist in the Cosmos account. | false | boolean | |
Sets the hostname. The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name. If not specified, this will be a generated random hostname. | String | ||
Sets the lease container which acts as a state storage and coordinates processing the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account. It will be auto-created if createLeaseContainerIfNotExists is set to true. | camel-lease | String | |
Sets the lease database where the leaseContainerName will be stored. If it is not specified, this component will store the lease container in the same database that is specified in databaseName. It will be auto-created if createLeaseDatabaseIfNotExists is set to true. | String | ||
Sets the itemId in case needed for operation on item like delete, replace. | String | ||
Sets partition key. Represents a partition key value in the Azure Cosmos DB database service. A partition key identifies the partition where the item is stored in. | String | ||
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. | false | boolean | |
The CosmosDB operation that can be used with this component on the producer. Enum values:
| listDatabases | CosmosDbOperationsDefinition | |
An SQL query to execute on a given resources. To learn more about Cosmos SQL API, check this link \{link https://docs.microsoft.com/en-us/azure/cosmos-db/sql-query-getting-started}. | String | ||
Set additional QueryRequestOptions that can be used with queryItems, queryContainers, queryDatabases, listDatabases, listItems, listContainers operations. | CosmosQueryRequestOptions | ||
The CosmosDB Indexing Policy that will be set in case of container creation, this option is related to createLeaseContainerIfNotExists and it will be taken into account when the latter is true. | IndexingPolicy | ||
Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean | |
Sets either a master or readonly key used to perform authentication for accessing resource. | String | ||
Determines the credential strategy to adopt. Enum values:
| SHARED_ACCOUNT_KEY | CredentialType |
Endpoint Options
The Azure CosmosDB endpoint is configured using URI syntax:
azure-cosmosdb:databaseName/containerName
With the following path and query parameters:
Path Parameters (2 parameters)
Name | Description | Default | Type |
---|---|---|---|
The name of the Cosmos database that component should connect to. In case you are producing data and have createDatabaseIfNotExists=true, the component will automatically auto create a Cosmos database. | String | ||
The name of the Cosmos container that component should connect to. In case you are producing data and have createContainerIfNotExists=true, the component will automatically auto create a Cosmos container. | String |
Query Parameters (31 parameters)
Name | Description | Default | Type |
---|---|---|---|
Sets the flag to enable client telemetry which will periodically collect database operations aggregation statistics, system information like cpu/memory and send it to cosmos monitoring service, which will be helpful during debugging. DEFAULT value is false indicating this is an opt-in feature, by default no telemetry collection. | false | boolean | |
Enables connections sharing across multiple Cosmos Clients. The default is false. When you have multiple instances of Cosmos Client in the same JVM interacting with multiple Cosmos accounts, enabling this allows connection sharing in Direct mode if possible between instances of Cosmos Client. Please note, when setting this option, the connection configuration (e.g., socket timeout config, idle timeout config) of the first instantiated client will be used for all other client instances. | false | boolean | |
Sets the consistency levels supported for Azure Cosmos DB client operations in the Azure Cosmos DB service. The requested ConsistencyLevel must match or be weaker than that provisioned for the database account. Consistency levels by order of strength are STRONG, BOUNDED_STALENESS, SESSION and EVENTUAL. Refer to consistency level documentation for additional details: https://docs.microsoft.com/en-us/azure/cosmos-db/consistency-levels. Enum values:
| SESSION | ConsistencyLevel | |
Sets the container partition key path. | String | ||
Sets the boolean to only return the headers and status code in Cosmos DB response in case of Create, Update and Delete operations on CosmosItem. In Consumer, it is enabled by default because of the ChangeFeed in the consumer that needs this flag to be enabled, and thus it shouldn’t be overridden. In Producer, it is advised to disable it since it reduces the network overhead. | true | boolean | |
Autowired Inject an external CosmosAsyncClient into the component which provides a client-side logical representation of the Azure Cosmos DB service. This asynchronous client is used to configure and execute requests against the service. | CosmosAsyncClient | ||
Sets if the component should create the Cosmos container automatically in case it doesn’t exist in the Cosmos database. | false | boolean | |
Sets if the component should create the Cosmos database automatically in case it doesn’t exist in the Cosmos account. | false | boolean | |
Required Sets the Azure Cosmos database endpoint the component will connect to. | String | ||
Sets the flag to enable writes on any regions for geo-replicated database accounts in the Azure Cosmos DB service. When the value of this property is true, the SDK will direct write operations to available writable regions of geo-replicated database account. Writable regions are ordered by PreferredRegions property. Setting the property value to true has no effect until EnableMultipleWriteRegions in DatabaseAccount is also set to true. DEFAULT value is true indicating that writes are directed to available writable regions of geo-replicated database account. | true | boolean | |
Sets the comma separated preferred regions for geo-replicated database accounts. For example, East US as the preferred region. When EnableEndpointDiscovery is true and PreferredRegions is non-empty, the SDK will prefer to use the regions in the container in the order they are specified to perform operations. | String | ||
Sets whether to allow for reads to go to multiple regions configured on an account of Azure Cosmos DB service. DEFAULT value is true. If this property is not set, the default is true for all Consistency Levels other than Bounded Staleness, The default is false for Bounded Staleness. 1. endpointDiscoveryEnabled is true 2. the Azure Cosmos DB account has more than one region. | true | boolean | |
Sets throughput of the resources in the Azure Cosmos DB service. | ThroughputProperties | ||
Sets the ChangeFeedProcessorOptions to be used. Unless specifically set the default values that will be used are: maximum items per page or FeedResponse: 100 lease renew interval: 17 seconds lease acquire interval: 13 seconds lease expiration interval: 60 seconds feed poll delay: 5 seconds maximum scale count: unlimited. | ChangeFeedProcessorOptions | ||
Sets if the component should create Cosmos lease container for the consumer automatically in case it doesn’t exist in Cosmos database. | false | boolean | |
Sets if the component should create the Cosmos lease database for the consumer automatically in case it doesn’t exist in the Cosmos account. | false | boolean | |
Sets the hostname. The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name. If not specified, this will be a generated random hostname. | String | ||
Sets the lease container which acts as a state storage and coordinates processing the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account. It will be auto-created if createLeaseContainerIfNotExists is set to true. | camel-lease | String | |
Sets the lease database where the leaseContainerName will be stored. If it is not specified, this component will store the lease container in the same database that is specified in databaseName. It will be auto-created if createLeaseDatabaseIfNotExists is set to true. | String | ||
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | |
To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | ExceptionHandler | ||
Sets the exchange pattern when the consumer creates an exchange. Enum values:
| ExchangePattern | ||
Sets the itemId in case needed for operation on item like delete, replace. | String | ||
Sets partition key. Represents a partition key value in the Azure Cosmos DB database service. A partition key identifies the partition where the item is stored in. | String | ||
The CosmosDB operation that can be used with this component on the producer. Enum values:
| listDatabases | CosmosDbOperationsDefinition | |
An SQL query to execute on a given resources. To learn more about Cosmos SQL API, check this link \{link https://docs.microsoft.com/en-us/azure/cosmos-db/sql-query-getting-started}. | String | ||
Set additional QueryRequestOptions that can be used with queryItems, queryContainers, queryDatabases, listDatabases, listItems, listContainers operations. | CosmosQueryRequestOptions | ||
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. | false | boolean | |
The CosmosDB Indexing Policy that will be set in case of container creation, this option is related to createLeaseContainerIfNotExists and it will be taken into account when the latter is true. | IndexingPolicy | ||
Sets either a master or readonly key used to perform authentication for accessing resource. | String | ||
Determines the credential strategy to adopt. Enum values:
| SHARED_ACCOUNT_KEY | CredentialType |
Usage
Authentication Information
To use this component, you have two options to provide the required Azure authentication information:
-
Provide
accountKey
anddatabaseEndpoint
for your Azure CosmosDB account. The account key can be generated through your CosmosDB Azure portal. -
Provide a CosmosAsyncClient instance which can be provided into
cosmosAsyncClient
.
Async Consumer and Producer
This component implements the async Consumer and producer.
This allows camel route to consume and produce events asynchronously without blocking any threads.
Message headers evaluated by the component producer
Header | Variable Name | Type | Description |
---|---|---|---|
|
|
| Overrides the database name which is the name of the Cosmos database that component should connect to. In case you are producing data and have createDatabaseIfNotExists=true, the component will automatically auto create a Cosmos database. |
|
|
| Overrides the container name which is the name of the Cosmos container that component should connect to. In case you are producing data and have createContainerIfNotExists=true, the component will automatically auto create a Cosmos container. |
|
|
| Set the producer operation which can be used to execute a specific operation on the producer. |
|
|
| Set the SQL query to execute on a given producer query operations. |
|
|
| Set additional QueryRequestOptions that can be used with queryItems, queryContainers, queryDatabases, listDatabases, listItems, listContainers operations. |
|
|
| Sets if the component should create the Cosmos database automatically in case it doesn’t exist in the Cosmos account. |
|
|
| Sets if the component should create Cosmos container automatically in case it doesn’t exist in the Cosmos account. |
|
|
| Sets throughput of the resources in the Azure Cosmos DB service. |
|
|
| Sets additional options to execute on database operations. |
|
|
| Set the container partition key path. |
|
|
| Set additional options to execute on container operations. |
|
|
| Set the partition key. Represents a partition key value in the Azure Cosmos DB database service. A partition key identifies the partition where the item is stored in. |
|
|
| Set additional options to execute on item operations. |
|
|
| Set the itemId in case needed for operation on item like delete, replace. |
Message headers set by the component producer
Header | Variable Name | Type | Description |
---|---|---|---|
|
|
| The resource ID of the requested resource. |
|
|
| The Etag ID of the requested resource. |
|
|
| The timestamp of the requested resource. |
|
|
| The response headers of the requested resource. |
|
|
| The status code of the requested resource. |
|
|
| The TTL of the requested resource. |
|
|
| The manual throughput of the requested resource. |
|
|
| The AutoscaleMaxThroughput of the requested resource. |
Azure CosmosDB Producer operations
Camel Azure CosmosDB component provides a wide range of operations on the producer side:
Operations on the service level
For these operations, databaseName
is required except for queryDatabases
and listDatabases
operations.
Operation | Description |
---|---|
| Gets a list of all databases as |
| Create a database in the specified Azure CosmosDB account. |
|
|
Operations on the database level
For these operations, databaseName
is required for all operations here and containerName
only for createContainer
and queryContainers
.
Operation | Description |
---|---|
| Delete a database from the Azure CosmosDB account. |
| Create a container in the specified Azure CosmosDB database. |
| Replace the throughput for the specified Azure CosmosDB database. |
| Gets a list of all containers in the specified database as |
|
|
Operations on the container level
For these operations, databaseName
and containerName
is required for all operations here.
Operation | Description |
---|---|
| Delete a container from the specified Azure CosmosDB database. |
| Replace the throughput for the specified Azure CosmosDB container. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Refer to the example section in this page to learn how to use these operations into your camel application.
Examples
Consuming records from a specific container
For example, to consume records from a specific container in a specific database to a file, use the following snippet:
from("azure-cosmosdb://camelDb/myContainer?accountKey=MyaccountKey&databaseEndpoint=https//myazure.com:443&leaseDatabaseName=myLeaseDB&createLeaseDatabaseIfNotExists=true&createLeaseContainerIfNotExists=true").
to("file://directory");
Operations
-
listDatabases
:
from("direct:start")
.to("azure-cosmosdb://?operation=listDatabases")
.to("mock:result");
-
createDatabase
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "myDb");
})
.to("azure-cosmosdb://?operation=createDatabase")
.to("mock:result");
-
deleteDatabase
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "myDb");
})
.to("azure-cosmosdb://?operation=deleteDatabase")
.to("mock:result");
-
createContainer
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_PARTITION_KEY_PATH, "path");
exchange.getIn().setHeader(CosmosDbConstants.CREATE_DATABASE_IF_NOT_EXIST, true);
})
.to("azure-cosmosdb://?operation=createContainer")
.to("mock:result");
-
deleteContainer
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
})
.to("azure-cosmosdb://?operation=deleteContainer")
.to("mock:result");
-
replaceDatabaseThroughput
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.THROUGHPUT_PROPERTIES,
ThroughputProperties.createManualThroughput(700));
})
.to("azure-cosmosdb://?operation=replaceDatabaseThroughput")
.to("mock:result");
-
queryContainers
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.QUERY, "SELECT * from c where c.id = 'myAwersomeContainer'");
})
.to("azure-cosmosdb://?operation=queryContainers")
.to("mock:result");
-
createItem
:
from("direct:start")
.process(exchange -> {
// create item to send
final Map<String, Object> item = new HashMap<>();
item1.put("id", "test-id-1");
item1.put("partition", "test-1");
item1.put("field1", "awesome!");
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_PARTITION_KEY_PATH, "partition");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_PARTITION_KEY, "test-1");
exchange.getIn().setBody(item);
})
.to("azure-cosmosdb://?operation=createItem")
.to("mock:result");
-
replaceItem
:
from("direct:start")
.process(exchange -> {
// create item to send
final Map<String, Object> item = new HashMap<>();
item1.put("id", "test-id-1");
item1.put("partition", "test-1");
item1.put("field1", "awesome!");
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_PARTITION_KEY, "test-1");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_ID, "test-id-1");
exchange.getIn().setBody(item);
})
.to("azure-cosmosdb://?operation=replaceItem")
.to("mock:result");
-
deleteItem
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_PARTITION_KEY, "test-1");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_ID, "test-id-1");
exchange.getIn().setBody(item);
})
.to("azure-cosmosdb://?operation=deleteItem")
.to("mock:result");
-
queryItems
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.QUERY, "SELECT c.id,c.field2,c.field1 from c where c.id = 'test-id-1'");
})
.to("azure-cosmosdb://?operation=queryItems")
.to("mock:result");
Azure CosmosDB Consumer
Camel Azure CosmosDB uses ChangeFeed pattern to capture a feed of events and feed them into the Camel in an Async manner, something similar to the Change Data Capture (CDC) design pattern. However, it doesn’t capture deletions as these are removed from the feed as well.
To use the Camel Azure CosmosDB, containerName
and databaseName
are required. However, there are more options that need to be set to use this feature:
-
leaseDatabaseName
: Sets the lease database where theleaseContainerName
will be stored. If it is not specified, this component will store the lease container in the same database that is specified in databaseName. It will be auto-created ifcreateLeaseDatabaseIfNotExists
is set to true. -
leaseContainerName
: Sets the lease container which acts as a state storage and coordinates processing the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account. It will be auto-created ifcreateLeaseContainerIfNotExists
is set to true. If not specified, this component will create container calledcamel-lease
. -
hostName
: Sets the hostname. The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name. If not specified, this will be a generated random hostname. -
changeFeedProcessorOptions
: Sets additional options for the change feed processor.
The consumer will set List<Map<String,>>
in exchange message body which reflect the list of items in a single feed.
Important Development Notes
When developing on this component, you will need to obtain your Azure accessKey in order to run the integration tests. In addition to the mocked unit tests, you will need to run the integration tests with every change you make or even client upgrade as the Azure client can break things even on minor versions upgrade. To run the integration tests, on this component directory, run the following maven command:
mvn clean install -Dendpoint={{dbaddress}} -DaccessKey={{accessKey}}
Whereby endpoint
is your Azure CosmosDB endpoint name and accessKey
is the access key being generated from Azure CosmosDB portal.
Spring Boot Auto-Configuration
When using azure-cosmosdb with Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-azure-cosmosdb-starter</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
The component supports 32 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
Sets either a master or readonly key used to perform authentication for accessing resource. | String | ||
Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | Boolean | |
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | Boolean | |
camel.component.azure-cosmosdb.change-feed-processor-options | Sets the ChangeFeedProcessorOptions to be used. Unless specifically set the default values that will be used are: maximum items per page or FeedResponse: 100 lease renew interval: 17 seconds lease acquire interval: 13 seconds lease expiration interval: 60 seconds feed poll delay: 5 seconds maximum scale count: unlimited. The option is a com.azure.cosmos.models.ChangeFeedProcessorOptions type. | ChangeFeedProcessorOptions | |
Sets the flag to enable client telemetry which will periodically collect database operations aggregation statistics, system information like cpu/memory and send it to cosmos monitoring service, which will be helpful during debugging. DEFAULT value is false indicating this is an opt-in feature, by default no telemetry collection. | false | Boolean | |
The component configurations. The option is a org.apache.camel.component.azure.cosmosdb.CosmosDbConfiguration type. | CosmosDbConfiguration | ||
camel.component.azure-cosmosdb.connection-sharing-across-clients-enabled | Enables connections sharing across multiple Cosmos Clients. The default is false. When you have multiple instances of Cosmos Client in the same JVM interacting with multiple Cosmos accounts, enabling this allows connection sharing in Direct mode if possible between instances of Cosmos Client. Please note, when setting this option, the connection configuration (e.g., socket timeout config, idle timeout config) of the first instantiated client will be used for all other client instances. | false | Boolean |
Sets the consistency levels supported for Azure Cosmos DB client operations in the Azure Cosmos DB service. The requested ConsistencyLevel must match or be weaker than that provisioned for the database account. Consistency levels by order of strength are STRONG, BOUNDED_STALENESS, SESSION and EVENTUAL. Refer to consistency level documentation for additional details: https://docs.microsoft.com/en-us/azure/cosmos-db/consistency-levels. | session | ConsistencyLevel | |
Sets the container partition key path. | String | ||
camel.component.azure-cosmosdb.content-response-on-write-enabled | Sets the boolean to only return the headers and status code in Cosmos DB response in case of Create, Update and Delete operations on CosmosItem. In Consumer, it is enabled by default because of the ChangeFeed in the consumer that needs this flag to be enabled, and thus it shouldn’t be overridden. In Producer, it is advised to disable it since it reduces the network overhead. | true | Boolean |
Inject an external CosmosAsyncClient into the component which provides a client-side logical representation of the Azure Cosmos DB service. This asynchronous client is used to configure and execute requests against the service. The option is a com.azure.cosmos.CosmosAsyncClient type. | CosmosAsyncClient | ||
camel.component.azure-cosmosdb.create-container-if-not-exists | Sets if the component should create the Cosmos container automatically in case it doesn’t exist in the Cosmos database. | false | Boolean |
camel.component.azure-cosmosdb.create-database-if-not-exists | Sets if the component should create the Cosmos database automatically in case it doesn’t exist in the Cosmos account. | false | Boolean |
camel.component.azure-cosmosdb.create-lease-container-if-not-exists | Sets if the component should create Cosmos lease container for the consumer automatically in case it doesn’t exist in Cosmos database. | false | Boolean |
camel.component.azure-cosmosdb.create-lease-database-if-not-exists | Sets if the component should create the Cosmos lease database for the consumer automatically in case it doesn’t exist in the Cosmos account. | false | Boolean |
Determines the credential strategy to adopt. | shared-account-key | CredentialType | |
Sets the Azure Cosmos database endpoint the component will connect to. | String | ||
Whether to enable auto configuration of the azure-cosmosdb component. This is enabled by default. | Boolean | ||
Sets the hostname. The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name. If not specified, this will be a generated random hostname. | String | ||
The CosmosDB Indexing Policy that will be set in case of container creation, this option is related to createLeaseContainerIfNotExists and it will be taken into account when the latter is true. The option is a com.azure.cosmos.models.IndexingPolicy type. | IndexingPolicy | ||
Sets the itemId in case needed for operation on item like delete, replace. | String | ||
Sets partition key. Represents a partition key value in the Azure Cosmos DB database service. A partition key identifies the partition where the item is stored in. | String | ||
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. | false | Boolean | |
Sets the lease container which acts as a state storage and coordinates processing the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account. It will be auto-created if createLeaseContainerIfNotExists is set to true. | camel-lease | String | |
Sets the lease database where the leaseContainerName will be stored. If it is not specified, this component will store the lease container in the same database that is specified in databaseName. It will be auto-created if createLeaseDatabaseIfNotExists is set to true. | String | ||
camel.component.azure-cosmosdb.multiple-write-regions-enabled | Sets the flag to enable writes on any regions for geo-replicated database accounts in the Azure Cosmos DB service. When the value of this property is true, the SDK will direct write operations to available writable regions of geo-replicated database account. Writable regions are ordered by PreferredRegions property. Setting the property value to true has no effect until EnableMultipleWriteRegions in DatabaseAccount is also set to true. DEFAULT value is true indicating that writes are directed to available writable regions of geo-replicated database account. | true | Boolean |
The CosmosDB operation that can be used with this component on the producer. | listdatabases | CosmosDbOperationsDefinition | |
Sets the comma separated preferred regions for geo-replicated database accounts. For example, East US as the preferred region. When EnableEndpointDiscovery is true and PreferredRegions is non-empty, the SDK will prefer to use the regions in the container in the order they are specified to perform operations. | String | ||
An SQL query to execute on a given resources. To learn more about Cosmos SQL API, check this link \{link https://docs.microsoft.com/en-us/azure/cosmos-db/sql-query-getting-started}. | String | ||
Set additional QueryRequestOptions that can be used with queryItems, queryContainers, queryDatabases, listDatabases, listItems, listContainers operations. The option is a com.azure.cosmos.models.CosmosQueryRequestOptions type. | CosmosQueryRequestOptions | ||
camel.component.azure-cosmosdb.read-requests-fallback-enabled | Sets whether to allow for reads to go to multiple regions configured on an account of Azure Cosmos DB service. DEFAULT value is true. If this property is not set, the default is true for all Consistency Levels other than Bounded Staleness, The default is false for Bounded Staleness. 1. endpointDiscoveryEnabled is true 2. the Azure Cosmos DB account has more than one region. | true | Boolean |
Sets throughput of the resources in the Azure Cosmos DB service. The option is a com.azure.cosmos.models.ThroughputProperties type. | ThroughputProperties |