Salesforce - Streaming and Pub/Sub
Pub/Sub API
The Pub/Sub API allows you to publish and subscribe to platform events, including real-time event monitoring events, and change data capture events. This API is based on gRPC and HTTP/2, and event payloads are delivered in Apache Avro format.
Publishing Events
The URI format for publishing events is:
salesforce:pubSubPublish:<topic_name>
For example:
.to("salesforce:pubsubPublish:/event/MyCustomPlatformEvent__e") Publish an Event
pubSubPublish
| Parameter | Type | Description | Default | Required |
|---|---|---|---|---|
Body |
| Event payloads to be published |
Because The Pub/Sub API requires that event payloads be serialized in Apache Avro format, Camel will attempt to serialize event payloads from the following input types:
-
Avro
GenericRecord. Camel fetches the Avro schema in order to serializeGenericRecordinstances. This option doesn’t require ahead-of-time generation of Event classes. -
Avro
SpecificRecord. Subclasses ofSpecificRecordcontain properties that are specific to an event type. The maven plugin can generate the subclasses automatically. -
POJO. Camel fetches the Avro schema in order to serialize POJO instances. The POJO’s field names must match event field names exactly, including case.
-
String. Camel will treat theStringvalue as JSON and serialize to Avro. Note that the JSON value does not have to be Avro-encoded JSON. It can be arbitrary JSON, but it must be serializable to Avro based on the Schema associated with the topic you’re publishing to. The JSON object’s field names must match event field names exactly, including case. -
byte[]. Camel will not perform any serialization. Value must be the Avro-encoded event payload. -
com.salesforce.eventbus.protobuf.ProducerEvent. Providing aProducerEventallows full control, e.g., setting theidproperty, which can be tied back to thePublishResult.CorrelationKey.
Output
Type: List<org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult>
The order of the items in the returned List correlates to the order of the items in the input List.
Subscribing
The URI format for subscribing to a Pub/Sub topic is:
salesforce:pubSubSubscribe:<topic_name>
For example:
from("salesforce:pubSubSubscribe:/event/BatchApexErrorEvent") | Parameter | Type | Description | Default | Required |
|---|---|---|---|---|
|
| Values: |
| |
|
| When | ||
| int | Max number of events to receive at a time. Values >100 will be normalized to 100 by salesforce. | 100 | X |
|
| Values: |
| X |
| Fully qualified class name to deserialize Pub/Sub API event to. | If |
Output
Type: Determined by the pubSubDeserializeType option.
Headers: CamelSalesforcePubSubReplayId
Streaming API
The Streaming API enables streaming of events using push technology and provides a subscription mechanism for receiving events in near real time. The Streaming API subscription mechanism supports multiple types of events, including PushTopic events, generic events, platform events, and Change Data Capture events.
Push Topics
The URI format for consuming Push Topics is:
salesforce:subscribe:<topic_name>[?options]
To create and subscribe to a topic
-
Java
-
XML
-
YAML
from("salesforce:subscribe:CamelTestTopic?notifyForFields=ALL¬ifyForOperations=ALL&sObjectName=Merchandise__c&updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c")
.to("..."); <route>
<from uri="salesforce:subscribe:CamelTestTopic?notifyForFields=ALL&notifyForOperations=ALL&sObjectName=Merchandise__c&updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c"/>
<to uri="..."/>
</route> - route:
from:
uri: salesforce:subscribe:CamelTestTopic
parameters:
notifyForFields: ALL
notifyForOperations: ALL
sObjectName: Merchandise__c
updateTopic: true
sObjectQuery: "SELECT Id, Name FROM Merchandise__c"
steps:
- to:
uri: "..." To subscribe to an existing topic
-
Java
-
XML
-
YAML
from("salesforce:subscribe:CamelTestTopic?sObjectName=Merchandise__c")
.to("..."); <route>
<from uri="salesforce:subscribe:CamelTestTopic?sObjectName=Merchandise__c"/>
<to uri="..."/>
</route> - route:
from:
uri: salesforce:subscribe:CamelTestTopic
parameters:
sObjectName: Merchandise__c
steps:
- to:
uri: "..." | Parameter | Type | Description | Default | Required |
|---|---|---|---|---|
|
| SObject to monitor | x | |
|
| SOQL query used to create Push Topic | Required for creating new topics | |
|
| Whether to update an existing Push Topic if exists | false | |
|
| Specifies how the record is evaluated against the PushTopic query. | Referenced | |
|
| Whether a create operation should generate a notification. | false | |
|
| Whether a delete operation should generate a notification. | false | |
|
| Whether an undelete operation should generate a notification. | false | |
|
| Whether an update operation should generate a notification. | false | |
|
| Whether an update operation should generate a notification. Only for use in API version < 29.0 | All | |
|
| The replayId value to use when subscribing. | ||
|
| Default replayId setting if no value is found in initialReplayIdMap. | -1 | |
|
| ReplayId to fall back to after an Invalid Replay Id response. | -1 |
Parallel processing received events
You can turn on consumerWorkerPoolEnabled=true on the salesforce endpoint to let Camel use a thread-pool to process the received events, which allows to process these events in parallel.
| It has been reported that when receiving from PUSH_TOPIC events and then later sending a message to salesforce (via producer) via the same thread could cause Salesforce to block. Enabling the worker pool can help with this. See more in CAMEL-22332. |
Output
Type: Class passed via sObjectName parameter
Platform Events
To emit a platform event use the createSObject operation, passing an instance of a platform event, e.g. Order_Event__e.
The URI format for consuming platform events is:
salesforce:subscribe:event/<event_name>
For example, to receive platform events use for the event type Order_Event__e:
-
Java
-
XML
-
YAML
from("salesforce:subscribe:event/Order_Event__e")
.to("..."); <route>
<from uri="salesforce:subscribe:event/Order_Event__e"/>
<to uri="..."/>
</route> - route:
from:
uri: salesforce:subscribe:event/Order_Event__e
steps:
- to:
uri: "..." | Parameter | Type | Description | Default | Required |
|---|---|---|---|---|
|
| If false, operation returns a | false | |
|
| The replayId value to use when subscribing. | ||
|
| Default replayId setting if no value is found in initialReplayIdMap. | -1 | |
|
| ReplayId to fall back to after an Invalid Replay Id response. | -1 |
Output
Type: PlatformEvent or org.cometd.bayeux.Message
Change Data Capture Events
Change Data Capture (CDC) allows you to receive near-real-time changes of Salesforce records, and synchronize corresponding records in an external data store. Change Data Capture publishes change events, which represent changes to Salesforce records. Changes include the creation of a new record, updates to an existing record, deletion of a record, and undeletion of a record.
The URI format to consume CDC events is as follows:
All Selected Entities
salesforce:subscribe:data/ChangeEvents
Standard Objects
salesforce:subscribe:data/<Standard_Object_Name>ChangeEvent
Custom Objects
salesforce:subscribe:data/<Custom_Object_Name>__ChangeEvent
Here are a few examples
-
Java
-
XML
-
YAML
from("salesforce:subscribe:data/ChangeEvents?replayId=-1")
.log("being notified of all change events");
from("salesforce:subscribe:data/AccountChangeEvent?replayId=-1")
.log("being notified of change events for Account records");
from("salesforce:subscribe:data/Employee__ChangeEvent?replayId=-1")
.log("being notified of change events for Employee__c custom object"); <route>
<from uri="salesforce:subscribe:data/ChangeEvents?replayId=-1"/>
<log message="being notified of all change events"/>
</route>
<route>
<from uri="salesforce:subscribe:data/AccountChangeEvent?replayId=-1"/>
<log message="being notified of change events for Account records"/>
</route>
<route>
<from uri="salesforce:subscribe:data/Employee__ChangeEvent?replayId=-1"/>
<log message="being notified of change events for Employee__c custom object"/>
</route> - route:
from:
uri: salesforce:subscribe:data/ChangeEvents
parameters:
replayId: -1
steps:
- log: "being notified of all change events"
- route:
from:
uri: salesforce:subscribe:data/AccountChangeEvent
parameters:
replayId: -1
steps:
- log: "being notified of change events for Account records"
- route:
from:
uri: salesforce:subscribe:data/Employee__ChangeEvent
parameters:
replayId: -1
steps:
- log: "being notified of change events for Employee__c custom object" More details about how to use the Camel Salesforce component change data capture capabilities could be found in the ChangeEventsConsumerIntegrationTest.
The Salesforce developer guide is a good fit to better know the subtleties of implementing a change data capture integration application. The dynamic nature of change event body fields, high level replication steps as well as security considerations could be of interest.
| Parameter | Type | Description | Default | Required |
|---|---|---|---|---|
|
| If false, operation returns a | false | |
|
| The replayId value to use when subscribing. | ||
|
| Default replayId setting if no value is found in initialReplayIdMap. | -1 | |
|
| ReplayId to fall back to after an Invalid Replay Id response. | -1 |
Output
Type: Map<String, Object> or org.cometd.bayeux.Message
Headers
| Name | Description |
|---|---|
|
|