Snowflake Dynamic Tables

Snowflake Dynamic Tables with Snowpipe Streaming

Snowflake Dynamic Tables can be used to create a materialized view on top of the inserted append-only data to create a final state view of the data, i.e. most recent records only.

Overview

This guide outlines the setup for Dynamic Tables and Clean-up Tasks for customers using Snowflake's Snowpipe stream. Since Snowpipe is append-only (allowing multiple records with the same primary key), Dynamic Tables ensure that only the most recent version of each record is returned.

Additionally, Clean-up Tasks help manage the base table's storage size by automatically deleting older, redundant records, keeping only the latest version.

❗️

Important Note: Dynamic Tables and Clean-up Tasks are created together. If you later decide to disable these features, you must manually remove them from Snowflake; simply unchecking the Use Dynamic Tables option in the connector does not automatically remove the table and task.

Objectives

  1. Dynamic Tables: Ensure only the final version of each record is visible by deduplicating data.
  2. Clean-up Tasks: Periodically delete older records from the base table to maintain manageable storage sizes.

Dynamic Table Setup

Dynamic Tables are automatically generated when the Use Dynamic Tables option is selected in the connector settings. You can configure the Dynamic Table Lag to control how frequently the dynamic table updates.

Dynamic Table Script

The following script is used to create a dynamic table:

CREATE OR REPLACE DYNAMIC TABLE {{table}}_DT 
TARGET_LAG='{{targetLag}} minutes' 
WAREHOUSE={{warehouse}} 
AS 
SELECT * FROM (
    SELECT *, 
           ROW_NUMBER() OVER (PARTITION BY {{primaryKeyColumns}} ORDER BY _streamkap_ts_ms DESC, _streamkap_offset DESC) AS dedupe_id
    FROM {{table}}
) 
WHERE dedupe_id = 1 
AND __deleted = 'false';

Components:

  • {{table}}: The dynamic table's name, automatically generated with a _DT suffix.
  • {{targetLag}}: The lag time (in minutes) for updating the dynamic table. The default is 15 minutes, but this can be modified in the connector settings.
  • {{warehouse}}: The Snowflake warehouse used to run the dynamic table query.
  • {{primaryKeyColumns}}: Primary key columns used to group records and identify the most recent version, automatically selected by the connector.

Clean-up Task Setup

The Clean-up Task is also automatically generated when the Use Dynamic Tables option is selected in the connector settings to periodically delete outdated records from the base table. This keeps the base table lean by retaining only the latest version of each record.

Clean-up Task Script

The following script is used to create a clean-up task

CREATE OR REPLACE TASK {{table}}_CT 
WAREHOUSE={{warehouse}} 
SCHEDULE='{{schedule}} minutes' 
TASK_AUTO_RETRY_ATTEMPTS=3 
ALLOW_OVERLAPPING_EXECUTION=FALSE 
AS 
DELETE FROM {{table}} 
WHERE NOT EXISTS (
    SELECT 1 
    FROM (
        SELECT {{primaryKeyColumns}}, MAX(_streamkap_ts_ms) AS max_timestamp 
        FROM {{table}} 
        GROUP BY {{primaryKeyColumns}}
    ) AS subquery 
    WHERE {{keyColumnsAndCondition}} 
    AND {{table}}._streamkap_ts_ms = subquery.max_timestamp
);

Components:

  • {{table}}: The clean-up task's name, automatically generated with a _CT suffix.
  • {{warehouse}}: The Snowflake warehouse used to run the dynamic table query.
  • {{schedule}}: The frequency (in minutes) that the clean-up task is scheduled to run.
  • {{primaryKeyColumns}}: Primary key columns used to group records and identify the most recent version, automatically selected by the connector.
  • {{keyColumnsAndCondition}}: Conditions to match records in the base table with the latest version in the subquery, generated automatically.

Task Management

Once the clean-up task is created, you can manually resume its execution using the following script:

ALTER TASK {{table}}_CT RESUME;

This command ensures the clean-up task resumes running at the scheduled intervals.


Example: Orders Table

Below is an example of setting up a dynamic table and clean-up task for a table called orders.

Inputs:

  • Table: orders
  • Warehouse: STREAMKAP_WH
  • Primary Key: order_id
  • Target Lag: 5 minutes
  • Clean-up Schedule: Every 60 minutes

Dynamic Table Script:

CREATE OR REPLACE DYNAMIC TABLE orders_DT 
TARGET_LAG='5 minutes' 
WAREHOUSE=STREAMKAP_WH 
AS 
SELECT * FROM (
    SELECT *, 
           ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY _streamkap_ts_ms DESC, _streamkap_offset DESC) AS dedupe_id
    FROM orders
) 
WHERE dedupe_id = 1 
AND __deleted = 'false';

Clean-up Task Script:

CREATE OR REPLACE TASK orders_CT 
WAREHOUSE=STREAMKAP_WH 
SCHEDULE='60 minutes' 
TASK_AUTO_RETRY_ATTEMPTS=3 
ALLOW_OVERLAPPING_EXECUTION=FALSE 
AS 
DELETE FROM orders 
WHERE NOT EXISTS (
    SELECT 1 
    FROM (
        SELECT order_id, MAX(_streamkap_ts_ms) AS max_timestamp 
        FROM orders 
        GROUP BY order_id
    ) AS subquery 
    WHERE orders.order_id = subquery.order_id 
    AND orders._streamkap_ts_ms = subquery.max_timestamp
);

Resuming the Clean-up Task:

ALTER TASK orders_CT RESUME;

Learn more at Snowflake Dynamic Tables