Running an Pipe
The Pipe is a concept that enable the user to cerate a "composable" Event Driven Architecture design. The Pipe can bind source and sink endpoints where an endpoint represents a source/sink external entity (could be any Camel URI or a Kubernetes resource such as Kamelets, Kafka (Strimzi) or Knative resources).
make sure you’re familiar with the concept of Kamelet before continuing. |
The operator is in charge to transform a binding between a source and a sink and transform into a running Integration taking care to do all the building involved and the transformation required.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: timer-to-log
spec:
sink:
uri: log:bar
source:
uri: timer:foo
The above example is the simplest example we can use to show how to "connect" a Camel URI source to a Camel URI sink. You can run it executing kubectl apply -f timer-to-log.yaml
. Once executed, you can check the status of your Pipe:
kubectl get pipe -w
NAME PHASE REPLICAS
timer-to-log Creating
timer-to-log Ready 0
timer-to-log Ready 1
The operator has taken the Pipe and has created an Integration from the Pipe configuration. The Integration is the resource that will run your final application and you can look at it accordingly:
NAME PHASE READY RUNTIME PROVIDER RUNTIME VERSION CATALOG VERSION KIT REPLICAS
timer-to-log Running True quarkus 3.8.1 3.8.1 kit-crbgrhmn5tgc73cb1tl0 1
Sources, Sinks and Actions
The development of a Pipe should be limiting the binding between a source and a sink. However sometimes you may need to perform slight transformation when consuming the events. In such case you can include a set of actions that will take care of that.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: timer-to-log
spec:
sink:
uri: log:bar
source:
uri: timer:foo
steps:
- uri: https://gist.githubusercontent.com/squakez/48b4ebf24c2579caf6bcb3e8a59fa509/raw/c7d9db6ee5e8851f5dc6a564172d85f00d87219c/gistfile1.txt
In the example above we’re making sure to call an intermediate resource in order to fill the content with some value. This action is configured in the .spec.steps
parameter.
Traits configuration
Although this should not be necessarily required (the operator do all the required configuration for you), you can tune your Pipe
with traits configuration adding .metadata.annotations
. Let’s have a look at the following example:
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: timer-2-log-annotation
annotations: (1)
trait.camel.apache.org/logging.level: DEBUG
trait.camel.apache.org/logging.color: "false"
spec:
source:
uri: timer:foo
sink:
uri: log:bar
1 | Include .metadata.annotations to specify the list of traits we want to configure |
In this example, we’ve set the logging
trait to specify certain configuration we want to apply. You can do the same with all the traits available, just by setting trait.camel.apache.org/trait-name.trait-property
with the expected value.
if you need to specify an array of values, the syntax will be trait.camel.apache.org/trait.conf: "[\"opt1\", \"opt2\", …]" |
Using Kamel CLI
Camel K works very well with any Kubernetes compatible user interface (such as CLI as kubectl
, oc
or any other visual tooling). However we do provide a simple CLI that helps you performing most of the Pipe works in an easier fashion: it’s kamel
CLI.
Differences with Integrations
The simples examples above may make you wonder which are the differences between a Pipe and an Integration. The Integration is meant for any generic Camel workload where you have complex business logic to perform, whereas the Pipe are more useful when you have events and you want to emit or consume such events in an connector style approach.
Most of the time you will have consumer applications (one Pipe) which are consuming events from a topic (Kafka, Kamelet or Knative) and producer applications (another Pipe) producing to a topic.
Camel K operator will allow you to use directly Kafka (Strimzi) and Knative endpoints custom resources. |
More advanced examples
Here some other examples involving Kamelets, Knative and Kafka.
Binding Kamelets
One development that emerges is the Connector development. You can consider a Kamelet as a connector endpoint, therefore binding together source and sink Kamelets to perform some logic. In this one, for instance, we’re moving data from an AWS Kinesis source to a PostgreSQL database.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: from-kinesis-to-pgdb
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: aws-kinesis-source
properties:
region: my-region
stream: my-stream
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: postgresql-sink
properties:
databaseName: my-db
password: my-pwd
query: INSERT INTO accounts (username,city) VALUES (:#username,:#city)
serverName: localhost
username: my-usr
Binding to Kafka topics
Another typical use case is consume/produce events directly from a KafkaTopic custom resource (managed by Strimzi operator):
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: beer-event-source
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: beer-source
properties:
period: 5000
sink:
ref:
kind: KafkaTopic
apiVersion: kafka.strimzi.io/v1beta1
name: beer-events
the Strimzi operator is required to be installed and a KafkaTopic configured. |
Binding to Knative resources
A Pipe allows to move data from a system described by a Kamelet towards a Knative destination, or from a Knative channel/broker to another external system described by a Kamelet. This means Pipes may act as event sources and sinks for the Knative eventing broker in a declarative way.
all examples require Knative operator installed and the related resources configured as well. |
For example, here is a Pipe that connects a Kamelet Telegram source to the Knative broker:
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-to-knative
spec:
source: (1)
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: telegram-text-source
properties:
botToken: the-token-here
sink: (2)
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
1 | Reference to the source that provides data |
2 | Reference to the sink where data should be sent to |
This binding takes the telegram-text-source
Kamelet, configures it using specific properties ("botToken") and makes sure that messages produced by the Kamelet are forwarded to the Knative Broker named "default". Note that source and sink are specified as standard Kubernetes object references in a declarative way. Knative eventing uses the CloudEvents data format by default. You may want to set some properties that specify the event attributes such as the event type.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-to-knative
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: telegram-text-source
properties:
botToken: the-token-here
sink:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
properties:
type: org.apache.camel.telegram.events (1)
1 | Sets the event type attribute of the CloudEvent produced by this Pipe |
This way you may specify event attributes before publishing to the Knative broker. Note that Camel uses a default CloudEvents event type org.apache.camel.event
for events produced by Camel. You can overwrite CloudEvent event attributes on the sink using the ce.overwrite.
prefix when setting a property.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-to-knative
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: telegram-text-source
properties:
botToken: the-token-here
sink:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
properties:
type: org.apache.camel.telegram.events
ce.overwrite.ce-source: my-source (1)
1 | Use "ce.overwrite.ce-source" to explicitly set the CloudEvents source attribute. |
The example shows how we can reference the "telegram-text-source" resource in a Pipe. It’s contained in the source
section because it’s a Kamelet of type "source". A Kamelet of type "sink", by contrast, can only be used in the sink
section of a Pipe
.
Under the covers, a Pipe creates an Integration resource that implements the binding, but all details of how to connect with Telegram forwarding the data to the Knative broker is fully transparent to the end user. For instance the Integration uses a SinkBinding
concept under the covers in order to retrieve the Knative broker endpoint URL.
In the same way you can also connect a Kamelet source to a Knative channel.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-to-knative-channel
spec:
source: (1)
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: telegram-text-source
properties:
botToken: the-token-here
sink: (2)
ref:
kind: InMemoryChannel
apiVersion: messaging.knative.dev/v1
name: messages
1 | Reference to the source that provides data |
2 | Reference to the Knative channel that acts as the sink where data should be sent to |
When reading data from Knative you just need to specify for instance the Knative broker as a source in the Pipe. Events consumed from Knative event stream will be pushed to the given sink of the Pipe.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: knative-to-slack
spec:
source: (1)
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
properties:
type: org.apache.camel.event.messages
sink: (2)
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: slack-sink
properties:
channel: "#my-channel"
webhookUrl: the-webhook-url
1 | Reference to the Knative broker source that provides data |
2 | Reference to the sink where data should be sent to |
Once again, the Pipe provides a declarative way of creating event sources and sinks for Knative eventing. In the example, all events of type org.apache.camel.event.messages
get forwarded to the given Slack channel using the Webhook API.
When consuming events from the Knative broker you most likely need to filter and select the events to process. You can do that with the properties set on the Knative broker source reference, for instance filtering by the even type as shown in the example. The filter possibilities include CloudEvent attributes such as event type, source, subject and extensions.
In the background Camel K will automatically create a Knative Trigger resource for the Pipe that uses the filter attributes accordingly.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: camel-event-messages
spec:
broker: default (1)
filter:
attributes:
type: org.apache.camel.event.messages
myextension: my-extension-value
subscriber:
ref:
apiVersion: serving.knative.dev/v1 (2)
kind: Service
name: camel-service
uri: /events/camel.event.messages
1 | Reference to the Knative broker source that provides data |
2 | Reference to the Camel K integration/pipe service |
The trigger calls the Camel K integration service endpoint URL and pushes events with the given filter attributes to the Pipe. All properties that you have set on the Knative broker source reference will be set as a filter attribute on the trigger resource (except for reserved properties such as name
and cloudEventsType
).
Note that Camel K creates the trigger resource only for Knative broker type event sources. In case you reference a Knative channel as a source in a Pipe Camel K assumes that the channel and the trigger are already present. Camel K will only create the subscription for the integration service on the channel.
Binding to an explicit URI
An alternative way to use a Pipe is to configure the source/sink to be an explicit Camel URI. For example, the following binding is allowed:
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-text-source-to-channel
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: telegram-text-source
properties:
botToken: the-token-here
sink:
uri: https://mycompany.com/the-service (1)
1 | Pipe with explicitly URI |
This Pipe explicitly defines an URI where data is going to be pushed.
the uri option is also conventionally used in Knative to specify a non-kubernetes destination. To comply with the Knative specifications, in case an "http" or "https" URI is used, Camel will send CloudEvents to the destination. |
Binding with data types
When referencing Kamelets in a binding users may choose from one of the supported input/output data types provided by the Kamelet. The supported data types are declared on the Kamelet itself and give additional information about used header names, content type and content schema.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: my-sample-source-to-log
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: my-sample-source
data-types: (1)
out:
format: text-plain (2)
sink:
uri: "log:info"
1 | Specify the output data type on the referenced Kamelet source. |
2 | Select text-plain as an output data type of the my-sample-source Kamelet. |
The very same Kamelet my-sample-source
may also provide a CloudEvents specific data type as an output which fits perfect for binding to a Knative broker.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: my-sample-source-to-knative
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: my-sample-source
data-types:
out:
format: application-cloud-events (1)
sink:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
1 | Select application-cloud-events as an output data type of the my-sample-source Kamelet. |
Information about the supported data types can be found on the Kamelet itself.
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
name: my-sample-source
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition:
# ...
dataTypes:
out: (1)
default: text-plain (2)
types: (3)
text-plain:
description: Output type as plain text.
mediaType: text/plain
application-cloud-events:
description: CloudEvents specific representation of the Kamelet output.
mediaType: application/cloudevents+json
schema: (4)
# ...
dependencies: (5)
- "camel:cloudevents"
template:
from:
uri: ...
steps:
- to: "kamelet:sink"
1 | Declared output data types of this Kamelet source |
2 | The output data type used by default |
3 | List of supported output types |
4 | Optional Json schema describing the application/cloudevents+json data type |
5 | Optional list of additional dependencies that are required by the data type. |
This way users may choose the best Kamelet data type for a specific use case when referencing Kamelets in a binding.
KEDA enabled Pipes
Some Kamelets are enhanced with KEDA metadata to allow users to automatically configure autoscalers on them. Kamelets with KEDA features can be distinguished by the presence of the annotation camel.apache.org/keda.type
, which is set to the name of a specific KEDA autoscaler.
this feature is in an experimental phase. |
A KEDA enabled Kamelet can be used in the same way as any other Kamelet, in a Pipe or in an Integration. KEDA autoscalers are not enabled by default: they need to be manually enabled by the user via the keda
trait.
KEDA operator is required to run on the cluster. |
In a Pipe, the KEDA trait can be enabled using annotations:
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: my-keda-binding
annotations:
trait.camel.apache.org/keda.enabled: "true"
spec:
source:
# ...
sink:
# ...
In an integration, it can be enabled using kamel run
args, for example:
kamel run my-keda-integration.yaml -t keda.enabled=true
Make sure that the my-keda-integration uses at least one KEDA enabled Kamelet, otherwise enabling KEDA (without other options) will have no effect. |
For information on how to create KEDA enabled Kamelets, see the KEDA section in the development guide.