Oracle - Generic

Oracle Change Data Capture Setup with Streamkap

Prerequisites

❗️

Standby databases

An Oracle database can be configured with either a physical or a logical standby database for recovery after a production failure. At this time, Streamkap does not support them.

  • Oracle 12c or above, Standard or Enterprise Edition
  • Hostname and Port
  • (single-tenant architecture): Database name
  • (multi-tenant architecture): Container database name (CDB) and pluggable database name (PDB)
  • The Oracle database master user credentials or equivalent

Oracle Setup

For the Connector to ingest changes from your database it is dependent on Oracle's redo logs and archive logs. It is important that these redo logs are large enough and, the archive logs are retained for long enough to ensure all changes are captured.

1) Enable Archive Logs

❗️

Reboot required

When ARCHIVELOG mode is enabled, your Oracle database will be taken offline.

When redo logs fill up, Oracle archives groups of them into archive logs. Archive logs should be retained for at least 24 hours. However, we recommend retaining them for longer, if possible. Too short a retention period and changes may not be captured and processed.

🚧

Retention periods and database storage

Archive logs are retained on your database instance using up it's storage capacity. It is important to make sure it has enough space, otherwise, performance issues and outages can occur.

In general, the more tables (and columns) there are, the more capacity is required. Even more is required if supplemental logging has been enabled at table level.

To estimate what storage capacity you might need and assuming your Oracle database has supplemental logging enabled, you can look at the last 1 hour of log storage usage and multiply that by archivelog retention hours. Here's an example script for that:

SELECT
 SUM(BLOCKS * BLOCK_SIZE) bytes, -- usage
 SUM(BLOCKS * BLOCK_SIZE) * 24 estimated_bytes -- assuming 24 hours archivelog retention
  FROM V$ARCHIVED_LOG
 WHERE 
  FIRST_TIME >= SYSDATE-(1/24) -- last hour
  AND 
  DEST_ID=1;

You should set the DB_RECOVERY_FILE_DEST_SIZE parameter to a value that is appropriate for your available disk space.

-- Replace the {...} placeholders as required
alter system set db_recovery_file_dest_size = {recovery_file_size};
alter system set db_recovery_file_dest = '{recovery_file_location}' scope=spfile;

Then, configure Oracle RMAN to retain backups and archive logs for at least 24 hours. We recommend retaining data for longer, if possible.

RMAN> CONFIGURE RETENTION POLICY TO RECOVERY WINDOW OF 3 DAYS;

To enable ARCHIVELOG mode, run this script:

SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;

To confirm if ARCHIVELOG mode has been enabled, run this query:

archive log list;

If the Database log mode is Archive Mode then it is enabled.

2) Enable LogMiner

For the Connector to query the redo and archive logs, it is dependent on the Oracle LogMiner utility. To enable that, supplemental logging must be enabled.

-- Enable database supplement logging
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

After supplemental logging has been enabled at the database level, you then need to enable table level supplemental logging.

-- To enable all supplemental logging, run the following SQL statement for each table:
ALTER TABLE {schema}.{table} ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

To confirm if supplemental logging has been enabled, run this query:

SELECT NAME, SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE;

If the SUPPLEMENTAL_LOG_DATA_MIN is YES it is enabled.

2.1) Resize redo logs

An Oracle instance typically starts with several, online redo logs, each less than 500MB in size. This default size is too small and should be resized to at least 500MB or more, especially for production databases.

Before making any changes, run this query to determine the current log sizes:

SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP#SIZE_MBSTATUS
1128INACTIVE
2128INACTIVE
3128CURRENT

Also, we need to determine how many logs per group there are and their location.

SELECT GROUP#, LOCATION FROM V$LOGFILE ORDER BY 1, 2;
GROUP#LOCATION
1/opt/oracle/oradata/ORCLCDB/redo01.log
2/opt/oracle/oradata/ORCLCDB/redo02.log
3/opt/oracle/oradata/ORCLCDB/redo03.log

Now replace the old logs with new, larger logs. Only INACTIVE and UNUSED groups can be dropped and recreated.

🚧

Log multiplexing

Oracle does support the notion of multiple logs per group known as 'log multiplexing'. If your database uses this, use a comma-delimited list of filenames to register each log file.

-- Replace {group_number} placeholder and execute these statements as many times as required
-- Replace {log_location}, using a comma separated list of filenames if database uses log multiplexing
ALTER DATABASE CLEAR LOGFILE GROUP {group_number};
ALTER DATABASE DROP LOGFILE GROUP {group_number};
ALTER DATABASE ADD LOGFILE GROUP {group_number} ('{log_location}') size 500M REUSE;

Now switch the active log so we can drop it and run the query from earlier to confirm it has switched.

ALTER SYSTEM SWITCH LOGFILE;

SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP#SIZE_MBSTATUS
1512CURRENT
2512UNUSED
3128ACTIVE

We now need to wait for the database to eventually switch the status of the ACTIVE group to INACTIVE. The switch could take several minutes, so be patient and recheck the size periodically. Once the status reaches INACTIVE, run this script and confirm all logs are resized:

-- Replace {group_number} placeholder and execute these statements as many times as required
-- Replace {log_location}, using a comma separated list of filenames if database uses log multiplexing
ALTER DATABASE CLEAR LOGFILE GROUP {group_number};
ALTER DATABASE DROP LOGFILE GROUP {group_number};
ALTER DATABASE ADD LOGFILE GROUP {group_number} ('{log_location}') size 500M REUSE;

SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
GROUP#SIZE_MBSTATUS
1512CURRENT
2512UNUSED
3512UNUSED

3) Create Database User

Depending on your database architecture, choose the correct script below to create a database user with privileges for the Connector.

-- Replace {...} placeholders as needed
ALTER SESSION SET CONTAINER=CDB$ROOT;
CREATE USER C##STREAMKAP_USER IDENTIFIED BY {password};

ALTER SESSION SET CONTAINER={PDB};
CREATE TABLESPACE STREAMKAP_LOGMINER_TBS DATAFILE {filename} SIZE 25M AUTOEXTEND ON MAXSIZE UNLIMITED;
ALTER USER C##STREAMKAP_USER DEFAULT STREAMKAP_LOGMINER_TBS;
ALTER USER C##STREAMKAP_USER QUOTA UNLIMITED ON STREAMKAP_LOGMINER_TBS;

-- Grant permissions
GRANT CREATE SESSION TO C##STREAMKAP_USER CONTAINER=ALL; 
GRANT SET CONTAINER TO C##STREAMKAP_USER CONTAINER=ALL; 

-- Allows the Connector to use LogMiner
GRANT LOGMINING TO C##STREAMKAP_USER CONTAINER=ALL;

-- Flashback queries used for performing initial snapshots of the data
GRANT FLASHBACK ANY TABLE TO C##STREAMKAP_USER CONTAINER=ALL; 
GRANT SELECT ANY TRANSACTION TO C##STREAMKAP_USER CONTAINER=ALL; 

-- Required for schema history when performing initial snapshots
GRANT SELECT_CATALOG_ROLE TO C##STREAMKAP_USER CONTAINER=ALL; 
GRANT EXECUTE_CATALOG_ROLE TO C##STREAMKAP_USER CONTAINER=ALL; 

-- Connector creates a table for explicitly managing the flushing of internal log buffers (LGWR)
GRANT CREATE TABLE TO C##STREAMKAP_USER CONTAINER=ALL;

GRANT CREATE SEQUENCE TO C##STREAMKAP_USER CONTAINER=ALL; 

-- Grant the Streamkap user permission to read each schema and table you wish to sync
ALTER SESSION SET CONTAINER={PDB};
GRANT SELECT ON {schema}.{table} TO C##STREAMKAP_USER CONTAINER=ALL;
-- Alternatively, you can grant access to all
-- GRANT SELECT ANY TABLE TO C##STREAMKAP_USER CONTAINER=ALL;

-- Grant the Streamkap user access to the DBA_EXTENTS, DBA_TABLESPACES, DBA_SEGMENTS, and TRANSACTION system views. 
GRANT SELECT ON DBA_EXTENTS TO C##STREAMKAP_USER;
GRANT SELECT ON DBA_TABLESPACES TO C##STREAMKAP_USER;
GRANT SELECT ON DBA_SEGMENTS TO C##STREAMKAP_USER;
GRANT SELECT ANY TRANSACTION TO C##STREAMKAP_USER;

-- Grant the Streamkap user permission to run LogMiner
ALTER SESSION SET CONTAINER=CDB$ROOT;
GRANT SELECT ON SYS.V_$DATABASE TO C##STREAMKAP_USER;
GRANT SELECT ON SYS.V_$PARAMETER TO C##STREAMKAP_USER;
GRANT SELECT ON SYS.V_$ARCHIVED_LOG TO C##STREAMKAP_USER;
GRANT SELECT ON SYS.V_$ARCHIVE_DEST TO C##STREAMKAP_USER;
GRANT SELECT ON SYS.V_$LOGMNR_CONTENTS TO C##STREAMKAP_USER;
GRANT EXECUTE ON DBMS_LOGMNR TO C##STREAMKAP_USER;
GRANT EXECUTE ON DBMS_LOGMNR_D TO C##STREAMKAP_USER;
GRANT SELECT ANY TRANSACTION TO C##STREAMKAP_USER;
GRANT EXECUTE_CATALOG_ROLE TO C##STREAMKAP_USER;

-- Replace {...} placeholders as needed
CREATE USER STREAMKAP_USER IDENTIFIED BY {password};

CREATE TABLESPACE STREAMKAP_LOGMINER_TBS DATAFILE {filename} SIZE 25M AUTOEXTEND ON MAXSIZE UNLIMITED;
ALTER USER STREAMKAP_USER DEFAULT STREAMKAP_LOGMINER_TBS;
ALTER USER STREAMKAP_USER QUOTA UNLIMITED ON STREAMKAP_LOGMINER_TBS;

-- Grant permissions
GRANT CREATE SESSION TO STREAMKAP_USER; 
GRANT SET CONTAINER TO STREAMKAP_USER; 

-- Allows the Connector to use LogMiner
GRANT LOGMINING TO STREAMKAP_USER; 

-- Flashback queries used for performing initial snapshots of the data
GRANT FLASHBACK ANY TABLE TO STREAMKAP_USER; 
GRANT SELECT ANY TRANSACTION TO STREAMKAP_USER; 

-- Required for schema history when performing initial snapshots
GRANT SELECT_CATALOG_ROLE TO STREAMKAP_USER; 
GRANT EXECUTE_CATALOG_ROLE TO STREAMKAP_USER; 

-- Connector creates a table for explicitly managing the flushing of internal log buffers (LGWR)
GRANT CREATE TABLE TO STREAMKAP_USER; 

GRANT CREATE SEQUENCE TO STREAMKAP_USER;

-- Grant the Streamkap user permission to read each schema and table you wish to sync
GRANT SELECT ON {schema}.{table} TO STREAMKAP_USER;
-- Alternatively, you can grant access to all
-- GRANT SELECT ANY TABLE TO STREAMKAP_USER;

-- Grant the Streamkap user permission to run LogMiner
GRANT SELECT ON SYS.V_$DATABASE TO STREAMKAP_USER;
GRANT SELECT ON SYS.V_$PARAMETER TO STREAMKAP_USER;
GRANT SELECT ON SYS.V_$ARCHIVED_LOG TO STREAMKAP_USER;
GRANT SELECT ON SYS.V_$LOGMNR_CONTENTS TO STREAMKAP_USER;
GRANT EXECUTE ON DBMS_LOGMNR TO STREAMKAP_USER;
GRANT EXECUTE ON DBMS_LOGMNR_D TO STREAMKAP_USER;
GRANT SELECT ANY TRANSACTION TO STREAMKAP_USER;
GRANT EXECUTE_CATALOG_ROLE TO STREAMKAP_USER;

Enable Snapshots

You can perform ad-hoc snapshots of all or some of your tables in the Streamkap app. See Snapshots & Backfilling for more information.

❗️

Please create the signal table with the name streamkap_signal in a new schema called streamkap. It will not be recognised if given another name.

-- Switch to the newly created user/schema
ALTER SESSION SET CURRENT_SCHEMA = streamkap;

-- Create the table
CREATE TABLE streamkap_signal (
  id VARCHAR2(255) PRIMARY KEY, 
  type VARCHAR2(32) NOT NULL, 
  data VARCHAR2(2000) NULL
);

-- Grant necessary privileges on the table to the user
GRANT SELECT, UPDATE, INSERT ON streamkap_signal TO streamkap;

Streamkap Setup

  • Go to Sources and choose Oracle
  • Input the following information:
    • Name - A unique and memorable name for this Connector
    • Hostname - The database endpoint
    • Port (optional) - The database port
    • Username (case sensitive) - STREAMKAP_USER or the username you chose
    • Password - The database user's password
    • Heartbeat - Required for low volume connectors. See Oracle Heartbeats (CDB Multi-Tenant)
    • Signal Table Schema - Streamkap will use a collection in this schema to manage snapshots e.g. public. See Enable Snapshots for more information
    • Database - The database name (single-tenant architecture) or container database name (multi-tenant architecture)
    • Pluggable Database (optional, multi-tenant architecture only) - The pluggable database name
    • Connect via SSH Tunnel. See SSH Tunnel
    • Advanced Parameters
      • Snapshot Mode (Default When Needed) See PostgreSQL Snapshot Modes for more information
      • Represent Binary Data As (Default bytes)
      • Snapshot Chunk Size (Default 1024) - This is the number of rows read at a time when snapshotting. This is a low safe value. As a guide, if you have 100m + rows of data you may want to move this to 5120. If you have 1bn then a higher number still will allow you to backfill faster.
      • Max Batch Size (Default 2048) - A value that specifies the maximum size of each batch of events that the connector processes. Only increase if experiencing lag
      • Add Schemas/Tables. Can also bulk upload here. The format is a simple list of each schema or table per row saved in csv format without a header.
  • Click Save

The connector will take approximately 1 minute to start processing data.

Why does the Connector need CREATE TABLE privileges?

Oracle has a background process called the "Log Writer" or LGWR for short. Logs are written to an in-memory buffer first, and then the LGWR writes them to disk.

The Connector needs to keep track of the last recorded system change number (SCN) that the LGWR process records for each committed transaction.

It's best to persist that SCN somewhere outside of the Connector so it creates a very small, 1 column table named LOG_MINING_TABLE. If the Connector fails, it can use the last recorded value from that table to recover.

Do I need to resize my redo logs for a database with low data volume and traffic?

We recommend it because the supplemental logging configuration from earlier in this guide will increase the amount of data in the redo logs, data that's required for tracking changes to your data, including it's schema.

We would also recommend for databases with low traffic to also enable the heartbeats feature explained here: Oracle Heartbeats

Multi-tenant Architecture

How do I find the Container and pluggable database names?

Connect to the database and run this script to list the available container (CDB) and pluggable database (PDB) names.

SELECT NAME, CDB, CON_ID FROM V$DATABASE ORDER BY CON_ID;

For the Container database names, the CDB should be YES.

Can I create a local user instead e.g. CREATE USER ... CONTAINER=<PDB>?

At this time, no. The Connector makes calls to LogMiner APIs from within the root database. Additionally, it consults several V$ tables which are not available from within a PDB.