Skip to main content

Prerequisites

  • MongoDB version ≥ 5.0
  • A MongoDB user with sufficient privileges to create database users and collections

MongoDB Setup

1. Grant Database Access

2. Create Database User

MongoDB Shell

  • Using MongoDB Shell, connect to your primary node or replica set.
  • Create a user for Streamkap using the script below. Replace password with your choice.
use admin  
   db.createUser({  
     user: "streamkap_user",  
     pwd: "{password}",  
     roles: [ "readAnyDatabase", {role: "read", db: "local"} ]  
   })

3. Enable Snapshots

To backfill your data, the Connector needs to be able to perform snapshots. See Snapshots & Backfilling for more information. You will need to create the table and give necessary permissions to the streamkap_user. The Connector will use this collection for managing snapshots. Below is an example script that does that. This collection can exist in a different database (on the same MongoDB cluster) to the database Streamkap captures data from.
Please create the signal collection with the name streamkap_signal. It will not be recognised if given another name.

MongoDB Shell

db.createCollection("streamkap_signal")

db.grantRolesToUser("streamkap_user", [
  { role: "read", db: "{database}" },
  { role: "readWrite", db: "{database}", collection: "streamkap_signal" }
])

4. Heartbeats

MongoDB uses change streams to track changes. While change streams use resume tokens to track position, these tokens can expire or become invalidated—particularly on clusters with high write activity or when using custom aggregation pipelines that filter events. Heartbeats ensure the Connector receives regular change events, keeping resume tokens fresh and providing liveness monitoring. There are two layers of heartbeat protection:

Layer 1: Connector heartbeats (enabled by default)

The Connector periodically emits heartbeat messages to an internal topic, even when no actual data changes are detected. This keeps offsets fresh and prevents staleness. No configuration is necessary for this layer; it is automatically enabled. We recommend keeping this layer enabled for all deployments.
Why we recommend configuring Layer 2Layer 2 is especially important when:
  • Your database has low or intermittent traffic
  • You use custom aggregation pipelines that filter out many events
  • You need reliable liveness monitoring
We recommend configuring Layer 2 for all deployments to provide additional resilience.
You can configure regular updates to a dedicated heartbeat collection in the source database. This simulates activity, ensuring change events are generated consistently and resume tokens remain valid. Since the MongoDB Connector doesn’t write directly to the database, you must configure an external scheduler (e.g., cron job, Kubernetes CronJob) to generate artificial traffic.
1

Create the heartbeat collection

Connect to your MongoDB instance and create the heartbeat collection:
use streamkap
db.createCollection("streamkap_heartbeat")

// Insert initial document
db.streamkap_heartbeat.insertOne({
  _id: "heartbeat",
  last_update: new Date()
})
2

Grant permissions to the Streamkap user

db.grantRolesToUser("streamkap_user", [
  { role: "read", db: "streamkap" }
])
3

Create a heartbeat script

Create a script that updates the heartbeat document:
#!/bin/bash
# heartbeat.sh

MONGO_URI="mongodb://heartbeat_user:password@localhost:27017/streamkap?authSource=admin"

mongosh "$MONGO_URI" --eval '
  db.streamkap_heartbeat.updateOne(
    { _id: "heartbeat" },
    { $set: { last_update: new Date() } },
    { upsert: true }
  )
'
Make the script executable:
chmod +x heartbeat.sh
4

Schedule the heartbeat

Using cron (Linux/macOS):
# Edit crontab
crontab -e

# Add this line to run every minute
* * * * * /path/to/heartbeat.sh >> /var/log/mongodb-heartbeat.log 2>&1
Using Kubernetes CronJob:
apiVersion: batch/v1
kind: CronJob
metadata:
  name: mongodb-heartbeat
spec:
  schedule: "* * * * *"  # Every minute
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: heartbeat
            image: mongo:latest
            command:
            - mongosh
            - "mongodb://heartbeat_user:password@mongodb-host:27017/streamkap?authSource=admin"
            - --eval
            - |
              db.streamkap_heartbeat.updateOne(
                { _id: "heartbeat" },
                { $set: { last_update: new Date() } },
                { upsert: true }
              )
          restartPolicy: OnFailure

5. Obtain Connection String

You’ll need the connection string for setting up the Connector in Streamkap.

MongoDB Shell

  • Connect to your replica set or primary node using the MongoDB shell as an Admin user.
  • Run db.getMongo() method to return your connection string
    • We recommend the connection string have the following parameters. They will be added automatically if not included:
      • w=majority
      • readPreference=primaryPreferred
For information on accepted connection string formats, please see MongoDB - Connection String Formats

Streamkap Setup

Follow these steps to configure your new connector:

1. Create the Source

2. Connection Settings

  • Name: Enter a name for your connector
  • Connection String: Copy the connection string from earlier steps but replace username and password in the string with the one you created earlier.
  • Array Encoding: Specify how Streamkap should encode MongoDB array types. Array encodes them as a JSON array but requires all elements in the arrays to be of the same type e.g. array of integers. Array_String encodes them as a JSON string and must be used if the MongoDB arrays have mixed types.
  • Nested Document Encoding: Specify how Streamkap should encode nested documents. Document encodes them as JSON objects but may be problematic for complex (e.g. multiple levels of nested sub documents and arrays, sub arrays of nested documents) documents. String encodes them as a JSON string and we recommend it if the MongoDB nested documents are complex.
  • Connect via SSH Tunnel: The Connector will connect to an SSH server in your network which has access to your database. This is necessary if the Connector cannot connect directly to your database.

3. Snapshot Settings

  • Signal Table Database: Streamkap will use a collection in this database to manage snapshots e.g. public. See Enable Snapshots for more information.

4. Database and Collection Capture

  • Add Database/Collections: Specify the database(s) and collection(s) for capture.
    • You can bulk upload here. The format is a simple list of databases and collections, with each entry on a new row. Save as a .csv file without a header.
    • If you configured Layer 2 heartbeats, include the heartbeat collection (e.g., streamkap.streamkap_heartbeat). See Heartbeats for setup instructions.
CDC only captures base collections, not ViewsChange Data Capture reads MongoDB’s oplog via change streams, which only record changes to physical collections. Database Views are query-time aggregations with no physical storage—they don’t generate oplog entries.What you cannot capture: Views (aggregation pipeline results), system collections (system.*, admin.*, config.*).Time series collections (MongoDB 5.0+): These use specialized columnar-like bucketing storage that compresses documents by time windows. Change streams are NOT supported because individual document changes cannot be tracked—they’re absorbed into compressed buckets. Workaround: Use regular collections with compound indexes like {metadata: 1, timestamp: 1} for CDC-compatible time series data.Capped collections caveat: Can be captured, but there’s risk of missing events if the connector falls behind and the oplog position it needs gets overwritten. For mission-critical data, use regular collections.Solution: Specify only the underlying base collections that feed your views. You can recreate the view aggregation pipeline in your destination or transformation layer.
Click Save.