Overview
The Salesforce CDC source captures inserts, updates, deletes, and undeletes from your Salesforce org and writes them to Kafka topics — one topic per Salesforce object.
Two ingestion modes are supported and can be used together:
- Native CDC (default) — Streamkap subscribes directly to the Salesforce Change Data Capture stream using OAuth credentials. No Apex code required.
- Apex-trigger webhook — Apex triggers in your org POST change events to a Streamkap-generated webhook URL. Use this for standard objects without CDC support, custom objects, or when you need fine-grained control.
Both paths feed the same payload router, so downstream topics, keys, and flatten/include behavior are identical regardless of how the event arrived.
This connector is in Beta. Behaviors and defaults may change before general availability.
Prerequisites
- A Salesforce org (Developer, Professional, Enterprise, or Unlimited edition) with System Administrator access.
- For native CDC: the Change Data Capture feature enabled in your org and selected for the objects you want to capture (Setup → Change Data Capture).
- OAuth Consumer Key and Consumer Secret from a Salesforce Connected App or External Client App (see Salesforce Setup below).
- A Streamkap workspace with permission to create source connectors.
How It Works
- OAuth authentication — Streamkap exchanges your Consumer Key and Secret for a Salesforce access token.
- CDC subscription — When Native CDC is enabled, Streamkap subscribes to one channel per selected object (
/data/AccountChangeEvent, /data/ContactChangeEvent, …) and streams events as they happen.
- Webhook ingestion (optional) — If you also publish from Apex triggers, those POSTs hit the same connector at a generated webhook URL and flow through the same router.
- Routing — Each event carries an entity name (from the CDC
ChangeEventHeader or your Apex payload). The router maps it to a topic of the form {Object}_events — Account_events, Contact_events, etc.
- Key extraction — The Salesforce record
Id becomes the Kafka message key, enabling upsert-style consumption.
- Delete handling —
DELETE and GAP_DELETE change types set __deleted: true on the record so downstream sinks can issue tombstones.
Streamkap Setup
1. Create the Source
- Navigate to Sources and choose Salesforce CDC.
- Give the source a memorable Name (for example,
salesforce-prod).
2. Connection Settings (Auth tab)
| Field | Required | Description |
|---|
| Webhook URL | Auto | Read-only. Endpoint that Apex triggers post to. Generated on save. Only used in Apex-trigger mode. |
| API Key | Auto | Read-only and encrypted. Sent by Apex triggers in the x-api-key header. Generated on save. |
| Salesforce Instance URL | Yes | Your org URL — for example https://myorg.my.salesforce.com or https://myorg.lightning.force.com. |
| Client ID (Consumer Key) | Yes | OAuth2 Consumer Key from your Salesforce Connected App / External Client App. |
| Client Secret (Consumer Secret) | Yes | OAuth2 Consumer Secret. Encrypted at rest. |
| Username (Optional) | No | Salesforce username. Only required if your Connected App is not configured for the client_credentials flow. |
| Password + Token (Optional) | No | Salesforce password concatenated with the user’s security token. Only required alongside Username. Encrypted at rest. |
3. Schema (Salesforce Objects)
In the Schema tab, list the Salesforce objects you want to capture, comma-separated. The default is Account,Contact,Lead,Opportunity.
The following standard objects are supported out of the box:
Account, Contact, Lead, Opportunity, Case, Task, Event, User, Campaign,
CampaignMember, Contract, Order, OrderItem, Product2, Pricebook2,
PricebookEntry, Asset, Note, Attachment, ContentDocument, ContentVersion,
FeedItem, Solution, OpportunityLineItem, OpportunityContactRole,
AccountContactRole, Partner, Quote, QuoteLineItem
Each object you list produces a Kafka topic named {Object}_events (for example Account_events, OpportunityLineItem_events).
For native CDC to deliver events for an object, that object must also be enabled for Change Data Capture in your Salesforce org (Setup → Change Data Capture → add the object to the Selected Entities list). Custom objects and objects without CDC support must use the Apex-trigger mode.
4. Settings
| Field | Default | Description |
|---|
| Enable Native CDC | true | Subscribe directly to Salesforce CDC channels using OAuth. Disable if you only want to ingest via Apex triggers. |
| Unselected Object Behavior | DEFAULT_TOPIC | What to do when an event arrives for an object not in your Schema list. DEFAULT_TOPIC, SKIP, or FAIL. |
| Default Topic for Unselected Objects | unknown | Topic used when behavior is DEFAULT_TOPIC. Only shown when the option above is set to DEFAULT_TOPIC. |
| Flatten Record Fields | true | Promote nested record fields to top level. Recommended for most warehouse destinations. |
| Flatten Prefix | (empty) | Prefix for flattened fields. Empty means Salesforce field names are used directly (Name, BillingCity, Phone). Only shown when flattening is enabled. |
| Include Change Metadata | false | Keep the ChangeEventHeader (changeType, commitTimestamp, recordIds, …) in the output. Disable for upsert/state-table mode; enable for audit-log mode. |
| Enable Dead Letter Queue (advanced) | true | Failed records are written to a DLQ topic instead of crashing the connector. |
5. Save
Save the source. If Native CDC is enabled and credentials are valid, events begin flowing as soon as Salesforce records change. To verify, edit a record in Salesforce and watch Events Written on the source dashboard.
Salesforce Setup
The required setup depends on the mode you choose.
OAuth Connected App (required for both modes)
- In Salesforce, go to Setup → App Manager → New External Client App (or New Connected App on older orgs).
- Configure:
- App name —
Streamkap CDC.
- Contact Email — your email.
- Distribution State —
Local.
- API (Enable OAuth Settings) — checked.
- Callback URL —
https://login.salesforce.com/services/oauth2/callback (any valid URL works; the connector does not use it).
- OAuth scopes — add:
Manage user data via APIs (api)
Full access (full)
Perform requests at any time (refresh_token, offline_access)
- Uncheck Require Proof Key for Code Exchange (PKCE).
- If you want password-less auth, enable Client Credentials Flow and select a Run-As user with the necessary object permissions.
- Save and wait 2–10 minutes for Salesforce to provision the app.
- Open the app and reveal the Consumer Key and Consumer Secret (you may need to click Manage Consumer Details to reveal the secret). Paste these into the Streamkap source as Client ID and Client Secret.
- Confirm the user (either the Run-As user for client-credentials flow, or the Username/Password user) has API Enabled, View All Data (or the relevant object permissions), and is allowed to log in from Streamkap’s IP range if you have IP restrictions.
Username / password fallback
If you cannot use the client-credentials flow, fill the optional Username and Password + Token fields:
- Username — the Salesforce user’s login email. We recommend a dedicated integration user with API Enabled, the relevant read permissions, and no MFA requirement.
- Password + Token — concatenate the user’s password and security token directly with no separator. To get a security token, log in as that user and go to Settings → My Personal Information → Reset My Security Token — Salesforce emails a new token.
If Streamkap’s IP range is added to your org’s Setup → Network Access trusted IPs, the security token is not required and the password alone is sufficient.
Native CDC Setup
- In Salesforce, go to Setup → Change Data Capture.
- Move the objects you want to stream from Available Entities to Selected Entities.
- Save. Salesforce begins publishing change events to the per-object channels (
/data/AccountChangeEvent, etc.).
- In Streamkap, ensure Enable Native CDC is on and the same objects are listed under Salesforce Objects.
That is everything required for native CDC. No Apex code is needed.
Apex-trigger Webhook (optional)
Use this mode for objects that CDC does not support (custom objects, certain standard objects, custom field-level filtering) or when you want explicit control over what is sent. The connector accepts the same envelope shape from triggers as it receives from native CDC, so the resulting topics, keys, and downstream handling are identical.
1. Create the External Credential
- Setup → Named Credentials → External Credentials → New.
- Fill in:
- Label —
Streamkap Webhook.
- Name —
Streamkap_Webhook.
- Authentication Protocol —
Custom.
- Save.
- Under Principals, click New:
- Parameter Name —
Default.
- Identity Type —
Named Principal (read-only).
- Sequence Number —
1.
- Save.
- Open the Default principal and under Authentication Parameters click Add:
- Parameter 1 Name —
x-api-key.
- Parameter 1 Value — paste the API Key from the Streamkap source.
- Save.
2. Create the Named Credential
- Click the Named Credentials tab → New.
- Fill in:
- Label —
Streamkap.
- Name —
Streamkap.
- URL — paste the Streamkap Webhook URL, with no path appended.
- External Credential — select
Streamkap Webhook.
- Save.
3. Grant Access via Permission Set
The user that runs the Apex callouts must be allowed to use the External Credential.
- Setup → Permission Sets → New:
- Label —
Streamkap Webhook Access.
- Save.
- Inside the permission set, open External Credential Principal Access.
- Click Edit, move
Streamkap_Webhook - Default into Enabled, save.
- Click Manage Assignments → Add Assignment, select the user(s) that will execute the triggers, and assign.
- Have those users log out and back in to refresh their session.
4. Create the Apex Sender Class
This class dynamically discovers every accessible field on the object (including custom fields added later) and posts a CDC-formatted payload.
- Setup → Apex Classes → New, paste the code below, and save:
public class WebhookSender {
@future(callout=true)
public static void sendAsync(String recordId, String changeType, String objectName) {
Map<String, Schema.SObjectField> fieldMap = Schema.getGlobalDescribe()
.get(objectName).getDescribe().fields.getMap();
List<String> fieldNames = new List<String>();
for (String fieldName : fieldMap.keySet()) {
Schema.DescribeFieldResult fieldDesc = fieldMap.get(fieldName).getDescribe();
if (fieldDesc.isAccessible()) {
fieldNames.add(fieldDesc.getName());
}
}
String soql = 'SELECT ' + String.join(fieldNames, ', ')
+ ' FROM ' + objectName
+ ' WHERE Id = \'' + String.escapeSingleQuotes(recordId) + '\'';
List<SObject> records = Database.query(soql);
if (records.isEmpty()) {
sendPayload(recordId, objectName, changeType, new Map<String, Object>{'Id' => recordId});
return;
}
Map<String, Object> recordMap = new Map<String, Object>();
SObject record = records[0];
for (String fieldName : fieldNames) {
recordMap.put(fieldName, record.get(fieldName));
}
sendPayload(recordId, objectName, changeType, recordMap);
}
@future(callout=true)
public static void sendDeleteAsync(String recordId, String objectName, String recordJson) {
Map<String, Object> recordMap = (Map<String, Object>) JSON.deserializeUntyped(recordJson);
sendPayload(recordId, objectName, 'DELETE', recordMap);
}
private static void sendPayload(String recordId, String objectName,
String changeType, Map<String, Object> recordFields) {
Map<String, Object> header = new Map<String, Object>{
'entityName' => objectName,
'recordIds' => new List<String>{recordId},
'changeType' => changeType,
'commitTimestamp' => System.currentTimeMillis()
};
Map<String, Object> payload = new Map<String, Object>();
payload.put('ChangeEventHeader', header);
payload.putAll(recordFields);
Map<String, Object> body = new Map<String, Object>{
'data' => new Map<String, Object>{
'payload' => payload,
'event' => new Map<String, Object>{'replayId' => 0}
},
'channel' => '/data/' + objectName + 'ChangeEvent'
};
HttpRequest req = new HttpRequest();
req.setEndpoint('callout:Streamkap');
req.setMethod('POST');
req.setHeader('Content-Type', 'application/json');
req.setBody(JSON.serialize(body));
req.setTimeout(30000);
Http http = new Http();
try {
HttpResponse res = http.send(req);
if (res.getStatusCode() != 200) {
System.debug(LoggingLevel.ERROR, 'Webhook failed: ' + res.getStatusCode() + ' ' + res.getBody());
}
} catch (Exception e) {
System.debug(LoggingLevel.ERROR, 'Webhook error: ' + e.getMessage());
}
}
}
The Named Credential URL must have no path appended. Use callout:Streamkap, not callout:Streamkap/webhook — adding a path overrides the payload router and sends every event to a single topic.
The connector relies on these exact field names to detect and route events — do not rename them:
| Field path | Required value | Purpose |
|---|
data.payload | Object with record fields | Container for record data + header. |
data.payload.ChangeEventHeader | Object | Identifies the event as Salesforce CDC. |
data.payload.ChangeEventHeader.entityName | Object name (e.g. "Account") | Determines the Kafka topic. |
data.payload.ChangeEventHeader.changeType | CREATE, UPDATE, DELETE, UNDELETE | Sets __deleted. |
data.payload.Id | Record ID | Used as the message key. |
channel | /data/{Object}ChangeEvent | Used for event-format detection. |
5. Create One Trigger per Object
Create one Apex trigger per object you want to capture — they all share the WebhookSender class. Example for Account:
trigger AccountToKafka on Account (after insert, after update, after delete, after undelete) {
List<SObject> records;
String changeType;
if (Trigger.isInsert) {
changeType = 'CREATE';
records = Trigger.new;
} else if (Trigger.isUpdate) {
changeType = 'UPDATE';
records = Trigger.new;
} else if (Trigger.isDelete) {
changeType = 'DELETE';
records = Trigger.old;
} else if (Trigger.isUndelete) {
changeType = 'UNDELETE';
records = Trigger.new;
}
for (SObject rec : records) {
if (changeType == 'DELETE') {
WebhookSender.sendDeleteAsync(rec.Id, 'Account', JSON.serialize(rec));
} else {
WebhookSender.sendAsync(rec.Id, changeType, 'Account');
}
}
}
To capture additional objects (Contact, Lead, Opportunity, custom objects), copy this trigger and change the trigger name, the object in trigger ... on {Object}, and the object string passed to WebhookSender.
Salesforce limits @future methods to 50 per transaction and callouts to 120 seconds. Bulk operations that touch more than 50 records in a single transaction (Data Loader imports, batch updates) will hit the @future ceiling — for those workflows, prefer native CDC.
Event Routing Reference
The payload router supports three Salesforce event formats and detects them by inspecting the payload shape.
Change Data Capture (CDC)
Detected by the presence of data.payload.ChangeEventHeader. Used by both native CDC and the Apex WebhookSender class above.
changeType | Topic | __deleted |
|---|
CREATE | {Entity}_events | false |
UPDATE | {Entity}_events | false |
DELETE | {Entity}_events | true |
UNDELETE | {Entity}_events | false |
GAP_CREATE | gap_events | false |
GAP_DELETE | gap_events | true |
GAP_OVERFLOW | gap_events | false |
Gap events indicate that Salesforce could not deliver every change individually (typically due to bulk operations) and are routed to a single gap_events topic regardless of the entity.
PushTopic events
Detected by the presence of data.sobject. Routed by the PushTopic name.
event.type | Topic | __deleted |
|---|
created | {TopicName}_events | false |
updated | {TopicName}_events | false |
deleted | {TopicName}_events | true |
undeleted | {TopicName}_events | false |
Detected by the presence of data.payload without a ChangeEventHeader. Routed by the platform event name. Platform events always have __deleted = false.
Message keys
All Salesforce events use { Id: <record_id> } as the Kafka message key. With Flatten Record Fields enabled and a non-empty Flatten Prefix, the key field name is prefixed accordingly (for example { sf_Id: ... }).
Example Payload
A typical change event arriving from Salesforce CDC:
{
"data": {
"payload": {
"ChangeEventHeader": {
"entityName": "Account",
"recordIds": ["001Hp00002abcDE"],
"changeType": "UPDATE",
"commitTimestamp": 1764000000000
},
"Id": "001Hp00002abcDE",
"Name": "Acme Corp",
"Phone": "555-0101",
"BillingCity": "San Francisco"
},
"event": { "replayId": 12345 }
},
"channel": "/data/AccountChangeEvent"
}
With the default settings (Flatten Record Fields = true, Include Change Metadata = false), the record written to Account_events looks like:
{
"Id": "001Hp00002abcDE",
"Name": "Acme Corp",
"Phone": "555-0101",
"BillingCity": "San Francisco",
"__deleted": false
}
The Kafka message key is { "Id": "001Hp00002abcDE" }, which a sink can use to upsert.
Common Patterns
State table (upsert) ingestion
Use this when you want one row per Salesforce record in your warehouse, mirroring the live state.
- Flatten Record Fields —
true
- Flatten Prefix — (empty)
- Include Change Metadata —
false
DELETE events flip __deleted: true, which most sinks treat as a tombstone.
Audit log ingestion
Use this when you want every change preserved as a separate row.
- Flatten Record Fields —
false (or true, depending on your destination)
- Include Change Metadata —
true
The output retains ChangeEventHeader so you can see changeType, commitTimestamp, and the full sequence of changes per record.
Troubleshooting
| Issue | Check |
|---|
401 from Salesforce when the source starts | Verify the Consumer Key and Secret are correct and that the Connected App has finished provisioning (Salesforce can take 2–10 minutes after creation). If using the username/password fallback, confirm the password concatenates the user’s security token with no separator. |
| No events for an object even though the source is healthy | In native CDC mode, the object must be enabled for Change Data Capture (Setup → Change Data Capture → Selected Entities). For objects without CDC support, use the Apex-trigger mode instead. |
Events arrive in the unknown topic | The entity name does not match any object in your Salesforce Objects schema list. Add the object to the schema, or set Unselected Object Behavior to SKIP. |
| No events from Apex triggers | Setup → Apex Jobs — check for failed @future jobs. Trigger errors typically show up here rather than at insert/update time. |
Unauthorized endpoint error from Apex | The user does not have External Credential Principal Access. Re-check the Permission Set assignment and have the user log out and back in. |
Insufficient access on a trigger | Verify the user’s profile allows modifying the object and that the trigger is Active. |
| All Apex events land in one topic | The Named Credential URL includes a path, or the setEndpoint call adds one. Use callout:Streamkap with no trailing path. |
| Missing fields on records | Field-level security is hiding the field from the user running the trigger. Grant read access via Profile or Permission Set. |
| Destination cannot handle the nested record | Toggle Flatten Record Fields on. To avoid collisions with reserved field names downstream, set Flatten Prefix to something like sf_. |
gap_events topic has data | Salesforce dropped granular events (typically during bulk operations) and emitted a gap notification. Affected records need to be backfilled from the source. |
Limitations
- The Salesforce CDC source is currently Beta.
- Native CDC requires that each object be enabled for Change Data Capture in your Salesforce org. Objects without CDC support must use Apex-trigger mode.
- Maximum payload size is 50 MB per request and maximum header size is 64 KB.
- Each request is treated as a single Kafka record; the connector does not batch multiple events into one Kafka message.
- The connector runs as a single task; horizontal scaling requires multiple source instances.
- Initial backfill of historical records is not performed automatically — only changes from the time the source starts are streamed.
See Also