Try it out locally
Run Kafka
Install a locally running kafka instance by following Apache Kafka quickstart guide. This usually boils down to:
export KAFKA_HOME=<your kafka install dir> $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties $KAFKA_HOME/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic mytopic For using the quickstart we’ll use the plugin.path property, so you’ll have to add a path for your connectors.
Open your configuration file located at $KAFKA_HOME/config/connect-standalone.properties
and add a property at the end
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/connectors At this point you’re able to run the connectors quickstart.
Next, run Camel kafka connectors source and/or sink:
You can use these Kafka utilities to listen or produce from a Kafka topic:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic Try some examples
For the following examples you need to fetch the camel-kafka-connector project and build it locally. You can either build the entire project with ./mvnw package from the root directory (which may take some time) or build just the connectors you are interested in with the following command from within a connector’s directory.
> cd connectors/camel-log-kafka-connector/
> mvn package -pl camel-timer-kafka-connector -am Look into the config and docs/examples directories for the configuration files (*.properties) of the examples showcased here. There is also a comprehensive set of examples with instructions on how to run them in this repository.
Simple logger (sink)
Unzip or untar the camel-log-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-log-kafka-connector/target/ a .zip file named camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-log-kafka-connector/target/camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelLogSinkConnector.properties Timer (source)
Unzip or untar the camel-timer-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-timer-kafka-connector/target/ a .zip file named camel-timer-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-timer-kafka-connector/target/camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-timer-kafka-connector-0.6.0-SNAPSHOT-package.zip This is an example of a source that produces a message every second to mytopic.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelTimerSourceConnector.properties AWS Kinesis (source)
Unzip or untar the camel-aws-kinesis-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-kinesis-kafka-connector/target/ a .zip file named camel-aws-kinesis-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-kinesis-kafka-connector/target/camel-aws-kinesis-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-kinesis-kafka-connector-0.6.0-SNAPSHOT-package.zip This example consumes from AWS Kinesis data stream and transfers the payload to mytopic topic in Kafka.
Adjust properties in examples/CamelAWSKinesisSourceConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-kinesis.configuration.access-key=youraccesskey, camel.component.aws-kinesis.configuration.secret-key=yoursecretkey and camel.component.aws-kinesis.configuration.region=yourregion.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSKinesisSourceConnector.properties AWS SQS (sink)
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-sqs-kafka-connector/target/ a .zip file named camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-sqs-kafka-connector/target/camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip This example consumes from Kafka topic mytopic and transfers the payload to AWS SQS.
Adjust properties in examples/CamelAWSSQSSinkConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey, camel.component.aws-sqs.configuration.secret-key=yoursecretkey and camel.component.aws-sqs.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSSQSSinkConnector.properties AWS SQS (source)
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-sqs-kafka-connector/target/ a .zip file named camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-sqs-kafka-connector/target/camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip This example consumes from AWS SQS queue mysqs and transfers the payload to mytopic topic in Kafka.
Adjust properties in examples/CamelAWSSQSSourceConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey, camel.component.aws-sqs.configuration.secret-key=yoursecretkey and camel.component.aws-sqs.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSSQSSourceConnector.properties AWS SNS (sink)
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-sns-kafka-connector/target/ a .zip file named camel-aws-sns-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-sns-kafka-connector/target/camel-aws-sns-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-sns-kafka-connector-0.6.0-SNAPSHOT-package.zip This example consumes from mytopic Kafka topic and transfers the payload to AWS SNS topic topic.
Adjust properties in examples/CamelAWSSNSSinkConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sns.configuration.access-key=youraccesskey, camel.component.aws-sns.configuration.secret-key=yoursecretkey and camel.component.aws-sns.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSSNSSinkConnector.properties AWS S3 (source)
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-s3-kafka-connector/target/ a .zip file named camel-aws-s3-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-s3-kafka-connector/target/camel-aws-s3-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-s3-kafka-connector-0.6.0-SNAPSHOT-package.zip This example fetches objects from AWS S3 in the camel-kafka-connector bucket and transfers the payload to mytopic Kafka topic. This example shows how to implement a custom converter converting from bytes received from S3 to Kafka’s SchemaAndValue.
Adjust properties in examples/CamelAWSS3SourceConnector.properties for your environment, you need to configure access key, secret key and region by adding camel.component.aws-s3.configuration.access-key=youraccesskey, camel.component.aws-s3.configuration.secret-key=yoursecretkey and camel.component.aws-s3.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSS3SourceConnector.properties Apache Cassandra
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-cql-kafka-connector/target/ a .zip file named camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-cql-kafka-connector/target/camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you’ll need to run a Cassandra instance:
docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra Next, check and make sure Cassandra is running:
docker exec -ti master_node /opt/cassandra/bin/nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1 To populate the database using to the cqlsh tool, you’ll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with LOCAL_CASSANDRA_HOME. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker.
<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node) Next, execute the following script to create keyspace test, the table users and insert one row into it.
create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit; In the configuration .properties file we use below the IP address of the Cassandra master node needs to be configured, replace the value 172.17.0.2 in the camel.source.url or localhost in camel.sink.url configuration property with the IP of the master node obtained from Docker. Each example uses a different .properties file shown in the command line to run the example.
docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node Apache Cassandra (source)
This example polls Cassandra via CSQL (select * from users) in the test keyspace and transfers the result to the mytopic Kafka topic.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelCassandraQLSourceConnector.properties Apache Cassandra (sink)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-cql-kafka-connector/target/ a .zip file named camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-cql-kafka-connector/target/camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip This example adds data to the users table in Cassandra from the data consumed from the mytopic Kafka topic. Notice how the name column is populated from the Kafka message using CQL command insert into users….
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelCassandraQLSinkConnector.properties Elasticsearch (sink)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-elasticsearch-rest-kafka-connector/target/ a .zip file named camel-elasticsearch-rest-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-elasticsearch-rest-kafka-connector/target/camel-elasticsearch-rest-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-elasticsearch-rest-kafka-connector-0.6.0-SNAPSHOT-package.zip This example passes data from mytopic Kafka topic to sampleIndexName index in Elasticsearch. Adjust properties in docs/examples/CamelElasticSearchSinkConnector.properties to reflect your environment, for example change the hostAddresses to a valid Elasticsearch instance hostname and port.
For the index operation, it might be necessary to provide or implement a transformer. A sample configuration would be similar to the one below:
transforms=ElasticSearchTransformer This is the sample Transformer used in the integration test code that transforms Kafka’s ConnectRecord to a Map:
transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.elasticsearch.sink.transforms.ConnectRecordValueToMapTransformer This is a configuration for the sample transformer that defines the key used in the map:
transforms.ElasticSearchTransformer.key=MyKey When the configuration is ready run the sink with:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelElasticSearchSinkConnector.properties File (sink)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-file-kafka-connector/target/ a .zip file named camel-file-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-file-kafka-connector/target/camel-file-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-file-kafka-connector-0.6.0-SNAPSHOT-package.zip This example appends data from mytopic Kafka topic to a file in /tmp/kafkaconnect.txt.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelFileSinkConnector.properties HTTP (sink)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-http-kafka-connector/target/ a .zip file named camel-http-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-http-kafka-connector/target/camel-http-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-http-kafka-connector-0.6.0-SNAPSHOT-package.zip This example sends data from mytopic Kafka topic to a HTTP service. Adjust properties in docs/examples/CamelHttpSinkConnector.properties for your environment, for example configuring the camel.sink.url.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelHttpSinkConnector.properties JMS (source)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-sjms2-kafka-connector/target/ a .zip file named camel-sjsm2-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-sjsm2-kafka-connector/target/camel-sjms2-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-sjsm2-kafka-connector-0.6.0-SNAPSHOT-package.zip These are the basic connectors. For camel-sjms2 we have a bunch of provided dependencies we need to add in our path, so run the following commands (note that this is not needed from 0.7.0 onward for ActiveMQ and Artemis JMS clients, as their dependecies are packaged along with the SJMS2 connector):
> cd /home/connectors/camel-sjms2-kafka-connector
> wget https://repo1.maven.org/maven2/org/apache/activemq/activemq-client/5.15.11/activemq-client-5.15.11.jar
> wget https://repo1.maven.org/maven2/org/apache/geronimo/specs/geronimo-jms_2.0_spec/1.0-alpha-2/geronimo-jms_2.0_spec-1.0-alpha-2.jar
> wget https://repo1.maven.org/maven2/org/apache/geronimo/specs/geronimo-annotation_1.0_spec/1.1.1/geronimo-annotation_1.0_spec-1.1.1.jar
> wget https://repo1.maven.org/maven2/javax/management/j2ee/management-api/1.1-rev-1/management-api-1.1-rev-1.jar
> wget https://repo1.maven.org/maven2/org/fusesource/hawtbuf/hawtbuf/1.11/hawtbuf-1.11.jar This example receives messages from a JMS queue named myqueue and transfers them to mytopic Kafka topic. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616. Adjust properties in examples/CamelJmsSourceConnector.properties for your environment, for example configuring username and password by setting camel.component.sjms2.connection-factory.userName=yourusername and camel.component.sjms2.connection-factory.password=yourpassword or change the camel.component.sjms2.connection-factory and camel.component.sjms2.connection-factory.brokerURL to reflect your JMS implementation and URL.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelJmsSourceConnector.properties JMS (sink)
This example receives messages from mytopic Kafka topic and transfers them to JMS queue named myqueue. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616. You can adjust properties in examples/CamelJmsSinkConnector.properties for your environment, for example configure username and password by adding camel.component.sjms2.connection-factory.userName=yourusername and camel.component.sjms2.connection-factory.password=yourpassword or change the camel.component.sjms2.connection-factory and camel.component.sjms2.connection-factory.brokerURL to reflect your JMS implementation and URL.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelJmsSinkConnector.properties Telegram (source)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-telegram-kafka-connector/target/ a .zip file named camel-telegram-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-telegram-kafka-connector/target/camel-telegram-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-telegram-kafka-connector-0.6.0-SNAPSHOT-package.zip This example transfers messages sent to Telegram bot to the mytopic Kafka topic. Adjust to set telegram bot token in examples/CamelTelegramSourceConnector.properties to reflect your bot’s token.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelTelegramSourceConnector.properties