> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Amazon RDS Oracle

> Oracle Change Data Capture Setup on Amazon RDS with Streamkap

## Prerequisites

<Danger>
  **Multi-tenant databases**

  Due to the AWS RDS [limitation](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Oracle.Concepts.CDBs.html#Oracle.Concepts.single-tenant-limitations) where you cannot connect to the CDB and only being able to create [local users](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Oracle.Concepts.CDBs.html#Oracle.Concepts.single-tenant.users), Streamkap does not support Oracle multi-tenant databases on AWS RDS.
</Danger>

<Danger>
  **Standby databases**

  An Oracle database can be configured with either a physical or a logical standby database for recovery after a production failure. At this time, Streamkap does not support them.
</Danger>

* Oracle 12c or above, Standard or Enterprise Edition
* AWS RDS endpoint and port of the database
* (single-tenant architecture): Database name
* The Oracle database master user credentials or equivalent
* An AWS console account with administrator access to the database

## Oracle Setup

<Info>
  Streamkap's Oracle Source supports **LogMiner**, **XStream**, and **OpenLogReplicator** as log reading methods.
</Info>

For the Connector to ingest changes from your database it is dependent on Oracle's **redo** logs and **archive** logs. It is important that these redo logs are large enough and the archive logs are retained for long enough to ensure all changes are captured.

### 1. Grant Database Access

* Configure one of the [Connection Options](/connection-options) to ensure Streamkap can reach your database.

### 2. Enable Archive Logs

When redo logs fill up, Oracle archives *groups* of them into archive logs. For Oracle on RDS, archiving is enabled when **AWS automated backups** is enabled.

#### Enable AWS Automated Backups

<Danger>
  **Reboot required**

  When automated backups are enabled, your RDS instance and database are taken **offline** and a backup is immediately created which can take some time.
</Danger>

* Sign in to and open the Amazon RDS console at [https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/).
* In the left-side navigation menu, choose **Databases**. The **Databases** page should appear.
* Select the DB instance that you want to modify.
* Click **Modify**. The **Modify DB instance** page should appear.
* For **Backup retention period**, choose at least **1** or higher.
* Click **Continue**.
* Select **Apply immediately**.
* On the confirmation page, click **Modify DB instance** to enable automated backups.

To check, you can connect to the database and run this query:

<CodeGroup>
  ```SQL SQL theme={null}
  SELECT NAME, LOG_MODE FROM V$DATABASE;
  ```
</CodeGroup>

If the `LOG_MODE` is `ARCHIVELOG` then it is enabled.

Or:

* Sign into your AWS account.
* Once signed in, navigate to the **RDS** dashboard by clicking on **Services** in the top left corner, **Databases** and then **RDS** or by typing **RDS** into the top left search box.
* From the AWS RDS Dashboard, click on **DB Instances** or **Databases** in the left side menu.
* Click on the **DB identifier** for the Oracle database you want Streamkap to use.
* Click on the **Maintenance & backups** tab.
* Under the **Backups** section, check **Automated backups** says "Enabled (*N* Days)".

#### Configure Log Retention

Archive logs should be retained for at least 24 hours. However, we recommend retaining them for longer, if possible. Too short a retention period and changes may not be captured and processed.

<Warning>
  **Retention periods and database storage**

  Archive logs are retained on your **database instance** using up its storage capacity. It is important to make sure it has enough space, otherwise, performance issues and outages can occur.

  In general, the more tables (and columns) there are, the more capacity is required. For the Connector, additional capacity is essential because it depends on Oracle's supplemental logging.

  Assuming your Oracle database has supplemental logging enabled already, to estimate storage capacity you can look at the last 1 hour of log storage usage and multiply that by `archivelog retention hours`. Here's an example script for that:

  ```sql SQL theme={null}
  SELECT
   SUM(BLOCKS * BLOCK_SIZE) bytes, -- usage
   SUM(BLOCKS * BLOCK_SIZE) * 24 estimated_bytes -- assuming 24 hours archivelog retention
    FROM V$ARCHIVED_LOG
   WHERE
    FIRST_TIME >= SYSDATE-(1/24) -- last hour
    AND
    DEST_ID=1;
  ```
</Warning>

<CodeGroup>
  ```SQL SQL theme={null}
  -- Set log retention
  EXECUTE rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours', 72);
  COMMIT; -- The commit is required for the change to take effect.
  ```
</CodeGroup>

### 3. Enable LogMiner

For the Connector to query the redo and archive logs, it is dependent on the [Oracle LogMiner](https://docs.oracle.com/en/database/oracle/oracle-database/19/sutil/oracle-logminer-utility.html) utility. To enable that, supplemental logging must be enabled.

<CodeGroup>
  ```SQL SQL theme={null}
  EXECUTE rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD','ALL');
  ```
</CodeGroup>

To confirm if supplemental logging has been enabled, run this query:

<CodeGroup>
  ```SQL SQL theme={null}
  SELECT NAME, SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE;
  ```
</CodeGroup>

If the `SUPPLEMENTAL_LOG_DATA_MIN` is `YES` then it is enabled.

#### Resize Redo Logs

An Amazon RDS Oracle instance starts with four, online redo log files, 128MB each. That is too small; more log files are necessary, and the logs should be resized to at least 1024MB or more, especially for production databases.

Before making any changes, run this query to check the current log sizes:

<CodeGroup>
  ```SQL SQL theme={null}
  SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
  ```
</CodeGroup>

| GROUP# | SIZE\_MB | STATUS   |
| ------ | -------- | -------- |
| 1      | 128      | INACTIVE |
| 2      | 128      | CURRENT  |
| 3      | 128      | INACTIVE |
| 4      | 128      | INACTIVE |

Now let's add 8 new, larger logs - that will eventually replace the smaller logs - and run the query from earlier to confirm they have been created.

<CodeGroup>
  ```SQL SQL theme={null}
  EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 1073741824);
  EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 1073741824);
  EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 1073741824);
  EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 1073741824);
  EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 1073741824);
  EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 1073741824);
  EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 1073741824);
  EXEC rdsadmin.rdsadmin_util.add_logfile(bytes => 1073741824);

  SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
  ```
</CodeGroup>

| GROUP# | SIZE\_MB | STATUS   |
| ------ | -------- | -------- |
| 1      | 128      | INACTIVE |
| 2      | 128      | CURRENT  |
| 3      | 128      | INACTIVE |
| 4      | 128      | INACTIVE |
| 5      | 1024     | UNUSED   |
| 6      | 1024     | UNUSED   |
| ...    | ...      | ...      |

For all *groups* showing as `INACTIVE`, drop them and run the query from earlier to confirm they have been dropped:

<CodeGroup>
  ```SQL SQL theme={null}
  -- Replace {group_number} placeholder and execute the procedure as required
  EXEC rdsadmin.rdsadmin_util.drop_logfile(grp => {group_number});

  SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
  ```
</CodeGroup>

| GROUP# | SIZE\_MB | STATUS  |
| ------ | -------- | ------- |
| 2      | 128      | CURRENT |
| 5      | 1024     | UNUSED  |
| 6      | 1024     | UNUSED  |
| ...    | ...      | ...     |

Now let's switch the `CURRENT` log so we can drop it and run the query from earlier to confirm it has switched.

<CodeGroup>
  ```SQL SQL theme={null}
  EXEC rdsadmin.rdsadmin_util.switch_logfile;

  SELECT GROUP#, BYTES/1024/1024 SIZE_MB, STATUS FROM V$LOG ORDER BY 1;
  ```
</CodeGroup>

| GROUP# | SIZE\_MB | STATUS  |
| ------ | -------- | ------- |
| 2      | 128      | ACTIVE  |
| 5      | 1024     | CURRENT |
| 6      | 1024     | UNUSED  |
| ...    | ...      | ...     |

If the `STATUS` of the log we want to drop is still `ACTIVE`, we need to issue a checkpoint to make it `INACTIVE`.

<CodeGroup>
  ```SQL SQL theme={null}
  EXEC rdsadmin.rdsadmin_util.checkpoint;
  ```
</CodeGroup>

Finally, let's drop the remaining log file and run the query from earlier to confirm we now have 8 larger logs.

<CodeGroup>
  ```SQL SQL theme={null}
  -- Replace {group_number} placeholder and execute the procedure as required
  EXEC rdsadmin.rdsadmin_util.drop_logfile(grp => {group_number});
  ```
</CodeGroup>

| GROUP# | SIZE\_MB | STATUS  |
| ------ | -------- | ------- |
| 5      | 1024     | CURRENT |
| 6      | 1024     | UNUSED  |
| ...    | ...      | ...     |

### 4. Create Database User

The script below creates a database user with privileges for the Connector.

<CodeGroup>
  ```SQL Non-CDB (Single-tenant) theme={null}
  -- Replace {...} placeholders as needed
  CREATE USER STREAMKAP_USER IDENTIFIED BY {password};

  CREATE TABLESPACE STREAMKAP_LOGMINER_TBS DATAFILE SIZE 25M AUTOEXTEND ON MAXSIZE UNLIMITED;
  ALTER USER STREAMKAP_USER DEFAULT TABLESPACE STREAMKAP_LOGMINER_TBS;
  ALTER USER STREAMKAP_USER QUOTA UNLIMITED ON STREAMKAP_LOGMINER_TBS;

  -- Grant permissions
  GRANT CREATE SESSION TO STREAMKAP_USER;

  -- Allows the Connector to use LogMiner
  GRANT LOGMINING TO STREAMKAP_USER;

  -- Flashback queries used for performing initial snapshots of the data
  GRANT FLASHBACK ANY TABLE TO STREAMKAP_USER;
  GRANT SELECT ANY TRANSACTION TO STREAMKAP_USER;

  -- Required for schema history when performing initial snapshots
  GRANT SELECT_CATALOG_ROLE TO STREAMKAP_USER;
  GRANT EXECUTE_CATALOG_ROLE TO STREAMKAP_USER;

  -- Connector creates a table for explicitly managing the flushing of internal log buffers (LGWR)
  GRANT CREATE TABLE TO STREAMKAP_USER;

  GRANT CREATE SEQUENCE TO STREAMKAP_USER;

  -- Read-only privileges on system tables containing redo, archive log and current transaction state
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$DATABASE','STREAMKAP_USER','SELECT');
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG','STREAMKAP_USER','SELECT');
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOG_HISTORY','STREAMKAP_USER','SELECT');
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_LOGS','STREAMKAP_USER','SELECT');
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_CONTENTS','STREAMKAP_USER','SELECT');
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_PARAMETERS','STREAMKAP_USER','SELECT');
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGFILE','STREAMKAP_USER','SELECT');
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVED_LOG','STREAMKAP_USER','SELECT');
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVE_DEST_STATUS','STREAMKAP_USER','SELECT');
  EXECUTE rdsadmin.rdsadmin_util.grant_sys_object('V_$TRANSACTION','STREAMKAP_USER','SELECT');

  -- Grant the Streamkap user permission to read each schema and table you wish to sync
  GRANT SELECT ON {schema}.{table} TO STREAMKAP_USER;
  -- Alternatively, you can grant access to all
  -- GRANT SELECT ANY TABLE TO STREAMKAP_USER;
  ```
</CodeGroup>

### 5. Enable Snapshots

To backfill your data, the Connector needs to be able to perform Snapshots (See [Snapshots & Backfilling](/snapshots) for more information). To enable this process, a table must be created for the Connector to use.

<Info>
  The examples below use `STREAMKAP_SIGNAL` as the signal table name, but you can choose any name. During [Streamkap Setup](#3-snapshot-settings), provide the full path to your signal table in `schema.table` format (e.g., `STREAMKAP_USER.STREAMKAP_SIGNAL`).
</Info>

<CodeGroup>
  ```SQL Non-CDB (Single-tenant) theme={null}
  -- Create the table
  CREATE TABLE STREAMKAP_USER.STREAMKAP_SIGNAL (
    id VARCHAR2(255) PRIMARY KEY,
    type VARCHAR2(32) NOT NULL,
    data VARCHAR2(2000) NULL
  );

  -- Grant necessary privileges on the table to the user
  GRANT SELECT, UPDATE, INSERT, DELETE ON STREAMKAP_USER.STREAMKAP_SIGNAL TO STREAMKAP_USER;
  ```
</CodeGroup>

### 6. Heartbeats

Connectors use "offsets"—like bookmarks—to track their position in the database's log or change stream. When no changes occur for long periods, these offsets may become outdated, and the Connector might lose its place or stop capturing changes.

Heartbeats ensure the Connector stays active and continues capturing changes.

There are two layers of heartbeat protection:

#### Layer 1: Connector heartbeats (enabled by default)

The Connector periodically emits heartbeat messages to an internal topic, even when no actual data changes are detected. This keeps offsets fresh and prevents staleness.

No configuration is necessary for this layer; it is automatically enabled. We recommend keeping this layer enabled for all deployments.

#### Layer 2: Source database heartbeats (recommended)

<Info>
  **Why we recommend configuring Layer 2**

  While Layer 2 is **crucial** for low-traffic or intermittent databases, we recommend configuring it for all deployments. It provides additional resilience and helps prevent issues during periods of inactivity.
</Info>

You can configure regular updates to a dedicated heartbeat table in the source database. This simulates activity, ensuring change events are generated consistently, maintaining log progress and providing additional resilience.

How this layer is configured depends on the connection type (if supported by the Source):

* **Read-write connections** (when **Read only** is **No** during Streamkap Setup): The Connector updates the heartbeat table directly.
* **Read-only connections** (when **Read only** is **Yes** during Streamkap Setup): A scheduled job on the **primary** database updates the heartbeat table, and these changes replicate to the read replica for the Connector to consume.

This layer requires you to set up a heartbeat table—and for read-only connections, a scheduled job (e.g., `pg_cron` for PostgreSQL, `event_scheduler` for MySQL)—on your source database.

<Tabs>
  <Tab title="Read-write connections">
    For read-write connections (when **Read only** is **No** during Streamkap Setup), the Connector writes to the heartbeat table directly.

    <CodeGroup>
      ```SQL SQL theme={null}
      -- Replace {...} placeholders as needed
      ALTER SESSION SET CONTAINER={PDB};

      -- Create the heartbeat table with id, text, and last_update fields
      CREATE TABLE STREAMKAP_USER.STREAMKAP_HEARTBEAT (
          id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY,
          text VARCHAR2(4000),
          last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      );

      -- Grant permission to the Streamkap user
      GRANT SELECT, UPDATE, INSERT, DELETE ON STREAMKAP_USER.STREAMKAP_HEARTBEAT TO STREAMKAP_USER;

      -- Grant necessary privileges on the table to the common user
      GRANT SELECT, UPDATE, INSERT, DELETE ON STREAMKAP_USER.STREAMKAP_HEARTBEAT TO C##STREAMKAP_USER;

      -- Insert the first row into the heartbeat table
      INSERT INTO STREAMKAP_USER.STREAMKAP_HEARTBEAT (text) VALUES ('test_heartbeat');
      ```
    </CodeGroup>
  </Tab>
</Tabs>

***

## Streamkap Setup

Follow these steps to configure your new connector:

### 1. Create the Source

* Navigate to [Add Connectors](https://app.streamkap.com/connectors/add?tab=Sources).
* Choose **Oracle**.
  * Select **Amazon RDS**.

### 2. Connection Settings

* **Name**: Enter a name for your connector.

* **Hostname**: Specify the database endpoint.

* **Port**: Default is `1521`.

* **Connect via SSH Tunnel**: The Connector will connect to an SSH server in your network which has access to your database. This is necessary if the Connector cannot connect directly to your database.

  * See [SSH Tunnel](/ssh-tunnel) for setup instructions.

* **Username** (case sensitive): Username to access the database. By default, Streamkap scripts use `STREAMKAP_USER`.

* **Password**: Password to access the database.

* **Database**: The database name (single-tenant architecture).

* **Heartbeats**:

  * **Heartbeat Table Schema**: Streamkap will use a table in this schema to manage heartbeats. Usually this is the same as the Signal Table. See [Heartbeats](#6-heartbeats) for setup instructions.

### 3. Snapshot Settings

* **Signal Table**: Full path to the signal table including schema and table name (e.g., `STREAMKAP_USER.STREAMKAP_SIGNAL`). This table is used for incremental snapshotting. See [Enable Snapshots](#5-enable-snapshots) for setup instructions.

### 4. Advanced Parameters

* **Represent binary data as**: Specifies how the data for binary columns should be interpreted. Your destination for this data can impact which option you choose. Default is `bytes`.
* **Capture Only Captured Databases DDL**: Used to control whether the connector records schema structures from all databases defined in the server (the default) or only those databases for which you've explicitly configured the connector. Specify `true` to capture schema history only for the specific databases you've configured. This is particularly valuable when databases are large, to reduce the volume of DDL stored in the schema history topic. It also improves startup times when the connector restarts or recovers from failures. Default is `false`. See [Schema History Optimization](/schema-history-optimization) for details.
* **Capture Only Captured Tables DDL**: Used to control whether the connector records the schema structure for all tables in the configured databases (the default) or only the tables whose changes the connector captures. Specify `true` to capture schema history only for the specific tables you've configured. This is particularly valuable when tables are large, to reduce the volume of DDL statements stored in the schema history topic. It also improves startup times when the connector restarts or recovers from failures. Default is `false`. See [Schema History Optimization](/schema-history-optimization) for details.

Click **Next**.

### 5. Schema and Table Capture

* **Add Schemas/Tables**: Specify the schema(s) and table(s) for capture.
  * You can bulk upload here. The format is a simple list of schemas and tables, with each entry on a new row. Save as a `.csv` file without a header.

<Warning>
  **CDC only captures base tables, not Views**

  Change Data Capture reads Oracle redo logs via LogMiner, which only record changes to physical tables. Database Views are query-time computations with no physical storage—they don't generate redo log entries.

  **What you cannot capture:** Views, materialized views (capture base tables instead), global temporary tables, external tables, or system tables (DBA\_\*, ALL\_\*, USER\_\*).

  **Solution:** Specify only the underlying base tables that feed your views. You can recreate the view logic in your destination or transformation layer.
</Warning>

Click **Save**.

<Info>
  **Have questions?** See the [Oracle Source FAQ](/oracle-source-faq) for answers to common questions about Oracle sources, AWS RDS specifics, troubleshooting, and best practices.
</Info>
