Creating Final State Tables

How to create final state tables from append only tables

Let's start with an example row of data

RECORD_METADATAIDFIRST_NAMELAST_NAME_STREAMKAP_SOURCE_TS_MS_STREAMKAP_TS_MS__DELETED_OFFSET
{
"CreateTime": 1727770177917,
"SnowflakeConnectorPushTime": 1727770177919,
"headers": {
"__op": "c"
},
"key": {
"id": 1302
},
"offset": 309707,
"partition": 0,
"topic": "source_66db3762539d8187df6bd5a7.crm.demo"
}
1302PaulSmith17277701770001727770177618FALSE309707

In the above example we have 3 data fields that match the source

  • ID (Primary Key)
  • FIRST_NAME
  • LAST_NAME

Then we have the additional columns that Streamkap adds to each row inserted

  • RECORD_METADATA
  • _STREAMKAP_SOURCE_TS_MS (Time in milliseconds that the record changed at Source)
  • _STREAMKAP_TS_MS (Time in milliseconds that Streamkap processed the record)
  • __DELETED (Is this a record which has been deleted at source)
  • _OFFSET

Snowflake

Task + Merge Statement

# Replace the placeholders in the query
# DBNAME, SCHEMANAME, WAREHOUSE_NAME
-------------------------------------------------------
# Create a `TABLE_REFRESH_HISTORY` table to store the timestamp of the last inserted timestamp. This will ensure that only the latest data is pulled during subsequent runs. Additionally, set up a task to run after the main `<TABLENAME>_TASK` task, which will update the timestamp in the `<TASK_TABLENAME>_TASK_RUN_HISTORY` table.

CREATE OR REPLACE TASK <DBNAME>.<SCHEMANAME>.<TASK_TABLENAME>_TASK_RUN_HISTORY
  WAREHOUSE = WAREHOUSE_NAME
  AFTER <DBNAME>.<SCHEMANAME>.<TABLENAME>_TASK
AS
INSERT INTO <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY (TABLE_NAME, MAX_TIMESTAMP, REFRESH_TIME)
SELECT 
    '<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>', 
    MAX(_streamkap_ts_ms) AS max_ts,
    CURRENT_TIMESTAMP AS refresh_time
FROM <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>;

---------------------------------------------------------
#`MERGE` statement will merge data into the target table. Additionally, a configurable task will be created to run every 15 minutes.

CREATE TASK <DBNAME>.<SCHEMANAME>.<TABLENAME>_TASK
  WAREHOUSE = <WAREHOUSE_NAME>  
  SCHEDULE = '15 MINUTE'
AS
MERGE INTO <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP> AS target
USING (
    SELECT *
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY ID ORDER BY _streamkap_ts_ms DESC) AS dedupe_id
        FROM <DBNAME>.<SCHEMANAME>.<TABLENAME>
        WHERE _streamkap_ts_ms >= 
        COALESCE((SELECT MAX(MAX_TIMESTAMP) FROM <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY
        WHERE TABLE_NAME='<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>'),
        (DATE_PART('EPOCH', CURRENT_TIMESTAMP()) * 1000) - (8 * 24 * 60 * 60 * 1000))
    ) AS subquery
    WHERE dedupe_id = 1 AND __deleted = 'false'
) AS source
ON target.ID = source.ID
WHEN MATCHED THEN
    UPDATE SET 
        target.RECORD_METADATA = source.RECORD_METADATA,
        <values>,
        target.__DELETED = source.__DELETED,
        target._STREAMKAP_SOURCE_TS_MS = source._STREAMKAP_SOURCE_TS_MS,
        target._STREAMKAP_TS_MS = source._STREAMKAP_TS_MS,
        target._STREAMKAP_OFFSET = source._STREAMKAP_OFFSET
WHEN NOT MATCHED THEN
    INSERT (
  			<values>,
        __DELETED,
        _STREAMKAP_SOURCE_TS_MS,
        _STREAMKAP_TS_MS,
        _STREAMKAP_OFFSET
    )
    VALUES (
        <values>,
        source.__DELETED,
        source._STREAMKAP_SOURCE_TS_MS,
        source._STREAMKAP_TS_MS,
        source._STREAMKAP_OFFSET
    );

Dynamic Tables

Streamkap supports the auto creation and maintenance of Snowflake Dynamic Tables for this final view but the logic underneath is as follows

# Replace the placeholders in the query
# DYNAMIC_TABLE_NAME, TARGET_LAG, WAREHOUSE_NAME, PRIMARY_KEY, SOURCE_TABLE_NAME

CREATE OR REPLACE DYNAMIC TABLE <DYNAMIC_TABLE_NAME> (
    {{column_1}},
    {{column_2}},
    {{column_3}},
    {{column_4}},
    {{column_5}},
    .
    .
    .
    .
    __DELETED,
    _STREAMKAP_SOURCE_TS_MS,
    _STREAMKAP_TS_MS,
    _STREAMKAP_OFFSET
)
TARGET_LAG = '<TARGET_LAG>'  -- Example: '15 minutes'
REFRESH_MODE = AUTO 		 
INITIALIZE =  ON_CREATE
WAREHOUSE = <WAREHOUSE_NAME>
AS
SELECT * EXCLUDE dedupe_id
FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY <PRIMARY_KEY> ORDER BY _streamkap_ts_ms DESC) AS dedupe_id 
    FROM <SOURCE_TABLE_NAME>
)
WHERE dedupe_id = 1 AND __deleted = 'false';

DBT

DBT Example

{{
	config(
		materialized='incremental',
		unique_key=<your_primary_key>,
		post_hook ='DELETE FROM {{ this }} WHERE __deleted = TRUE',
	)
}}


WITH cte AS (
    SELECT * FROM source_table
    {% if is_incremental() %}
    WHERE
			_streamkap_ts_ms >= (SELECT MAX(_streamkap_ts_ms) FROM {{ this }})
	{% endif %}
    QUALIFY RANK() OVER (PARTITION BY <your_primary_key> ORDER BY _streamkap_ts_ms DESC, _streamkap_offset DESC) = 1
)

SELECT
    *
FROM
    cte

{% if is_incremental() %}

	WHERE
		_streamkap_ts_ms >= (SELECT MAX(this._streamkap_ts_ms) FROM {{ this }} AS this)

{% endif %}

This is an example approach whereby we can utilize incremental models (dbt or similar) to only compare data that has recently come into the raw source tables from streamkap. So during incremental refreshes there is no need to scan over the historical CDC data coming in.

The 2nd where clause for incremental logic is in relation to {{ this }} declaration for dbt, since it can use post dbt hook to delete the "_deleted" columns that are true.

Example Python script to generate the DBT incremental models

import os
import yaml


class IncrementalScriptGenerator:
    def __init__(self, relative_input_path, relative_output_dir):
        self.home_directory = os.path.expanduser("~")
        self.relative_input_path = relative_input_path
        self.relative_output_dir = relative_output_dir
        self.prod_tables = []

    def load_yaml_file(self):
        full_path = os.path.join(self.home_directory, self.relative_input_path)
        with open(full_path, "r") as file:
            self.data = yaml.safe_load(file)

    def extract_prod_tables(self):
        for source in self.data.get("sources", []):
            if source.get("name") == "prod":
                tables = source.get("tables", [])
                for table in tables:
                    table_name = table.get("name")
                    if table_name:
                        self.prod_tables.append(table_name)

    def generate_incremental_scripts(self):
        for prod_table in self.prod_tables:
            incremental_script = """
            {{
                config(
                    incremental_strategy='merge',
                    unique_key='id'
                )
            }}
            """

            incremental_script += f"""
            select * 
            from {{{{ source('prod', '{prod_table}') }}}}
            """

            incremental_script += """
            {% if is_incremental() %}
            where
                _streamkap_ts_ms >= (select max(_streamkap_ts_ms) from {{ this }})
            {% endif %}

            qualify rank() over (partition by id order by _streamkap_ts_ms desc, _streamkap_offset desc) = 1
            """

            relative_output_path = (
                f"{self.relative_output_dir}/sk_{prod_table.lower()}.sql"
            )
            final_output_path = os.path.join(self.home_directory, relative_output_path)

            with open(final_output_path, "w") as file:
                file.write(incremental_script)

            print(f"Written to {final_output_path}")


if __name__ == "__main__":
    relative_input_path = "dae-dbt/models/source.yml"
    relative_output_dir = "dae-dbt/models/streamkap/incremental"
    processor = IncrementalScriptGenerator(relative_input_path, relative_output_dir)
    processor.load_yaml_file()
    processor.extract_prod_tables()
    processor.generate_incremental_scripts()