Prerequisites
A Snowflake account grantedACCOUNTADMIN
system-defined role or custom role with privileges to:
CREATE WAREHOUSE, DATABASE, SCHEMA
CREATE ROLE, USER
CREATE NETWORK POLICY
Snowflake Setup
It’s recommended to create a separate user and role for Streamkap to access your Snowflake database. Below is an example script that does that.We do not use
CREATE OR REPLACE
in our scripts. This is to avoid destroying something by mistake that already exists in your Snowflake account.Key Pair Authentication
The connector relies on an RSA key pair for authentication which you can generate using OpenSSH. Below are example scripts that do that. You can modify them to suit your security policies, but please ensure the key pair meets these minimum requirements:- RSA 2048-bit
- PKCS#8 key format
SSH key generation on WindowsSnowflake does not support keys generated by PuTTY Key Generator.One of the easiest and quickest ways to generate a valid OpenSSL key is via Git Bash which is installed by default with Git for Windows. After installation, you can open a Git Bash prompt by Left Shift + Right Clicking on your Desktop, choosing “Open Git Bash here” and then executing the OpenSSL commands below.If you have any issues following these instructions or are unable to install Git for Windows, please contact us.
.p8
) and the other public (usually has the extension .pub
). Store both files in a secure place.
Once generated, the public key needs to be assigned to the Snowflake database user created for Streamkap earlier.
This command will copy the public key you generated to your clipboard.
Streamkap Setup
Follow these steps to configure your new connector:1. Create the Destination
- Navigate to Add Connectors.
- Choose Snowflake.
2. Connection Settings
- Name: Enter a name for your connector.
-
Snowflake URL: The URL for accessing your Snowflake account. This URL must include your account identifier. Note that the protocol (
https://
) and port number are optional. - Username: User login name for the Snowflake account (Case sensitive).
-
Private Key: Provide the private key you generated by using the command below.
- Key secured with passphrase?: If checked (default), provide your SSH key’s passphrase, otherwise, uncheck for SSH keys without passphrase.
- Private Key Passphrase: The passphrase is used to decrypt the private key.
- Key secured with passphrase?: If checked (default), provide your SSH key’s passphrase, otherwise, uncheck for SSH keys without passphrase.
- Database Name: The name of the database to use (Case sensitive).
- Schema Name: The name of the schema where tables will be created (Case sensitive).
- Snowflake Role: The name of an existing role with necessary privileges (for Streamkap) assigned to the user specified by Username (Case sensitive).
3. Ingestion Settings
-
Ingestion Mode: How the Connector loads data into the Snowflake tables. See Upsert mode for further details.
Changing ingestion mode
append
andupsert
modes use different, incompatible methods for loading data into the Snowflake tables. If - for whatever reason - you want to change modes for an existing Snowflake Connector, please create a new Snowflake Destination instead i.e. a separate destination forappend
, and forupsert
.-
append
mode:- Use Dynamic Tables: Specifies whether the connector should create Dynamic Tables & Cleanup Tasks. See Dynamic Tables.
- Custom SQL Template - Dynamic Table Creation: These template queries run for each table the first time a record is streamed for them.
- Custom SQL Template - Dynamic Table Name: Can be used as
{{dynamicTableName}}
in dynamic table creation SQL. It can use input JSON data for more complex mappings and logic. - Custom SQL Template - Input JSON data: Use
{"TABLE_DATA": {"{table_name}": {"{key}": "{value}"}, ...}, ...}
to set table specific data. This data will be available in the custom SQL templates e.g.SELECT {{key}}
. - Auto QA Deduplication Table Mapping: Mapping between the tables that store append-only data and the deduplicated tables. The dedupeTable in mapping will be used for QA scripts. If dedupeSchema is not specified, the deduplicated table will be created in the same schema as the raw table.
- Use Dynamic Tables: Specifies whether the connector should create Dynamic Tables & Cleanup Tasks. See Dynamic Tables.
-
upsert
mode:- Delete Mode: Specifies whether the connector processes deletions (or tombstone events) and removes the corresponding row from the database.
- Use Hybrid Tables: Specifies whether the connector should create Hybrid Tables.
-
Troubleshooting
Dynamic Tables
Snowflake Dynamic Tables are materialized views which consist of the latest records inserted into Snowflake. Streamkap’s Snowflake Connector creates them—if enabled—for each table the first time a record is streamed for them. A Snowflake Task is also created for each dynamic table to clean up older entries periodically. Below is the default template—shown in the Streamkap UI. You can modify it there to suit your requirements.Offset Management (Append Mode)
UI Coming Soon: The Streamkap app will soon expose consumer group and offset management directly in the UI. Currently, offset troubleshooting and resets require assistance from Streamkap support, who can coordinate both Kafka and Snowflake offset resets safely.
Understanding dual offset tracking
Understanding dual offset tracking
Two offset systems work together:
-
Kafka Consumer Group Offsets (
connect-<connector-id>
)- Tracks which Kafka messages the connector has consumed from each topic partition
- Managed by Kafka Connect framework
- Consumer lag = difference between latest message in topic and last consumed offset
-
Snowpipe Streaming Channel Offsets
- Tracks which messages have been successfully ingested into Snowflake tables
- Each topic partition creates a Snowflake channel (e.g.,
TOPIC_0
for partition 0) - Managed within Snowflake using
SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN
- View channels with
SHOW CHANNELS
in your Snowflake schema
Common offset issues
Common offset issues
Negative consumer lag scenarios:
- Kafka topic is deleted and recreated, but consumer group retains old offsets
- Consumer group “remembers” offsets from the old topic that no longer exist
- Connector appears “ahead” of the current topic messages
- Snowflake channels may retain offsets that reference deleted/recreated topics
- Channels can become “stuck” with offsets pointing to non-existent messages
- Data ingestion stops even though Kafka consumer group appears healthy
- Connector shows as running but no new data appears in Snowflake
- Negative or unexpectedly high consumer lag
- Channels showing stale offset positions in
SHOW CHANNELS
Offset reset strategies
Offset reset strategies
When offset resets are needed:
- After recreating Kafka topics
- Negative consumer lag that doesn’t resolve automatically
- Connector stuck and not processing new messages
- Need to reprocess historical data
- To earliest: Reprocess all available messages in the topic
- To latest: Skip to newest messages (ignore historical data)
- To timestamp: Start from a specific point in time
- To offset: Jump to exact message position
Coordination Required: Both Kafka consumer group AND Snowflake channel offsets typically need to be reset together to avoid data gaps or duplicates.
Impact and best practices
Impact and best practices
Before resetting offsets, consider:
- Data duplication: Resetting to earlier positions may cause duplicate records in Snowflake
- Processing time: Reprocessing large volumes of historical data takes time
- Snowflake costs: Additional data processing increases Snowpipe Streaming and storage costs
- Coordination complexity: Both Kafka and Snowflake offsets need proper alignment
- Use Dynamic Tables: Automatically handle deduplication from any offset resets
- Reset to latest: For new deployments where historical data isn’t needed
- Coordinate resets: Ensure both Kafka consumer group and Snowflake channel offsets are reset together
- Monitor channels: Regularly check
SHOW CHANNELS
output for channel health - Test thoroughly: After offset resets, verify data flow and check for duplicates
Upsert mode
Snowflake destination connector can run in upsert mode. This mode switches off the use of snowpipe streaming and connector uses periodicMERGE INTO
statements to upsert data into target snowflake tables. Dynamic tables or other de-duplication mechanisms will not be necessary when using upsert mode.
Snowflake costsCurrently upsert mode requires a warehouse to be running so overall the costs will be higher compared to append mode which uses snowpipe streaming.
Getting the Snowflake URL
You can also run the script below in a Snowflake worksheet to return the Snowflake URL. You need to be logged into Snowflake with an account grantedORGADMIN
system-defined role to run this script.
Snowflake Setup scripts failing
There can be many reasons for them to fail, but the scripts below can help you diagnose the issues. You need to be logged into Snowflake with an account grantedACCOUNTADMIN
system-defined role or custom role with equivalent privileges to run these scripts.
Copy paste the scripts below into Snowflake worksheets. Change the object names at the top as required and run all queries.
- Check in the top right corner (next to Share and Run buttons) of the Snowflake Worksheets that the role is set to
ACCOUNTADMIN
, or a custom role with equivalent privileges - Depending on which query failed or returned no results, check the object names at the top of the script are correct
- If a query returns
"Object does not exist or is not authorized"
error, go to the Snowsight UI Admin page and see if the object is showing there. For example, ifDESC WAREHOUSE ...
failed, go to Admin -> Warehouses page and check if the Warehouse is shown on that page