Creating Final State Tables
How to create final state tables from append (or insert) only destinations.
Let's start with an example table and row of data from a Snowflake destination.
|
|
|
|
|
|
|
|
---|---|---|---|---|---|---|---|
| 1302 | Paul | Smith | 1727770177000 | 1727770177618 | FALSE | 309707 |
In the above example we have 3 data fields that match the source:
ID
: also the source table's primary key.FIRST_NAME
LAST_NAME
Then we have the additional columns that Streamkap adds:
RECORD_METADATA
: Metadata such as the origin topic, partition, offset, event type and more. It can be useful for debugging purposes._STREAMKAP_SOURCE_TS_MS
: Timestamp in milliseconds (UTC) for when the event occurred in the source database._STREAMKAP_TS_MS
: Timestamp in milliseconds (UTC) for when Streamkap received the event.__DELETED
: Indicates whether this event/row (by its primary key) has been deleted in the source database._STREAMKAP_OFFSET
: This is an offset value in relation to the events we process. It can be useful for debugging purposes.
Snowflake
Task & Merge Statement
-- Replace the placeholders in the query
-- DBNAME, SCHEMANAME, WAREHOUSE_NAME
-------------------------------------------------------
-- Create a `TABLE_REFRESH_HISTORY` table to store the timestamp of the last inserted timestamp. This will ensure that only the latest data is pulled during subsequent runs. Additionally, set up a task to run after the main `<TABLENAME>_TASK` task, which will update the timestamp in the `<TASK_TABLENAME>_TASK_RUN_HISTORY` table.
CREATE OR REPLACE TASK <DBNAME>.<SCHEMANAME>.<TASK_TABLENAME>_TASK_RUN_HISTORY
WAREHOUSE = WAREHOUSE_NAME
AFTER <DBNAME>.<SCHEMANAME>.<TABLENAME>_TASK
AS
INSERT INTO <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY (TABLE_NAME, MAX_TIMESTAMP, REFRESH_TIME)
SELECT
'<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>',
MAX(_streamkap_ts_ms) AS max_ts,
CURRENT_TIMESTAMP AS refresh_time
FROM <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>;
---------------------------------------------------------
-- `MERGE` statement will merge data into the target table. Additionally, a configurable task will be created to run every 15 minutes.
CREATE TASK <DBNAME>.<SCHEMANAME>.<TABLENAME>_TASK
WAREHOUSE = <WAREHOUSE_NAME>
SCHEDULE = '15 MINUTE'
AS
MERGE INTO <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP> AS target
USING (
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY ID ORDER BY _streamkap_ts_ms DESC, _streamkap_offset DESC) AS dedupe_id
FROM <DBNAME>.<SCHEMANAME>.<TABLENAME>
WHERE _streamkap_ts_ms >=
COALESCE((SELECT MAX(MAX_TIMESTAMP) FROM <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY
WHERE TABLE_NAME='<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>'),
(DATE_PART('EPOCH', CURRENT_TIMESTAMP()) * 1000) - (8 * 24 * 60 * 60 * 1000))
) AS subquery
WHERE dedupe_id = 1 AND __deleted = 'false'
) AS source
ON target.ID = source.ID
WHEN MATCHED THEN
UPDATE SET
target.RECORD_METADATA = source.RECORD_METADATA,
<values>,
target.__DELETED = source.__DELETED,
target._STREAMKAP_SOURCE_TS_MS = source._STREAMKAP_SOURCE_TS_MS,
target._STREAMKAP_TS_MS = source._STREAMKAP_TS_MS,
target._STREAMKAP_OFFSET = source._STREAMKAP_OFFSET
WHEN NOT MATCHED THEN
INSERT (
<values>,
__DELETED,
_STREAMKAP_SOURCE_TS_MS,
_STREAMKAP_TS_MS,
_STREAMKAP_OFFSET
)
VALUES (
<values>,
source.__DELETED,
source._STREAMKAP_SOURCE_TS_MS,
source._STREAMKAP_TS_MS,
source._STREAMKAP_OFFSET
);
Dynamic Tables
Streamkap supports the auto creation and maintenance of Snowflake Dynamic Tables for this final view but the logic underneath is as follows
-- Replace the placeholders in the query
-- DYNAMIC_TABLE_NAME, TARGET_LAG, WAREHOUSE_NAME, PRIMARY_KEY, SOURCE_TABLE_NAME
CREATE OR REPLACE DYNAMIC TABLE <DYNAMIC_TABLE_NAME> (
{{column_1}},
{{column_2}},
{{column_3}},
{{column_4}},
{{column_5}},
.
.
.
.
__DELETED,
_STREAMKAP_SOURCE_TS_MS,
_STREAMKAP_TS_MS,
_STREAMKAP_OFFSET
)
TARGET_LAG = '<TARGET_LAG>' -- Example: '15 minutes'
REFRESH_MODE = AUTO
INITIALIZE = ON_CREATE
WAREHOUSE = <WAREHOUSE_NAME>
AS
SELECT * EXCLUDE dedupe_id
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY <PRIMARY_KEY> ORDER BY _streamkap_ts_ms DESC, _streamkap_offset DESC) AS dedupe_id
FROM <SOURCE_TABLE_NAME>
)
WHERE dedupe_id = 1 AND __deleted = 'false';
DBT
SQL Example
{{
config(
materialized='incremental',
unique_key=<your_primary_key>,
post_hook ='DELETE FROM {{ this }} WHERE __deleted = TRUE',
)
}}
WITH cte AS (
SELECT * FROM source_table
{% if is_incremental() %}
WHERE
_streamkap_ts_ms >= (SELECT MAX(_streamkap_ts_ms) FROM {{ this }})
{% endif %}
QUALIFY RANK() OVER (PARTITION BY <your_primary_key> ORDER BY _streamkap_ts_ms DESC, _streamkap_offset DESC) = 1
)
SELECT
*
FROM
cte
{% if is_incremental() %}
WHERE
_streamkap_ts_ms >= (SELECT MAX(this._streamkap_ts_ms) FROM {{ this }} AS this)
{% endif %}
This is an example approach whereby we can utilize incremental models (DBT or similar) to only compare data that has recently come into the raw source tables from Streamkap. So during incremental refreshes there is no need to scan over the historical CDC data coming in.
The 2nd where clause for incremental logic is in relation to {{ this }}
declaration for DBT, since it can use post DBT hook to delete the __deleted
columns that are true.
Python Example
This is an example approach for generating the DBT incremental models.
import os
import yaml
class IncrementalScriptGenerator:
def __init__(self, relative_input_path, relative_output_dir):
self.home_directory = os.path.expanduser("~")
self.relative_input_path = relative_input_path
self.relative_output_dir = relative_output_dir
self.prod_tables = []
def load_yaml_file(self):
full_path = os.path.join(self.home_directory, self.relative_input_path)
with open(full_path, "r") as file:
self.data = yaml.safe_load(file)
def extract_prod_tables(self):
for source in self.data.get("sources", []):
if source.get("name") == "prod":
tables = source.get("tables", [])
for table in tables:
table_name = table.get("name")
if table_name:
self.prod_tables.append(table_name)
def generate_incremental_scripts(self):
for prod_table in self.prod_tables:
incremental_script = """
{{
config(
incremental_strategy='merge',
unique_key='id'
)
}}
"""
incremental_script += f"""
select *
from {{{{ source('prod', '{prod_table}') }}}}
"""
incremental_script += """
{% if is_incremental() %}
where
_streamkap_ts_ms >= (select max(_streamkap_ts_ms) from {{ this }})
{% endif %}
qualify rank() over (partition by id order by _streamkap_ts_ms desc, _streamkap_offset desc) = 1
"""
relative_output_path = (
f"{self.relative_output_dir}/sk_{prod_table.lower()}.sql"
)
final_output_path = os.path.join(self.home_directory, relative_output_path)
with open(final_output_path, "w") as file:
file.write(incremental_script)
print(f"Written to {final_output_path}")
if __name__ == "__main__":
relative_input_path = "dae-dbt/models/source.yml"
relative_output_dir = "dae-dbt/models/streamkap/incremental"
processor = IncrementalScriptGenerator(relative_input_path, relative_output_dir)
processor.load_yaml_file()
processor.extract_prod_tables()
processor.generate_incremental_scripts()
Updated 28 days ago