Amazon RDS PostgreSQL Serverless
PostgreSQL Change Data Capture Setup on Amazon RDS PostgreSQL Serverless with Streamkap
Prerequisites
- PostgreSQL version ≥ 10
- A database user with sufficient privileges to configure the database, including enabling logical replication and creating users
PostgreSQL Setup
1. Grant Database Access
- Configure one of the Connection Options to ensure Streamkap can reach your database.
2. Enable Logical Replication
Aurora Serverless Read Replicas
Aurora Serverless Read Replicas only support physical not logical replication. Please configure and use the primary server with Streamkap.
Logical replication is a method of replicating data objects and their changes, based upon their replication identity (usually a primary key). The Connector relies on PostgreSQLs implementation of this.
- Open the Amazon RDS console at https://console.aws.amazon.com/rds/.
- In the navigation pane, choose Parameter groups.
Default parameter groups can't be modified
If the DB instance is using a default parameter group, create a new one:
- Choose Create parameter group.
- Enter a Parameter group name and Description.
- For Engine type, choose your database engine.
- For Parameter group family, choose a DB parameter group family.
- For Type, choose DB Cluster Parameter Group.
- Choose Create.
- Select the parameter group to edit.
- Choose Edit from Actions.
- Set
rds.logical_replication
to1
. - Set
wal_sender_timeout
to0
. A nonzero value may cause disconnects in low/intermittent traffic databases. Enable PostgreSQL Heartbeats or set an appropriate value if needed. - Choose Save changes.
If you created a new parameter group, associate it with your Aurora DB cluster:
- In the navigation pane, choose Databases and select the target DB cluster.
- Choose Modify.
- Change the DB cluster parameter group setting.
- Choose Continue and review modifications.
- The change is applied immediately, regardless of the Scheduling of modifications setting.
- On the confirmation page, choose Modify cluster.
A reboot is required to apply the changes.
WAL and database storage
The write-ahead logs take up disk space on your database. If you run out of disk space, you may face an interruption to your database. To mitigate this:
- Ensure that your database server has ample free space.
- Enable sufficient space with auto growth on the database.
- Monitor the WAL growth and configure alerts.
- Configure the publication (See Create Publication & Slot) to contain only tables you need.
3. Create Database User
It's recommended to create a separate user and role for the Connector to access your PostgreSQL database. Below is an example script that does that.
-- Replace { ... } placeholders as required
CREATE USER streamkap_user PASSWORD '{password}';
-- Create a role for Streamkap
CREATE ROLE streamkap_role;
GRANT streamkap_role TO streamkap_user;
GRANT rds_replication TO streamkap_role;
-- Grant Streamkap permissions on the database, schema and all tables to capture
GRANT CONNECT ON DATABASE "{database}" TO streamkap_role;
GRANT CREATE, USAGE ON SCHEMA "{schema}" TO streamkap_role;
GRANT SELECT ON ALL TABLES IN SCHEMA "{schema}" TO streamkap_role;
ALTER DEFAULT PRIVILEGES IN SCHEMA "{schema}" GRANT SELECT ON TABLES TO streamkap_role;
4. Enable Snapshots
To backfill your data, the Connector needs to be able to perform snapshots. See Snapshots & Backfilling for more information.
To enable this feature, there are 2 methods available:
Method 1: Enable read only connection
This method is recommended if you cannot create a table in the source database and grant the Connector read/write privileges to that.
- Set Read only to Yes during Streamkap Setup. No other configuration should be necessary.
Method 2: Create a table in the source database
You will need to create the table and give necessary permissions to the streamkap_user
. The Connector will use this collection for managing snapshots. Below is an example script that does that.
Please create the signal table with the name
streamkap_signal
. It will not be recognised if given another name.
-- Create the schema
CREATE SCHEMA streamkap;
-- Switch to the newly created schema
SET search_path TO streamkap;
-- Create the table
CREATE TABLE streamkap_signal (
id VARCHAR(255) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data VARCHAR(2000) NULL
);
-- Grant necessary privileges on the table to the role
GRANT CREATE, USAGE ON SCHEMA streamkap TO streamkap_role;
GRANT SELECT ON ALL TABLES IN SCHEMA streamkap TO streamkap_role;
GRANT SELECT, UPDATE, INSERT ON TABLE streamkap_signal TO streamkap_role;
Publications and signal tables
When you create the PostgreSQL publication in the next step, if you choose to specify tables for capture instead of all tables, you must include the
streamkap_signal
table.
5. Create Publication & Slot
REPLICA IDENTITY
and deleted records (PostgreSQL 13 and newer)Introduced in PostgreSQL 13, the
REPLICA IDENTITY
table setting controls what data is logged for row updates and deletes.By default, only the primary key and Streamkap metadata column values are retained for deleted records. All other columns will be empty. This leaves you with an incomplete record.
If you require - for auditing and historical tracking purposes - all column values for deleted records, or if your deletion strategy for your destination is 'soft deletes' (retain the deleted record with a deletion flag), you must set the
REPLICA IDENTITY
toFULL
for all capture tables.ALTER TABLE {table} REPLICA IDENTITY FULL;
This ensures complete data retention.
Publications contain a set of change events for the tables you want the Connector to capture.
- Create a publication for your tables. You can create a publication for all tables or selected tables.
-- Create a publication for all tables to capture
CREATE PUBLICATION streamkap_pub FOR ALL TABLES;
-- Create a publication for specific tables to capture
CREATE PUBLICATION streamkap_pub FOR TABLE table1, table2, table3, ...;
-- Verify the tables to capture were added to the publication
SELECT * FROM pg_publication_tables where pubname = 'streamkap_pub';
Altering publications
You cannot alter
FOR ALL TABLES
publications to include/exclude tables.If you set up a
FOR ALL TABLES
publication and later decide to change that, you have to drop the publication and create another to include specific tables e.g.CREATE PUBLICATION ... TABLE table1, table2, table3, ...
.However, any change events that occur before the new publication's created will not be included in it, so a snapshot's required to ensure they are not missed by your Streamkap pipelines.
You should also stop the Source before changing the publication.
A replication slot represents a stream of change events the Connector reads from.
- Create a replication slot.
-- Create a logical replication slot
SELECT pg_create_logical_replication_slot('streamkap_pgoutput_slot', 'pgoutput');
-- Verify the replication slot is working (this may take a few moments to return the count)
SELECT count(*) FROM pg_logical_slot_peek_binary_changes('streamkap_pgoutput_slot', null, null, 'proto_version', '1', 'publication_names', 'streamkap_pub');
Streamkap Setup
Follow these steps to configure your new connector:
1. Create the Source
- Navigate to Add Connectors.
- Choose PostgreSQL.
2. Connection Settings
-
Name: Enter a name for your connector.
-
Hostname: Specify the hostname.
-
Port: Default is
5432
. -
Connect via SSH Tunnel: The Connector will connect to an SSH server in your network which has access to your database. This is necessary if the Connector cannot connect directly to your database.
- See SSH Tunnel for setup instructions.
-
Username: Username to access the database. By default, Streamkap scripts use
streamkap_user
. -
Password: Password to access the database.
-
Database: Specify the database to stream data from.
-
Read only: Whether or not to use a read-only connection.
-
Heartbeats: Crucial for low and intermittent traffic databases. Enabled by default.
- Heartbeat Table Schema (For read-write connections only; Read only is No): If configured, this can mitigate unnecessary PostgreSQL log growth and retention.
- See PostgreSQL Heartbeats for setup instructions.
- Heartbeat Table Schema (For read-write connections only; Read only is No): If configured, this can mitigate unnecessary PostgreSQL log growth and retention.
3. Snapshot Settings
If you set Read only to No, you will need to create a snapshot signal table and give permissions to the
streamkap_user
. See Enable Snapshots for setup instructions.
- Signal Table Schema: The Connector will use a table in this schema to manage snapshots.
4. Replication Settings
- Replication Slot Name: The name of the replication slot for the connector to use. Default is
streamkap_pgoutput_slot
. - Publication Name: The name of the publication for the connector to use. Default is
streamkap_pub
.
5. Advanced Parameters
- SSL mode: Whether to use an encrypted connection to the PostgreSQL server. By default, it's required.
- Prefix with Database Name?: Changes the format of topics to
DatabaseName_TopicName
- Represent binary data as: Specifies how the data for binary columns e.g.
blob
,binary
,varbinary
should be interpreted. Your destination for this data can impact which option you choose. Default isbytes
.
Click Next.
6. Schema and Table Capture
- Add Schemas/Tables: Specify the schema(s) and table(s) for capture
- You can bulk upload here. The format is a simple list of schemas and tables, with each entry on a new row. Save as a
.csv
file without a header.
- You can bulk upload here. The format is a simple list of schemas and tables, with each entry on a new row. Save as a
Click Save.
Troubleshooting
Managing PostgreSQL upgrades
When upgrading the PostgreSQL database used by Streamkap, there are specific steps to prevent data loss and ensure continued operation.
Streamkap handles network failures and outages well. If a monitored database stops, the connector resumes from the last recorded log sequence number (LSN) once communication is restored. It retrieves this offset and queries PostgreSQL for a matching LSN in the replication slot.
A replication slot is required for change capture, but PostgreSQL removes slots during upgrades and doesn’t restore them. When the connector restarts, it requests the last known offset, but PostgreSQL cannot return it.
Creating a new replication slot isn't enough to prevent data loss. New slots only track changes from their creation point and lack earlier offsets. The connector fetches its last known offset from Kafka but can't retrieve corresponding data from the new slot. It skips older change events and resumes from the latest log position, causing silent data loss with no warnings.
Procedure
Follow these steps to minimize data loss. Note that a few steps may require support from Streamkap. We recommend notifying us about your database upgrade ahead of time to ensure you have the necessary support.
- Using your database's upgrade procedure, ensure writes to it have stopped.
- Allow the connector to capture all change events before starting the upgrade procedure. Ask Streamkap to confirm this for you.
Interrupting change data capture
If all the events were not captured before stopping the Source and upgrading the database, you can perform a snapshot (after the procedure is completed) in Streamkap for that Source to ensure no change events were missed.
- Assuming all events are captured, stop the Source in the Streamkap app. This flushes the last records and saves the last offset.
- Stop the database and upgrade it using your upgrade procedure.
Once the database is upgraded, and before allowing writes again:
- Recreate the logical replication slot, otherwise, Streamkap will miss changes. See Create Publication & Slot.
- Verify the publication for Streamkap exists; recreate it if necessary: Create Publication & Slot.
Update the Source in the Streamkap app
If you have chosen different names for the logical replication slot or publication, make sure to update them in the Streamkap setup page for the relevant PostgreSQL Source. In this case, contact Streamkap, as they may need to reset your Connector's offsets in this scenario.
- Restore write access to the database.
- Resume or restart the Source in the Streamkap app.
Updated 3 days ago