Running an Pipe

The Pipe is a concept that enable the user to cerate a "composable" Event Driven Architecture design. The Pipe can bind source and sink endpoints where an endpoint represents a source/sink external entity (could be any Camel URI or a Kubernetes resource such as Kamelets, Kafka (Strimzi) or Knative resources).

make sure you’re familiar with the concept of Kamelet before continuing.

The operator is in charge to transform a binding between a source and a sink and transform into a running Integration taking care to do all the building involved and the transformation required.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: timer-to-log
spec:
  sink:
    uri: log:bar
  source:
    uri: timer:foo

The above example is the simplest example we can use to show how to "connect" a Camel URI source to a Camel URI sink. You can run it executing kubectl apply -f timer-to-log.yaml. Once executed, you can check the status of your Pipe:

kubectl get pipe -w

NAME           PHASE      REPLICAS
timer-to-log   Creating
timer-to-log   Ready      0
timer-to-log   Ready      1

The operator has taken the Pipe and has created an Integration from the Pipe configuration. The Integration is the resource that will run your final application and you can look at it accordingly:

NAME             PHASE     READY   RUNTIME PROVIDER   RUNTIME VERSION   CATALOG VERSION   KIT                        REPLICAS
timer-to-log     Running   True    quarkus            3.8.1             3.8.1             kit-crbgrhmn5tgc73cb1tl0   1

Sources, Sinks and Actions

The development of a Pipe should be limiting the binding between a source and a sink. However sometimes you may need to perform slight transformation when consuming the events. In such case you can include a set of actions that will take care of that.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: timer-to-log
spec:
  sink:
    uri: log:bar
  source:
    uri: timer:foo
  steps:
  - uri: https://gist.githubusercontent.com/squakez/48b4ebf24c2579caf6bcb3e8a59fa509/raw/c7d9db6ee5e8851f5dc6a564172d85f00d87219c/gistfile1.txt

In the example above we’re making sure to call an intermediate resource in order to fill the content with some value. This action is configured in the .spec.steps parameter.

Traits configuration

Although this should not be necessarily required (the operator do all the required configuration for you), you can tune your Pipe with traits configuration adding .metadata.annotations. Let’s have a look at the following example:

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: timer-2-log-annotation
  annotations: (1)
    trait.camel.apache.org/logging.level: DEBUG
    trait.camel.apache.org/logging.color: "false"
spec:
  source:
    uri: timer:foo
  sink:
    uri: log:bar
1 Include .metadata.annotations to specify the list of traits we want to configure

In this example, we’ve set the logging trait to specify certain configuration we want to apply. You can do the same with all the traits available, just by setting trait.camel.apache.org/trait-name.trait-property with the expected value.

if you need to specify an array of values, the syntax will be trait.camel.apache.org/trait.conf: "[\"opt1\", \"opt2\", …​]"

Using Kamel CLI

Camel K works very well with any Kubernetes compatible user interface (such as CLI as kubectl, oc or any other visual tooling). However we do provide a simple CLI that helps you performing most of the Pipe works in an easier fashion: it’s kamel CLI.

Differences with Integrations

The simples examples above may make you wonder which are the differences between a Pipe and an Integration. The Integration is meant for any generic Camel workload where you have complex business logic to perform, whereas the Pipe are more useful when you have events and you want to emit or consume such events in an connector style approach.

Most of the time you will have consumer applications (one Pipe) which are consuming events from a topic (Kafka, Kamelet or Knative) and producer applications (another Pipe) producing to a topic.

Camel K operator will allow you to use directly Kafka (Strimzi) and Knative endpoints custom resources.

More advanced examples

Here some other examples involving Kamelets, Knative and Kafka.

Binding Kamelets

One development that emerges is the Connector development. You can consider a Kamelet as a connector endpoint, therefore binding together source and sink Kamelets to perform some logic. In this one, for instance, we’re moving data from an AWS Kinesis source to a PostgreSQL database.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: from-kinesis-to-pgdb
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: aws-kinesis-source
    properties:
      region: my-region
      stream: my-stream
  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: postgresql-sink
    properties:
      databaseName: my-db
      password: my-pwd
      query: INSERT INTO accounts (username,city) VALUES (:#username,:#city)
      serverName: localhost
      username: my-usr

Binding to Kafka topics

Another typical use case is consume/produce events directly from a KafkaTopic custom resource (managed by Strimzi operator):

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: beer-event-source
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: beer-source
    properties:
      period: 5000
  sink:
    ref:
      kind: KafkaTopic
      apiVersion: kafka.strimzi.io/v1beta1
      name: beer-events
the Strimzi operator is required to be installed and a KafkaTopic configured.

Binding to Knative resources

A Pipe allows to move data from a system described by a Kamelet towards a Knative destination, or from a Knative channel/broker to another external system described by a Kamelet. This means Pipes may act as event sources and sinks for the Knative eventing broker in a declarative way.

all examples require Knative operator installed and the related resources configured as well.

For example, here is a Pipe that connects a Kamelet Telegram source to the Knative broker:

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-to-knative
spec:
  source: (1)
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink: (2)
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
1 Reference to the source that provides data
2 Reference to the sink where data should be sent to

This binding takes the telegram-text-source Kamelet, configures it using specific properties ("botToken") and makes sure that messages produced by the Kamelet are forwarded to the Knative Broker named "default". Note that source and sink are specified as standard Kubernetes object references in a declarative way. Knative eventing uses the CloudEvents data format by default. You may want to set some properties that specify the event attributes such as the event type.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-to-knative
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink:
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
    properties:
      type: org.apache.camel.telegram.events (1)
1 Sets the event type attribute of the CloudEvent produced by this Pipe

This way you may specify event attributes before publishing to the Knative broker. Note that Camel uses a default CloudEvents event type org.apache.camel.event for events produced by Camel. You can overwrite CloudEvent event attributes on the sink using the ce.overwrite. prefix when setting a property.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-to-knative
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink:
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
    properties:
      type: org.apache.camel.telegram.events
      ce.overwrite.ce-source: my-source (1)
1 Use "ce.overwrite.ce-source" to explicitly set the CloudEvents source attribute.

The example shows how we can reference the "telegram-text-source" resource in a Pipe. It’s contained in the source section because it’s a Kamelet of type "source". A Kamelet of type "sink", by contrast, can only be used in the sink section of a Pipe.

Under the covers, a Pipe creates an Integration resource that implements the binding, but all details of how to connect with Telegram forwarding the data to the Knative broker is fully transparent to the end user. For instance the Integration uses a SinkBinding concept under the covers in order to retrieve the Knative broker endpoint URL.

In the same way you can also connect a Kamelet source to a Knative channel.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-to-knative-channel
spec:
  source: (1)
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink: (2)
    ref:
      kind: InMemoryChannel
      apiVersion: messaging.knative.dev/v1
      name: messages
1 Reference to the source that provides data
2 Reference to the Knative channel that acts as the sink where data should be sent to

When reading data from Knative you just need to specify for instance the Knative broker as a source in the Pipe. Events consumed from Knative event stream will be pushed to the given sink of the Pipe.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: knative-to-slack
spec:
  source: (1)
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
    properties:
      type: org.apache.camel.event.messages
  sink: (2)
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: slack-sink
    properties:
      channel: "#my-channel"
      webhookUrl: the-webhook-url
1 Reference to the Knative broker source that provides data
2 Reference to the sink where data should be sent to

Once again, the Pipe provides a declarative way of creating event sources and sinks for Knative eventing. In the example, all events of type org.apache.camel.event.messages get forwarded to the given Slack channel using the Webhook API.

When consuming events from the Knative broker you most likely need to filter and select the events to process. You can do that with the properties set on the Knative broker source reference, for instance filtering by the even type as shown in the example. The filter possibilities include CloudEvent attributes such as event type, source, subject and extensions.

In the background Camel K will automatically create a Knative Trigger resource for the Pipe that uses the filter attributes accordingly.

Sample trigger created by Camel K
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: camel-event-messages
spec:
  broker: default (1)
  filter:
    attributes:
      type: org.apache.camel.event.messages
      myextension: my-extension-value
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1 (2)
      kind: Service
      name: camel-service
    uri: /events/camel.event.messages
1 Reference to the Knative broker source that provides data
2 Reference to the Camel K integration/pipe service

The trigger calls the Camel K integration service endpoint URL and pushes events with the given filter attributes to the Pipe. All properties that you have set on the Knative broker source reference will be set as a filter attribute on the trigger resource (except for reserved properties such as name and cloudEventsType).

Note that Camel K creates the trigger resource only for Knative broker type event sources. In case you reference a Knative channel as a source in a Pipe Camel K assumes that the channel and the trigger are already present. Camel K will only create the subscription for the integration service on the channel.

Binding to an explicit URI

An alternative way to use a Pipe is to configure the source/sink to be an explicit Camel URI. For example, the following binding is allowed:

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-text-source-to-channel
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink:
    uri: https://mycompany.com/the-service (1)
1 Pipe with explicitly URI

This Pipe explicitly defines an URI where data is going to be pushed.

the uri option is also conventionally used in Knative to specify a non-kubernetes destination. To comply with the Knative specifications, in case an "http" or "https" URI is used, Camel will send CloudEvents to the destination.

Binding with data types

When referencing Kamelets in a binding users may choose from one of the supported input/output data types provided by the Kamelet. The supported data types are declared on the Kamelet itself and give additional information about used header names, content type and content schema.

my-sample-source-to-log.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: my-sample-source-to-log
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: my-sample-source
    data-types: (1)
      out:
        format: text-plain (2)
  sink:
    uri: "log:info"
1 Specify the output data type on the referenced Kamelet source.
2 Select text-plain as an output data type of the my-sample-source Kamelet.

The very same Kamelet my-sample-source may also provide a CloudEvents specific data type as an output which fits perfect for binding to a Knative broker.

my-sample-source-to-knative.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: my-sample-source-to-knative
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: my-sample-source
    data-types:
      out:
        format: application-cloud-events (1)
  sink:
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
1 Select application-cloud-events as an output data type of the my-sample-source Kamelet.

Information about the supported data types can be found on the Kamelet itself.

my-sample-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: my-sample-source
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  definition:
# ...
  dataTypes:
    out: (1)
      default: text-plain (2)
      types: (3)
        text-plain:
          description: Output type as plain text.
          mediaType: text/plain
        application-cloud-events:
          description: CloudEvents specific representation of the Kamelet output.
          mediaType: application/cloudevents+json
          schema: (4)
            # ...
          dependencies: (5)
            - "camel:cloudevents"

  template:
    from:
      uri: ...
      steps:
        - to: "kamelet:sink"
1 Declared output data types of this Kamelet source
2 The output data type used by default
3 List of supported output types
4 Optional Json schema describing the application/cloudevents+json data type
5 Optional list of additional dependencies that are required by the data type.

This way users may choose the best Kamelet data type for a specific use case when referencing Kamelets in a binding.

KEDA enabled Pipes

Some Kamelets are enhanced with KEDA metadata to allow users to automatically configure autoscalers on them. Kamelets with KEDA features can be distinguished by the presence of the annotation camel.apache.org/keda.type, which is set to the name of a specific KEDA autoscaler.

this feature is in an experimental phase.

A KEDA enabled Kamelet can be used in the same way as any other Kamelet, in a Pipe or in an Integration. KEDA autoscalers are not enabled by default: they need to be manually enabled by the user via the keda trait.

KEDA operator is required to run on the cluster.

In a Pipe, the KEDA trait can be enabled using annotations:

my-keda-binding.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: my-keda-binding
  annotations:
    trait.camel.apache.org/keda.enabled: "true"
spec:
  source:
  # ...
  sink:
  # ...

In an integration, it can be enabled using kamel run args, for example:

kamel run my-keda-integration.yaml -t keda.enabled=true
Make sure that the my-keda-integration uses at least one KEDA enabled Kamelet, otherwise enabling KEDA (without other options) will have no effect.

For information on how to create KEDA enabled Kamelets, see the KEDA section in the development guide.