Snowflake

Prerequisites

A Snowflake account granted ACCOUNTADMIN system-defined role or custom role with privileges to:

  • CREATE WAREHOUSE, DATABASE, SCHEMA
  • CREATE ROLE, USER
  • CREATE NETWORK POLICY

Snowflake Setup

Copy paste the script below into a Snowflake worksheet, change the values at the top as required, and then run all queries.

-- We've provided defaults so, change these as required with names for Database Objects in 'UPPERCASE'
SET user_name           = UPPER('STREAMKAP_USER');
SET user_password       = '{password}'; -- IMPORTANT: Make sure to change this!
SET warehouse_name      = UPPER('STREAMKAP_WH'); -- Used for optional views not ingestion 
SET database_name       = UPPER('STREAMKAPDB');
SET schema_name         = UPPER('STREAMKAP');
SET role_name           = UPPER('STREAMKAP_ROLE');
SET network_policy_name = UPPER('STREAMKAP_NETWORK_ACCESS');

-- If your Snowflake account uses custom roles to grant privileges, change these values below
SET sysadmin_role       = UPPER('SYSADMIN');
SET securityadmin_role  = UPPER('SECURITYADMIN');

-- Create a warehouse with defaults:
-- Standard, X-Small, No Scaling, Auto-Suspend after 1 Minute
USE ROLE IDENTIFIER($sysadmin_role);
CREATE WAREHOUSE IF NOT EXISTS IDENTIFIER($warehouse_name) AUTO_SUSPEND =1;

-- Create a database and schema for Streamkap
USE WAREHOUSE IDENTIFIER($warehouse_name);
CREATE DATABASE IF NOT EXISTS IDENTIFIER($database_name);
USE DATABASE IDENTIFIER($database_name);
CREATE SCHEMA IF NOT EXISTS IDENTIFIER($schema_name);

-- Create a Snowflake role with privileges for the Streamkap connector
USE ROLE IDENTIFIER($securityadmin_role);
CREATE ROLE IF NOT EXISTS IDENTIFIER($role_name);

-- Grant privileges on the warehouse
GRANT USAGE ON WAREHOUSE IDENTIFIER($warehouse_name) TO ROLE IDENTIFIER($role_name);

-- Grant privileges on the database
GRANT USAGE ON DATABASE IDENTIFIER($database_name) TO ROLE IDENTIFIER($role_name);

-- Grant privileges on the database schema
USE ROLE IDENTIFIER($sysadmin_role);
USE DATABASE IDENTIFIER($database_name);
GRANT USAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE TABLE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE STAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE PIPE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);

-- Create a user for Streamkap
USE ROLE IDENTIFIER($securityadmin_role);
CREATE USER IDENTIFIER($user_name) PASSWORD = $user_password DEFAULT_ROLE = $role_name;

-- Grant the custom role to the Streamkap user
GRANT ROLE IDENTIFIER($role_name) TO USER IDENTIFIER($user_name);

-- Set the custom role as the default role for the Streamkap user.
-- If you encounter an 'Insufficient privileges' error, verify the '$securityadmin_role' has OWNERSHIP privilege on the '$user_name'.
ALTER USER IDENTIFIER($user_name) SET DEFAULT_ROLE = $role_name;

-- Allow the Streamkap user access to the Snowflake account
-- Latest IPs can be found here: https://docs.streamkap.com/docs/streamkap-ip-addresses-whitelisting
-- If you need to edit the network policy, you can use:
-- ALTER NETWORK POLICY STREAMKAP_NETWORK_ACCESS SET ALLOWED_IP_LIST=('52.32.238.100');
CREATE NETWORK POLICY IDENTIFIER($network_policy_name) ALLOWED_IP_LIST=('52.32.238.100');
ALTER USER IDENTIFIER($user_name) SET NETWORK_POLICY = $network_policy_name;

πŸ“˜

We do not use CREATE OR REPLACE in our scripts. This is to avoid destroying something by mistake that already exists in your Snowflake account.

Key Pair Authentication

The connector relies on an RSA key pair for authentication which you can generate using OpenSSH. Below are example scripts that do that. You can modify them to suit your security policies, but please ensure the key pair meets these minimum requirements:

  • RSA 2048-bit
  • PKCS#8 key format

πŸ“˜

SSH key generation on Windows

Snowflake does not support keys generated by PuTTY Key Generator.

One of the easiest and quickest ways to generate a valid OpenSSL key is via Git Bash which is installed by default with Git for Windows. After installation, you can open a Git Bash prompt by Left Shift + Right Clicking on your Desktop, choosing "Open Git Bash here" and then executing the OpenSSL commands below.

If you have any issues following these instructions or are unable to install Git for Windows, please contact us.

# generates an encrypted RSA private key
# Make sure to change '{passphrase}' to a password of your choice
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out streamkap_key.p8 -passout pass:{passphrase}

# generates the public key, referencing the private key
# Don't forget to replace '{passphrase}' with the password used in the previous command
openssl rsa -in streamkap_key.p8 -pubout -out streamkap_key.pub -passin pass:{passphrase}

The scripts above should create two files (the key pair), one private (may have an extension e.g. .p8) and the other public (usually has the extension .pub). Store both files in a secure place.

Once generated, the public key needs to be assigned to the Snowflake database user created for Streamkap earlier.

This command will copy the public key you generated to your clipboard:

egrep -v '^-|^$' ./streamkap_key.pub | pbcopy
Get-Content .\streamkap_key.pub | Where-Object { $_ -notmatch '^-|^$' } | Set-Clipboard

Now attach the public key to the user:

-- We've provided a default, so change this as required
SET user_name = UPPER('STREAMKAP_USER');

USE ROLE SECURITYADMIN;

-- Replace '{public key}' below with the public key file contents
-- If you used the previous command to copy the key to your clipboard, use Ctrl+V (Windows)
-- or Cmd+V (MacOS) to replace the '{public key}' placeholder with the key
-- Key part MUST start with 'MII' excluding any headers and footers
ALTER USER STREAMKAP_USER SET RSA_PUBLIC_KEY = '{public key}';

Streamkap Setup

  1. Go to Destinations and choose Snowflake

  2. Input the following information:

    1. Name - A unique and memorable name for this Connector

    2. Snowflake URL - The value of account_url returned from running SHOW ORGANIZATION ACCOUNTS;in a worksheet with the ORGADMIN role.

    3. Username (Case sensitive) - STREAMKAP_USER OR the username you chose

    4. Private Key - Upload the private key you generated egrep -v '^-|^$' ./streamkap_key.p8 | pbcopy

    5. Private Key Passphrase (The passphrase you gave to the private key)

    6. Database Name (Case sensitive) - STREAMKAPDB OR the database name you chose

    7. Schema Name (Case sensitive) - STREAMKAP OR the schema name you chose

  3. Click Save

Dynamic Tables

Snowflake Dynamic Tables are materialized views which can create a table that consists of the latest records inserted into Snowflake. Below is an example script you can use, substituting the placeholders e.g. {schema}, {table}, {primary_key_column} accordingly.

CREATE OR REPLACE DYNAMIC TABLE {schema}.{table}
TARGET_LAG = '1 minute'
WAREHOUSE = STREAMKAP_WH
AS SELECT *
    FROM (
        SELECT *,
            ROW_NUMBER() OVER 
                (PARTITION BY {primary_key_column} 
                ORDER BY __streamkap_source_ts_ms DESC) AS dedupe_id
        FROM {schema}.{table}
    ) AS subquery
WHERE dedupe_id = 1;  -- Latest record
AND __deleted = 'false'  -- Excluding deleted records

Troubleshooting

Getting the Snowflake URL

You can also run the script below in a Snowflake worksheet to return the Snowflake URL. You need to be logged into Snowflake with an account granted ORGADMIN system-defined role to run this script.

-- Snowflake URL is the 'account_url', not 'account_locator_url'
SHOW ORGANIZATION ACCOUNTS;

Snowflake Setup scripts failing

There can be many reasons for them to fail, but the scripts below can help you diagnose the issues.

You need to be logged into Snowflake with an account granted ACCOUNTADMIN system-defined role or custom role with equivalent privileges to run these scripts.

Copy paste the scripts below into Snowflake worksheets. Change the object names at the top as required and run all queries.

-- Replace object names below with the names used by your Snowflake Setup script
SET warehouse_name      = UPPER('STREAMKAP_WH');
SET database_name       = UPPER('STREAMKAPDB');
SET schema_name         = UPPER('STREAMKAP');
SET role_name           = UPPER('STREAMKAP_ROLE');
SET user_name           = UPPER('STREAMKAP_USER');
SET network_policy_name = UPPER('STREAMKAP_NETWORK_ACCESS');

-- If your Snowflake account uses custom roles to grant privileges, change the role name below
SET accountadmin_role   = UPPER('ACCOUNTADMIN');

USE ROLE IDENTIFIER($accountadmin_role);

-- If any of the queries fail or return no results, '$accountadmin_role' doesn't have necessary privileges, or the object doesn't exist
-- Warehouses, databases and schemas
DESC WAREHOUSE IDENTIFIER($warehouse_name);
DESC DATABASE IDENTIFIER($database_name); -- Displays schemas; no need for DESC SCHEMA ... query also

-- Users
DESC USER IDENTIFIER($user_name); -- Shows name, defaults and RSA details; no need for separate queries

-- Network policies
DESC NETWORK POLICY IDENTIFIER($network_policy_name);
-- Replace role name below with the role name used by your Snowflake Setup script
SET role_name           = UPPER('STREAMKAP_CONNECTOR');

-- If your Snowflake account uses custom roles to grant privileges, change the role name below
SET accountadmin_role   = UPPER('ACCOUNTADMIN');

USE ROLE IDENTIFIER($accountadmin_role);

-- List privileges
SHOW GRANTS TO ROLE IDENTIFIER($role_name);

If any of the queries return an error or no results:

  • Check in the top right corner (next to Share and Run buttons) of the Snowflake Worksheets that the role is set to ACCOUNTADMIN, or a custom role with equivalent privileges
  • Depending on which query failed or returned no results, check the object names at the top of the script are correct
  • If a query returns "Object does not exist or is not authorized" error, go to the Snowsight UI Admin page and see if the object is showing there. For example, if DESC WAREHOUSE ... failed, go to Admin -> Warehouses page and check if the Warehouse is shown on that page

If the warehouse, database, schema, role and user exists, privileges might be an issue. Run Script #2 and ensure the privileges displayed match or include the following:

Clean up old records

If you're not concerned about keeping older change events, that is, the historic state of records, you can delete them.

To delete older records - assuming there's an appropriate timestamp such as the change event timestamp - you can schedule a query to delete them, replacing { ... } placeholders as required:

DELETE FROM {dataset}.{table}
WHERE
  (NOT EXISTS (
    SELECT 1
    FROM (
      SELECT {primary_key_column, ...}, MAX({timestamp_column}) AS max_timestamp
      FROM {dataset}.{table}
      GROUP BY {primary_key_column, ...}
    ) AS subquery
    WHERE {table}.{primary_key_column} = subquery.{primary_key_column}
      AND {table}.{timestamp_column} = subquery.max_timestamp
  ))