Amazon RDS Oracle

Oracle Change Data Capture Setup on Amazon RDS with Streamkap

Prerequisites

❗️

Standby databases

An Oracle database can be configured with either a physical or a logical standby database for recovery after a production failure. At this time, Streamkap does not support them.

  • Oracle 12c or above, Standard or Enterprise Edition
  • AWS RDS endpoint and port of the database
  • (single-tenant architecture): Database name
  • (multi-tenant architecture): Container database name (CDB) and pluggable database name (PDB)
  • The Oracle database master user credentials or equivalent
  • An AWS console account with administrator access to the database

Oracle Setup

For the Connector to ingest changes from your database it is dependent on Oracle's redo logs and archive logs. It is important that these redo logs are large enough and the archive logs are retained for long enough to ensure all changes are captured.

1) Enable Archive Logs

When redo logs fill up, Oracle archives groups of them into archive logs. For Oracle on RDS, archiving is enabled when AWS automated backups is enabled.

1.1) Enable AWS automated backups

❗️

Reboot required

When automated backups are enabled, your RDS instance and database are taken offline and a backup is immediately created which can take some time

  1. Sign in to and open the Amazon RDS console at https://console.aws.amazon.com/rds/
  2. In the left-side navigation menu, choose Databases. The Databases page should appear
  3. Select the DB instance that you want to modify
  4. Click Modify. The Modify DB instance page should appear
  5. For Backup retention period, choose at least 1 or higher
  6. Click Continue
  7. Select Apply immediately
  8. On the confirmation page, click Modify DB instance to enable automated backups

To check, you can connect to the database and run this query:

SELECT NAME, LOG_MODE FROM V$DATABASE;

If the LOG_MODE is ARCHIVELOG then it is enabled.

Or

  1. Sign into your AWS account
  2. Once signed in, navigate to the RDS dashboard by clicking on Services in the top left corner, Databases and then RDS or by typing RDS into the top left search box
  3. From the AWS RDS Dashboard, click on DB Instances or Databases in the left side menu
  4. Click on the DB identifier for the Oracle database you want Streamkap to use
  5. Click on the Maintenance & backups tab
  6. Under the Backups section, check Automated backups says "Enabled (N Days)"

1.2) Configure log retention

Archive logs should be retained for at least 24 hours. However, we recommend retaining them for longer, if possible. Too short a retention period and changes may not be captured and processed.

🚧

Retention periods and database storage

Archive logs are retained on your database instance using up it's storage capacity. It is important to make sure it has enough space, otherwise, performance issues and outages can occur.

In general, the more tables (and columns) there are, the more capacity is required. Even more is required if supplemental logging has been enabled for all columns.

Assuming your Oracle database has supplemental logging enabled, to estimate storage capacity you can look at the last 1 hour of log storage usage and multiply that by archivelog retention hours. Here's an example script for that:

SELECT
 SUM(BLOCKS * BLOCK_SIZE) bytes, -- usage
 SUM(BLOCKS * BLOCK_SIZE) * 24 estimated_bytes -- assuming 24 hours archivelog retention
  FROM V$ARCHIVED_LOG
 WHERE 
  FIRST_TIME >= SYSDATE-(1/24) -- last hour
  AND 
  DEST_ID=1;
-- Set log retention
EXECUTE rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours', 72);
COMMIT;

2) Enable LogMiner

For the Connector to query the redo and archive logs, it is dependent on the Oracle LogMiner utility. To enable that, supplemental logging must be enabled.

EXECUTE rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD','ALL');
COMMIT;

To confirm if supplemental logging has been enabled, run this query:

SELECT NAME, SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE;

If the SUPPLEMENTAL_LOG_DATA_MIN is YES then it is enabled.

2.1 Resize redo logs

An Amazon RDS Oracle instance starts with four, online redo logs, 128MB each. That is too small; the logs should be resized to at least 500MB or more, especially for production databases.

Before making any changes, run this query to determine the current log sizes:

SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP#SIZE_MBSTATUS
1128INACTIVE
2128CURRENT
3128INACTIVE
4128INACTIVE

Now let's add 4 new, larger logs that will eventually replace the smaller logs and run the query from earlier to confirm they have been created.

EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 536870912);
EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 536870912);
EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 536870912);
EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 536870912);

SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP#SIZE_MBSTATUS
1128INACTIVE
2128CURRENT
3128INACTIVE
4128INACTIVE
5512UNUSED
6512UNUSED
7512UNUSED
8512UNUSED

For all groups showing as INACTIVE, drop them and run the query from earlier to confirm they have been dropped:

-- Replace {group_number} placeholder and execute the procedure as required
EXEC rdsadmin.rdsadmin_util.drop_logfile(grp => {group_number});

SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP#SIZE_MBSTATUS
2128CURRENT
5512UNUSED
6512UNUSED
7512UNUSED
8512UNUSED

Now let's switch the CURRENT log so we can drop it and run the query from earlier to confirm it has switched.

EXEC rdsadmin.rdsadmin_util.switch_logfile;

SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP#SIZE_MBSTATUS
2128ACTIVE
5512CURRENT
6512UNUSED
7512UNUSED
8512UNUSED

If the STATUS of the log we want to drop is still ACTIVE, we need to issue a checkpoint to make it INACTIVE.

EXEC rdsadmin.rdsadmin_util.checkpoint;

Finally, let's drop the remaining log file and run the query from earlier to confirm we now have 4 larger logs.

-- Replace {group_number} placeholder and execute the procedure as required
EXEC rdsadmin.rdsadmin_util.drop_logfile(grp => {group_number});
GROUP#SIZE_MBSTATUS
5512CURRENT
6512UNUSED
7512UNUSED
8512UNUSED

3) Create Database User

Depending on your database architecture, choose the correct script below to create a database user with privileges for the Connector.

-- Replace {...} placeholders as needed
ALTER SESSION SET CONTAINER=CDB$ROOT;
CREATE USER C##STREAMKAP_USER IDENTIFIED BY {password};

ALTER SESSION SET CONTAINER={PDB};
CREATE TABLESPACE STREAMKAP_LOGMINER_TBS DATAFILE {filename} SIZE 25M AUTOEXTEND ON MAXSIZE UNLIMITED;
ALTER USER C##STREAMKAP_USER DEFAULT STREAMKAP_LOGMINER_TBS;
ALTER USER C##STREAMKAP_USER QUOTA UNLIMITED ON STREAMKAP_LOGMINER_TBS;

-- Grant permissions
GRANT CREATE SESSION TO C##STREAMKAP_USER CONTAINER=ALL; 
GRANT SET CONTAINER TO C##STREAMKAP_USER CONTAINER=ALL; 

-- Allows the Connector to use LogMiner
GRANT LOGMINING TO C##STREAMKAP_USER CONTAINER=ALL;

-- Flashback queries used for performing initial snapshots of the data
GRANT FLASHBACK ANY TABLE TO C##STREAMKAP_USER CONTAINER=ALL; 
GRANT SELECT ANY TRANSACTION TO C##STREAMKAP_USER CONTAINER=ALL; 

-- Required for schema history when performing initial snapshots
GRANT SELECT_CATALOG_ROLE TO C##STREAMKAP_USER CONTAINER=ALL; 
GRANT EXECUTE_CATALOG_ROLE TO C##STREAMKAP_USER CONTAINER=ALL; 

-- Connector creates a table for explicitly managing the flushing of internal log buffers (LGWR)
GRANT CREATE TABLE TO C##STREAMKAP_USER CONTAINER=ALL;

GRANT CREATE SEQUENCE TO C##STREAMKAP_USER CONTAINER=ALL; 

-- Read-only privileges on system tables containing redo, archive log and current transaction state
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$DATABASE','C##STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG','C##STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG_HISTORY','C##STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_LOGS','C##STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_CONTENTS','C##STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_PARAMETERS','C##STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGFILE','C##STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVED_LOG','C##STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVE_DEST_STATUS','C##STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$TRANSACTION','C##STREAMKAP_USER','SELECT'); 

-- Grant the Streamkap user permission to read each schema and table you wish to sync
GRANT SELECT ON {schema}.{table} TO C##STREAMKAP_USER CONTAINER=ALL;
-- Alternatively, you can grant access to all
-- GRANT SELECT ANY TABLE TO C##STREAMKAP_USER CONTAINER=ALL;
-- Replace {...} placeholders as needed
CREATE USER STREAMKAP_USER IDENTIFIED BY {password};

CREATE TABLESPACE STREAMKAP_LOGMINER_TBS DATAFILE {filename} SIZE 25M AUTOEXTEND ON MAXSIZE UNLIMITED;
ALTER USER STREAMKAP_USER DEFAULT STREAMKAP_LOGMINER_TBS;
ALTER USER STREAMKAP_USER QUOTA UNLIMITED ON STREAMKAP_LOGMINER_TBS;

-- Grant permissions
GRANT CREATE SESSION TO STREAMKAP_USER; 
GRANT SET CONTAINER TO STREAMKAP_USER; 

-- Allows the Connector to use LogMiner
GRANT LOGMINING TO STREAMKAP_USER; 

-- Flashback queries used for performing initial snapshots of the data
GRANT FLASHBACK ANY TABLE TO STREAMKAP_USER; 
GRANT SELECT ANY TRANSACTION TO STREAMKAP_USER; 

-- Required for schema history when performing initial snapshots
GRANT SELECT_CATALOG_ROLE TO STREAMKAP_USER; 
GRANT EXECUTE_CATALOG_ROLE TO STREAMKAP_USER; 

-- Connector creates a table for explicitly managing the flushing of internal log buffers (LGWR)
GRANT CREATE TABLE TO STREAMKAP_USER; 

GRANT CREATE SEQUENCE TO STREAMKAP_USER; 

-- Read-only privileges on system tables containing redo, archive log and current transaction state
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$DATABASE','STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG','STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG_HISTORY','STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_LOGS','STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_CONTENTS','STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_PARAMETERS','STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGFILE','STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVED_LOG','STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVE_DEST_STATUS','STREAMKAP_USER','SELECT'); 
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$TRANSACTION','STREAMKAP_USER','SELECT');

-- Grant the Streamkap user permission to read each schema and table you wish to sync
GRANT SELECT ON {schema}.{table} TO STREAMKAP_USER;
-- Alternatively, you can grant access to all
-- GRANT SELECT ANY TABLE TO STREAMKAP_USER;

Enable Snapshots

You can perform ad-hoc snapshots of all or some of your tables in the Streamkap app. See Snapshots & Backfilling for more information.

❗️

Please create the signal table with the name streamkap_signal in a new schema called streamkap. It will not be recognised if given another name.

-- Switch to the newly created user/schema
ALTER SESSION SET CURRENT_SCHEMA = streamkap;

-- Create the table
CREATE TABLE streamkap_signal (
  id VARCHAR2(255) PRIMARY KEY, 
  type VARCHAR2(32) NOT NULL, 
  data VARCHAR2(2000) NULL
);

-- Grant necessary privileges on the table to the user
GRANT SELECT, UPDATE, INSERT ON streamkap_signal TO streamkap;

Streamkap Setup

  • Go to Sources and choose Oracle on Amazon RDS
  • Input the following information:
    • Name - A unique and memorable name for this Connector
    • Hostname - The database endpoint
    • Port (optional) - The database port
    • Username (case sensitive) - STREAMKAP_USER or the username you chose
    • Password - The database user's password
    • Heartbeat - Required for low volume connectors. See Oracle Heartbeats (CDB Multi-Tenant)
    • Signal Table Schema - Streamkap will use a collection in this schema to manage snapshots e.g. public. See Enable Snapshots for more information
    • Database - The database name (single-tenant architecture) or container database name (multi-tenant architecture)
    • Pluggable Database (optional, multi-tenant architecture only) - The pluggable database name
    • Connect via SSH Tunnel. See SSH Tunnel
    • Advanced Parameters
      • Snapshot Mode (Default When Needed) See PostgreSQL Snapshot Modes for more information
      • Represent Binary Data As (Default bytes)
      • Snapshot Chunk Size (Default 1024) - This is the number of rows read at a time when snapshotting. This is a low safe value. As a guide, if you have 100m + rows of data you may want to move this to 5120. If you have 1bn then a higher number still will allow you to backfill faster.
      • Max Batch Size (Default 2048) - A value that specifies the maximum size of each batch of events that the connector processes. Only increase if experiencing lag
      • Add Schemas/Tables. Can also bulk upload here. The format is a simple list of each schema or table per row saved in csv format without a header.
  • Click Save

The connector will take approximately 1 minute to start processing data.

Frequently Asked Questions

How do I find the database endpoint and port?

If you use any database tools such as Oracle SQL Developer to interact with the database, you'll find them in the connection configuration you have saved in those tools.

Alternatively, you can find them in the AWS account:

  1. Sign into your AWS account
  2. Once signed in, navigate to the RDS dashboard by clicking on Services in the top left corner, Databases and then RDS or by typing RDS into the top left search box
  3. From the AWS RDS Dashboard, click on DB Instances or Databases in the left side menu
  4. Click on the DB identifier for the Oracle database you want Streamkap to use. The Database Details page should appear
  5. Under the Connectivity & security section you will find Endpoint & port

Does the AWS automated backups retention period need to be large enough to accommodate the archivelog retention hours?

No, they are mostly independent of each other.

  • archivelog retention hours determines how long the logs are retained locally in the database storage
  • AWS automated backups retention period determines how long the logs are retained by AWS outside of the database storage

When archive logs have existed for longer than the archivelog retention hours, they are removed from the database storage and then retained by AWS - outside of the database storage - for the AWS automated backups retention period.

The AWS automated backups are there so you can recover your database in the event of a disaster. The retention period doesn't impact the Streamkap Connector, only the archivelog retention hours does.

Why does the Connector need CREATE TABLE privileges?

Oracle has a background process called the "Log Writer" or LGWR for short. Logs are written to an in-memory buffer first, and then the LGWR writes them to disk.

The Connector needs to keep track of the last recorded system change number (SCN) that the LGWR process records for each committed transaction.

It's best to persist that SCN somewhere outside of the Connector so it creates a very small, 1 column table named LOG_MINING_TABLE. If the Connector fails, it can use the last recorded value from that table to recover.

Do I need to resize my redo logs for a database with low data volume and traffic?

We recommend it because the supplemental logging configuration from earlier in this guide increases the level of change tracking to include data that's essential for the Streamkap Connector.

We would also recommend for databases with low traffic to also enable the heartbeats feature explained here: Oracle Heartbeats

Multi-tenant Architecture

How do I find the Container and pluggable database names?

Connect to the database and run this script to list the available container (CDB) and pluggable database (PDB) names.

SELECT NAME, CDB, CON_ID FROM V$DATABASE ORDER BY CON_ID;

For the Container database names, the CDB should be YES.

Can I create a local user instead e.g. CREATE USER ... CONTAINER=<PDB>?

At this time, no. The Connector makes calls to LogMiner APIs from within the root database. Additionally, it consults several V$ tables which are not available from within a PDB.