Migrating Data from Batch Ingestion to Streamkap: A Technical Deep Dive

July 18, 2025
At SpotOn, we had an existing pipeline that synchronized data from our production MongoDB to Snowflake. During peak demand, our customers require data to be as close to real-time as possible. To improve our latency (and reduce the cost of our maintenance and infrastructure), we started to look for a different solution.
After researching our options, we decided to migrate our highest volume connectors from a batch ingestion process to Streamkap. Our dbt project has over 2,000 models and the entire process took about a month and a half, which involved transferring the data, refactoring all previous dbt data models to accommodate the new data source, ensuring data validation, and efficiently handling Change Data Capture (CDC). One of the most complex aspects of this migration was refactoring the dbt snapshot models to maintain the integrity of historical data while integrating the new data source.
The benefits of moving from batch ingestion to Streamkap were substantial. Streamkap provided us ultra-low latency via Snowflake streaming, which means data is available for analysis almost in real-time. Additionally, due to Streamkap's efficient data handling and integration capabilities, we experienced lower ingestion costs and reduced Snowflake compute costs.
Refactoring DBT Data Models
After migrating the data to Streamkap, the first task was refactoring all existing DBT data models to point to the new source data. This involved several key steps:
- Updating Source References: All DBT models previously pointed to data ingested from batch ingestion as the data source. We had to update these references to point to Streamkap instead. This was a straightforward but necessary step to ensure that our models pulled data from the correct source, and did not disrupt our business user’s workflows.
- Validating Data Types: Given the differences in how batch ingestion and Streamkap handle certain data types, we meticulously validated all data types to ensure consistency. This involved checking the schema and data types in Snowflake and updating our DBT models accordingly.
- Handling CDC Data: With CDC, every time a record is modified in the database, said record is inserted to Snowflake. While this is efficient to load as we do not have to spin up a Snowflake warehouse to upsert, we had to take great care to ensure we always worked with the most recent state of a record. We utilized subqueries to efficiently scan tables using Snowflake column metadata compared to using qualify with window functions which became costly with high velocity datasets.
{{ config(materialized=view) }}
select
_id
, _streamkap_source_ts_ms
-- all other columns
from {{ source('source_schema', 'source_table') }}
where
-- get most recently inserted record
and (_id, _streamkap_offset) in (
select
_id
, max_by(_streamkap_offset, _streamkap_source_ts_ms) as max_streamkap_offset_by_source_ts_ms
from
{{ source('source_schema', 'source_table') }}
group by 1
)
Engineering DBT Snapshot Models
With CDC streams, snapshots are no longer needed since all changes to a record are sent directly to the data warehouse, giving us the ability to view a full changelog. However, we still needed to ensure users could view historical data as it was at any point in time. One of the biggest challenges during our migration was integrating the historical data from our existing DBT snapshot models with the new CDC data. The main goal was to maintain the integrity of the historical snapshot data while adding new data from Streamkap. Here’s how we did it, step by step:
Steps to Refactor Snapshot Models
- Common Table Expressions (CTE):
The CTE was used to preprocess Streamkap data, adding necessary fields and ensuring data was in the correct format for DBT snapshots. This step was crucial for standardizing the data and making it compatible with our existing snapshot models.
- We started by creating a CTE to handle data from the new Streamkap source.
- This CTE included all relevant fields from the Streamkap data, along with additional derived fields required for unioning with DBT snapshots, such as a unique identifier (dbt_scd_id) and timestamps (dbt_updated_at and dbt_valid_from).
- Generating Unique Identifiers and Timestamps:
Creating a unique identifier involves concatenating key fields and hashing them. This ensured that each record could be uniquely tracked across different sources. Timestamps were standardized using conversion functions, allowing us to handle dates and times consistently.
- We concatenated relevant fields and applied an MD5 hash to derive a dbt_scd_id for each record, ensuring uniqueness.
- We calculated the dbt_updated_at ( _streamkap_ts_ms) timestamp by converting the timestamp from Streamkap to a standard format.
- The dbt_valid_from (_streamkap_source_ts_ms) timestamp was determined using the earliest occurrence of the record, ensuring accurate tracking of when each record was first seen.
- Integrating Existing Snapshot Data:
The UNION ALL operation combined historical data from batch ingestion with new data from Streamkap. This approach preserved the continuity of our snapshots and ensured that no data was lost during the transition.
- The next step was to combine the existing snapshot data (originally from batch ingestion) with the new data from Streamkap.
- We used a UNION ALL operation to merge these datasets. This allowed us to preserve historical data from the existing snapshots while integrating new records from Streamkap.
- Handling Historical Data:
Using a window function, we derived the activity windows based on timestamps from each record. This function looked ahead to the next record’s activity timestamp, to derive dbt_valid_from and dbt_valid_to windows.
- We ensured that historical records retained their original dbt_valid_to timestamps for the existing snapshot data.
- For the new Streamkap data, we calculated the dbt_valid_to timestamp using a window function. This function looked at the next available dbt_valid_from timestamp for each record, ensuring that the timeline for each record was continuous and accurate.
Sample of Snapshot Union:
{{ config(materialized='view') }}
--migration took place on 2024-05-28, which is represented in hardcoded values below
with streamkap_cte as (
select
_id
, 'streamkap' as source
, md5(concat(_id, _streamkap_source_ts_ms)) as dbt_scd_id
, dateadd(millisecond, _streamkap_ts_ms, '1970-01-01'::timestamp) as dbt_updated_at
--required to set dbt_valid_from/to in cases when the streamkap data is the first
--time we see a record for the entity, otherwise the dbt_valid_from/to will always
--be set to '2024-05-28' when it might not have been seen for the first time till
--much later
, iff(
row_number() over (partition by _id order by _streamkap_source_ts_ms) = 1
, greatest(
'2024-05-28'::timestamp
, dateadd(millisecond, _streamkap_source_ts_ms, '1970-01-01'::timestamp)
)
, dateadd(millisecond, _streamkap_source_ts_ms, '1970-01-01'::timestamp)
) as dbt_valid_from
from {{ source('source_schema', 'source_table') }}
where
not coalesce(__deleted, false)
and dateadd(millisecond, _streamkap_source_ts_ms, '1970-01-01'::timestamp) >= '2024-05-29'
)
select
_id
, 'snapshot' as source
, dbt_scd_id
, dbt_updated_at
, dbt_valid_from
, coalesce(dbt_valid_to, '2024-05-28') as dbt_valid_to
from {{ ref('existing_batch_snapshot') }}
where
not coalesce(_deleted, false)
and dbt_valid_from <= '2024-05-28'
union all
select
*
, lead(dbt_valid_from) over (partition by _id order by dbt_valid_from) as dbt_valid_to
from
streamkap_cte
Overall Results
The consequences of our migration were substantial. After the transition to Streamkap, we had:
- Simplified Data Pipelines: This project focused on a core dataset that needed to be ingested into our data warehouse and other databases used by customer-facing applications. Streamkap allows one source to send data to multiple destinations easily.
- Ease of Streaming: As the organization saw how easy it was to set up and maintain streaming pipelines, more use cases emerged. This opened up new opportunities for both internal analytics and customer-facing reporting.
- Lower Cost: Switching to Streamkap cut our data infrastructure costs by 3x.
- Improved Data Capabilities: Previously, the dbt snapshot schedule limited how well we could track changes in records. With streaming data, we can now see all changes to the state of a record at any point in time.
