Streaming MongoDB to Snowflake with Snowpipe Streaming and Dynamic Tables and Lowering Costs
May 3, 2024
Introduction
This hands-on guide is for data teams who wish to ingest data from MongoDB to a Snowflake data warehouse with sub-second latency and cost savings to boot.
Data warehousing has traditionally used slow batch processes but Streamkap utilises the database source transaction log file using change data capture methods for data replication to read the data in real-time without any load being put upon the database.
The data is then streamed into the data warehouses or data lakes using real-time inserts with an option to transform data before the insertion. The entire pipeline latency is usually sub-second at a lower cost than current ETL methods.
This guide is for data engineers but will benefit anyone interested in:
- Moving from batch to streaming processing
- Reduce their ETL costs by up to 90% by switching from traditional ETL vendors.
- Build real-time dashboards and customer-facing experiences.
- Enable data science use cases such as training machine learning models.
- Support analytical workloads for time-sensitive decisions.
- Share data in real-time with customers.
The guide includes code examples, screenshots, and step-by-step instructions so that it’s easy to follow.
To replicate the source data, we will be utilising Streamkap, Snowpipe Streaming and Snowflake Dynamic Tables. In this guide, we include instructions for Both MongoDB Atlas and self hosted MongoDB.
Setup MongoDB
For this blog, we will be MongoDB Atlas but if you require guides for other platforms, visit our docs here - MongoDB Change Data Capture Setup Docs.
Obtain Connection String
MongoDB Atlas
- Login to MongoDB Atlas and navigate to the cluster to which that Streamkap should connect
- Click Connect
- Click Connect your Application
- Click Connect your Application
- Copy this connection string
MongoDB Shell
- Connect to your replica set or primary node using the MongoDB shell as an Admin user.
- Execute the db.adminCommand( { replSetGetStatus : 1 } ).members command.
- Copy the host identifier and optionally the alternative host identifiers.
Setup Permissions
MongoDB Atlas
- Login to MongoDB Atlas
- In the left-hand navigation menu, go to Security > Database Access.
- Click New Database User.
- Choose the password authentication method.
- Enter the username and password for the new Streamkap user (Suggest the username streamkap_user)
- In the Database User Privileges drop-down menu, select Grant Specific User Privileges.
- Under Specific Privileges, add the following roles/privileges:
- readAnyDatabase
- read on the local database
- Click Add User.
Enable Snapshots through MongoDB Atlas
You will need to create the collection and give permissions to the Streamkap user/role.
- Create the collection streamkap_signal. This can be done via the MongoDB Atlas web app or Mongo Compass:
- Navigate to the streaming cluster
- Navigate to Collections and create a new collection in the streaming database by clicking the + button next to the database name. Name the collection streamkap_signal.
- Give the Streamkap user permissions to read and readWrite on the streamkap_signal collection
- Return to Database User Privileges drop-down menu, select Grant Specific User Privileges
- Under Specific Privileges, add the following roles/privileges:
- readWrite@<database>.streamkap_signal
MongoDB Shell
- Using MongoDB Shell, connect to your primary node or replica set
- Create a user for Streamkap. Replace password with your choice.
Enable Snapshots through MongoDB Shell
You will need to create the collection and give permissions to the Streamkap user/role.
Consider Access Restrictions
It is likely that your MongoDB database is behind a firewall. It is possible to connect via IP whitelisting, SSH tunnels, VPN or AWS Private Link. Visit Connection Options to decide which is best for you.
Setup MongoDB Connector in Streamkap
- Go to Sources, click New and Choose MongoDB
- Input
- Name for your Connector
- Connection String
- Connection Mode
- Database Name
- Add Schemas/Tables. The entire data set can be replicated or choose a subset of source tables.
- Click Save
You have now added your source database
Backfilling/Snapshots
As soon as MongoDB has been added to Streamkap, the default behaviour is to take a snapshot/backfill of the existing data.
The rate can vary but as a guide you can expect approximately 500m rows of data per 24 hours on the lowest setting. You can adjust this up in the app to approx 10x faster taking you to approx 5bn rows per 24 hours and if you need to go beyond that, Streamkap support can increase it further.
Setup Snowflake Snowpipe Streaming
Using Snowsight, let’s start setting the parameters for the scripts. Replace the parameters accordingly such as the database to be used:
Create a warehouse for minimal DDL work, i.e. table creation. The warehouse should be x-small, no scaling and auto-suspend on 1 minute.
Create a database and schema for Streamkap:
Create a Snowflake role with privileges for the Streamkap connector:
Grant privileges on the warehouse:
Grant privileges on the database:
Grant privileges on the database schema:
Create a user for Streamkap:
Grant the custom role to the Streamkap user:
Set the custom role as the default role for the Streamkap user. If you encounter an 'Insufficient privileges' error, verify the '$securityadmin_role' has OWNERSHIP privilege on the '$user_name'.
Allow the Streamkap user access to the Snowflake account.
Latest IP addresses can be found at Streamkap IP Addresses.
Key Pair Authentication
Snowpipe Streaming uses an RSA key pair for authentication. The instructions below will work on both mac or windows but for windows, you will need something like OpenSSH for Windows installed.
Generate an encrypted RSA private key. You'll be asked to enter a passphrase which the connector needs.
Generates the public key, referencing the private key:
You will now have two files (the key pair) named streamkap_key.p8 and streamkap_key.pub in the directory you ran the above commands.
Copy the public key contents to the clipboard after removing the comments.
The public key needs to be assigned to the Snowflake database user created for Streamkap earlier. Replace {public key} with the contents of your public key.
Setup Snowflake Connector in Streamkap
- Go to Destinations, click New and Choose Snowflake
- Input:
- Name
- URL
- Key (Paste the contents of your private key - streamkap_key.p8)
- Passphrase (the passphrase you set when creating the key pair)
- Database Name
- Schema Name
Replicate Data with a Pipeline
A pipeline links a source to a destination
- Go to Pipelines, click New
- Choose the MongoDB Source
- Choose the Snowflake Destination
- Choose to sync all data or selectively choose
- Save
The pipeline will now begin extracting data from the source and sending it to Snowflake. As you can see in the screenshot below, the pipeline shows a 349 millisecond latency for the entire pipeline.
Verify Data has arrived in Snowflake
We can see our mock new data has arrived in Snowflake.
As you can see, there are a few additional columns of data added to the ingested data.
- RECORD_METADATA - The metadata contains useful information in the event of diagnosing any issues
- _STREAMKAP_SOURCE_TS_MS - The timestamp in milliseconds the source record was created/modified/deleted
- _STREAMKAP_TS_MS - The timestamp in milliseconds when Streamkap processed the message
- __DELETED - True or False whether the record has been deleted at source.
- _STREAMKAP_OFFSET - This is a unique ID of the record.
If you would like to query the Streamkap date/time fields, they can be converted from milliseconds to something more readable by using the following:
Schema Drift with Snowflake Streaming
Schema drift is essential for production pipelines. Streamkap supports the handling of schema drift on all data sources and ensures these schema changes are evolved in Snowflake. More details are available in the Streamkap documentation.
Snowflake Dynamic Tables
Now the data is in Snowflake, you can use various solutions such as DBT to work with the data.
Dynamic Tables are a new way to model your data. Dynamic Tables are tables that are created from the results of a query. This means that you can define a dynamic table without having to create a separate target table or write code to transform and update the data. Instead, you can simply specify the SQL statement that performs the transformation, and Snowflake will automatically update the materialized results through regular (and often incremental) refreshes.
When to use Dynamic Tables?
Dynamic Tables are a good choice for cases where:
- You want the fastest and most efficient way to model data streamed into Snowflake, for most cases
- You need to materialize the results of a query on one or more base tables while being able to use functions such as window functions.
- You would like data to refresh and to track data dependencies without writing code.
- You would like the data to be fresh when it’s available rather than having to implement fine-grained control over the refresh schedule.
Using Dynamic Tables with Snowflake Streaming
Snowpipe Streaming currently supports inserts as append-only, which is pretty normal in data streaming. Many companies prefer this since it keeps a tracked version of all changes for a period of time of your choosing.
However, you will still need some final state tables which return only the most recent records. With Dynamic Tables, we can utilise the window function to create materialized tables with a lag of just 10 seconds.
As a result of updates to a source table, you will insert modified data. Here is the mock data for such a set of changes.
Here is the code to create a dynamic table on top of our mock Customer data, which has a target lag of 10 seconds.
The resulting query on the new dynamic table now looks like this
Dynamic Tables have their own heading under the schema.
Some queries will prevent the table refreshing incrementally and will force a background full refresh. See Table Refreshes and Monitoring Dynamic Tables for more detail.
Data Ingestion Costs
Traditionally, streaming data was viewed as a special case where businesses had to truly justify spending because of higher costs and complexity. We hope we have shown you that the complexity is now comprable to using batch ETL processes. With the stack described here we now find that streaming is also less expensive than batch processing. See below for any analysis of the components.
Snowpipe Streaming
Snowpipe streaming is very cost effective for ingesting data. The cost is made up from two charges which are an insertion cost as well as a background process cost. You can expect to be billed somewhere between 10 to 18 Snowflake credits per 1 TB of data ingested. To help give context, 1 TB of data could equate to approximately 5 billion rows of data and thereforem if we assume a Snowflake credit is $2, then we are looking at a cost of up to $36 per month to ingest 5 billion rows of data.
Dynamic Tables Costs
Dynamic Tables use a warehouse like other operations within Snowflake and are billed the same. Key points:
- You can utilise existing or new warehouses
- The cost is dependent on complexity and target lag for each Dynamic Table
- The ability to reach the target lag may depend on the power of your warehouse
Streamkap Costs
Streamkap charges per GB volume processed, with costs of $1.5GB per GB for a typical deployment.
Streaming Data with Streamkap and Snowflake
Streaming data has traditionally been expensive and difficult to manage. However, with the combination of Streamkap and Snowflake, these challenges are no longer a problem.
Streamkap is a drop-in replacement for the vendors and methods you currently use to sync data from databases. It is easy to use and can be deployed in minutes.
I hope this summary blog has convinced you of the benefits of using Streamkap and Snowflake for streaming data.
If you are interested in a free trial, sign up at Streamkap.