Amazon RDS PostgreSQL Serverless

PostgreSQL Change Data Capture Setup on Amazon RDS PostgreSQL Serverless with Streamkap

Prerequisites

  • PostgreSQL version ≥ 10.x
  • PostgreSQL logical replication enabled on the primary database
  • Connection details
  • Streamkap user and role

Granting Privileges

It's recommended to create a separate user and role for Streamkap to access your PostgreSQL database. Below is an example script that does that.

-- Replace { ... } placeholders as required
CREATE USER streamkap_user PASSWORD '{password}';

-- Create a role for Streamkap
CREATE ROLE streamkap_role;
GRANT streamkap_role TO streamkap_user;
GRANT rds_replication TO streamkap_role;

-- Grant Streamkap permissions on the database, schema and all tables to capture
GRANT CREATE ON DATABASE "{database}" TO streamkap_role;
GRANT CREATE, USAGE ON SCHEMA "{schema}" TO streamkap_role;
GRANT SELECT ON ALL TABLES IN SCHEMA "{schema}" TO streamkap_role;
ALTER DEFAULT PRIVILEGES IN SCHEMA "{schema}" GRANT SELECT ON TABLES TO streamkap_role;

Enable Snapshots

You will need to create the table in the source database and give permissions to the streamkap_role. Streamkap will use this collection for managing snapshots.

This table can exist in a different schema (on the same database) to the schema Streamkap captures data from.

❗️

Please create the signal table with the name streamkap_signal. It will not be recognised if given another name.

CREATE TABLE streamkap_signal (
  id VARCHAR(255) PRIMARY KEY, 
  type VARCHAR(32) NOT NULL, 
  data VARCHAR(2000) NULL
);

GRANT SELECT, UPDATE, INSERT ON streamkap_signal TO streamkap_role;

Configuring & Monitor your WAL log

🚧

WAL logs and database storage

Ensure that your database server has ample free space for logical replication to maintain the logs or you may face an interruption to your database. Enable sufficient space with auto growth on the database, monitor the WAL log and configure the publication slot to contain only tables you need

Enabling Logical Replication & Disable Timeout

To enable logical replication with the pgoutput plugin, follow these steps

  1. Open the Amazon RDS console at https://console.aws.amazon.com/rds/
  2. In the navigation pane, choose Parameter groups
  3. Choose the parameter group used by the DB instance you want to modify
  4. You can't modify a default parameter group. If the DB instance is using a default parameter group, create a new parameter group and associate it with the DB instance
  5. From Parameter group actions, choose Edit
  6. Set the logical_replication parameter value to 1
  7. Set the wal_sender_timeout parameter value to 0
  8. Choose Save changes to save the updates to the DB parameter group

📘

Reboot required

Create Publication(s) & Slot

Publications contain a set of change events for the tables you include.

Log in to a PostgreSQL console (such as a SQL workbench or psql) as a superuser. Superusers have the rds_superuser role.

Create a publication for your tables. You can create a publication for all tables or be selective.

-- Note: You must have table ownership privileges on the table(s)

-- Create a publication for all tables or specific table(s) to replicate

-- Option 1: All Tables
CREATE PUBLICATION streamkap_pub FOR ALL TABLES;

-- Option 2: Specific Tables
CREATE PUBLICATION streamkap_pub FOR TABLE table1, table2, table3, ...;

-- If you didn't add ALL TABLES, add our signal table to the publication
ALTER PUBLICATION streamkap_pub ADD TABLE streamkap_signal;

-- If you need to drop a table
-- ALTER PUBLICATION streamkap_pub DROP TABLE table5;

-- PostgreSQL 13 or later, enable the adding of partitioned tables
-- CREATE PUBLICATION streamkap_pub FOR ALL TABLES WITH (publish_via_partition_root=true);

-- Create a logical replication slot
SELECT pg_create_logical_replication_slot('streamkap_pgoutput_slot', 'pgoutput');

-- Verify the table(s) to replicate were added to the publication
SELECT * FROM pg_publication_tables;

Log in as the Streamkap user and verify it can read the replication slot by running the following command:

SELECT count(*) FROM pg_logical_slot_peek_binary_changes('streamkap_pgoutput_slot', null, null, 'proto_version', '1', 'publication_names', 'streamkap_pub');

Consider Access Restrictions

Setup PostgreSQL Connector in Streamkap

  • Go to Sources and click Create New
  • Input
    • Name for your Connector
    • Hostname
    • Port (Default 5432)
    • Username (Username you chose earlier, our scripts use streamkap_user)
    • Password
    • Signal Table Schema - Streamkap will use a collection in this schema to manage snapshots e.g. public. See Enable Snapshots for more information
    • Database Name
    • Replication Slot Name (Default streamkap_pgoutput_slot)
    • Publication Name (Default streamkap_pub)
    • Advanced Parameters
      • Snapshot Mode (Default When Needed) See PostgreSQL Snapshot Modes for more information
      • Represent Binary Data As (Default bytes)
      • Snapshot Chunk Size (Default 1024) - This is the number of rows read at a time when snapshotting. This is a low safe value. As a guide, if you have 100m + rows of data you may want to move this to 5120. If you have 1bn then a higher number still will allow you to backfill faster.
      • Max Batch Size (Default 2048) - A value that specifies the maximum size of each batch of events that the connector processes. Only increase if experiencing lag
    • Add Schemas/Tables. Can also bulk upload here. The format is a simple list of each schema or table per row saved in csv format without a header.
    • Click Save
      The connector will take approximately 1 minute to start processing data.