Streaming MongoDB to Snowflake with Snowpipe Streaming and Dynamic Tables and Lowering Costs

Ricky Thomas
July 26, 2023

Introduction

This hands-on guide is for data teams who wish to ingest data from MongoDB to a Snowflake data warehouse with sub-second latency and cost savings to boot. 

Data warehousing has traditionally used slow batch processes but Streamkap utilises the database source transaction log file using change data capture methods for data replication to read the data in real-time without any load being put upon the database.

The data is then streamed into the data warehouses or data lakes using real-time inserts with an option to transform data before the insertion. The entire pipeline latency is usually sub-second at a lower cost than current ETL methods. 

This guide is for data engineers but will benefit anyone interested in:

  • Moving from batch to streaming processing
  • Reduce their ETL costs by up to 90% by switching from traditional ETL vendors.
  • Build real-time dashboards and customer-facing experiences.
  • Enable data science use cases such as training machine learning models.
  • Support analytical workloads for time-sensitive decisions.
  • Share data in real-time with customers.

The guide includes code examples, screenshots, and step-by-step instructions so that it’s easy to follow.

To replicate the source data, we will be utilising Streamkap, Snowpipe Streaming and Snowflake Dynamic Tables. In this guide, we include instructions for Both MongoDB Atlas and self hosted MongoDB.

Setup MongoDB

For this blog, we will be MongoDB Atlas but if you require guides for other platforms, visit our docs here - MongoDB Change Data Capture Setup Docs.

Obtain Connection String

MongoDB Atlas
  • Login to MongoDB Atlas and navigate to the cluster to which that Streamkap should connect
  • Click Connect
  • Click Connect your Application
  • Click Connect your Application
  • Copy this connection string
MongoDB Shell
  • Connect to your replica set or primary node using the MongoDB shell as an Admin user.
  • Execute the db.adminCommand( { replSetGetStatus : 1 } ).members command.
  • Copy the host identifier and optionally the alternative host identifiers.

Setup Permissions

MongoDB Atlas
  • Login to MongoDB Atlas
  • In the left-hand navigation menu, go to Security > Database Access.
  • Click New Database User.
  • Choose the password authentication method.
  • Enter the username and password for the new Streamkap user (Suggest the username streamkap_user)
  • In the Database User Privileges drop-down menu, select Grant Specific User Privileges.
  • Under Specific Privileges, add the following roles/privileges:
  • readAnyDatabase
  • read on the local database
  • Click Add User.
Enable Snapshots through MongoDB Atlas

You will need to create the collection and give permissions to the Streamkap user/role.

  • Create the collection streamkap_signal. This can be done via the MongoDB Atlas web app or Mongo Compass:
  • Navigate to the streaming cluster
  • Navigate to Collections and create a new collection in the streaming database by clicking the + button next to the database name. Name the collection streamkap_signal.
  • Give the Streamkap user permissions to read and readWrite on the streamkap_signal collection
  • Return to Database User Privileges drop-down menu, select Grant Specific User Privileges
  • Under Specific Privileges, add the following roles/privileges:
  • readWrite@<database>.streamkap_signal
MongoDB Shell
  • Using MongoDB Shell, connect to your primary node or replica set
  • Create a user for Streamkap. Replace password with your choice.

use admin 
db.createUser({ 
user: "streamkap_user",
pwd: "",
roles: [ "readAnyDatabase", {role: "read", db: "local"} ] 
})
Enable Snapshots through MongoDB Shell

You will need to create the collection and give permissions to the Streamkap user/role.


db.createCollection("streamkap_signal") 

db.grantRolesToUser("streamkap_user", [ { role: "read", db: "{database}" }, { role: "readWrite", db: "{database}", collection: "streamkap_signal" } ])

--When later setting up the connector, you must include this collection

Consider Access Restrictions

It is likely that your MongoDB database is behind a firewall. It is possible to connect via IP whitelisting, SSH tunnels, VPN or AWS Private Link. Visit Connection Options to decide which is best for you.

Setup MongoDB Connector in Streamkap

  1. Go to Sources, click New and Choose MongoDB
  2. Input
  • Name for your Connector
  • Connection String
  • Connection Mode
  • Database Name
  • Add Schemas/Tables. The entire data set can be replicated or choose a subset of source tables.
  • Click Save

You have now added your source database

Backfilling/Snapshots

As soon as MongoDB has been added to Streamkap, the default behaviour is to take a snapshot/backfill of the existing data.

The rate can vary but as a guide you can expect approximately 500m rows of data per 24 hours on the lowest setting. You can adjust this up in the app to approx 10x faster taking you to approx 5bn rows per 24 hours and if you need to go beyond that, Streamkap support can increase it further.

Setup Snowflake Snowpipe Streaming

Using Snowsight, let’s start setting the parameters for the scripts. Replace the parameters accordingly such as the database to be used:


SET user_password = '{password}'; 
SET warehouse_name = UPPER('STREAMKAP_WH');
SET database_name = UPPER('STREAMKAPDB');
SET schema_name = UPPER('STREAMKAP'); 
SET role_name = UPPER('STREAMKAP_ROLE'); 
SET user_name = UPPER('STREAMKAP_USER'); 
SET network_policy_name = UPPER('STREAMKAP_NETWORK_ACCESS');

-- If your Snowflake account uses custom roles to grant privileges, change these values below
SET sysadmin_role = UPPER('SYSADMIN'); 
SET securityadmin_role = UPPER('SECURITYADMIN');

Create a warehouse for minimal DDL work, i.e. table creation. The warehouse should be x-small, no scaling and auto-suspend on 1 minute.


USE ROLE IDENTIFIER($sysadmin_role);
CREATE WAREHOUSE IF NOT EXISTS IDENTIFIER($warehouse_name); 

Create a database and schema for Streamkap:


USE WAREHOUSE IDENTIFIER($warehouse_name);
CREATE DATABASE IF NOT EXISTS IDENTIFIER($database_name);
CREATE SCHEMA IF NOT EXISTS IDENTIFIER($schema_name); 

Create a Snowflake role with privileges for the Streamkap connector:


USE ROLE IDENTIFIER($securityadmin_role);
CREATE ROLE IF NOT EXISTS IDENTIFIER($role_name);

Grant privileges on the warehouse:


GRANT USAGE ON WAREHOUSE IDENTIFIER($warehouse_name) TO ROLE IDENTIFIER($role_name); 

Grant privileges on the database:


GRANT USAGE ON DATABASE IDENTIFIER($database_name) TO ROLE IDENTIFIER($role_name); 

Grant privileges on the database schema:


USE ROLE IDENTIFIER($sysadmin_role); 
USE DATABASE IDENTIFIER($database_name); 
GRANT USAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE TABLE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name); 
GRANT CREATE STAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name); 
GRANT CREATE PIPE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);

Create a user for Streamkap:


USE ROLE IDENTIFIER($securityadmin_role);
CREATE USER IDENTIFIER($user_name) PASSWORD = $user_password DEFAULT_ROLE = $role_name;

Grant the custom role to the Streamkap user:


GRANT ROLE IDENTIFIER($role_name) TO USER IDENTIFIER($user_name);

Set the custom role as the default role for the Streamkap user. If you encounter an 'Insufficient privileges' error, verify the '$securityadmin_role' has OWNERSHIP privilege on the '$user_name'. 


ALTER USER IDENTIFIER($user_name) SET DEFAULT_ROLE = $role_name;

Allow the Streamkap user access to the Snowflake account.

Latest IP addresses can be found at Streamkap IP Addresses.


CREATE NETWORK POLICY IDENTIFIER($network_policy_name) ALLOWED_IP_LIST=('52.33.81.202','52.36.212.154','35.85.76.250');
ALTER USER IDENTIFIER($user_name) SET NETWORK_POLICY = $network_policy_name;

Key Pair Authentication

Snowpipe Streaming uses an RSA key pair for authentication. The instructions below will work on both mac or windows but for windows, you will need something like OpenSSH for Windows installed.

Generate an encrypted RSA private key. You'll be asked to enter a passphrase which the connector needs.


openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out streamkap_key.p8

Generates the public key, referencing the private key:


openssl rsa -in streamkap_key.p8 -pubout -out streamkap_key.pub

You will now have two files (the key pair) named streamkap_key.p8 and streamkap_key.pub in the directory you ran the above commands.

Copy the public key contents to the clipboard after removing the comments.


egrep -v '^-|^$' ./streamkap_key.pub | pbcopy

The public key needs to be assigned to the Snowflake database user created for Streamkap earlier. Replace {public key} with the contents of your public key. 


SET user_name = UPPER('STREAMKAP_USER');
USE ROLE SECURITYADMIN;
ALTER USER STREAMKAP_USER SET RSA_PUBLIC_KEY = '{public key}';

Setup Snowflake Connector in Streamkap

  1. Go to Destinations, click New and Choose Snowflake
  2. Input:
  • Name
  • URL
  • Key (Paste the contents of your private key - streamkap_key.p8)
  • Passphrase (the passphrase you set when creating the key pair)
  • Database Name
  • Schema Name

Replicate Data with a Pipeline

A pipeline links a source to a destination

  • Go to Pipelines, click New
  • Choose the MongoDB Source
  • Choose the Snowflake Destination
  • Choose to sync all data or selectively choose
  • Save

The pipeline will now begin extracting data from the source and sending it to Snowflake. As you can see in the screenshot below, the pipeline shows a 349 millisecond latency for the entire pipeline.

Verify Data has arrived in Snowflake

We can see our mock new data has arrived in Snowflake.

As you can see, there are a few additional columns of data added to the ingested data.

  • RECORD_METADATA - The metadata contains useful information in the event of diagnosing any issues
  • _STREAMKAP_SOURCE_TS_MS - The timestamp in milliseconds the source record was created/modified/deleted
  • _STREAMKAP_TS_MS - The timestamp in milliseconds when Streamkap processed the message
  • __DELETED - True or False whether the record has been deleted at source.
  • _STREAMKAP_OFFSET - This is a unique ID of the record. 

If you would like to query the Streamkap date/time fields, they can be converted from milliseconds to something more readable by using the following:


select DATEADD(second, _streamkap_source_ts_ms / 1000, '1970-01-01') AS timestamp from Customer;

Schema Drift with Snowflake Streaming

Schema drift is essential for production pipelines. Streamkap supports the handling of schema drift on all data sources and ensures these schema changes are evolved in Snowflake. More details are available in the Streamkap documentation.

Snowflake Dynamic Tables

Now the data is in Snowflake, you can use various solutions such as DBT to work with the data.

Dynamic Tables are a new way to model your data. Dynamic Tables are tables that are created from the results of a query. This means that you can define a dynamic table without having to create a separate target table or write code to transform and update the data. Instead, you can simply specify the SQL statement that performs the transformation, and Snowflake will automatically update the materialized results through regular (and often incremental) refreshes.

When to use Dynamic Tables?

Dynamic Tables are a good choice for cases where:

  • You want the fastest and most efficient way to model data streamed into Snowflake, for most cases
  • You need to materialize the results of a query on one or more base tables while being able to use functions such as window functions.
  • You would like data to refresh and to track data dependencies without writing code. 
  • You would like the data to be fresh when it’s available rather than having to implement fine-grained control over the refresh schedule.

Using Dynamic Tables with Snowflake Streaming

Snowpipe Streaming currently supports inserts as append-only, which is pretty normal in data streaming. Many companies prefer this since it keeps a tracked version of all changes for a period of time of your choosing. 

However, you will still need some final state tables which return only the most recent records. With Dynamic Tables, we can utilise the window function to create materialized tables with a lag of just 10 seconds.

As a result of updates to a source table, you will insert modified data. Here is the mock data for such a set of changes.

Here is the code to create a dynamic table on top of our mock Customer data, which has a target lag of 10 seconds.


CREATE OR REPLACE DYNAMIC TABLE Customers_dynamic_table
TARGET_LAG = '10 seconds'
WAREHOUSE = STREAMKAP_WH
AS
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY ID
      ORDER BY _streamkap_source_ts_ms DESC
    ) AS dedupe_id
  FROM STREAMKAP.customers
)
WHERE dedupe_id = 1
AND __deleted = 'false'

The resulting query on the new dynamic table now looks like this

Dynamic Tables have their own heading under the schema.

Some queries will prevent the table refreshing incrementally and will force a background full refresh. See Table Refreshes and Monitoring Dynamic Tables for more detail.

Data Ingestion Costs

Traditionally, streaming data was viewed as a special case where businesses had to truly justify spending because of higher costs and complexity. We hope we have shown you that the complexity is now comprable to using batch ETL processes. With the stack described here we now find that streaming is also less expensive than batch processing. See below for any analysis of the components.

Snowpipe Streaming

Snowpipe streaming is very cost effective for ingesting data. The cost is made up from two charges which are an insertion cost as well as a background process cost. You can expect to be billed somewhere between 10 to 18 Snowflake credits per 1 TB of data ingested. To help give context, 1 TB of data could equate to approximately 5 billion rows of data and thereforem if we assume a Snowflake credit is $2, then we are looking at a cost of up to $36 per month to ingest 5 billion rows of data.

Dynamic Tables Costs

Dynamic Tables use a warehouse like other operations within Snowflake and are billed the same. Key points:

  • You can utilise existing or new warehouses
  • The cost is dependent on complexity and target lag for each Dynamic Table
  • The ability to reach the target lag may depend on the power of your warehouse

Streamkap Costs

Streamkap charges per GB volume processed, with costs of $1.5GB per GB for a typical deployment. 

Streaming Data with Streamkap and Snowflake

Streaming data has traditionally been expensive and difficult to manage. However, with the combination of Streamkap and Snowflake, these challenges are no longer a problem.

Streamkap is a drop-in replacement for the vendors and methods you currently use to sync data from databases. It is easy to use and can be deployed in minutes. 

I hope this summary blog has convinced you of the benefits of using Streamkap and Snowflake for streaming data. 

If you are interested in a free trial, sign up at Streamkap.

Try SaasBox Free for 7 days

No contracts, no credit card.
Get started now
Give us a free call :
The first 7 days are on us
Free hands-on onboarding & support
30 days money back guarantee

Enjoy our content?

Get the best insights, delivered straight to your inbox.