Running a Pipe
The Pipe is a concept that allows you to create a "composable" Event Driven Architecture design. The Pipe can bind source and sink endpoints where an endpoint represents a source/sink external entity (could be any Camel URI or a Kubernetes resource such as Kamelets, Kafka (Strimzi) or Knative resources).
| make sure you’re familiar with the concept of Kamelet before continuing. | 
The operator is in charge to transform a binding between a source and a sink and transform into a running Integration taking care to do all the building involved and the transformation required.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: timer-to-log
spec:
  sink:
    uri: log:bar
  source:
    uri: timer:fooThe above example is the simplest example we can use to show how to "connect" a Camel URI source to a Camel URI sink. You can run it executing kubectl apply -f timer-to-log.yaml. Once executed, you can check the status of your Pipe:
kubectl get pipe -w
NAME           PHASE      REPLICAS
timer-to-log   Creating
timer-to-log   Ready      0
timer-to-log   Ready      1The operator has taken the Pipe and has created an Integration from the Pipe configuration. The Integration is the resource that will run your final application and you can look at it accordingly:
NAME             PHASE     READY   RUNTIME PROVIDER   RUNTIME VERSION   CATALOG VERSION   KIT                        REPLICAS
timer-to-log     Running   True    quarkus            3.8.1             3.8.1             kit-crbgrhmn5tgc73cb1tl0   1Sources, Sinks and Actions
The development of a Pipe should be limiting the binding between a source and a sink. However sometimes you may need to perform slight transformation when consuming the events. In such case you can include a set of actions that will take care of that.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: timer-to-log
spec:
  sink:
    uri: log:bar
  source:
    uri: timer:foo
  steps:
  - uri: https://gist.githubusercontent.com/squakez/48b4ebf24c2579caf6bcb3e8a59fa509/raw/c7d9db6ee5e8851f5dc6a564172d85f00d87219c/gistfile1.txtIn the example above we’re making sure to call an intermediate resource in order to fill the content with some value. This action is configured in the .spec.steps parameter.
Traits configuration
Although this should not be necessarily required (the operator do all the required configuration for you), you can tune your Pipe with traits configuration adding .metadata.annotations. Let’s have a look at the following example:
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: timer-2-log-annotation
  annotations: (1)
    trait.camel.apache.org/logging.level: DEBUG
    trait.camel.apache.org/logging.color: "false"
spec:
  source:
    uri: timer:foo
  sink:
    uri: log:bar| 1 | Include .metadata.annotationsto specify the list of traits we want to configure | 
In this example, we’ve set the logging trait to specify certain configuration we want to apply. You can do the same with all the traits available, just by setting trait.camel.apache.org/trait-name.trait-property with the expected value.
| If you need to specify an array of values, the syntax will be  | 
Using Kamel CLI
Camel K works very well with any Kubernetes compatible user interface (such as CLI as kubectl, oc or any other visual tooling). However we do provide a simple CLI that helps you performing most of the Pipe works in an easier fashion: it’s kamel CLI.
Differences with Integrations
The simples examples above may make you wonder which are the differences between a Pipe and an Integration. The Integration is meant for any generic Camel workload where you have complex business logic to perform, whereas the Pipe are more useful when you have events and you want to emit or consume such events in an connector style approach.
Most of the time you will have consumer applications (one Pipe) which are consuming events from a topic (Kafka, Kamelet or Knative) and producer applications (another Pipe) producing to a topic.
| Camel K Operator will allow you to use directly Kafka (Strimzi) and Knative endpoints custom resources. | 
More advanced examples
Here some other examples involving Kamelets, Knative and Kafka.
Binding Kamelets
One development that emerges is the Connector development. You can consider a Kamelet as a connector endpoint, therefore binding together source and sink Kamelets to perform some logic. In this one, for instance, we’re moving data from an AWS Kinesis source to a PostgreSQL database.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: from-kinesis-to-pgdb
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: aws-kinesis-source
    properties:
      region: my-region
      stream: my-stream
  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: postgresql-sink
    properties:
      databaseName: my-db
      password: my-pwd
      query: INSERT INTO accounts (username,city) VALUES (:#username,:#city)
      serverName: localhost
      username: my-usrBinding to Kafka topics
Another typical use case is consume/produce events directly from a KafkaTopic custom resource (managed by Strimzi operator):
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: beer-event-source
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: beer-source
    properties:
      period: 5000
  sink:
    ref:
      kind: KafkaTopic
      apiVersion: kafka.strimzi.io/v1beta1
      name: beer-events| KafkaTopics require the Strimzi operator and a configured KafkaTopic`. | 
Binding to Knative resources
A Pipe allows to move data from a system described by a Kamelet towards a Knative destination, or from a Knative channel/broker to another external system described by a Kamelet. This means Pipes may act as event sources and sinks for the Knative eventing broker in a declarative way.
| All examples require Knative operator installed and the related resources configured as well. | 
For example, here is a Pipe that connects a Kamelet Telegram source to the Knative broker:
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-to-knative
spec:
  source: (1)
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink: (2)
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default| 1 | Reference to the source that provides data | 
| 2 | Reference to the sink where data should be sent to | 
This binding takes the telegram-text-source Kamelet, configures it using specific properties (botToken) and makes sure that messages produced by the Kamelet are forwarded to the Knative Broker named default.
| Source and sink are specified as standard Kubernetes object references in a declarative way. Knative eventing uses the  | 
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-to-knative
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink:
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
    properties:
      type: org.apache.camel.telegram.events (1)| 1 | Sets the event type attribute of the CloudEvent produced by this Pipe | 
This way you may specify event attributes before publishing to the Knative broker.
| Camel uses a default CloudEvents event type  | 
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-to-knative
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink:
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
    properties:
      type: org.apache.camel.telegram.events
      ce.overwrite.ce-source: my-source (1)| 1 | Use "ce.overwrite.ce-source" to explicitly set the CloudEvents source attribute. | 
The example shows how we can reference the "telegram-text-source" resource in a Pipe. It’s contained in the source section because it’s a Kamelet of type source. A Kamelet of type sink, by contrast, can only be used in the sink section of a Pipe.
Under the covers, a Pipe creates an Integration resource that implements the binding, but all details of how to connect with Telegram forwarding the data to the Knative broker is fully transparent to the end user. For instance the Integration uses a SinkBinding concept in order to retrieve the Knative broker endpoint URL.
In the same way you can also connect a Kamelet source to a Knative channel.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-to-knative-channel
spec:
  source: (1)
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink: (2)
    ref:
      kind: InMemoryChannel
      apiVersion: messaging.knative.dev/v1
      name: messages| 1 | Reference to the source that provides data | 
| 2 | Reference to the Knative channel that acts as the sink where data should be sent to | 
When reading data from Knative you just need to specify for instance the Knative broker as a source in the Pipe. Events consumed from Knative event stream will be pushed to the given sink of the Pipe.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: knative-to-slack
spec:
  source: (1)
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
    properties:
      type: org.apache.camel.event.messages
  sink: (2)
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: slack-sink
    properties:
      channel: "#my-channel"
      webhookUrl: the-webhook-url| 1 | Reference to the Knative broker source that provides data | 
| 2 | Reference to the sink where data should be sent to | 
Once again, the Pipe provides a declarative way of creating event sources and sinks for Knative eventing. In the example, all events of type org.apache.camel.event.messages get forwarded to the given Slack channel using the Webhook API.
When consuming events from the Knative broker you most likely need to filter and select the events to process. You can do that with the properties set on the Knative broker source reference, for instance filtering by the even type as shown in the example. The filter possibilities include CloudEvent attributes such as event type, source, subject and extensions.
In the background Camel K will automatically create a Knative Trigger resource for the Pipe that uses the filter attributes accordingly.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: camel-event-messages
spec:
  broker: default (1)
  filter:
    attributes:
      type: org.apache.camel.event.messages
      myextension: my-extension-value
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1 (2)
      kind: Service
      name: camel-service
    uri: /events/camel.event.messages| 1 | Reference to the Knative broker source that provides data | 
| 2 | Reference to the Camel K integration/pipe service | 
The trigger calls the Camel K integration service endpoint URL and pushes events with the given filter attributes to the Pipe. All properties that you have set on the Knative broker source reference will be set as a filter attribute on the trigger resource (except for reserved properties such as name and cloudEventsType).
| Camel K creates the trigger resource only for Knative broker type event sources. In case you reference a Knative channel as a source in a Pipe Camel K assumes that the channel and the trigger are already present. Camel K will only create the subscription for the integration service on the channel. | 
Binding to an explicit URI
An alternative way to use a Pipe is to configure the source/sink to be an explicit Camel URI. For example, the following binding is allowed:
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-text-source-to-channel
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-text-source
    properties:
      botToken: the-token-here
  sink:
    uri: https://mycompany.com/the-service (1)| 1 | Pipe with explicitly URI | 
This Pipe explicitly defines an URI where data is going to be pushed.
| The  | 
Binding to a Service, Integration or Pipe
In general. you can connect any Kubernetes Service or any Camel Integration or Pipe that has a Service associated with it.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: source-to-service
spec:
  source:
    ...
  sink:
    ref:
      apiVersion: v1
      kind: Service
      name: my-svc
      namespace: my-svc-ns
    properties:
      path: /path/to/my/service (optional)The operator translates to the related URL. The same mechanism works using apiVersion:camel.apache.org/v1 and kind:Integration or kind:Pipe types, assuming these Integrations are exposing any kind of ClusterIP Service.
The operator will discover the port to use and you can optionally provide a path property if you need to specify a given endpoint to use.
| This binding is only available for the ClusterIP Service type. | 
Binding with data types
When referencing Kamelets in a binding users may choose from one of the supported input/output data types provided by the Kamelet. The supported data types are declared on the Kamelet itself and give additional information about the header names, content type and content schema in use.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: my-sample-source-to-log
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: my-sample-source
    data-types: (1)
      out:
        format: text-plain (2)
  sink:
    uri: "log:info"| 1 | Specify the output data type on the referenced Kamelet source. | 
| 2 | Select text-plainas an output data type of themy-sample-sourceKamelet. | 
The very same Kamelet my-sample-source may also provide a CloudEvents specific data type as an output which fits perfect for binding to a Knative broker.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: my-sample-source-to-knative
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: my-sample-source
    data-types:
      out:
        format: application-cloud-events (1)
  sink:
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default| 1 | Select application-cloud-eventsas an output data type of themy-sample-sourceKamelet. | 
Information about the supported data types can be found on the Kamelet itself.
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: my-sample-source
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  definition:
# ...
  dataTypes:
    out: (1)
      default: text-plain (2)
      types: (3)
        text-plain:
          description: Output type as plain text.
          mediaType: text/plain
        application-cloud-events:
          description: CloudEvents specific representation of the Kamelet output.
          mediaType: application/cloudevents+json
          schema: (4)
            # ...
          dependencies: (5)
            - "camel:cloudevents"
  template:
    from:
      uri: ...
      steps:
        - to: "kamelet:sink"| 1 | Declared output data types of this Kamelet source | 
| 2 | The output data type used by default | 
| 3 | List of supported output types | 
| 4 | Optional Json schema describing the application/cloudevents+jsondata type | 
| 5 | Optional list of additional dependencies that are required by the data type. | 
This way users may choose the best Kamelet data type for a specific use case when referencing Kamelets in a binding.
KEDA enabled Pipes
Some Kamelets are enhanced with KEDA metadata to allow users to automatically configure autoscalers on them. Kamelets with KEDA features can be distinguished by the presence of the annotation camel.apache.org/keda.type, which is set to the name of a specific KEDA autoscaler.
| KEDA enabled Pipes are currently an experimental feature. | 
A KEDA enabled Kamelet can be used in the same way as any other Kamelet, in a Pipe or in an Integration. KEDA autoscalers are not enabled by default: they need to be manually enabled by the user via the keda trait.
| The KEDA operator is required to run on the cluster. | 
In a Pipe, the KEDA trait can be enabled using annotations:
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: my-keda-integration
  annotations:
    trait.camel.apache.org/keda.enabled: "true"
spec:
  source:
  # ...
  sink:
  # ...In an integration, it can be enabled using kamel run args, for example:
kamel run my-keda-integration.yaml -t keda.enabled=true| Ensure that  | 
For information on how to create KEDA enabled Kamelets, see the KEDA section in the development guide.