Creating Final State Tables from Inserts/Append
How to create final state tables from append only tables
Here is an overview of how you can create final state tables while working with insert/append-only data.
Snowflake
Task + Merge Statement
#Replace DBNAME, SCHEMANAME, WAREHOUSE_NAME
-------------------------------------------------------
CREATE OR REPLACE TASK <DBNAME>.<SCHEMANAME>.<TASK_TABLENAME>_TASK_RUN_HISTORY
WAREHOUSE = WAREHOUSE_NAME
AFTER <DBNAME>.<SCHEMANAME>.<TASK_<TABLENAME>
AS
INSERT INTO <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY (TABLE_NAME, MAX_TIMESTAMP, REFRESH_TIME)
SELECT
'<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>',
MAX(_streamkap_ts_ms) AS max_ts,
CURRENT_TIMESTAMP AS refresh_time
FROM <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>;
---------------------------------------------------------
CREATE TASK <DBNAME>.<SCHEMANAME>.<TABLENAME>_TASK
WAREHOUSE = STREAMKAP_WH
SCHEDULE = '15 MINUTE'
AS
MERGE INTO <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP> AS target
USING (
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY ID ORDER BY _streamkap_ts_ms DESC) AS dedupe_id
FROM <DBNAME>.<SCHEMANAME>.<TABLENAME>
WHERE _streamkap_ts_ms >=
COALESCE((SELECT MAX(MAX_TIMESTAMP) FROM <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY
WHERE TABLE_NAME='<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>'),
(DATE_PART('EPOCH', CURRENT_TIMESTAMP()) * 1000) - (8 * 24 * 60 * 60 * 1000))
) AS subquery
WHERE dedupe_id = 1 AND __deleted = 'false'
) AS source
ON target.ID = source.ID
WHEN MATCHED THEN
UPDATE SET
target.RECORD_METADATA = source.RECORD_METADATA,
<values>,
target.__DELETED = source.__DELETED,
target._STREAMKAP_SOURCE_TS_MS = source._STREAMKAP_SOURCE_TS_MS,
target._STREAMKAP_TS_MS = source._STREAMKAP_TS_MS,
target._STREAMKAP_OFFSET = source._STREAMKAP_OFFSET
WHEN NOT MATCHED THEN
INSERT (
<values>,
__DELETED,
_STREAMKAP_SOURCE_TS_MS,
_STREAMKAP_TS_MS,
_STREAMKAP_OFFSET
)
VALUES (
<values>,
source.__DELETED,
source._STREAMKAP_SOURCE_TS_MS,
source._STREAMKAP_TS_MS,
source._STREAMKAP_OFFSET
);
Dynamic Tables
Streamkap supports the auto creation and maintenance of Snowflake Dynamic Tables for this final view but the logic underneath is as follows
DBT
{{
config(
materialized='incremental',
unique_key=<your_primary_key>,
post_hook ='DELETE FROM {{ this }} WHERE __deleted = TRUE',
)
}}
WITH cte AS (
SELECT * FROM source_table
{% if is_incremental() %}
WHERE
_streamkap_ts_ms >= (SELECT MAX(_streamkap_ts_ms) FROM {{ this }})
{% endif %}
QUALIFY RANK() OVER (PARTITION BY <your_primary_key> ORDER BY _streamkap_ts_ms DESC, _streamkap_offset DESC) = 1
)
SELECT
*
FROM
cte
{% if is_incremental() %}
WHERE
_streamkap_ts_ms >= (SELECT MAX(this._streamkap_ts_ms) FROM {{ this }} AS this)
{% endif %}
This is an example approach whereby we can utilize incremental models (dbt or similar) to only compare data that has recently come into the raw source tables from streamkap. So during incremental refreshes there is no need to scan over the historical CDC data coming in.
The 2nd where clause for incremental logic is in relation to {{ this }} declaration for dbt, since it can use post dbt hook to delete the "_deleted" columns that are true.
Updated about 14 hours ago