Creating Final State Tables from Inserts/Append

How to create final state tables from append only tables

Here is an overview of how you can create final state tables while working with insert/append-only data.

Snowflake

Task + Merge Statement

#Replace DBNAME, SCHEMANAME, WAREHOUSE_NAME
-------------------------------------------------------
CREATE OR REPLACE TASK <DBNAME>.<SCHEMANAME>.<TASK_TABLENAME>_TASK_RUN_HISTORY
  WAREHOUSE = WAREHOUSE_NAME
  AFTER <DBNAME>.<SCHEMANAME>.<TASK_<TABLENAME>
AS
INSERT INTO <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY (TABLE_NAME, MAX_TIMESTAMP, REFRESH_TIME)
SELECT 
    '<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>', 
    MAX(_streamkap_ts_ms) AS max_ts,
    CURRENT_TIMESTAMP AS refresh_time
FROM <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>;
---------------------------------------------------------
CREATE TASK <DBNAME>.<SCHEMANAME>.<TABLENAME>_TASK
  WAREHOUSE = STREAMKAP_WH  
  SCHEDULE = '15 MINUTE'
AS
MERGE INTO <DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP> AS target
USING (
    SELECT *
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY ID ORDER BY _streamkap_ts_ms DESC) AS dedupe_id
        FROM <DBNAME>.<SCHEMANAME>.<TABLENAME>
        WHERE _streamkap_ts_ms >= 
        COALESCE((SELECT MAX(MAX_TIMESTAMP) FROM <DBNAME>.<SCHEMANAME>.TABLE_REFRESH_HISTORY
        WHERE TABLE_NAME='<DBNAME>.<SCHEMANAME>.<TABLENAME_DEDUP>'),
        (DATE_PART('EPOCH', CURRENT_TIMESTAMP()) * 1000) - (8 * 24 * 60 * 60 * 1000))
    ) AS subquery
    WHERE dedupe_id = 1 AND __deleted = 'false'
) AS source
ON target.ID = source.ID
WHEN MATCHED THEN
    UPDATE SET 
        target.RECORD_METADATA = source.RECORD_METADATA,
        <values>,
        target.__DELETED = source.__DELETED,
        target._STREAMKAP_SOURCE_TS_MS = source._STREAMKAP_SOURCE_TS_MS,
        target._STREAMKAP_TS_MS = source._STREAMKAP_TS_MS,
        target._STREAMKAP_OFFSET = source._STREAMKAP_OFFSET
WHEN NOT MATCHED THEN
    INSERT (
  			<values>,
        __DELETED,
        _STREAMKAP_SOURCE_TS_MS,
        _STREAMKAP_TS_MS,
        _STREAMKAP_OFFSET
    )
    VALUES (
        <values>,
        source.__DELETED,
        source._STREAMKAP_SOURCE_TS_MS,
        source._STREAMKAP_TS_MS,
        source._STREAMKAP_OFFSET
    );

Dynamic Tables

Streamkap supports the auto creation and maintenance of Snowflake Dynamic Tables for this final view but the logic underneath is as follows


DBT


{{
	config(
		materialized='incremental',
		unique_key=<your_primary_key>,
		post_hook ='DELETE FROM {{ this }} WHERE __deleted = TRUE',
	)
}}


WITH cte AS (
    SELECT * FROM source_table
    {% if is_incremental() %}
    WHERE
			_streamkap_ts_ms >= (SELECT MAX(_streamkap_ts_ms) FROM {{ this }})
	{% endif %}
    QUALIFY RANK() OVER (PARTITION BY <your_primary_key> ORDER BY _streamkap_ts_ms DESC, _streamkap_offset DESC) = 1
)

SELECT
    *
FROM
    cte

{% if is_incremental() %}

	WHERE
		_streamkap_ts_ms >= (SELECT MAX(this._streamkap_ts_ms) FROM {{ this }} AS this)

{% endif %}

This is an example approach whereby we can utilize incremental models (dbt or similar) to only compare data that has recently come into the raw source tables from streamkap. So during incremental refreshes there is no need to scan over the historical CDC data coming in.

The 2nd where clause for incremental logic is in relation to {{ this }} declaration for dbt, since it can use post dbt hook to delete the "_deleted" columns that are true.