> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Neon PostgreSQL

> PostgreSQL Change Data Capture Setup on Neon with Streamkap

## Prerequisites

* PostgreSQL version ≥ 10
* A database user with sufficient privileges to configure the database, including enabling logical replication and creating users

## PostgreSQL Setup

### 1. Grant Database Access

* Configure one of the [Connection Options](/connection-options) to ensure Streamkap can reach your database.

### 2. Create Database User

It's recommended to create a separate user and role for Streamkap to access your PostgreSQL database. Below is an example script that does that.

<CodeGroup>
  ```SQl SQl theme={null}
  -- Replace { ... } placeholders as required
  CREATE USER streamkap_user PASSWORD '{password}';

  -- Create a role for Streamkap
  CREATE ROLE streamkap_role nologin PASSWORD '{password}';
  GRANT streamkap_role TO streamkap_user;

  -- Grant Streamkap permissions on the database, schema and all tables to capture
  GRANT CONNECT ON DATABASE "{database}" TO streamkap_role; 
  GRANT CREATE, USAGE ON SCHEMA "{schema}" TO streamkap_role;
  GRANT SELECT ON ALL TABLES IN SCHEMA "{schema}" TO streamkap_role;
  ALTER DEFAULT PRIVILEGES IN SCHEMA "{schema}" GRANT SELECT ON TABLES TO streamkap_role;

  -- Grant replication role to the user
  ALTER USER streamkap_user WITH REPLICATION;
  ```
</CodeGroup>

### 3. Enable Snapshots

To backfill your data, the Connector needs to be able to perform snapshots. See [Snapshots & Backfilling](/snapshots) for more information.

To enable this feature, there are 2 methods available:

#### Method 1: Enable read only connection

<Danger>
  Requires PostgreSQL version 13 or higher.
</Danger>

This method is recommended if you cannot create a table in the source database and grant the Connector read/write privileges to that.

* Set **Read only** to **Yes** during Streamkap Setup. No other configuration should be necessary.

#### Method 2: Create a table in the source database

<Danger>
  Not supported on read replicas. Please use method 1 instead.
</Danger>

You will need to create the table and give necessary permissions to the `streamkap_user`. The Connector will use this collection for managing snapshots. Below is an example script that does that.

<Info>
  The examples below use `streamkap_signal` as the signal table name, but you can choose any name. During [Streamkap Setup](#3-snapshot-settings), provide the full path to your signal table in `schema.table` format (e.g., `streamkap.streamkap_signal`).
</Info>

<CodeGroup>
  ```SQL SQL theme={null}
  -- Create the schema
  CREATE SCHEMA streamkap;

  -- Switch to the newly created schema
  SET search_path TO streamkap;

  -- Create the table
  CREATE TABLE streamkap_signal (
    id VARCHAR(255) PRIMARY KEY, 
    type VARCHAR(32) NOT NULL, 
    data VARCHAR(2000) NULL
  );

  -- Grant necessary privileges on the table to the role
  GRANT CREATE, USAGE ON SCHEMA streamkap TO streamkap_role;
  GRANT SELECT ON ALL TABLES IN SCHEMA streamkap TO streamkap_role;
  GRANT SELECT, UPDATE, INSERT, DELETE ON TABLE streamkap_signal TO streamkap_role;
  ```
</CodeGroup>

<Warning>
  **Publications and signal tables**

  When you create the PostgreSQL publication in the next step, if you choose to specify tables for capture instead of all tables, you **must** include your signal table.
</Warning>

### 4. Heartbeats

Connectors use "offsets"—like bookmarks—to track their position in the database's log or change stream. When no changes occur for long periods, these offsets may become outdated, and the Connector might lose its place or stop capturing changes.

Heartbeats ensure the Connector stays active and continues capturing changes.

There are two layers of heartbeat protection:

#### Layer 1: Connector heartbeats (enabled by default)

The Connector periodically emits heartbeat messages to an internal topic, even when no actual data changes are detected. This keeps offsets fresh and prevents staleness.

No configuration is necessary for this layer; it is automatically enabled. We recommend keeping this layer enabled for all deployments.

#### Layer 2: Source database heartbeats (recommended)

<Info>
  **Why we recommend configuring Layer 2**

  While Layer 2 is **crucial** for low-traffic or intermittent databases, we recommend configuring it for all deployments. It provides additional resilience and helps prevent issues during periods of inactivity.
</Info>

You can configure regular updates to a dedicated heartbeat table in the source database. This simulates activity, ensuring change events are generated consistently, maintaining log progress and providing additional resilience.

How this layer is configured depends on the connection type (if supported by the Source):

* **Read-write connections** (when **Read only** is **No** during Streamkap Setup): The Connector updates the heartbeat table directly.
* **Read-only connections** (when **Read only** is **Yes** during Streamkap Setup): A scheduled job on the **primary** database updates the heartbeat table, and these changes replicate to the read replica for the Connector to consume.

This layer requires you to set up a heartbeat table—and for read-only connections, a scheduled job (e.g., `pg_cron` for PostgreSQL, `event_scheduler` for MySQL)—on your source database.

<Tabs>
  <Tab title="Read-write connections">
    For read-write connections (when **Read only** is **No** during Streamkap Setup), the Connector writes to the heartbeat table directly.

    <CodeGroup>
      ```SQL SQL theme={null}
      -- Create the streamkap schema
      CREATE SCHEMA IF NOT EXISTS streamkap;

      -- Switch to the streamkap schema
      SET search_path TO streamkap;

      -- Create the heartbeat table with id, text, and last_update fields
      CREATE TABLE streamkap_heartbeat (
          id SERIAL PRIMARY KEY,
          text TEXT,
          last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      );

      -- Grant permission to the Streamkap user
      GRANT USAGE ON SCHEMA streamkap TO streamkap_user;
      GRANT SELECT, UPDATE, INSERT, DELETE ON TABLE streamkap_heartbeat TO streamkap_user;

      -- Insert the first row into the heartbeat table
      INSERT INTO streamkap_heartbeat (text) VALUES ('test_heartbeat');
      ```
    </CodeGroup>

    <Info>
      **Heartbeat tables and PostgreSQL publications**

      If the `streamkap_pub` publication created during PostgreSQL Setup was for specific tables e.g. `CREATE PUBLICATION streamkap_pub FOR TABLE table1, table2, table3, ...;` instead of `FOR ALL TABLES;`, you **must** add the heartbeat table to the publication: `ALTER PUBLICATION streamkap_pub ADD TABLE streamkap.streamkap_heartbeat;`.
    </Info>
  </Tab>

  <Tab title="Read-only connections">
    For read-only connections (when **Read only** is **Yes** during Streamkap Setup), the Connector cannot write to the heartbeat table directly. Instead, you must configure a scheduled job on the **primary** database to generate artificial traffic. These changes will replicate to the read replica, which the Connector then consumes.

    <Warning>
      **Run these commands on the primary database, not the read replica.**

      The heartbeat table and scheduled job must be created on the primary database. The changes will automatically replicate to the read replica.
    </Warning>

    <Steps>
      <Step title="Enable the pg_cron extension">
        The `pg_cron` extension must be allowed in your database's configuration. See your provider's documentation for enabling extensions:

        <CardGroup cols={2}>
          <Card title="Amazon RDS" icon="aws" href="https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL_pg_cron.html">
            Scheduling maintenance with pg\_cron
          </Card>

          <Card title="Amazon Aurora" icon="aws" href="https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/PostgreSQL_pg_cron.html">
            Scheduling maintenance with pg\_cron
          </Card>

          <Card title="Google Cloud SQL" icon="google" href="https://cloud.google.com/sql/docs/postgres/extensions#pg_cron">
            PostgreSQL extensions
          </Card>

          <Card title="Azure Database" icon="microsoft" href="https://learn.microsoft.com/en-us/azure/postgresql/flexible-server/concepts-extensions">
            PostgreSQL extensions
          </Card>

          <Card title="Supabase" icon="database" href="https://supabase.com/docs/guides/database/extensions/pg_cron">
            pg\_cron extension
          </Card>

          <Card title="Neon" icon="database" href="https://neon.tech/docs/extensions/pg_cron">
            pg\_cron extension
          </Card>

          <Card title="AlloyDB" icon="google" href="https://cloud.google.com/alloydb/docs/reference/extensions">
            Supported extensions
          </Card>
        </CardGroup>

        Once allowed, create the extension:

        ```sql SQL theme={null}
        CREATE EXTENSION IF NOT EXISTS pg_cron;
        ```
      </Step>

      <Step title="Create the heartbeat table">
        ```sql SQL theme={null}
        -- Create the streamkap schema
        CREATE SCHEMA IF NOT EXISTS streamkap;

        -- Create the heartbeat table
        CREATE TABLE streamkap.streamkap_heartbeat (
            id SERIAL PRIMARY KEY,
            text TEXT,
            last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        -- Insert the initial row
        INSERT INTO streamkap.streamkap_heartbeat (text) VALUES ('test_heartbeat');
        ```
      </Step>

      <Step title="Schedule the heartbeat update job">
        ```sql SQL theme={null}
        -- Schedule a job to update the heartbeat table every minute
        SELECT cron.schedule(
            'streamkap_heartbeat_job',
            '*/1 * * * *',
            $$UPDATE streamkap.streamkap_heartbeat SET text = 'updated_heartbeat', last_update = CURRENT_TIMESTAMP WHERE id = 1;$$
        );
        ```
      </Step>

      <Step title="Grant permissions">
        Whichever database user is used to create and run the cron jobs (often the `postgres` user or a dedicated cron user) needs appropriate permissions on the heartbeat table. Additionally, the Streamkap user also needs permissions to monitor the heartbeat table.

        ```sql SQL theme={null}
        GRANT USAGE ON SCHEMA streamkap TO {cron user};
        GRANT SELECT, UPDATE, INSERT, DELETE ON streamkap.streamkap_heartbeat TO {cron user};

        -- Grant permissions to the Streamkap user for monitoring and diagnostics
        GRANT USAGE ON SCHEMA streamkap TO streamkap_user;
        GRANT SELECT, UPDATE, INSERT, DELETE ON streamkap.streamkap_heartbeat TO streamkap_user;
        ```
      </Step>

      <Step title="(Recommended) Schedule cleanup of cron job history">
        The `pg_cron` extension stores job execution history in the table `cron.job_run_details`. To prevent this table from growing indefinitely:

        ```sql SQL theme={null}
        SELECT cron.schedule(
            'streamkap_cron_cleanup',
            '0 0 * * *',
            $$DELETE FROM cron.job_run_details WHERE end_time < now() - interval '7 days';$$
        );
        ```
      </Step>
    </Steps>

    <Accordion title="Useful pg_cron commands">
      ```sql SQL theme={null}
      -- View all scheduled jobs
      SELECT * FROM cron.job;

      -- View recent job execution history
      SELECT * FROM cron.job_run_details ORDER BY start_time DESC LIMIT 10;

      -- Unschedule a job (replace {jobid} with the actual job ID)
      SELECT cron.unschedule({jobid});
      ```
    </Accordion>

    <Info>
      **Heartbeat tables and PostgreSQL publications**

      If the `streamkap_pub` publication was created for specific tables (e.g., `CREATE PUBLICATION streamkap_pub FOR TABLE table1, table2, ...;`) instead of `FOR ALL TABLES`, you **must** add the heartbeat table to the publication:

      ```sql SQL theme={null}
      ALTER PUBLICATION streamkap_pub ADD TABLE streamkap.streamkap_heartbeat;
      ```
    </Info>
  </Tab>
</Tabs>

### 5. Create Publication & Slot

Publications contain a set of change events for the tables you want the Connector to capture.

* Create a publication for your tables. You can create a publication for all tables or selected tables.

<CodeGroup>
  ```SQL SQL - All Tables theme={null}
  -- Create a publication for all tables to capture
  CREATE PUBLICATION streamkap_pub FOR ALL TABLES WITH (publish_via_partition_root = true);
  ```

  ```SQL SQL - Selected Tables theme={null}
  -- Create a publication for specific tables to capture
  CREATE PUBLICATION streamkap_pub FOR TABLE table1, table2, table3, ... WITH (publish_via_partition_root = true);

  -- Verify the tables to capture were added to the publication
  SELECT * FROM pg_publication_tables where pubname = 'streamkap_pub';
  ```
</CodeGroup>

<Warning>
  **Altering publications**

  You cannot alter `FOR ALL TABLES` publications to include/exclude tables.

  If you set up a `FOR ALL TABLES` publication and later decide to change that, you have to drop the publication and create another to include specific tables e.g. `CREATE PUBLICATION ... TABLE table1, table2, table3, ...`.

  However, any change events that occur before the new publication's created will not be included in it, so a snapshot's required to ensure they are not missed by your Streamkap pipelines.

  You should also stop the [Source](https://app.streamkap.com/connectors/sources) *before* changing the publication.
</Warning>

A replication slot represents a stream of change events the Connector reads from.

* Create a replication slot.

<CodeGroup>
  ```SQL SQL theme={null}
  -- Create a logical replication slot
  SELECT pg_create_logical_replication_slot('streamkap_pgoutput_slot', 'pgoutput');

  -- Verify the replication slot is working (this may take a few moments to return the count)
  SELECT count(*) FROM pg_logical_slot_peek_binary_changes('streamkap_pgoutput_slot', null, null, 'proto_version', '1', 'publication_names', 'streamkap_pub');
  ```
</CodeGroup>

<Info>
  **Replication slot auto-creation**

  If the Connector cannot find the replication slot named in [Replication Settings](http://localhost:3000/neon-postgresql-cdc#4-replication-settings) during Streamkap Setup, it will attempt to create one automatically as it cannot read the database transaction logs without one. This is assuming the database user has been granted the `REPLICATION` role as per [Create Database User](/neon-postgresql-cdc#2-create-database-user).
</Info>

## Streamkap Setup

Follow these steps to configure your new connector:

### 1. Create the Source

* Navigate to [Add Connectors](https://app.streamkap.com/connectors/add?tab=Sources).
* Choose **PostgreSQL**.

### 2. Connection Settings

* **Name**: Enter a name for your connector.
* **Hostname**: Specify the hostname.

<Warning>
  **PgBouncer and pooled connections**

  Neon uses PgBouncer to support connection pooling via pooler hostnames like this `ep-cool-darkness-123456-pooler.us-east-2.aws.neon.tech` (notice the `-pooler` option). However, PgBouncer has very limited support for PostgreSQL startup options, and Streamkap depends on one PostgreSQL option `replication` it does not support. The Connector will fail with a PgBouncer [unsupported startup parameter](https://neon.com/docs/connect/connection-errors#unsupported-startup-parameter) error in this scenario.

  Because of that, Neon's connection pooling cannot be used, so please remove (if present) the `-pooler option` from the hostname. This means connections from Streamkap will be unpooled, and Neon has limits on unpooled connections you should be aware of.
</Warning>

* **Port**: Default is `5432`.

* **Connect via SSH Tunnel**: The Connector will connect to an SSH server in your network which has access to your database. This is necessary if the Connector cannot connect directly to your database.

  * See [SSH Tunnel](/ssh-tunnel) for setup instructions.

* **Username**: Username to access the database. By default, Streamkap scripts use `streamkap_user`.

* **Password**: Password to access the database.

* **Database**: Specify the database to stream data from.

* **Read only**: Whether or not to use a read-only connection. Requires PostgreSQL version 13 or higher.

  * When connecting to a read replica, set this to **Yes** to support Streamkap snapshots.

* **Heartbeats**: Enabled by default.

  * For **read-write** connections, configure a heartbeat table in the source database and set **Heartbeat Table Schema**. See [Heartbeats](#4-heartbeats) for setup instructions.

  * For **read-only** connections, configure a scheduled heartbeat job on the primary database using `pg_cron`, and include the heartbeat table in [Schema and Table Capture](#6-schema-and-table-capture). See [Heartbeats](#4-heartbeats) for setup instructions.

### 3. Snapshot Settings

<Info>
  If you set Read only to **No**, you will need to create a snapshot signal table and give permissions to the `streamkap_user`. See [Enable Snapshots](/neon-postgresql-cdc#3-enable-snapshots) for setup instructions.
</Info>

* **Signal Table**: Full path to the signal table including schema and table name (e.g., `streamkap.streamkap_signal`). This table is used for incremental snapshotting. See [Enable Snapshots](#3-enable-snapshots) for setup instructions.

### 4. Replication Settings

* **Replication Slot Name**: The name of the replication slot for the connector to use. Default is `streamkap_pgoutput_slot`.
* **Publication Name**: The name of the publication for the connector to use. Default is `streamkap_pub`.

### 5. Advanced Parameters

* **SSL mode**: Whether to use an encrypted connection to the PostgreSQL server. By default, it's required.
* **Prefix with Database Name?**: Changes the format of topics to `DatabaseName_TopicName`
* **Represent binary data as**: Specifies how the data for binary columns e.g. `blob`, `binary`, `varbinary` should be interpreted. Your destination for this data can impact which option you choose. Default is `bytes`.

Click **Next**.

### 6. Schema and Table Capture

* **Add Schemas/Tables**: Specify the schema(s) and table(s) for capture
  * You can bulk upload here. The format is a simple list of schemas and tables, with each entry on a new row. Save as a `.csv` file without a header.

<Warning>
  **CDC only captures base tables, not Views**

  Change Data Capture reads the PostgreSQL write-ahead log (WAL), which only records changes to physical tables. Database Views are query-time computations with no physical storage—they don't generate WAL entries.

  **What you cannot capture:** Views, temporary tables, unlogged tables, foreign tables (FDW), or system tables (information\_schema, pg\_catalog).

  **Solution:** Specify only the underlying base tables that feed your views. You can recreate the view logic in your destination or transformation layer.
</Warning>

Click **Save**.

## Troubleshooting

<AccordionGroup>
  <Accordion title="Managing PostgreSQL upgrades">
    When upgrading the PostgreSQL database used by Streamkap, there are specific steps to prevent data loss and ensure continued operation.

    Streamkap handles network failures and outages well. If a monitored database stops, the connector resumes from the last recorded log sequence number (LSN) once communication is restored. It retrieves this offset and queries PostgreSQL for a matching LSN in the replication slot.

    A replication slot is required for change capture, but PostgreSQL removes slots during upgrades and doesn’t restore them. When the connector restarts, it requests the last known offset, but PostgreSQL cannot return it.

    Creating a new replication slot isn't enough to prevent data loss. New slots only track changes from their creation point and lack earlier offsets. The connector fetches its last known offset from Kafka but can't retrieve corresponding data from the new slot. It skips older change events and resumes from the latest log position, causing silent data loss with no warnings.

    ### Procedure

    Follow these steps to minimize data loss. Note that a few steps may require support from Streamkap. We recommend notifying us about your database upgrade ahead of time to ensure you have the necessary support.

    * Using your database's upgrade procedure, ensure writes to it have stopped.
    * Allow the connector to capture all change events before starting the upgrade procedure. Ask Streamkap to confirm this for you.

    <Info>
      **Interrupting change data capture**

      If all the events were not captured before stopping the Source and upgrading the database, you can perform a snapshot (after the procedure is completed) in Streamkap for that Source to ensure no change events were missed.
    </Info>

    * Assuming all events are captured, stop the Source in the Streamkap app. This flushes the last records and saves the last offset.
    * Stop the database and upgrade it using your upgrade procedure.

    Once the database is upgraded, and **before** allowing writes again:

    * Recreate the logical replication slot, otherwise, Streamkap will miss changes. See [Create Publication & Slot](/neon-postgresql-cdc#4-create-publication--slot).
    * Verify the publication for Streamkap exists; recreate it if necessary: [Create Publication & Slot](/neon-postgresql-cdc#4-create-publication--slot).

    <Info>
      **Update the Source in the Streamkap app**

      If you have chosen different names for the logical replication slot or publication, make sure to update them in the Streamkap setup page for the relevant PostgreSQL Source. In this case, contact Streamkap, as they may need to reset your Connector's offsets in this scenario.
    </Info>

    * Restore write access to the database.
    * Resume or restart the Source in the Streamkap app.
  </Accordion>

  <Accordion title="Replica identity and deleted records (PostgreSQL 13 and newer)">
    Introduced in PostgreSQL 13, the `REPLICA IDENTITY` table setting controls what data is logged for row updates and deletes.

    By default, only the primary key and Streamkap metadata column values are retained for deleted records. All other columns will be empty. This leaves you with an incomplete record.

    If you require - for auditing and historical tracking purposes - **all** column values for deleted records, or if your deletion strategy for your destination is 'soft deletes' (retain the deleted record with a deletion flag), you should set the
    `REPLICA IDENTITY` to `FULL` for all capture tables.

    <CodeGroup>
      ```SQl SQl theme={null}
      ALTER TABLE {table} REPLICA IDENTITY FULL;
      ```
    </CodeGroup>

    This ensures complete data retention.
  </Accordion>

  <Accordion title="Capturing partitioned tables (PostgreSQL 13 and newer)">
    For capturing partitioned tables, it's essential to enable `publish_via_partition_root` on the publication.

    By default, changes to partitions are published from the partition itself. The connector expects changes to come from the root table.

    To ensure compatibility and consistent replication, enable this setting:

    `ALTER PUBLICATION streamkap_pub SET (publish_via_partition_root = true);`
  </Accordion>
</AccordionGroup>
