Snowflake

Prerequisites

A Snowflake account granted ACCOUNTADMIN system-defined role or custom role with privileges to:

  • CREATE WAREHOUSE, DATABASE, SCHEMA
  • CREATE ROLE, USER
  • CREATE NETWORK POLICY

Snowflake Setup

Copy paste the script below into a Snowflake worksheet, change the values at the top as required, and then run all queries.

-- We've provided defaults so, change these as required with names for Database Objects in 'UPPERCASE'
SET user_name           = UPPER('STREAMKAP_USER');
SET user_password       = '{password}'; -- IMPORTANT: Make sure to change this!
SET warehouse_name      = UPPER('STREAMKAP_WH'); -- Used for optional views not ingestion 
SET database_name       = UPPER('STREAMKAPDB');
SET schema_name         = UPPER('STREAMKAP');
SET role_name           = UPPER('STREAMKAP_ROLE');
SET network_policy_name = UPPER('STREAMKAP_NETWORK_ACCESS');

-- If your Snowflake account uses custom roles to grant privileges, change these values below
SET sysadmin_role       = UPPER('SYSADMIN');
SET securityadmin_role  = UPPER('SECURITYADMIN');

-- Create a warehouse with defaults:
-- Standard, X-Small, No Scaling, Auto-Suspend after 1 Minute
USE ROLE IDENTIFIER($sysadmin_role);
CREATE WAREHOUSE IF NOT EXISTS IDENTIFIER($warehouse_name) AUTO_SUSPEND =1;

-- Create a database and schema for Streamkap
USE WAREHOUSE IDENTIFIER($warehouse_name);
CREATE DATABASE IF NOT EXISTS IDENTIFIER($database_name);
CREATE SCHEMA IF NOT EXISTS IDENTIFIER($schema_name);

-- Create a Snowflake role with privileges for the Streamkap connector
USE ROLE IDENTIFIER($securityadmin_role);
CREATE ROLE IF NOT EXISTS IDENTIFIER($role_name);

-- Grant privileges on the warehouse
GRANT USAGE ON WAREHOUSE IDENTIFIER($warehouse_name) TO ROLE IDENTIFIER($role_name);

-- Grant privileges on the database
GRANT USAGE ON DATABASE IDENTIFIER($database_name) TO ROLE IDENTIFIER($role_name);

-- Grant privileges on the database schema
USE ROLE IDENTIFIER($sysadmin_role);
USE DATABASE IDENTIFIER($database_name);
GRANT USAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE TABLE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE STAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE PIPE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);

-- Create a user for Streamkap
USE ROLE IDENTIFIER($securityadmin_role);
CREATE USER IDENTIFIER($user_name) PASSWORD = $user_password DEFAULT_ROLE = $role_name;

-- Grant the custom role to the Streamkap user
GRANT ROLE IDENTIFIER($role_name) TO USER IDENTIFIER($user_name);

-- Set the custom role as the default role for the Streamkap user.
-- If you encounter an 'Insufficient privileges' error, verify the '$securityadmin_role' has OWNERSHIP privilege on the '$user_name'.
ALTER USER IDENTIFIER($user_name) SET DEFAULT_ROLE = $role_name;

-- Allow the Streamkap user access to the Snowflake account
-- Latest IPs can be found here: https://docs.streamkap.com/docs/streamkap-ip-addresses-whitelisting
CREATE NETWORK POLICY IDENTIFIER($network_policy_name) ALLOWED_IP_LIST=('IP1, IP2');
ALTER USER IDENTIFIER($user_name) SET NETWORK_POLICY = $network_policy_name;

📘

We do not use CREATE OR REPLACE in our scripts. This is to avoid destroying something by mistake that already exists in your Snowflake account.

Key Pair Authentication

The connector relies on an RSA key pair for authentication which you can generate using OpenSSH. Below are example scripts that do that. You can modify them to suit your security policies, but please ensure the key pair meets these minimum requirements:

  • RSA 2048-bit
  • PKCS#8 key format

🚧

If you're on Windows, PuTTY tools can't generate the RSA key pair in the required format. We recommend you install and use OpenSSH for Windows instead

# generates an encrypted RSA private key. You'll be asked to enter a passphrase which the connector needs
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out streamkap_key.p8

# generates the public key, referencing the private key
openssl rsa -in streamkap_key.p8 -pubout -out streamkap_key.pub
# generates an encrypted RSA key pair. You'll be asked to enter a passphrase which the connector needs
ssh-keygen -m PKCS8 -t rsa -b 2048 -f streamkap_key

The scripts above should create two files (the key pair), one private (may have an extension e.g. .p8) and the other public (usually has the extension .pub). Store both files in a secure place.

Once generated, the public key needs to be assigned to the Snowflake database user created for Streamkap earlier.

This command will copy the key to your clipboard after removing the comments

egrep -v '^-|^$' ./streamkap_key.pub | pbcopy

Now attach the key to the user

-- We've provided a default, so change this as required
SET user_name = UPPER('STREAMKAP_USER');

USE ROLE SECURITYADMIN;

-- Open the public key file and replace '{public key}' placeholder below with the contents excluding the header and footer
-- Key part MUST start with 'MII'. If not, generate a new key meeting the minimum requirements
ALTER USER STREAMKAP_USER SET RSA_PUBLIC_KEY = '{public key}';

Streamkap Setup

  1. Go to Destinations and choose Snowflake

  2. Input the following information:

    1. Name - A unique and memorable name for this Connector

    2. Snowflake URL - The value of account_url returned from running SHOW ORGANIZATION ACCOUNTS;in a worksheet with the ORGADMIN role.

    3. Username (Case sensitive) - STREAMKAP_USER OR the username you chose

    4. Private Key - Upload the key you generated.

    5. Private Key Passphrase (optional if you added one)

    6. Database Name (Case sensitive) - STREAMKAPDB OR the database name you chose

    7. Schema Name (Case sensitive) - STREAMKAP OR the schema name you chose

  3. Click Save

Dynamic Tables

Snowflake Dynamic Tables are materialized views which can created a table that consists of the latest records inserted into Snowflake. Visit the 'Views' tab in the destination to set these up or learn more at Snowflake Dynamic Tables

Troubleshooting

Getting the Snowflake URL

You can also run the script below in a Snowflake worksheet to return the Snowflake URL. You need to be logged into Snowflake with an account granted ORGADMIN system-defined role to run this script.

-- Snowflake URL is the 'account_url', not 'account_locator_url'
SHOW ORGANIZATION ACCOUNTS;

Snowflake Setup scripts failing

There can be many reasons for them to fail, but the scripts below can help you diagnose the issues.

You need to be logged into Snowflake with an account granted ACCOUNTADMIN system-defined role or custom role with equivalent privileges to run these scripts.

Copy paste the scripts below into Snowflake worksheets. Change the object names at the top as required and run all queries.

-- Replace object names below with the names used by your Snowflake Setup script
SET warehouse_name      = UPPER('STREAMKAP_WH');
SET database_name       = UPPER('STREAMKAPDB');
SET schema_name         = UPPER('STREAMKAP');
SET role_name           = UPPER('STREAMKAP_ROLE');
SET user_name           = UPPER('STREAMKAP_USER');
SET network_policy_name = UPPER('STREAMKAP_NETWORK_ACCESS');

-- If your Snowflake account uses custom roles to grant privileges, change the role name below
SET accountadmin_role   = UPPER('ACCOUNTADMIN');

USE ROLE IDENTIFIER($accountadmin_role);

-- If any of the queries fail or return no results, '$accountadmin_role' doesn't have necessary privileges, or the object doesn't exist
-- Warehouses, databases and schemas
DESC WAREHOUSE IDENTIFIER($warehouse_name);
DESC DATABASE IDENTIFIER($database_name); -- Displays schemas; no need for DESC SCHEMA ... query also

-- Users
DESC USER IDENTIFIER($user_name); -- Shows name, defaults and RSA details; no need for separate queries

-- Network policies
DESC NETWORK POLICY IDENTIFIER($network_policy_name);
-- Replace role name below with the role name used by your Snowflake Setup script
SET role_name           = UPPER('STREAMKAP_CONNECTOR');

-- If your Snowflake account uses custom roles to grant privileges, change the role name below
SET accountadmin_role   = UPPER('ACCOUNTADMIN');

USE ROLE IDENTIFIER($accountadmin_role);

-- List privileges
SHOW GRANTS TO ROLE IDENTIFIER($role_name);

If any of the queries return an error or no results:

  • Check in the top right corner (next to Share and Run buttons) of the Snowflake Worksheets that the role is set to ACCOUNTADMIN, or a custom role with equivalent privileges
  • Depending on which query failed or returned no results, check the object names at the top of the script are correct
  • If a query returns "Object does not exist or is not authorized" error, go to the Snowsight UI Admin page and see if the object is showing there. For example, if DESC WAREHOUSE ... failed, go to Admin -> Warehouses page and check if the Warehouse is shown on that page

If the warehouse, database, schema, role and user exists, privileges might be an issue. Run Script #2 and ensure the privileges displayed match or include the following:

CREATE OR REPLACE DYNAMIC TABLE {schema}.{table}
TARGET_LAG = '1 minute'
WAREHOUSE = STREAMKAP_WH
AS SELECT *
    FROM (
        SELECT *,
            ROW_NUMBER() OVER 
                (PARTITION BY {primary_key_column} 
                ORDER BY {timestamp_column} DESC) AS dedupe_id
        FROM {schema}.{table}
        WHERE __deleted = 'false'  -- Excluding deleted records
    ) AS subquery
WHERE dedupe_id = 1;  -- Latest record

Clean up old records

If you're not concerned about keeping older change events, that is, the historic state of records, you can delete them.

To delete older records - assuming there's an appropriate timestamp such as the change event timestamp - you can schedule a query to delete them, replacing { ... } placeholders as required:

DELETE FROM {dataset}.{table}
WHERE
  (NOT EXISTS (
    SELECT 1
    FROM (
      SELECT {primary_key_column, ...}, MAX({timestamp_column}) AS max_timestamp
      FROM {dataset}.{table}
      GROUP BY {primary_key_column, ...}
    ) AS subquery
    WHERE {table}.{primary_key_column} = subquery.{primary_key_column}
      AND {table}.{timestamp_column} = subquery.max_timestamp
  ))
  AND TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), {timestamp_column}, MINUTE) > 90;