<--- Back to all resources
Change Log to Snapshot: Materializing CDC Streams into Current State
How to convert CDC change log streams into point-in-time snapshots representing the current state of your data. Covers compaction, upsert patterns, last-value-per-key semantics, and Flink deduplication.
Every CDC pipeline starts with a change log: a stream of inserts, updates, and deletes that represents the history of changes to a source database table. But most downstream systems do not want the history. They want the current state. A dashboard needs today’s inventory counts, not every adjustment that led to them. An API cache needs the latest customer record, not a stack of 47 updates.
Converting a change log into a snapshot, the current state of each row, is one of the most fundamental operations in streaming data engineering. It sounds simple, and the basic idea is simple: for each key, keep only the latest value. But the details matter enormously, especially when handling deletes, late-arriving data, and multi-table joins on snapshot state.
Change Logs vs. Snapshots: The Mental Model
A change log for a products table might look like this over the course of a day:
10:00 INSERT (id=1, name="Widget", price=9.99, stock=100)
10:15 INSERT (id=2, name="Gadget", price=19.99, stock=50)
11:00 UPDATE (id=1, name="Widget", price=10.99, stock=100) -- price change
12:30 UPDATE (id=1, name="Widget", price=10.99, stock=85) -- stock decrease
14:00 DELETE (id=2) -- discontinued
15:00 UPDATE (id=1, name="Widget Pro", price=12.99, stock=85) -- rebrand
The snapshot at 15:01 is:
(id=1, name="Widget Pro", price=12.99, stock=85)
Product 2 is gone (deleted). Product 1 reflects its latest state. All the intermediate history is collapsed into a single row per surviving key.
The challenge is performing this collapse continuously and correctly, especially when:
- Events arrive out of order
- Deletes need to propagate
- Multiple downstream systems need the snapshot at the same time
- You also need to keep the full change log for audit or time-travel
Kafka Log Compaction: The Built-In Approach
Kafka has a built-in mechanism for maintaining latest-value-per-key: log compaction. When compaction is enabled on a topic, Kafka’s background cleaner periodically removes older records for each key, keeping only the most recent one.
How It Works
Before compaction:
Offset 0: key=1, value={name: "Widget", price: 9.99}
Offset 1: key=2, value={name: "Gadget", price: 19.99}
Offset 2: key=1, value={name: "Widget", price: 10.99}
Offset 3: key=1, value={name: "Widget Pro", price: 12.99}
After compaction:
Offset 1: key=2, value={name: "Gadget", price: 19.99}
Offset 3: key=1, value={name: "Widget Pro", price: 12.99}
A consumer reading from the beginning of a compacted topic sees the latest value for each key, effectively reading a snapshot.
Handling Deletes with Tombstones
To represent a delete in a compacted topic, the producer writes a record with the key and a null value. This is called a tombstone. During compaction, Kafka keeps the tombstone for a configurable period (delete.retention.ms, default 24 hours), then removes it entirely. After the tombstone is removed, the key simply does not exist in the topic.
Offset 4: key=2, value=null -- tombstone, product 2 deleted
CDC connectors like Debezium (and Streamkap) produce tombstones automatically for DELETE operations, so compacted topics correctly reflect deletions.
Limitations of Log Compaction
Compaction is useful but has significant limitations:
1. It is eventually consistent. Compaction runs in the background. There is no guarantee of when it will run or how long it will take. Between compaction runs, the topic may contain multiple values for the same key. A consumer that reads during this window sees the full history, not just the latest state.
2. No point-in-time snapshots. You get “roughly latest” state, not “state as of 14:00.” Once older records are compacted away, they are gone forever.
3. No business logic. Compaction keeps the physically latest record per key. You cannot apply conditions like “keep the latest record where status != ‘draft’” during compaction.
4. Interacts poorly with consumer groups. If a consumer falls behind and compaction removes records it has not yet processed, it might miss intermediate state changes. For CDC, this can mean missing an update that changed a field the consumer cares about.
For simple use cases where you need “roughly the latest state” and can tolerate eventual consistency, compaction works well. For anything more precise, you need a stream processor.
Flink Deduplication: The Precise Approach
Flink’s deduplication pattern materializes exact last-value-per-key from a changelog stream in real time. The standard technique uses the ROW_NUMBER() window function.
Basic Deduplication Pattern
-- Source: CDC stream from Streamkap
CREATE TABLE products_cdc (
id BIGINT,
name STRING,
price DECIMAL(10, 2),
stock INT,
op STRING, -- 'c' (create), 'u' (update), 'd' (delete)
ts_ms BIGINT, -- source database timestamp
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'postgres.public.products',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
-- Deduplication: keep latest event per key
CREATE VIEW products_latest AS
SELECT id, name, price, stock, op
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY proc_time DESC) AS rn
FROM products_cdc
) WHERE rn = 1;
This query partitions the stream by id, orders by processing time descending, and keeps only the first row (the latest event) per key. Flink optimizes this pattern internally: it does not actually maintain a full window of events per key. It keeps only the current “winner” in state and updates it when a newer event arrives.
Handling Deletes
The basic deduplication pattern above keeps the latest event regardless of operation type. If the latest event is a DELETE, you still see it in the output with op = 'd'. Whether you want to:
Include deletes as tombstone markers (useful for propagating deletes to downstream systems):
SELECT id, name, price, stock, op
FROM products_latest;
-- Row with op='d' is present, downstream handles it
Exclude deleted rows (useful for materializing a “live rows only” view):
SELECT id, name, price, stock
FROM products_latest
WHERE op <> 'd';
Propagate deletes as actual nulls (useful for upsert-kafka sinks):
-- In the sink definition, configure to emit tombstones for deletes
When using the Debezium format in Flink, the delete semantics are handled more elegantly. Flink’s debezium-json format automatically interprets CDC operations and produces proper retractions:
CREATE TABLE products_cdc (
id BIGINT,
name STRING,
price DECIMAL(10, 2),
stock INT
) WITH (
'connector' = 'kafka',
'topic' = 'postgres.public.products',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
-- This is already a changelog table. Flink understands
-- inserts, updates, and deletes natively.
SELECT * FROM products_cdc;
No ROW_NUMBER() needed. Flink maintains the latest state per key automatically because it understands the Debezium changelog format. This is the preferred approach when your source is a proper CDC stream.
The Upsert Pattern for Destination Tables
Once you have the latest state per key, you need to write it somewhere. The upsert pattern is the standard approach: INSERT if the key is new, UPDATE if it exists, DELETE if the event is a tombstone.
Upsert to a Database
CREATE TABLE products_snapshot (
id BIGINT,
name STRING,
price DECIMAL(10, 2),
stock INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/analytics',
'table-name' = 'products_snapshot',
'driver' = 'org.postgresql.Driver'
);
INSERT INTO products_snapshot
SELECT id, name, price, stock
FROM products_cdc;
Flink’s JDBC sink automatically translates the changelog operations into SQL upserts (INSERT ... ON CONFLICT UPDATE for PostgreSQL, MERGE for other databases).
Upsert to a Compacted Kafka Topic
If the snapshot needs to be consumed by multiple downstream services, write to a compacted Kafka topic:
CREATE TABLE products_snapshot_topic (
id BIGINT,
name STRING,
price DECIMAL(10, 2),
stock INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'products-snapshot',
'key.format' = 'json',
'value.format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092'
);
INSERT INTO products_snapshot_topic
SELECT id, name, price, stock
FROM products_cdc;
Downstream consumers read from products-snapshot and get the latest state of each product. New consumers that start from the beginning read the full snapshot, then receive updates in real time.
Streamkap’s Approach to Changelog-to-Snapshot
If you do not need to apply custom business logic during the materialization, Streamkap handles the entire changelog-to-snapshot conversion for you. When you configure a Streamkap pipeline from PostgreSQL to Snowflake, the platform:
- Reads the CDC change log from the source database’s transaction log
- Serializes events into Kafka with proper ordering (partitioned by primary key)
- Applies the change log to the destination table using Snowflake’s
MERGEstatement - Handles inserts, updates, and deletes automatically
- Manages deduplication and ordering to ensure the destination table reflects the current source state
You do not write Flink jobs, manage compaction settings, or build merge SQL. The destination table is always a snapshot of the source table’s current state, updated in real time.
For cases where you do need custom logic (filtering, transforming, or joining before materializing), Streamkap’s managed Flink integration lets you write SQL that operates on the changelog stream, and the result is sunk to your destination with the same automatic merge handling.
Keeping Both: Change Log and Snapshot Side by Side
Many architectures need both the change log (for audit, replay, and time-travel) and the snapshot (for queries and dashboards). The standard pattern is dual-write from the same CDC stream:
PostgreSQL → Streamkap → Kafka CDC Topic
├── Snowflake (snapshot table via MERGE)
└── Iceberg (append-only change log for time-travel)
The Snowflake table always shows current state. The Iceberg table stores every change event with timestamps, enabling queries like “what was the product catalog at 14:00 yesterday?”
-- Iceberg: point-in-time query
SELECT * FROM products_changelog
WHERE event_timestamp <= TIMESTAMP '2026-02-24 14:00:00'
QUALIFY ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY event_timestamp DESC
) = 1
AND op <> 'd';
This gives you a snapshot as of any historical point, reconstructed from the change log.
Handling Late-Arriving Data
In a perfectly ordered world, events arrive in the same order as the source database committed them. In practice, network delays, Kafka rebalances, and parallel processing can reorder events.
If an UPDATE for product_1 at ts=100 arrives after an UPDATE for the same product at ts=105, the naive last-value-per-key approach (using processing time) will incorrectly apply the older update on top of the newer one.
Solutions:
Use event time, not processing time. Order by the source database’s commit timestamp instead of Flink’s processing time:
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts_ms DESC) AS rn
FROM products_cdc
) WHERE rn = 1;
Use the Debezium format directly. When Flink reads a Debezium-formatted topic, it uses the embedded operation type and source timestamp to determine the correct ordering. Combined with Kafka’s per-partition ordering (guaranteed by primary key partitioning), this ensures correct snapshot materialization even with minor reordering across partitions.
Set appropriate watermarks. If you use event time windowing, configure watermarks to tolerate your expected maximum out-of-orderness:
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
This tells Flink to wait 10 seconds for late events before finalizing results.
Performance Considerations
State size. The deduplication pattern stores one state entry per distinct key. For a table with 100 million rows, that is 100 million state entries. Use RocksDB state backend and size your TaskManagers accordingly.
Changelog rate. A table with high update frequency (thousands of updates per second per key) generates a high volume of changelog events. The snapshot table can fall behind if the sink cannot apply upserts fast enough. Batch upserts (grouping multiple changes per key into a single merge) helps here.
Delete propagation. Some warehouse connectors handle deletes by running a separate DELETE statement per key, which can be slow. Streamkap batches deletes and uses efficient merge patterns to handle high-volume delete scenarios.
The change-log-to-snapshot conversion is the bridge between the event-sourced world of CDC and the query-oriented world of analytics. Get it right, and your warehouse tables are always fresh, always correct reflections of your source databases. The key is choosing the right mechanism for your correctness and latency requirements: Kafka compaction for simple eventual consistency, Flink deduplication for precise real-time materialization, or Streamkap for fully-managed conversion that handles the complexity for you.