Amazon RDS Oracle
Oracle Change Data Capture Setup on Amazon RDS with Streamkap
Prerequisites
Standby databases
An Oracle database can be configured with either a physical or a logical standby database for recovery after a production failure. At this time, Streamkap does not support them.
- Oracle 12c or above, Standard or Enterprise Edition
- AWS RDS endpoint and port of the database
- (single-tenant architecture): Database name
- (multi-tenant architecture): Container database name (CDB) and pluggable database name (PDB)
- The Oracle database master user credentials or equivalent
- An AWS console account with administrator access to the database
Oracle Setup
For the Connector to ingest changes from your database it is dependent on Oracle's redo logs and archive logs. It is important that these redo logs are large enough and the archive logs are retained for long enough to ensure all changes are captured.
1) Enable Archive Logs
When redo logs fill up, Oracle archives groups of them into archive logs. For Oracle on RDS, archiving is enabled when AWS automated backups is enabled.
1.1) Enable AWS automated backups
Reboot required
When automated backups are enabled, your RDS instance and database are taken offline and a backup is immediately created which can take some time
- Sign in to and open the Amazon RDS console at https://console.aws.amazon.com/rds/
- In the left-side navigation menu, choose Databases. The Databases page should appear
- Select the DB instance that you want to modify
- Click Modify. The Modify DB instance page should appear
- For Backup retention period, choose at least 1 or higher
- Click Continue
- Select Apply immediately
- On the confirmation page, click Modify DB instance to enable automated backups
To check, you can connect to the database and run this query:
SELECT NAME, LOG_MODE FROM V$DATABASE;
If the LOG_MODE
is ARCHIVELOG
then it is enabled.
Or
- Sign into your AWS account
- Once signed in, navigate to the RDS dashboard by clicking on Services in the top left corner, Databases and then RDS or by typing RDS into the top left search box
- From the AWS RDS Dashboard, click on DB Instances or Databases in the left side menu
- Click on the DB identifier for the Oracle database you want Streamkap to use
- Click on the Maintenance & backups tab
- Under the Backups section, check Automated backups says "Enabled (N Days)"
1.2) Configure log retention
Archive logs should be retained for at least 24 hours. However, we recommend retaining them for longer, if possible. Too short a retention period and changes may not be captured and processed.
Retention periods and database storage
Archive logs are retained on your database instance using up it's storage capacity. It is important to make sure it has enough space, otherwise, performance issues and outages can occur.
In general, the more tables (and columns) there are, the more capacity is required. Even more is required if supplemental logging has been enabled for all columns.
Assuming your Oracle database has supplemental logging enabled, to estimate storage capacity you can look at the last 1 hour of log storage usage and multiply that by
archivelog retention hours
. Here's an example script for that:SELECT SUM(BLOCKS * BLOCK_SIZE) bytes, -- usage SUM(BLOCKS * BLOCK_SIZE) * 24 estimated_bytes -- assuming 24 hours archivelog retention FROM V$ARCHIVED_LOG WHERE FIRST_TIME >= SYSDATE-(1/24) -- last hour AND DEST_ID=1;
-- Set log retention
EXECUTE rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours', 72);
COMMIT;
2) Enable LogMiner
For the Connector to query the redo and archive logs, it is dependent on the Oracle LogMiner utility. To enable that, supplemental logging must be enabled.
EXECUTE rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD','ALL');
COMMIT;
To confirm if supplemental logging has been enabled, run this query:
SELECT NAME, SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE;
If the SUPPLEMENTAL_LOG_DATA_MIN
is YES
then it is enabled.
2.1 Resize redo logs
An Amazon RDS Oracle instance starts with four, online redo logs, 128MB each. That is too small; the logs should be resized to at least 500MB or more, especially for production databases.
Before making any changes, run this query to determine the current log sizes:
SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP# | SIZE_MB | STATUS |
---|---|---|
1 | 128 | INACTIVE |
2 | 128 | CURRENT |
3 | 128 | INACTIVE |
4 | 128 | INACTIVE |
Now let's add 4 new, larger logs that will eventually replace the smaller logs and run the query from earlier to confirm they have been created.
EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 536870912);
EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 536870912);
EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 536870912);
EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 536870912);
SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP# | SIZE_MB | STATUS |
---|---|---|
1 | 128 | INACTIVE |
2 | 128 | CURRENT |
3 | 128 | INACTIVE |
4 | 128 | INACTIVE |
5 | 512 | UNUSED |
6 | 512 | UNUSED |
7 | 512 | UNUSED |
8 | 512 | UNUSED |
For all groups showing as INACTIVE
, drop them and run the query from earlier to confirm they have been dropped:
-- Replace {group_number} placeholder and execute the procedure as required
EXEC rdsadmin.rdsadmin_util.drop_logfile(grp => {group_number});
SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP# | SIZE_MB | STATUS |
---|---|---|
2 | 128 | CURRENT |
5 | 512 | UNUSED |
6 | 512 | UNUSED |
7 | 512 | UNUSED |
8 | 512 | UNUSED |
Now let's switch the CURRENT
log so we can drop it and run the query from earlier to confirm it has switched.
EXEC rdsadmin.rdsadmin_util.switch_logfile;
SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP# | SIZE_MB | STATUS |
---|---|---|
2 | 128 | ACTIVE |
5 | 512 | CURRENT |
6 | 512 | UNUSED |
7 | 512 | UNUSED |
8 | 512 | UNUSED |
If the STATUS
of the log we want to drop is still ACTIVE
, we need to issue a checkpoint to make it INACTIVE
.
EXEC rdsadmin.rdsadmin_util.checkpoint;
Finally, let's drop the remaining log file and run the query from earlier to confirm we now have 4 larger logs.
-- Replace {group_number} placeholder and execute the procedure as required
EXEC rdsadmin.rdsadmin_util.drop_logfile(grp => {group_number});
GROUP# | SIZE_MB | STATUS |
---|---|---|
5 | 512 | CURRENT |
6 | 512 | UNUSED |
7 | 512 | UNUSED |
8 | 512 | UNUSED |
3) Create Database User
Depending on your database architecture, choose the correct script below to create a database user with privileges for the Connector.
-- Replace {...} placeholders as needed
ALTER SESSION SET CONTAINER=CDB$ROOT;
CREATE USER C##STREAMKAP_USER IDENTIFIED BY {password};
ALTER SESSION SET CONTAINER={PDB};
CREATE TABLESPACE STREAMKAP_LOGMINER_TBS DATAFILE {filename} SIZE 25M AUTOEXTEND ON MAXSIZE UNLIMITED;
ALTER USER C##STREAMKAP_USER DEFAULT STREAMKAP_LOGMINER_TBS;
ALTER USER C##STREAMKAP_USER QUOTA UNLIMITED ON STREAMKAP_LOGMINER_TBS;
-- Grant permissions
GRANT CREATE SESSION TO C##STREAMKAP_USER CONTAINER=ALL;
GRANT SET CONTAINER TO C##STREAMKAP_USER CONTAINER=ALL;
-- Allows the Connector to use LogMiner
GRANT LOGMINING TO C##STREAMKAP_USER CONTAINER=ALL;
-- Flashback queries used for performing initial snapshots of the data
GRANT FLASHBACK ANY TABLE TO C##STREAMKAP_USER CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO C##STREAMKAP_USER CONTAINER=ALL;
-- Required for schema history when performing initial snapshots
GRANT SELECT_CATALOG_ROLE TO C##STREAMKAP_USER CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO C##STREAMKAP_USER CONTAINER=ALL;
-- Connector creates a table for explicitly managing the flushing of internal log buffers (LGWR)
GRANT CREATE TABLE TO C##STREAMKAP_USER CONTAINER=ALL;
GRANT CREATE SEQUENCE TO C##STREAMKAP_USER CONTAINER=ALL;
-- Read-only privileges on system tables containing redo, archive log and current transaction state
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$DATABASE','C##STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG','C##STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG_HISTORY','C##STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_LOGS','C##STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_CONTENTS','C##STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_PARAMETERS','C##STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGFILE','C##STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVED_LOG','C##STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVE_DEST_STATUS','C##STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$TRANSACTION','C##STREAMKAP_USER','SELECT');
-- Grant the Streamkap user permission to read each schema and table you wish to sync
GRANT SELECT ON {schema}.{table} TO C##STREAMKAP_USER CONTAINER=ALL;
-- Alternatively, you can grant access to all
-- GRANT SELECT ANY TABLE TO C##STREAMKAP_USER CONTAINER=ALL;
-- Replace {...} placeholders as needed
CREATE USER STREAMKAP_USER IDENTIFIED BY {password};
CREATE TABLESPACE STREAMKAP_LOGMINER_TBS DATAFILE {filename} SIZE 25M AUTOEXTEND ON MAXSIZE UNLIMITED;
ALTER USER STREAMKAP_USER DEFAULT STREAMKAP_LOGMINER_TBS;
ALTER USER STREAMKAP_USER QUOTA UNLIMITED ON STREAMKAP_LOGMINER_TBS;
-- Grant permissions
GRANT CREATE SESSION TO STREAMKAP_USER;
GRANT SET CONTAINER TO STREAMKAP_USER;
-- Allows the Connector to use LogMiner
GRANT LOGMINING TO STREAMKAP_USER;
-- Flashback queries used for performing initial snapshots of the data
GRANT FLASHBACK ANY TABLE TO STREAMKAP_USER;
GRANT SELECT ANY TRANSACTION TO STREAMKAP_USER;
-- Required for schema history when performing initial snapshots
GRANT SELECT_CATALOG_ROLE TO STREAMKAP_USER;
GRANT EXECUTE_CATALOG_ROLE TO STREAMKAP_USER;
-- Connector creates a table for explicitly managing the flushing of internal log buffers (LGWR)
GRANT CREATE TABLE TO STREAMKAP_USER;
GRANT CREATE SEQUENCE TO STREAMKAP_USER;
-- Read-only privileges on system tables containing redo, archive log and current transaction state
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$DATABASE','STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG','STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG_HISTORY','STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_LOGS','STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_CONTENTS','STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_PARAMETERS','STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGFILE','STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVED_LOG','STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVE_DEST_STATUS','STREAMKAP_USER','SELECT');
EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$TRANSACTION','STREAMKAP_USER','SELECT');
-- Grant the Streamkap user permission to read each schema and table you wish to sync
GRANT SELECT ON {schema}.{table} TO STREAMKAP_USER;
-- Alternatively, you can grant access to all
-- GRANT SELECT ANY TABLE TO STREAMKAP_USER;
Enable Snapshots
You will need to create the table in the source database and give permissions to the STREAMKAP_USER
. Streamkap will use this collection for managing snapshots.
This table can exist in a different schema (on the same database) to the schema Streamkap captures data from.
Please create the signal table with the name
streamkap_signal
. It will not be recognised if given another name.
CREATE TABLE "streamkap_signal" (
id VARCHAR(255) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data VARCHAR(2000) NULL
);
GRANT SELECT, UPDATE, INSERT ON "streamkap_signal" TO STREAMKAP_USER;
Streamkap Setup
- Go to Sources and choose Oracle on Amazon RDS
- Input the following information:
- Name - A unique and memorable name for this Connector
- Hostname - The database endpoint
- Port (optional) - The database port
- Username (case sensitive) -
STREAMKAP_USER
or the username you chose - Password - The database user's password
- Signal Table Schema - Streamkap will use a collection in this schema to manage snapshots e.g.
public
. See Enable Snapshots for more information - Database - The database name (single-tenant architecture) or container database name (multi-tenant architecture)
- Pluggable Database (optional, multi-tenant architecture only) - The pluggable database name
- Schemas (case sensitive) - Either by clicking the + (plus) button and typing in the schema name, or by uploading a comma separated list of names of the schemas to capture e.g.
sales, web
- Tables (case sensitive) - Either by clicking the + (plus) button and typing in the table name, or by uploading a comma separated list of names of the tables to capture, including the schema name e.g.
sales.orders, sales.shipping, web.analytics
- Click Save
Frequently Asked Questions
How do I find the database endpoint and port?
If you use any database tools such as Oracle SQL Developer to interact with the database, you'll find them in the connection configuration you have saved in those tools.
Alternatively, you can find them in the AWS account:
- Sign into your AWS account
- Once signed in, navigate to the RDS dashboard by clicking on Services in the top left corner, Databases and then RDS or by typing RDS into the top left search box
- From the AWS RDS Dashboard, click on DB Instances or Databases in the left side menu
- Click on the DB identifier for the Oracle database you want Streamkap to use. The Database Details page should appear
- Under the Connectivity & security section you will find Endpoint & port
Does the AWS automated backups retention period need to be large enough to accommodate the archivelog retention hours
?
archivelog retention hours
?No, they are mostly independent of each other.
archivelog retention hours
determines how long the logs are retained locally in the database storage- AWS automated backups retention period determines how long the logs are retained by AWS outside of the database storage
When archive logs have existed for longer than the archivelog retention hours
, they are removed from the database storage and then retained by AWS - outside of the database storage - for the AWS automated backups retention period.
The AWS automated backups are there so you can recover your database in the event of a disaster. The retention period doesn't impact the Streamkap Connector, only the archivelog retention hours
does.
Why does the Connector need CREATE TABLE
privileges?
CREATE TABLE
privileges?Oracle has a background process called the "Log Writer" or LGWR for short. Logs are written to an in-memory buffer first, and then the LGWR writes them to disk.
The Connector needs to keep track of the last recorded system change number (SCN) that the LGWR process records for each committed transaction.
It's best to persist that SCN somewhere outside of the Connector so it creates a very small, 1 column table named LOG_MINING_TABLE
. If the Connector fails, it can use the last recorded value from that table to recover.
Do I need to resize my redo logs for a database with low data volume and traffic?
We recommend it because the supplemental logging configuration from earlier in this guide increases the level of change tracking to include data that's essential for the Streamkap Connector.
We would also recommend for databases with low traffic to also enable the heartbeats feature explained here: Oracle Heartbeats
Multi-tenant Architecture
How do I find the Container and pluggable database names?
Connect to the database and run this script to list the available container (CDB) and pluggable database (PDB) names.
SELECT NAME, CDB, CON_ID FROM V$DATABASE ORDER BY CON_ID;
For the Container database names, the CDB
should be YES
.
Can I create a local user instead e.g. CREATE USER ... CONTAINER=<PDB>
?
CREATE USER ... CONTAINER=<PDB>
?At this time, no. The Connector makes calls to LogMiner APIs from within the root database. Additionally, it consults several V$
tables which are not available from within a PDB.
Updated 20 days ago