Snowflake

Prerequisites

A Snowflake account granted ACCOUNTADMIN system-defined role or custom role with privileges to:

  • CREATE WAREHOUSE, DATABASE, SCHEMA
  • CREATE ROLE, USER
  • CREATE NETWORK POLICY

Snowflake Setup

It's recommended to create a separate user and role for Streamkap to access your Snowflake database. Below is an example script that does that.

-- We've provided defaults so, change these as required with names for Database Objects in 'UPPERCASE'
SET user_name           = UPPER('STREAMKAP_USER');
SET user_password       = '{password}'; -- IMPORTANT: Make sure to change this!
SET warehouse_name      = UPPER('STREAMKAP_WH'); -- Used for optional views not ingestion 
SET database_name       = UPPER('STREAMKAPDB');
SET schema_name         = UPPER('STREAMKAP');
SET role_name           = UPPER('STREAMKAP_ROLE');
SET network_policy_name = UPPER('STREAMKAP_NETWORK_ACCESS');

-- If your Snowflake account uses custom roles to grant privileges, change these values below
SET sysadmin_role       = UPPER('SYSADMIN');
SET securityadmin_role  = UPPER('SECURITYADMIN');
SET accountadmin_role   = UPPER('ACCOUNTADMIN');

-- Create a warehouse with defaults:
-- Standard, X-Small, No Scaling, Auto-Suspend after 1 Minute
USE ROLE IDENTIFIER($sysadmin_role);
CREATE WAREHOUSE IF NOT EXISTS IDENTIFIER($warehouse_name) AUTO_SUSPEND =1;

-- Create a database and schema for Streamkap
USE WAREHOUSE IDENTIFIER($warehouse_name);
CREATE DATABASE IF NOT EXISTS IDENTIFIER($database_name);
USE DATABASE IDENTIFIER($database_name);
CREATE SCHEMA IF NOT EXISTS IDENTIFIER($schema_name);

-- Create a Snowflake role with privileges for the Streamkap connector
USE ROLE IDENTIFIER($securityadmin_role);
CREATE ROLE IF NOT EXISTS IDENTIFIER($role_name);

-- Grant privileges on the warehouse
GRANT USAGE ON WAREHOUSE IDENTIFIER($warehouse_name) TO ROLE IDENTIFIER($role_name);

-- Grant privileges on the database
GRANT USAGE ON DATABASE IDENTIFIER($database_name) TO ROLE IDENTIFIER($role_name);

-- Grant privileges on the database schema
USE ROLE IDENTIFIER($sysadmin_role);
USE DATABASE IDENTIFIER($database_name);
GRANT USAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE TABLE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE STAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE PIPE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);

-- Grant privileges for dynamic table and task creation (Only if auto-creation is enabled)
GRANT CREATE DYNAMIC TABLE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE TASK ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
USE ROLE IDENTIFIER($accountadmin_role);
GRANT EXECUTE TASK ON ACCOUNT TO ROLE IDENTIFIER($role_name);

-- Create a user for Streamkap
USE ROLE IDENTIFIER($securityadmin_role);
CREATE USER IDENTIFIER($user_name) PASSWORD = $user_password DEFAULT_ROLE = $role_name;

-- Grant the custom role to the Streamkap user
GRANT ROLE IDENTIFIER($role_name) TO USER IDENTIFIER($user_name);

-- Set the custom role as the default role for the Streamkap user.
-- If you encounter an 'Insufficient privileges' error, verify the '$securityadmin_role' has OWNERSHIP privilege on the '$user_name'.
ALTER USER IDENTIFIER($user_name) SET DEFAULT_ROLE = $role_name;

-- Allow the Streamkap user access to the Snowflake account
-- Latest IPs can be found here: https://docs.streamkap.com/docs/streamkap-ip-addresses-whitelisting
-- If you need to edit the network policy, you can use:
-- ALTER NETWORK POLICY STREAMKAP_NETWORK_ACCESS SET ALLOWED_IP_LIST=('52.32.238.100');
CREATE NETWORK POLICY IDENTIFIER($network_policy_name) ALLOWED_IP_LIST=('52.32.238.100');
ALTER USER IDENTIFIER($user_name) SET NETWORK_POLICY = $network_policy_name;

๐Ÿ“˜

We do not use CREATE OR REPLACE in our scripts. This is to avoid destroying something by mistake that already exists in your Snowflake account.

Key Pair Authentication

The connector relies on an RSA key pair for authentication which you can generate using OpenSSH. Below are example scripts that do that. You can modify them to suit your security policies, but please ensure the key pair meets these minimum requirements:

  • RSA 2048-bit
  • PKCS#8 key format

๐Ÿ“˜

SSH key generation on Windows

Snowflake does not support keys generated by PuTTY Key Generator.

One of the easiest and quickest ways to generate a valid OpenSSL key is via Git Bash which is installed by default with Git for Windows. After installation, you can open a Git Bash prompt by Left Shift + Right Clicking on your Desktop, choosing "Open Git Bash here" and then executing the OpenSSL commands below.

If you have any issues following these instructions or are unable to install Git for Windows, please contact us.

# generates an encrypted RSA private key
# Make sure to change '{passphrase}' to a password of your choice
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out streamkap_key.p8 -passout pass:{passphrase}

# generates the public key, referencing the private key
# Don't forget to replace '{passphrase}' with the password used in the previous command
openssl rsa -in streamkap_key.p8 -pubout -out streamkap_key.pub -passin pass:{passphrase}
# generates an encrypted RSA private key
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out streamkap_key.p8 -nocrypt

# generates the public key, referencing the private key
openssl rsa -in streamkap_key.p8 -pubout -out streamkap_key.pub

The scripts above should create two files (the key pair), one private (may have an extension e.g. .p8) and the other public (usually has the extension .pub). Store both files in a secure place.

Once generated, the public key needs to be assigned to the Snowflake database user created for Streamkap earlier.

This command will copy the public key you generated to your clipboard:

egrep -v '^-|^$' ./streamkap_key.pub | pbcopy
Get-Content .\streamkap_key.pub | Where-Object { $_ -notmatch '^-|^$' } | Set-Clipboard

Now attach the public key to the user:

-- We've provided a default, so change this as required
SET user_name = UPPER('STREAMKAP_USER');

USE ROLE SECURITYADMIN;

-- Replace '{public key}' below with the public key file contents
-- If you used the previous command to copy the key to your clipboard, use Ctrl+V (Windows)
-- or Cmd+V (MacOS) to replace the '{public key}' placeholder with the key
-- Key part MUST start with 'MII' excluding any headers and footers
ALTER USER IDENTIFIER($user_name) SET RSA_PUBLIC_KEY = '{public key}';

Streamkap Setup

  1. Go to Destinations and choose Snowflake

  2. Input the following information:

    1. Name - A unique and memorable name for this Connector

    2. Snowflake URL - The URL for accessing your Snowflake account. This URL must include your account identifier. Note that the protocol (https://) and port number are optional

    3. Username (Case sensitive) - User login name for the Snowflake account

    4. Private Key - Upload the private key you generated egrep -v '^-|^$' ./streamkap_key.p8 | pbcopy

    5. Private Key Passphrase - For encrypted keys. Leave blank for unencrypted keys

    6. Database Name (Case sensitive) - The name of the database to use

    7. Schema Name (Case sensitive) - The name of the schema where tables will be created

    8. Snowflake Role (Case sensitive) - The name of an existing role with necessary privileges (for Streamkap) assigned to the user specified by Username

  3. Click Save

Dynamic Tables

Snowflake Dynamic Tables are materialized views which can create a table that consists of the latest records inserted into Snowflake. Below is an example script you can use, substituting the placeholders e.g. {schema}, {table}, {primary_key_column} accordingly.

CREATE OR REPLACE DYNAMIC TABLE {schema}.{table}
TARGET_LAG = '1 minute'
WAREHOUSE = STREAMKAP_WH
AS SELECT *
    FROM (
        SELECT *,
            ROW_NUMBER() OVER 
                (PARTITION BY {primary_key_column} 
                ORDER BY __streamkap_source_ts_ms DESC) AS dedupe_id
        FROM {schema}.{table}
    ) AS subquery
WHERE dedupe_id = 1;  -- Latest record
AND __deleted = 'false'  -- Excluding deleted records

Upsert mode

Snowflake destination connector can run in upsert mode. This mode switches off the use of snowpipe streaming and connector uses periodic MERGE INTO statements to upsert data into target snowflake tables. Dynamic tables or other de-duplication mechanisms will not be necessary when using upsert mode.

โ—๏ธ

Snowflake costs

Currently upsert mode requires a warehouse to be running so overall the costs will be higher compared to append mode which uses snowpipe streaming.

Troubleshooting

Getting the Snowflake URL

You can also run the script below in a Snowflake worksheet to return the Snowflake URL. You need to be logged into Snowflake with an account granted ORGADMIN system-defined role to run this script.

-- Snowflake URL is the 'account_url', not 'account_locator_url'
SHOW ORGANIZATION ACCOUNTS;

Snowflake Setup scripts failing

There can be many reasons for them to fail, but the scripts below can help you diagnose the issues.

You need to be logged into Snowflake with an account granted ACCOUNTADMIN system-defined role or custom role with equivalent privileges to run these scripts.

Copy paste the scripts below into Snowflake worksheets. Change the object names at the top as required and run all queries.

-- Replace object names below with the names used by your Snowflake Setup script
SET warehouse_name      = UPPER('STREAMKAP_WH');
SET database_name       = UPPER('STREAMKAPDB');
SET schema_name         = UPPER('STREAMKAP');
SET role_name           = UPPER('STREAMKAP_ROLE');
SET user_name           = UPPER('STREAMKAP_USER');
SET network_policy_name = UPPER('STREAMKAP_NETWORK_ACCESS');

-- If your Snowflake account uses custom roles to grant privileges, change the role name below
SET accountadmin_role   = UPPER('ACCOUNTADMIN');

USE ROLE IDENTIFIER($accountadmin_role);

-- If any of the queries fail or return no results, '$accountadmin_role' doesn't have necessary privileges, or the object doesn't exist
-- Warehouses, databases and schemas
DESC WAREHOUSE IDENTIFIER($warehouse_name);
DESC DATABASE IDENTIFIER($database_name); -- Displays schemas; no need for DESC SCHEMA ... query also

-- Users
DESC USER IDENTIFIER($user_name); -- Shows name, defaults and RSA details; no need for separate queries

-- Network policies
DESC NETWORK POLICY IDENTIFIER($network_policy_name);
-- Replace role name below with the role name used by your Snowflake Setup script
SET role_name           = UPPER('STREAMKAP_CONNECTOR');

-- If your Snowflake account uses custom roles to grant privileges, change the role name below
SET accountadmin_role   = UPPER('ACCOUNTADMIN');

USE ROLE IDENTIFIER($accountadmin_role);

-- List privileges
SHOW GRANTS TO ROLE IDENTIFIER($role_name);

If any of the queries return an error or no results:

  • Check in the top right corner (next to Share and Run buttons) of the Snowflake Worksheets that the role is set to ACCOUNTADMIN, or a custom role with equivalent privileges
  • Depending on which query failed or returned no results, check the object names at the top of the script are correct
  • If a query returns "Object does not exist or is not authorized" error, go to the Snowsight UI Admin page and see if the object is showing there. For example, if DESC WAREHOUSE ... failed, go to Admin -> Warehouses page and check if the Warehouse is shown on that page

If the warehouse, database, schema, role and user exists, privileges might be an issue. Run Script #2 and ensure the privileges displayed match or include the following:

Clean up old records

If you're not concerned about keeping older change events, that is, the historic state of records, you can delete them.

To delete older records - assuming there's an appropriate timestamp such as the change event timestamp - you can schedule a query to delete them, replacing { ... } placeholders as required:

DELETE FROM {dataset}.{table}
WHERE
  (NOT EXISTS (
    SELECT 1
    FROM (
      SELECT {primary_key_column, ...}, MAX({timestamp_column}) AS max_timestamp
      FROM {dataset}.{table}
      GROUP BY {primary_key_column, ...}
    ) AS subquery
    WHERE {table}.{primary_key_column} = subquery.{primary_key_column}
      AND {table}.{timestamp_column} = subquery.max_timestamp
  ))