Streaming PostgreSQL to Snowflake with Snowpipe Streaming and Dynamic Tables for Lower Costs

Ricky Thomas
July 26, 2023

Introduction

This hands-on guide is for data teams who wish to ingest data from PostgreSQL to a Snowflake data warehouse with sub-second latency and sub-budget costs. 

Data warehousing has traditionally used slow batch processes but Streamkap utilises the database source transaction log file using change data capture methods for data replication to read the data in real-time without any load being put upon the database. 

The data is then streamed into the data warehouses or data lakes using real-time inserts with an option to transform data before the insertion. The entire pipeline latency is usually sub-second at a lower cost than current ETL methods. 

This guide is for data engineers but will benefit anyone interested in:

  • Moving from batch to streaming processing
  • Reduce their ETL costs by up to 90% by switching from traditional ETL vendors.
  • Build real-time dashboards and customer-facing experiences.
  • Enable data science use cases such as training machine learning models.
  • Support analytical workloads for time-sensitive decisions.
  • Share data in real-time with customers.

The guide includes code examples, screenshots, and step-by-step instructions so that it’s easy to follow.

To replicate the source data, we will be utilising Streamkap, Snowpipe Streaming and Snowflake Dynamic Tables

Setup PostgreSQL

For this blog, we will be using PostgreSQL RDS on AWS but if you require guides for other platforms, visit our docs here - PostgreSQL Change Data Capture Setup Docs.

Enabling Logical Replication

To stream data in real-time from PostgreSQL we will be taking advantage of the PostgreSQL transaction log using Change Data Capture. Learn more at Change Data Capture for Streaming ETL.

Your database may have Change Data Capture enabled already.

  1. Open the Amazon RDS console at https://console.aws.amazon.com/rds/
  2. In the navigation pane, choose Parameter groups
  3. Choose the parameter group used by the DB instance you want to modify
  4. You can't modify a default parameter group. If the DB instance is using a default parameter group, create a new parameter group and associate it with the DB instance
  5. From Parameter group actions, choose Edit
  6. Set the logical_replication parameter value to 1
  7. Set the wal_sender_timeout parameter value to 0
  8. Save changes

Connect to PostgreSQL 

Log in to a PostgreSQL console (such as a SQL workbench or psql) as a superuser. Superusers have the rds_superuser role.

Setup Permissions


-- Replace { ... } placeholders as required
CREATE USER streamkap_user PASSWORD '{password}'; 

-- Create a role for Streamkap
CREATE ROLE streamkap_role; 
GRANT streamkap_role TO streamkap_user; 
GRANT rds_replication TO streamkap_user; 

-- Grant Streamkap permissions on the database, schema and all tables to capture 
GRANT CREATE ON DATABASE "{database}" TO streamkap_role; 
GRANT CREATE, USAGE ON SCHEMA "{schema}" TO streamkap_role; 
GRANT SELECT ON ALL TABLES IN SCHEMA "{schema}" TO streamkap_role; 
ALTER DEFAULT PRIVILEGES IN SCHEMA "{schema}" GRANT SELECT ON TABLES TO streamkap_role;

-- A snapshot table is used to track progress of snapshot/backfills

CREATE TABLE streamkap_signal ( id VARCHAR(255) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(8000) NULL );
GRANT SELECT, UPDATE, INSERT ON streamkap_signal TO streamkap_role;

Create Publication(s) & Slot

Publications contain a set of change events for the tables you include.

Create a publication for your tables. You can create a publication for all tables or be selective.

Create a publication for all tables or specific table(s) to replicate 

Option 1: All Tables


CREATE PUBLICATION streamkap_pub FOR ALL TABLES;

Option 2: Add Specific Tables


CREATE PUBLICATION streamkap_pub FOR TABLE table1, table2, table3, ...; 

PostgreSQL 13 or later, enable the adding of partitioned tables


CREATE PUBLICATION fivetran_pub FOR ALL TABLES WITH (publish_via_partition_root=true);

Add signal table if you did not choose the ‘ALL TABLES’ option above.


ALTER PUBLICATION streamkap_pub ADD TABLE streamkap_signal; 

Create a logical replication slot 


SELECT pg_create_logical_replication_slot('streamkap_pgoutput_slot', 'pgoutput'); 

Verify the table(s) to replicate were added to the publication. If you have no tables created yet, this will return empty at this stage


SELECT * FROM pg_publication_tables;

Log in as the Streamkap user and verify it can read the replication slot by running the following command:


SELECT count(*) FROM pg_logical_slot_peek_binary_changes('streamkap_pgoutput_slot', null, null, 'proto_version', '1', 'publication_names', 'streamkap_pub');

This statement should return a single cell result with a number in it.  

Monitoring

It is important to monitor your PostgreSQL WAL log and the disk space it utilises. When a pipeline is active, this will remain small in size but if there is any disconnect in the pipeline, it can grow very quickly until the pipeline is resumed. It’s important here to provide ample space with auto growth and monitor using datadog for example. 

Consider Access Restrictions

It is likely that your PostgreSQL database is behind a firewall. It is possible to connect via IP whitelisting, SSH tunnels, VPN or AWS Private Link. Visit Connection Options to decide which is best for you.

Setup PostgreSQL Connector in Streamkap

  1. Go to Sources, click New and Choose PostgreSQL
  2. Input
  • Name for your Connector
  • Hostname
  • Port (Default 5432)
  • Username (Username you chose earlier, our scripts use streamkap_user)
  • Password
  • Database Name
  • Add Schemas/Tables. The entire data set can be replicated or choose a subset of source tables.
  • Click Save

You have now added your source database:

Backfilling/Snapshots

As soon as PostgreSQL has been added to Streamkap, the default behaviour is to take a snapshot/backfill of the existing data. 

The rate can vary but as a guide you can expect approximately 500m rows of data per 24 hours on the lowest setting.  You can adjust this up in the app to approx 10x faster taking you to approx 5bn rows per 24 hours and if you need to go beyond that, Streamkap support can increase it further.

Setup Snowflake Snowpipe Streaming

Using Snowsight, let’s start setting the parameters for the scripts. Replace the parameters accordingly such as the database to be used


SET user_password = '{password}'; 
SET warehouse_name = UPPER('STREAMKAP_WH');
SET database_name = UPPER('STREAMKAPDB');
SET schema_name = UPPER('STREAMKAP'); 
SET role_name = UPPER('STREAMKAP_ROLE'); 
SET user_name = UPPER('STREAMKAP_USER'); 
SET network_policy_name = UPPER('STREAMKAP_NETWORK_ACCESS');

-- If your Snowflake account uses custom roles to grant privileges, change these values below
SET sysadmin_role = UPPER('SYSADMIN'); 
SET securityadmin_role = UPPER('SECURITYADMIN');

Create a warehouse for minimal DDL work, i.e. table creation. The warehouse should be x-small, no scaling and auto-suspend on 1 minute


USE ROLE IDENTIFIER($sysadmin_role);
CREATE WAREHOUSE IF NOT EXISTS IDENTIFIER($warehouse_name); 

Create a database and schema for Streamkap


USE WAREHOUSE IDENTIFIER($warehouse_name);
CREATE DATABASE IF NOT EXISTS IDENTIFIER($database_name);
CREATE SCHEMA IF NOT EXISTS IDENTIFIER($schema_name); 

Create a Snowflake role with privileges for the Streamkap connector 


USE ROLE IDENTIFIER($securityadmin_role);
CREATE ROLE IF NOT EXISTS IDENTIFIER($role_name);

Grant privileges on the warehouse


GRANT USAGE ON WAREHOUSE IDENTIFIER($warehouse_name) TO ROLE IDENTIFIER($role_name); 

Grant privileges on the database


GRANT USAGE ON DATABASE IDENTIFIER($database_name) TO ROLE IDENTIFIER($role_name); 

Grant privileges on the database schema 


USE ROLE IDENTIFIER($sysadmin_role); 
USE DATABASE IDENTIFIER($database_name); 
GRANT USAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);
GRANT CREATE TABLE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name); 
GRANT CREATE STAGE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name); 
GRANT CREATE PIPE ON SCHEMA IDENTIFIER($schema_name) TO ROLE IDENTIFIER($role_name);

Create a user for Streamkap


USE ROLE IDENTIFIER($securityadmin_role);
CREATE USER IDENTIFIER($user_name) PASSWORD = $user_password DEFAULT_ROLE = $role_name;

Grant the custom role to the Streamkap user


GRANT ROLE IDENTIFIER($role_name) TO USER IDENTIFIER($user_name);

Set the custom role as the default role for the Streamkap user. If you encounter an 'Insufficient privileges' error, verify the '$securityadmin_role' has OWNERSHIP privilege on the '$user_name'. 


ALTER USER IDENTIFIER($user_name) SET DEFAULT_ROLE = $role_name;

Allow the Streamkap user access to the Snowflake account.

Latest IP addresses can be found at Streamkap IP Addresses 


CREATE NETWORK POLICY IDENTIFIER($network_policy_name) ALLOWED_IP_LIST=('52.33.81.202','52.36.212.154','35.85.76.250');
ALTER USER IDENTIFIER($user_name) SET NETWORK_POLICY = $network_policy_name;

Key Pair Authentication

Snowpipe Streaming uses an RSA key pair for authentication. The instructions below will work on both mac or windows but for windows, you will need something like OpenSSH for Windows installed. 

Generate an encrypted RSA private key. You'll be asked to enter a passphrase which the connector needs.


openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out streamkap_key.p8

Generates the public key, referencing the private key


openssl rsa -in streamkap_key.p8 -pubout -out streamkap_key.pub

You will now have two files (the key pair) named streamkap_key.p8 and streamkap_key.pub in the directory you ran the above commands.

Copy the public key contents to the clipboard after removing the comments.


egrep -v '^-|^$' ./streamkap_key.pub | pbcopy

The public key needs to be assigned to the Snowflake database user created for Streamkap earlier. Replace {public key} with the contents of your public key. 


SET user_name = UPPER('STREAMKAP_USER');
USE ROLE SECURITYADMIN;
ALTER USER STREAMKAP_USER SET RSA_PUBLIC_KEY = '{public key}';

Setup Snowflake Connector in Streamkap

  1. Go to Destinations, click New and Choose Snowflake
  2. Input
  • Name
  • URL
  • Key (Paste the contents of your private key - streamkap_key.p8)
  • Passphrase (the passphrase you set when creating the key pair)
  • Database Name
  • Schema Name

Replicate Data with a Pipeline

A pipeline links a source to a destination

  • Go to Pipelines, click New
  • Choose the PostgreSQL Source
  • Choose the Snowflake Destination
  • Choose to sync all data or selectively choose
  • Save

The pipeline will now begin extracting data from the source and sending it to Snowflake. As you can see in the screenshot below, the pipeline shows a 349 millisecond latency for the entire pipeline.

Verify Data has arrived in Snowflake

We can see our mock new data has arrived in Snowflake

As you can see, there are a few additional columns of data added to the ingested data.

  • RECORD_METADATA - The metadata contains useful information in the event of diagnosing any issues
  • _STREAMKAP_SOURCE_TS_MS - The timestamp in milliseconds the source record was created/modified/deleted
  • _STREAMKAP_TS_MS - The timestamp in milliseconds when Streamkap processed the message
  • __DELETED - True or False whether the record has been deleted at source.
  • _STREAMKAP_OFFSET - This is a unique ID of the record. 

If you would like to query the streamkap date/time fields, they can be converted from milliseconds to something more readable by using the following:


select DATEADD(second, _streamkap_source_ts_ms / 1000, '1970-01-01') AS timestamp from Customer;

Schema Drift with Snowflake Streaming

Schema drift is essential for production pipelines. Streamkap supports the handling of schema drift on all data sources and ensures these schema changes are evolved in Snowflake. More details are available in the Streamkap documentation.

Snowflake Dynamic Tables

Now the data is in Snowflake, you can use various solutions such as DBT to work with the data.

Dynamic Tables are a new way to model your data. Dynamic Tables are tables that are created from the results of a query. This means that you can define a dynamic table without having to create a separate target table or write code to transform and update the data. Instead, you can simply specify the SQL statement that performs the transformation, and Snowflake will automatically update the materialized results through regular (and often incremental) refreshes.

When to use Dynamic Tables?

Dynamic Tables are a good choice for cases where:

  • You want the fastest and most efficient way to model data streamed into Snowflake, for most cases
  • You need to materialize the results of a query on one or more base tables while being able to use functions such as window functions.
  • You would like data to refresh and to track data dependencies without writing code. 
  • You would like the data to be fresh when it’s available rather than having to implement fine-grained control over the refresh schedule.

Using Dynamic Tables with Snowflake Streaming

Snowpipe Streaming currently supports inserts as append-only, which is pretty normal in data streaming. Many companies prefer this since it keeps a tracked version of all changes for a period of time of your choosing. 

However, you will still need some final state tables which return only the most recent records. With Dynamic Tables, we can utilise the window function to create materialized tables with a lag of just 10 seconds.

As a result of updates to a source table, you will insert modified data. Here is the mock data for such a set of changes. 

Here is the code to create a dynamic table on top of our mock Customer data, which has a target lag of 10 seconds.


CREATE OR REPLACE DYNAMIC TABLE Customers_dynamic_table
TARGET_LAG = '10 seconds'
WAREHOUSE = STREAMKAP_WH
AS
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY ID
      ORDER BY _streamkap_source_ts_ms DESC
    ) AS dedupe_id
  FROM STREAMKAP.customers
)
WHERE dedupe_id = 1
AND __deleted = 'false'

The resulting query on the new dynamic table now looks like this

Dynamic Tables have their own heading under the schema.

Some queries will prevent the table refreshing incrementally and will force a background full refresh. See Table Refreshes and Monitoring Dynamic Tables for more detail.

Data Ingestion Costs

Traditionally, streaming data was viewed as a special case where businesses had to truly justify spending because of higher costs and complexity. We hope we have shown you that the complexity is now comprable to using batch ETL processes. With the stack described here we now find that streaming is also less expensive than batch processing. See below for any analysis of the components.

Snowpipe Streaming

Snowpipe streaming is very cost effective for ingesting data. The cost is made up from two charges which are an insertion cost as well as a background process cost. You can expect to be billed somewhere between 10 to 18 Snowflake credits per 1 TB of data ingested. To help give context, 1 TB of data could equate to approximately 5 billion rows of data and thereforem if we assume a Snowflake credit is $2, then we are looking at a cost of up to $36 per month to ingest 5 billion rows of data. 

Dynamic Tables Costs

Dynamic Tables use a warehouse like other operations within Snowflake and are billed the same. Key points:

  • You can utilise existing or new warehouses
  • The cost is dependent on complexity and target lag for each Dynamic Table
  • The ability to reach the target lag may depend on the power of your warehouse

Streamkap Costs

Streamkap charges per GB volume processed, with costs of $1.5GB per GB for a typical deployment. 

Streaming Data with Streamkap and Snowflake

Streaming data has traditionally been expensive and difficult to manage. However, with the combination of Streamkap and Snowflake, these challenges are no longer a problem.

Streamkap is a drop-in replacement for the vendors and methods you currently use to sync data from databases. It is easy to use and can be deployed in minutes. 

I hope this summary blog has convinced you of the benefits of using Streamkap and Snowflake for streaming data. 

If you are interested in a free trial, sign up at Streamkap

Try SaasBox Free for 7 days

No contracts, no credit card.
Get started now
Give us a free call :
The first 7 days are on us
Free hands-on onboarding & support
30 days money back guarantee

Enjoy our content?

Get the best insights, delivered straight to your inbox.