> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# MongoDB (Generic)

## Prerequisites

* MongoDB version ≥ 5.0
* A MongoDB user with sufficient privileges to create database users and collections

## MongoDB Setup

### 1. Grant Database Access

* Configure one of the [Connection Options](/connection-options) to ensure Streamkap can reach your database.

### 2. Create Database User

#### MongoDB Shell

* Using MongoDB Shell, connect to your primary node or replica set.
* Create a user for Streamkap using the script below. Replace password with your choice.

<CodeGroup>
  ```bash Shell theme={null}
  use admin  
     db.createUser({  
       user: "streamkap_user",  
       pwd: "{password}",  
       roles: [ "readAnyDatabase", {role: "read", db: "local"} ]  
     })
  ```
</CodeGroup>

### 3. Enable Snapshots

To backfill your data, the Connector needs to be able to perform snapshots. See [Snapshots & Backfilling](/snapshots) for more information.

You will need to create the table and give necessary permissions to the `streamkap_user`. The Connector will use this collection for managing snapshots. Below is an example script that does that.

This collection can exist in a different database (on the same MongoDB cluster) to the database Streamkap captures data from.

<Info>
  The examples below use `streamkap_signal` as the signal collection name, but you can choose any name. During [Streamkap Setup](#3-snapshot-settings), provide the full path to your signal collection in `database.collection` format (e.g., `streamkap.streamkap_signal`).
</Info>

#### MongoDB Shell

<CodeGroup>
  ```bash Shell theme={null}
  db.createCollection("streamkap_signal")

  db.grantRolesToUser("streamkap_user", [
    { role: "read", db: "{database}" },
    { role: "readWrite", db: "{database}", collection: "streamkap_signal" }
  ])
  ```
</CodeGroup>

### 4. Heartbeats

MongoDB uses change streams to track changes. While change streams use resume tokens to track position, these tokens can expire or become invalidated—particularly on clusters with high write activity or when using custom aggregation pipelines that filter events.

Heartbeats ensure the Connector receives regular change events, keeping resume tokens fresh and providing liveness monitoring.

There are two layers of heartbeat protection:

#### Layer 1: Connector heartbeats (enabled by default)

The Connector periodically emits heartbeat messages to an internal topic, even when no actual data changes are detected. This keeps offsets fresh and prevents staleness.

No configuration is necessary for this layer; it is automatically enabled. We recommend keeping this layer enabled for all deployments.

#### Layer 2: Source database heartbeats (recommended)

<Info>
  **Why we recommend configuring Layer 2**

  Layer 2 is especially important when:

  * Your database has low or intermittent traffic
  * You use custom aggregation pipelines that filter out many events
  * You need reliable liveness monitoring

  We recommend configuring Layer 2 for all deployments to provide additional resilience.
</Info>

You can configure regular updates to a dedicated heartbeat collection in the source database. This simulates activity, ensuring change events are generated consistently and resume tokens remain valid.

Since the MongoDB Connector doesn't write directly to the database, you must configure an external scheduler (e.g., cron job, Kubernetes CronJob) to generate artificial traffic.

<Steps>
  <Step title="Create the heartbeat collection">
    Connect to your MongoDB instance and create the heartbeat collection:

    ```javascript theme={null}
    use streamkap
    db.createCollection("streamkap_heartbeat")

    // Insert initial document
    db.streamkap_heartbeat.insertOne({
      _id: "heartbeat",
      last_update: new Date()
    })
    ```
  </Step>

  <Step title="Grant permissions to the Streamkap user">
    ```javascript theme={null}
    db.grantRolesToUser("streamkap_user", [
      { role: "read", db: "streamkap" }
    ])
    ```
  </Step>

  <Step title="Create a heartbeat script">
    Create a script that updates the heartbeat document:

    ```bash theme={null}
    #!/bin/bash
    # heartbeat.sh

    MONGO_URI="mongodb://heartbeat_user:password@localhost:27017/streamkap?authSource=admin"

    mongosh "$MONGO_URI" --eval '
      db.streamkap_heartbeat.updateOne(
        { _id: "heartbeat" },
        { $set: { last_update: new Date() } },
        { upsert: true }
      )
    '
    ```

    Make the script executable:

    ```bash theme={null}
    chmod +x heartbeat.sh
    ```
  </Step>

  <Step title="Schedule the heartbeat">
    **Using cron (Linux/macOS):**

    ```bash theme={null}
    # Edit crontab
    crontab -e

    # Add this line to run every minute
    * * * * * /path/to/heartbeat.sh >> /var/log/mongodb-heartbeat.log 2>&1
    ```

    **Using Kubernetes CronJob:**

    ```yaml theme={null}
    apiVersion: batch/v1
    kind: CronJob
    metadata:
      name: mongodb-heartbeat
    spec:
      schedule: "* * * * *"  # Every minute
      jobTemplate:
        spec:
          template:
            spec:
              containers:
              - name: heartbeat
                image: mongo:latest
                command:
                - mongosh
                - "mongodb://heartbeat_user:password@mongodb-host:27017/streamkap?authSource=admin"
                - --eval
                - |
                  db.streamkap_heartbeat.updateOne(
                    { _id: "heartbeat" },
                    { $set: { last_update: new Date() } },
                    { upsert: true }
                  )
              restartPolicy: OnFailure
    ```
  </Step>
</Steps>

### 5. Obtain Connection String

You'll need the connection string for setting up the Connector in Streamkap.

#### MongoDB Shell

* Connect to your replica set or primary node using the MongoDB shell as an Admin user.

* Run `db.getMongo()` method to return your connection string

  * We recommend the connection string have the following parameters. They will be added automatically if not included:

    * `w=majority`
    * `readPreference=primaryPreferred`

<Info>
  For information on accepted connection string formats, please see [MongoDB - Connection String Formats](https://www.mongodb.com/docs/manual/reference/connection-string/#connection-string-formats)
</Info>

## Streamkap Setup

Follow these steps to configure your new connector:

### 1. Create the Source

* Navigate to [Add Connectors](https://app.streamkap.com/connectors/add?tab=Sources).
* Choose **MongoDB**.
  * **MongoDB**.

### 2. Connection Settings

* **Name:** Enter a name for your connector
* **Connection String:** Copy the connection string from earlier steps but replace username and password in the string with the one you created earlier.
* **Array Encoding:** Specify how Streamkap should encode MongoDB array types. `Array` encodes them as a JSON array but requires all elements in the arrays to be of the same type e.g. array of integers. `Array_String` encodes them as a JSON string and must be used if the MongoDB arrays have mixed types.
* **Nested Document Encoding:** Specify how Streamkap should encode nested documents. `Document` encodes them as JSON objects but may be problematic for complex (e.g. multiple levels of nested sub documents and arrays, sub arrays of nested documents) documents. `String` encodes them as a JSON string and we recommend it if the MongoDB nested documents are complex.
* **Connect via SSH Tunnel:** The Connector will connect to an SSH server in your network which has access to your database. This is necessary if the Connector cannot connect directly to your database.
  * See [SSH Tunnel](/ssh-tunnel) for setup instructions.

### 3. Snapshot Settings

* **Signal Collection:** Full path to the signal collection including database and collection name (e.g., `streamkap.streamkap_signal`). This collection is used for incremental snapshotting. See [Enable Snapshots](#3-enable-snapshots) for setup instructions.

### 4. Database and Collection Capture

* **Add Database/Collections:** Specify the database(s) and collection(s) for capture.
  * You can bulk upload here. The format is a simple list of databases and collections, with each entry on a new row. Save as a .csv file without a header.
  * If you configured Layer 2 heartbeats, include the heartbeat collection (e.g., `streamkap.streamkap_heartbeat`). See [Heartbeats](#4-heartbeats) for setup instructions.

<Warning>
  **CDC only captures base collections, not Views**

  Change Data Capture reads MongoDB's oplog via change streams, which only record changes to physical collections. Database Views are query-time aggregations with no physical storage—they don't generate oplog entries.

  **What you cannot capture:** Views (aggregation pipeline results), system collections (system.\*, admin.\*, config.\*).

  **Time series collections** (MongoDB 5.0+): These use specialized columnar-like bucketing storage that compresses documents by time windows. **Change streams are NOT supported** because individual document changes cannot be tracked—they're absorbed into compressed buckets. **Workaround**: Use regular collections with compound indexes like `{metadata: 1, timestamp: 1}` for CDC-compatible time series data.

  **Capped collections caveat**: Can be captured, but there's risk of missing events if the connector falls behind and the oplog position it needs gets overwritten. For mission-critical data, use regular collections.

  **Solution:** Specify only the underlying base collections that feed your views. You can recreate the view aggregation pipeline in your destination or transformation layer.
</Warning>

Click **Save**.
