Creating Final State Tables
How to create final state tables from append only tables
Let's start with an example row of data
RECORD_METADATA | ID | FIRST_NAME | LAST_NAME | _STREAMKAP_SOURCE_TS_MS | _STREAMKAP_TS_MS | __DELETED | _OFFSET |
---|---|---|---|---|---|---|---|
{ "CreateTime": 1727770177917, "SnowflakeConnectorPushTime": 1727770177919, "headers": { "__op": "c" }, "key": { "id": 1302 }, "offset": 309707, "partition": 0, "topic": "source_66db3762539d8187df6bd5a7.crm.demo" } | 1302 | Paul | Smith | 1727770177000 | 1727770177618 | FALSE | 309707 |
In the above example we have 3 data fields that match the source
- ID (Primary Key)
- FIRST_NAME
- LAST_NAME
Then we have the additional columns that Streamkap adds to each row inserted
- RECORD_METADATA
- _STREAMKAP_SOURCE_TS_MS (Time in milliseconds that the record changed at Source)
- _STREAMKAP_TS_MS (Time in milliseconds that Streamkap processed the record)
- __DELETED (Is this a record which has been deleted at source)
- _OFFSET
Snowflake
Task + Merge Statement
# Replace the placeholders in the query
# DBNAME, SCHEMANAME, WAREHOUSE_NAME
-------------------------------------------------------
# Create a `TABLE_REFRESH_HISTORY` table to store the timestamp of the last inserted timestamp. This will ensure that only the latest data is pulled during subsequent runs. Additionally, set up a task to run after the main `<TABLENAME>_TASK` task, which will update the timestamp in the `<TASK_TABLENAME>_TASK_RUN_HISTORY` table.
CREATE OR REPLACE TASK <DBNAME>.<SCHEMANAME>.<TASK_TABLENAME>_TASK_RUN_HISTORY
WAREHOUSE = WAREHOUSE_NAME
AFTER <DBNAME>.<SCHEMANAME>.<TABLENAME>_TASK
AS
INSERT INTO <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY (TABLE_NAME, MAX_TIMESTAMP, REFRESH_TIME)
SELECT
'<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>',
MAX(_streamkap_ts_ms) AS max_ts,
CURRENT_TIMESTAMP AS refresh_time
FROM <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>;
---------------------------------------------------------
#`MERGE` statement will merge data into the target table. Additionally, a configurable task will be created to run every 15 minutes.
CREATE TASK <DBNAME>.<SCHEMANAME>.<TABLENAME>_TASK
WAREHOUSE = <WAREHOUSE_NAME>
SCHEDULE = '15 MINUTE'
AS
MERGE INTO <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP> AS target
USING (
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY ID ORDER BY _streamkap_ts_ms DESC) AS dedupe_id
FROM <DBNAME>.<SCHEMANAME>.<TABLENAME>
WHERE _streamkap_ts_ms >=
COALESCE((SELECT MAX(MAX_TIMESTAMP) FROM <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY
WHERE TABLE_NAME='<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>'),
(DATE_PART('EPOCH', CURRENT_TIMESTAMP()) * 1000) - (8 * 24 * 60 * 60 * 1000))
) AS subquery
WHERE dedupe_id = 1 AND __deleted = 'false'
) AS source
ON target.ID = source.ID
WHEN MATCHED THEN
UPDATE SET
target.RECORD_METADATA = source.RECORD_METADATA,
<values>,
target.__DELETED = source.__DELETED,
target._STREAMKAP_SOURCE_TS_MS = source._STREAMKAP_SOURCE_TS_MS,
target._STREAMKAP_TS_MS = source._STREAMKAP_TS_MS,
target._STREAMKAP_OFFSET = source._STREAMKAP_OFFSET
WHEN NOT MATCHED THEN
INSERT (
<values>,
__DELETED,
_STREAMKAP_SOURCE_TS_MS,
_STREAMKAP_TS_MS,
_STREAMKAP_OFFSET
)
VALUES (
<values>,
source.__DELETED,
source._STREAMKAP_SOURCE_TS_MS,
source._STREAMKAP_TS_MS,
source._STREAMKAP_OFFSET
);
Dynamic Tables
Streamkap supports the auto creation and maintenance of Snowflake Dynamic Tables for this final view but the logic underneath is as follows
# Replace the placeholders in the query
# DYNAMIC_TABLE_NAME, TARGET_LAG, WAREHOUSE_NAME, PRIMARY_KEY, SOURCE_TABLE_NAME
CREATE OR REPLACE DYNAMIC TABLE <DYNAMIC_TABLE_NAME> (
{{column_1}},
{{column_2}},
{{column_3}},
{{column_4}},
{{column_5}},
.
.
.
.
__DELETED,
_STREAMKAP_SOURCE_TS_MS,
_STREAMKAP_TS_MS,
_STREAMKAP_OFFSET
)
TARGET_LAG = '<TARGET_LAG>' -- Example: '15 minutes'
REFRESH_MODE = AUTO
INITIALIZE = ON_CREATE
WAREHOUSE = <WAREHOUSE_NAME>
AS
SELECT * EXCLUDE dedupe_id
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY <PRIMARY_KEY> ORDER BY _streamkap_ts_ms DESC) AS dedupe_id
FROM <SOURCE_TABLE_NAME>
)
WHERE dedupe_id = 1 AND __deleted = 'false';
DBT
DBT Example
{{
config(
materialized='incremental',
unique_key=<your_primary_key>,
post_hook ='DELETE FROM {{ this }} WHERE __deleted = TRUE',
)
}}
WITH cte AS (
SELECT * FROM source_table
{% if is_incremental() %}
WHERE
_streamkap_ts_ms >= (SELECT MAX(_streamkap_ts_ms) FROM {{ this }})
{% endif %}
QUALIFY RANK() OVER (PARTITION BY <your_primary_key> ORDER BY _streamkap_ts_ms DESC, _streamkap_offset DESC) = 1
)
SELECT
*
FROM
cte
{% if is_incremental() %}
WHERE
_streamkap_ts_ms >= (SELECT MAX(this._streamkap_ts_ms) FROM {{ this }} AS this)
{% endif %}
This is an example approach whereby we can utilize incremental models (dbt or similar) to only compare data that has recently come into the raw source tables from streamkap. So during incremental refreshes there is no need to scan over the historical CDC data coming in.
The 2nd where clause for incremental logic is in relation to {{ this }} declaration for dbt, since it can use post dbt hook to delete the "_deleted" columns that are true.
Example Python script to generate the DBT incremental models
import os
import yaml
class IncrementalScriptGenerator:
def __init__(self, relative_input_path, relative_output_dir):
self.home_directory = os.path.expanduser("~")
self.relative_input_path = relative_input_path
self.relative_output_dir = relative_output_dir
self.prod_tables = []
def load_yaml_file(self):
full_path = os.path.join(self.home_directory, self.relative_input_path)
with open(full_path, "r") as file:
self.data = yaml.safe_load(file)
def extract_prod_tables(self):
for source in self.data.get("sources", []):
if source.get("name") == "prod":
tables = source.get("tables", [])
for table in tables:
table_name = table.get("name")
if table_name:
self.prod_tables.append(table_name)
def generate_incremental_scripts(self):
for prod_table in self.prod_tables:
incremental_script = """
{{
config(
incremental_strategy='merge',
unique_key='id'
)
}}
"""
incremental_script += f"""
select *
from {{{{ source('prod', '{prod_table}') }}}}
"""
incremental_script += """
{% if is_incremental() %}
where
_streamkap_ts_ms >= (select max(_streamkap_ts_ms) from {{ this }})
{% endif %}
qualify rank() over (partition by id order by _streamkap_ts_ms desc, _streamkap_offset desc) = 1
"""
relative_output_path = (
f"{self.relative_output_dir}/sk_{prod_table.lower()}.sql"
)
final_output_path = os.path.join(self.home_directory, relative_output_path)
with open(final_output_path, "w") as file:
file.write(incremental_script)
print(f"Written to {final_output_path}")
if __name__ == "__main__":
relative_input_path = "dae-dbt/models/source.yml"
relative_output_dir = "dae-dbt/models/streamkap/incremental"
processor = IncrementalScriptGenerator(relative_input_path, relative_output_dir)
processor.load_yaml_file()
processor.extract_prod_tables()
processor.generate_incremental_scripts()
Updated about 1 month ago