Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The Salesforce CDC source captures inserts, updates, deletes, and undeletes from your Salesforce org and writes them to Kafka topics — one topic per Salesforce object. Two ingestion modes are supported and can be used together:
  • Native CDC (default) — Streamkap subscribes directly to the Salesforce Change Data Capture stream using OAuth credentials. No Apex code required.
  • Apex-trigger webhook — Apex triggers in your org POST change events to a Streamkap-generated webhook URL. Use this for standard objects without CDC support, custom objects, or when you need fine-grained control.
Both paths feed the same payload router, so downstream topics, keys, and flatten/include behavior are identical regardless of how the event arrived.
This connector is in Beta. Behaviors and defaults may change before general availability.

Prerequisites

  • A Salesforce org (Developer, Professional, Enterprise, or Unlimited edition) with System Administrator access.
  • For native CDC: the Change Data Capture feature enabled in your org and selected for the objects you want to capture (Setup → Change Data Capture).
  • OAuth Consumer Key and Consumer Secret from a Salesforce Connected App or External Client App (see Salesforce Setup below).
  • A Streamkap workspace with permission to create source connectors.

How It Works

  1. OAuth authentication — Streamkap exchanges your Consumer Key and Secret for a Salesforce access token.
  2. CDC subscription — When Native CDC is enabled, Streamkap subscribes to one channel per selected object (/data/AccountChangeEvent, /data/ContactChangeEvent, …) and streams events as they happen.
  3. Webhook ingestion (optional) — If you also publish from Apex triggers, those POSTs hit the same connector at a generated webhook URL and flow through the same router.
  4. Routing — Each event carries an entity name (from the CDC ChangeEventHeader or your Apex payload). The router maps it to a topic of the form {Object}_eventsAccount_events, Contact_events, etc.
  5. Key extraction — The Salesforce record Id becomes the Kafka message key, enabling upsert-style consumption.
  6. Delete handlingDELETE and GAP_DELETE change types set __deleted: true on the record so downstream sinks can issue tombstones.

Streamkap Setup

1. Create the Source

  1. Navigate to Sources and choose Salesforce CDC.
  2. Give the source a memorable Name (for example, salesforce-prod).

2. Connection Settings (Auth tab)

FieldRequiredDescription
Webhook URLAutoRead-only. Endpoint that Apex triggers post to. Generated on save. Only used in Apex-trigger mode.
API KeyAutoRead-only and encrypted. Sent by Apex triggers in the x-api-key header. Generated on save.
Salesforce Instance URLYesYour org URL — for example https://myorg.my.salesforce.com or https://myorg.lightning.force.com.
Client ID (Consumer Key)YesOAuth2 Consumer Key from your Salesforce Connected App / External Client App.
Client Secret (Consumer Secret)YesOAuth2 Consumer Secret. Encrypted at rest.
Username (Optional)NoSalesforce username. Only required if your Connected App is not configured for the client_credentials flow.
Password + Token (Optional)NoSalesforce password concatenated with the user’s security token. Only required alongside Username. Encrypted at rest.

3. Schema (Salesforce Objects)

In the Schema tab, list the Salesforce objects you want to capture, comma-separated. The default is Account,Contact,Lead,Opportunity. The following standard objects are supported out of the box:
Account, Contact, Lead, Opportunity, Case, Task, Event, User, Campaign,
CampaignMember, Contract, Order, OrderItem, Product2, Pricebook2,
PricebookEntry, Asset, Note, Attachment, ContentDocument, ContentVersion,
FeedItem, Solution, OpportunityLineItem, OpportunityContactRole,
AccountContactRole, Partner, Quote, QuoteLineItem
Each object you list produces a Kafka topic named {Object}_events (for example Account_events, OpportunityLineItem_events).
For native CDC to deliver events for an object, that object must also be enabled for Change Data Capture in your Salesforce org (Setup → Change Data Capture → add the object to the Selected Entities list). Custom objects and objects without CDC support must use the Apex-trigger mode.

4. Settings

FieldDefaultDescription
Enable Native CDCtrueSubscribe directly to Salesforce CDC channels using OAuth. Disable if you only want to ingest via Apex triggers.
Unselected Object BehaviorDEFAULT_TOPICWhat to do when an event arrives for an object not in your Schema list. DEFAULT_TOPIC, SKIP, or FAIL.
Default Topic for Unselected ObjectsunknownTopic used when behavior is DEFAULT_TOPIC. Only shown when the option above is set to DEFAULT_TOPIC.
Flatten Record FieldstruePromote nested record fields to top level. Recommended for most warehouse destinations.
Flatten Prefix(empty)Prefix for flattened fields. Empty means Salesforce field names are used directly (Name, BillingCity, Phone). Only shown when flattening is enabled.
Include Change MetadatafalseKeep the ChangeEventHeader (changeType, commitTimestamp, recordIds, …) in the output. Disable for upsert/state-table mode; enable for audit-log mode.
Enable Dead Letter Queue (advanced)trueFailed records are written to a DLQ topic instead of crashing the connector.

5. Save

Save the source. If Native CDC is enabled and credentials are valid, events begin flowing as soon as Salesforce records change. To verify, edit a record in Salesforce and watch Events Written on the source dashboard.

Salesforce Setup

The required setup depends on the mode you choose.

OAuth Connected App (required for both modes)

  1. In Salesforce, go to Setup → App Manager → New External Client App (or New Connected App on older orgs).
  2. Configure:
    • App nameStreamkap CDC.
    • Contact Email — your email.
    • Distribution StateLocal.
    • API (Enable OAuth Settings) — checked.
    • Callback URLhttps://login.salesforce.com/services/oauth2/callback (any valid URL works; the connector does not use it).
    • OAuth scopes — add:
      • Manage user data via APIs (api)
      • Full access (full)
      • Perform requests at any time (refresh_token, offline_access)
    • Uncheck Require Proof Key for Code Exchange (PKCE).
    • If you want password-less auth, enable Client Credentials Flow and select a Run-As user with the necessary object permissions.
  3. Save and wait 2–10 minutes for Salesforce to provision the app.
  4. Open the app and reveal the Consumer Key and Consumer Secret (you may need to click Manage Consumer Details to reveal the secret). Paste these into the Streamkap source as Client ID and Client Secret.
  5. Confirm the user (either the Run-As user for client-credentials flow, or the Username/Password user) has API Enabled, View All Data (or the relevant object permissions), and is allowed to log in from Streamkap’s IP range if you have IP restrictions.

Username / password fallback

If you cannot use the client-credentials flow, fill the optional Username and Password + Token fields:
  • Username — the Salesforce user’s login email. We recommend a dedicated integration user with API Enabled, the relevant read permissions, and no MFA requirement.
  • Password + Token — concatenate the user’s password and security token directly with no separator. To get a security token, log in as that user and go to Settings → My Personal Information → Reset My Security Token — Salesforce emails a new token.
If Streamkap’s IP range is added to your org’s Setup → Network Access trusted IPs, the security token is not required and the password alone is sufficient.

Native CDC Setup

  1. In Salesforce, go to Setup → Change Data Capture.
  2. Move the objects you want to stream from Available Entities to Selected Entities.
  3. Save. Salesforce begins publishing change events to the per-object channels (/data/AccountChangeEvent, etc.).
  4. In Streamkap, ensure Enable Native CDC is on and the same objects are listed under Salesforce Objects.
That is everything required for native CDC. No Apex code is needed.

Apex-trigger Webhook (optional)

Use this mode for objects that CDC does not support (custom objects, certain standard objects, custom field-level filtering) or when you want explicit control over what is sent. The connector accepts the same envelope shape from triggers as it receives from native CDC, so the resulting topics, keys, and downstream handling are identical.

1. Create the External Credential

  1. Setup → Named Credentials → External Credentials → New.
  2. Fill in:
    • LabelStreamkap Webhook.
    • NameStreamkap_Webhook.
    • Authentication ProtocolCustom.
  3. Save.
  4. Under Principals, click New:
    • Parameter NameDefault.
    • Identity TypeNamed Principal (read-only).
    • Sequence Number1.
  5. Save.
  6. Open the Default principal and under Authentication Parameters click Add:
    • Parameter 1 Namex-api-key.
    • Parameter 1 Value — paste the API Key from the Streamkap source.
  7. Save.

2. Create the Named Credential

  1. Click the Named Credentials tab → New.
  2. Fill in:
    • LabelStreamkap.
    • NameStreamkap.
    • URL — paste the Streamkap Webhook URL, with no path appended.
    • External Credential — select Streamkap Webhook.
  3. Save.

3. Grant Access via Permission Set

The user that runs the Apex callouts must be allowed to use the External Credential.
  1. Setup → Permission Sets → New:
    • LabelStreamkap Webhook Access.
    • Save.
  2. Inside the permission set, open External Credential Principal Access.
  3. Click Edit, move Streamkap_Webhook - Default into Enabled, save.
  4. Click Manage Assignments → Add Assignment, select the user(s) that will execute the triggers, and assign.
  5. Have those users log out and back in to refresh their session.

4. Create the Apex Sender Class

This class dynamically discovers every accessible field on the object (including custom fields added later) and posts a CDC-formatted payload.
  1. Setup → Apex Classes → New, paste the code below, and save:
public class WebhookSender {

    @future(callout=true)
    public static void sendAsync(String recordId, String changeType, String objectName) {
        Map<String, Schema.SObjectField> fieldMap = Schema.getGlobalDescribe()
            .get(objectName).getDescribe().fields.getMap();

        List<String> fieldNames = new List<String>();
        for (String fieldName : fieldMap.keySet()) {
            Schema.DescribeFieldResult fieldDesc = fieldMap.get(fieldName).getDescribe();
            if (fieldDesc.isAccessible()) {
                fieldNames.add(fieldDesc.getName());
            }
        }

        String soql = 'SELECT ' + String.join(fieldNames, ', ')
                     + ' FROM ' + objectName
                     + ' WHERE Id = \'' + String.escapeSingleQuotes(recordId) + '\'';

        List<SObject> records = Database.query(soql);
        if (records.isEmpty()) {
            sendPayload(recordId, objectName, changeType, new Map<String, Object>{'Id' => recordId});
            return;
        }

        Map<String, Object> recordMap = new Map<String, Object>();
        SObject record = records[0];
        for (String fieldName : fieldNames) {
            recordMap.put(fieldName, record.get(fieldName));
        }

        sendPayload(recordId, objectName, changeType, recordMap);
    }

    @future(callout=true)
    public static void sendDeleteAsync(String recordId, String objectName, String recordJson) {
        Map<String, Object> recordMap = (Map<String, Object>) JSON.deserializeUntyped(recordJson);
        sendPayload(recordId, objectName, 'DELETE', recordMap);
    }

    private static void sendPayload(String recordId, String objectName,
                                     String changeType, Map<String, Object> recordFields) {
        Map<String, Object> header = new Map<String, Object>{
            'entityName' => objectName,
            'recordIds' => new List<String>{recordId},
            'changeType' => changeType,
            'commitTimestamp' => System.currentTimeMillis()
        };

        Map<String, Object> payload = new Map<String, Object>();
        payload.put('ChangeEventHeader', header);
        payload.putAll(recordFields);

        Map<String, Object> body = new Map<String, Object>{
            'data' => new Map<String, Object>{
                'payload' => payload,
                'event' => new Map<String, Object>{'replayId' => 0}
            },
            'channel' => '/data/' + objectName + 'ChangeEvent'
        };

        HttpRequest req = new HttpRequest();
        req.setEndpoint('callout:Streamkap');
        req.setMethod('POST');
        req.setHeader('Content-Type', 'application/json');
        req.setBody(JSON.serialize(body));
        req.setTimeout(30000);

        Http http = new Http();
        try {
            HttpResponse res = http.send(req);
            if (res.getStatusCode() != 200) {
                System.debug(LoggingLevel.ERROR, 'Webhook failed: ' + res.getStatusCode() + ' ' + res.getBody());
            }
        } catch (Exception e) {
            System.debug(LoggingLevel.ERROR, 'Webhook error: ' + e.getMessage());
        }
    }
}
The Named Credential URL must have no path appended. Use callout:Streamkap, not callout:Streamkap/webhook — adding a path overrides the payload router and sends every event to a single topic.
The connector relies on these exact field names to detect and route events — do not rename them:
Field pathRequired valuePurpose
data.payloadObject with record fieldsContainer for record data + header.
data.payload.ChangeEventHeaderObjectIdentifies the event as Salesforce CDC.
data.payload.ChangeEventHeader.entityNameObject name (e.g. "Account")Determines the Kafka topic.
data.payload.ChangeEventHeader.changeTypeCREATE, UPDATE, DELETE, UNDELETESets __deleted.
data.payload.IdRecord IDUsed as the message key.
channel/data/{Object}ChangeEventUsed for event-format detection.

5. Create One Trigger per Object

Create one Apex trigger per object you want to capture — they all share the WebhookSender class. Example for Account:
trigger AccountToKafka on Account (after insert, after update, after delete, after undelete) {
    List<SObject> records;
    String changeType;

    if (Trigger.isInsert) {
        changeType = 'CREATE';
        records = Trigger.new;
    } else if (Trigger.isUpdate) {
        changeType = 'UPDATE';
        records = Trigger.new;
    } else if (Trigger.isDelete) {
        changeType = 'DELETE';
        records = Trigger.old;
    } else if (Trigger.isUndelete) {
        changeType = 'UNDELETE';
        records = Trigger.new;
    }

    for (SObject rec : records) {
        if (changeType == 'DELETE') {
            WebhookSender.sendDeleteAsync(rec.Id, 'Account', JSON.serialize(rec));
        } else {
            WebhookSender.sendAsync(rec.Id, changeType, 'Account');
        }
    }
}
To capture additional objects (Contact, Lead, Opportunity, custom objects), copy this trigger and change the trigger name, the object in trigger ... on {Object}, and the object string passed to WebhookSender.
Salesforce limits @future methods to 50 per transaction and callouts to 120 seconds. Bulk operations that touch more than 50 records in a single transaction (Data Loader imports, batch updates) will hit the @future ceiling — for those workflows, prefer native CDC.

Event Routing Reference

The payload router supports three Salesforce event formats and detects them by inspecting the payload shape.

Change Data Capture (CDC)

Detected by the presence of data.payload.ChangeEventHeader. Used by both native CDC and the Apex WebhookSender class above.
changeTypeTopic__deleted
CREATE{Entity}_eventsfalse
UPDATE{Entity}_eventsfalse
DELETE{Entity}_eventstrue
UNDELETE{Entity}_eventsfalse
GAP_CREATEgap_eventsfalse
GAP_DELETEgap_eventstrue
GAP_OVERFLOWgap_eventsfalse
Gap events indicate that Salesforce could not deliver every change individually (typically due to bulk operations) and are routed to a single gap_events topic regardless of the entity.

PushTopic events

Detected by the presence of data.sobject. Routed by the PushTopic name.
event.typeTopic__deleted
created{TopicName}_eventsfalse
updated{TopicName}_eventsfalse
deleted{TopicName}_eventstrue
undeleted{TopicName}_eventsfalse

Platform events

Detected by the presence of data.payload without a ChangeEventHeader. Routed by the platform event name. Platform events always have __deleted = false.

Message keys

All Salesforce events use { Id: <record_id> } as the Kafka message key. With Flatten Record Fields enabled and a non-empty Flatten Prefix, the key field name is prefixed accordingly (for example { sf_Id: ... }).

Example Payload

A typical change event arriving from Salesforce CDC:
{
  "data": {
    "payload": {
      "ChangeEventHeader": {
        "entityName": "Account",
        "recordIds": ["001Hp00002abcDE"],
        "changeType": "UPDATE",
        "commitTimestamp": 1764000000000
      },
      "Id": "001Hp00002abcDE",
      "Name": "Acme Corp",
      "Phone": "555-0101",
      "BillingCity": "San Francisco"
    },
    "event": { "replayId": 12345 }
  },
  "channel": "/data/AccountChangeEvent"
}
With the default settings (Flatten Record Fields = true, Include Change Metadata = false), the record written to Account_events looks like:
{
  "Id": "001Hp00002abcDE",
  "Name": "Acme Corp",
  "Phone": "555-0101",
  "BillingCity": "San Francisco",
  "__deleted": false
}
The Kafka message key is { "Id": "001Hp00002abcDE" }, which a sink can use to upsert.

Common Patterns

State table (upsert) ingestion

Use this when you want one row per Salesforce record in your warehouse, mirroring the live state.
  • Flatten Record Fieldstrue
  • Flatten Prefix(empty)
  • Include Change Metadatafalse
DELETE events flip __deleted: true, which most sinks treat as a tombstone.

Audit log ingestion

Use this when you want every change preserved as a separate row.
  • Flatten Record Fieldsfalse (or true, depending on your destination)
  • Include Change Metadatatrue
The output retains ChangeEventHeader so you can see changeType, commitTimestamp, and the full sequence of changes per record.

Troubleshooting

IssueCheck
401 from Salesforce when the source startsVerify the Consumer Key and Secret are correct and that the Connected App has finished provisioning (Salesforce can take 2–10 minutes after creation). If using the username/password fallback, confirm the password concatenates the user’s security token with no separator.
No events for an object even though the source is healthyIn native CDC mode, the object must be enabled for Change Data Capture (Setup → Change Data Capture → Selected Entities). For objects without CDC support, use the Apex-trigger mode instead.
Events arrive in the unknown topicThe entity name does not match any object in your Salesforce Objects schema list. Add the object to the schema, or set Unselected Object Behavior to SKIP.
No events from Apex triggersSetup → Apex Jobs — check for failed @future jobs. Trigger errors typically show up here rather than at insert/update time.
Unauthorized endpoint error from ApexThe user does not have External Credential Principal Access. Re-check the Permission Set assignment and have the user log out and back in.
Insufficient access on a triggerVerify the user’s profile allows modifying the object and that the trigger is Active.
All Apex events land in one topicThe Named Credential URL includes a path, or the setEndpoint call adds one. Use callout:Streamkap with no trailing path.
Missing fields on recordsField-level security is hiding the field from the user running the trigger. Grant read access via Profile or Permission Set.
Destination cannot handle the nested recordToggle Flatten Record Fields on. To avoid collisions with reserved field names downstream, set Flatten Prefix to something like sf_.
gap_events topic has dataSalesforce dropped granular events (typically during bulk operations) and emitted a gap notification. Affected records need to be backfilled from the source.

Limitations

  • The Salesforce CDC source is currently Beta.
  • Native CDC requires that each object be enabled for Change Data Capture in your Salesforce org. Objects without CDC support must use Apex-trigger mode.
  • Maximum payload size is 50 MB per request and maximum header size is 64 KB.
  • Each request is treated as a single Kafka record; the connector does not batch multiple events into one Kafka message.
  • The connector runs as a single task; horizontal scaling requires multiple source instances.
  • Initial backfill of historical records is not performed automatically — only changes from the time the source starts are streamed.

See Also