<--- Back to all resources
Slowly Changing Dimensions in Streaming: Handling SCD Type 1 and Type 2
How to implement slowly changing dimensions in real-time streaming pipelines. Covers SCD Type 1, Type 2, and hybrid approaches using CDC and Flink.
If you have spent any time building analytics on top of transactional databases, you have run into this problem: a customer moves to a new city, a product changes price tiers, or a sales rep gets reassigned to a different region. The row in your dimension table gets updated, and suddenly every historical fact that joins against it reflects the current state instead of the state that was true at the time the event happened.
This is the slowly changing dimension problem. It has been well understood in batch data warehousing for decades - Ralph Kimball formalized the SCD types in the 1990s. But most of the literature assumes you are running nightly batch loads. When you move to streaming pipelines with sub-second latency, the mechanics change. You need to handle dimension changes as they arrive, in real time, without stopping the pipeline to reprocess anything.
This article covers how to implement SCD Type 1, Type 2, and hybrid approaches in streaming architectures, with a focus on CDC as the change capture mechanism and Apache Flink as the stream processor.
A Quick Refresher on SCD Types
Before getting into the streaming implementation, let’s establish what each SCD type actually does.
Type 1: Overwrite
Type 1 is the simplest approach. When a dimension attribute changes, you overwrite the old value with the new one. No history is preserved.
| customer_id | name | city | segment |
|---|---|---|---|
| 101 | Alice Chen | San Francisco | Enterprise |
When Alice moves to New York, you update the row in place:
| customer_id | name | city | segment |
|---|---|---|---|
| 101 | Alice Chen | New York | Enterprise |
Every query that joins against this dimension - past, present, and future - now sees “New York.” If someone asks “how much revenue came from San Francisco customers last quarter,” Alice’s revenue will be attributed to New York even though she was in San Francisco at the time.
Type 1 is appropriate when you genuinely do not care about historical values. Fixing a typo in a customer name, for example, is a textbook Type 1 use case.
Type 2: Add New Row with Versioning
Type 2 preserves full history by creating a new row every time a tracked attribute changes. Each row gets valid_from and valid_to timestamps, plus a flag indicating which row is current.
| surrogate_key | customer_id | name | city | valid_from | valid_to | is_current |
|---|---|---|---|---|---|---|
| 1001 | 101 | Alice Chen | San Francisco | 2024-01-15 | 2026-02-20 | false |
| 1002 | 101 | Alice Chen | New York | 2026-02-20 | 9999-12-31 | true |
Now when you query last quarter’s revenue, you can join on customer_id AND check that the fact’s event timestamp falls within the dimension row’s valid_from/valid_to range. Alice’s Q4 2025 revenue correctly attributes to San Francisco.
The tradeoff is storage and query complexity. Your dimension table grows with every change, and every join needs a date range predicate.
Type 3: Add New Column
Type 3 tracks limited history by adding columns for the previous value:
| customer_id | name | current_city | previous_city | city_changed_at |
|---|---|---|---|---|
| 101 | Alice Chen | New York | San Francisco | 2026-02-20 |
This is useful when you only need one level of history, but it does not scale well when attributes change multiple times or when you need to track many attributes.
Type 6: Hybrid (1 + 2 + 3)
Type 6 combines multiple approaches. You maintain versioned rows (Type 2) but also keep a current_value column that gets overwritten across all rows (Type 1 behavior) and a previous_value column (Type 3). This gives you maximum flexibility at the cost of maximum complexity.
In practice, most streaming implementations use Type 1 or Type 2. Type 3 and Type 6 are better suited to batch workflows where you have more control over the update mechanics.
Why CDC Is the Natural Fit
Change data capture is purpose-built for this problem. When a CDC connector reads a database’s transaction log, every update to a dimension table produces an event that contains both the before and after states:
{
"op": "u",
"before": {
"customer_id": 101,
"name": "Alice Chen",
"city": "San Francisco",
"segment": "Enterprise"
},
"after": {
"customer_id": 101,
"name": "Alice Chen",
"city": "New York",
"segment": "Enterprise"
},
"ts_ms": 1740076800000,
"source": {
"table": "customers",
"db": "app_prod"
}
}
That single event contains everything you need for any SCD type:
- Type 1: Take the
aftervalues, upsert into the dimension table. - Type 2: Use
beforeto close out the existing row (setvalid_to), useafterto insert a new current row (setvalid_from). - Type 3: Copy
before.citytoprevious_city, setcurrent_cityfromafter.city.
Without CDC, you would need to diff snapshots of the dimension table on every load to detect what changed. That approach has latency (you only see changes at snapshot intervals), misses intermediate states (if a row changes twice between snapshots, you lose the middle state), and puts unnecessary load on the source database.
Streamkap captures these change events from sources like PostgreSQL, MySQL, MongoDB, and SQL Server with sub-second latency. The CDC events flow through Kafka, giving you a durable, replayable stream of every dimension change.
Implementing SCD Type 1 in a Streaming Pipeline
Type 1 is straightforward in streaming because it maps directly to an upsert operation. Every CDC event for a dimension table gets applied as an upsert to the destination.
Direct CDC to Warehouse (No Flink Required)
For many use cases, you do not need a stream processor at all. If your destination supports upserts - and most modern warehouses do (Snowflake MERGE, BigQuery MERGE, ClickHouse ReplacingMergeTree) - you can stream CDC events directly to the destination with upsert semantics.
The pipeline looks like this:
Source DB → CDC → Kafka → Sink Connector → Warehouse (UPSERT)
Each change event arrives at the sink connector, which applies it as a MERGE/UPSERT keyed on the business key (customer_id). The destination always reflects the current state.
This is what Streamkap does out of the box for dimension tables. Every CDC event is applied using the destination’s native upsert mechanism, so your dimension tables stay current without any custom transformation logic.
Flink for Type 1 with Transformations
If you need to transform the dimension data before it lands - say, mapping raw status codes to human-readable labels, or enriching a customer record with data from a second stream - Flink enters the picture:
-- Flink SQL: Type 1 upsert into a dimension table
INSERT INTO dim_customers
SELECT
customer_id,
name,
city,
CASE segment_code
WHEN 'ENT' THEN 'Enterprise'
WHEN 'SMB' THEN 'Small Business'
WHEN 'MID' THEN 'Mid-Market'
ELSE 'Unknown'
END AS segment,
event_time AS last_updated
FROM cdc_customers;
With Flink’s upsert mode on the sink, this continuously maintains a Type 1 dimension table with transformations applied.
Implementing SCD Type 2 in a Streaming Pipeline
Type 2 is where things get interesting. You cannot just upsert - you need to close out the previous row and insert a new one, atomically, for every change event.
The Core Pattern in Flink
The general approach is:
- Consume CDC events for the dimension table.
- For each update event, emit two output records: one to close the previous version (set
valid_to) and one to open a new version (setvalid_from). - Write both records to the destination.
Here is a Flink SQL approach using a LATERAL TABLE function to fan out each CDC event into the two required rows:
-- Create the CDC source
CREATE TABLE cdc_customers (
customer_id INT,
name STRING,
city STRING,
segment STRING,
op_type STRING, -- 'c', 'u', 'd'
before_city STRING, -- from CDC before image
before_segment STRING,
change_ts TIMESTAMP(3),
WATERMARK FOR change_ts AS change_ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'app_prod.public.customers',
'format' = 'debezium-json'
);
-- Create the versioned dimension sink
CREATE TABLE dim_customers_v2 (
surrogate_key BIGINT,
customer_id INT,
name STRING,
city STRING,
segment STRING,
valid_from TIMESTAMP(3),
valid_to TIMESTAMP(3),
is_current BOOLEAN,
PRIMARY KEY (surrogate_key) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
...
);
The tricky part is generating the surrogate keys and managing the row closure. In practice, this often requires a Flink DataStream application rather than pure SQL, because you need to maintain state about the current version of each dimension row:
// Flink DataStream: SCD Type 2 processor
public class ScdType2Function
extends KeyedProcessFunction<Integer, CdcEvent, DimensionRow> {
private ValueState<DimensionRow> currentVersion;
@Override
public void open(Configuration params) {
ValueStateDescriptor<DimensionRow> desc =
new ValueStateDescriptor<>("current-version",
DimensionRow.class);
currentVersion = getRuntimeContext().getState(desc);
}
@Override
public void processElement(CdcEvent event,
Context ctx,
Collector<DimensionRow> out) throws Exception {
DimensionRow existing = currentVersion.value();
if (existing != null && event.isUpdate()) {
// Close the previous version
existing.setValidTo(event.getTimestamp());
existing.setIsCurrent(false);
out.collect(existing);
}
// Open a new version
DimensionRow newVersion = DimensionRow.builder()
.surrogateKey(generateSurrogateKey())
.customerId(event.getCustomerId())
.name(event.getName())
.city(event.getCity())
.segment(event.getSegment())
.validFrom(event.getTimestamp())
.validTo(Timestamp.valueOf("9999-12-31 00:00:00"))
.isCurrent(true)
.build();
out.collect(newVersion);
currentVersion.update(newVersion);
}
}
This keyed process function maintains one piece of state per customer: the current dimension row. When an update arrives, it closes the old version (emits it with valid_to set and is_current = false) and opens a new version.
Surrogate Key Generation
Surrogate keys in streaming need care. Auto-increment from a database does not work because you are generating rows in a distributed stream processor. Common approaches:
- Snowflake-style IDs (timestamp + worker ID + sequence): Globally unique, roughly time-ordered.
- Hash-based keys:
SHA-256(customer_id + valid_from)gives you a deterministic key that is idempotent on replay. - UUID v7: Time-ordered UUIDs that sort well and are globally unique.
The hash-based approach is often the best fit for streaming SCD because it makes the pipeline idempotent - if you replay events, you get the same surrogate keys, which means your upserts behave correctly.
Temporal Joins: Using Versioned Dimensions in Real Time
Once you have a Type 2 dimension table, you need a way to join fact events against the correct version of the dimension. This is where Flink’s temporal joins come in.
A temporal join matches each fact event to the dimension version that was active at the fact’s event time:
-- Flink SQL temporal join
SELECT
o.order_id,
o.order_amount,
o.order_time,
c.city AS customer_city_at_order_time,
c.segment AS customer_segment_at_order_time
FROM orders AS o
JOIN dim_customers FOR SYSTEM_TIME AS OF o.order_time AS c
ON o.customer_id = c.customer_id;
The FOR SYSTEM_TIME AS OF clause is the key. Instead of joining against the current row, Flink looks up the version of dim_customers that was valid at o.order_time. An order placed on January 15 joins against the dimension row where valid_from <= Jan 15 < valid_to.
This is fundamentally different from a regular streaming join. A regular join would give you the latest dimension value, which is Type 1 behavior. The temporal join gives you point-in-time accuracy, which is Type 2 behavior.
Temporal Joins Against CDC Streams Directly
You do not always need a materialized Type 2 table. Flink can build a versioned table in memory from a CDC stream and use it directly in temporal joins:
-- Define the dimension as a CDC source with versioning
CREATE TABLE customers (
customer_id INT,
name STRING,
city STRING,
segment STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'app_prod.public.customers',
'format' = 'debezium-json'
);
-- Temporal join directly against the CDC stream
SELECT
o.order_id,
o.order_amount,
c.city,
c.segment
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.order_time AS c
ON o.customer_id = c.customer_id;
Flink internally maintains the version history in its state backend. Each CDC event updates the internal version table, and each fact event gets joined against the appropriate version. This avoids the overhead of maintaining an external Type 2 table entirely - the versioning lives in the stream processor’s state.
The tradeoff is that Flink’s state has to hold enough version history to cover the time range of your fact events. If your orders arrive with low latency (seconds or minutes behind real time), the state stays small. If you have late-arriving events from days ago, the state can grow significantly.
Storage and Performance Considerations
Type 2 Table Growth
Type 2 dimensions grow without bound if you do not manage them. A customer table with 1 million rows where 5% of customers change address per month will accumulate 600,000 historical rows per year. After five years, your 1 million-row dimension table has 4 million rows.
Strategies to manage this:
- Partition by
valid_fromdate range: Queries that only care about recent history scan fewer partitions. - Archive old versions: Move rows with
valid_toolder than N years to cold storage. - Cluster/sort on
customer_id, valid_from: This keeps all versions of a customer physically co-located, making the range predicate in joins efficient.
State Size in Flink
For temporal joins, Flink must maintain version history in its state backend (RocksDB for production workloads). Key tuning parameters:
- State TTL: Set a time-to-live on version state so old versions are garbage collected. For example, if your fact events never arrive more than 24 hours late, set TTL to 48 hours.
- RocksDB block cache: Dimension lookups are random reads - give the block cache enough memory to keep hot keys in memory.
- Incremental checkpointing: Reduces checkpoint overhead when state is large.
// Configure state TTL for SCD state
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(48))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(
StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<DimensionRow> desc =
new ValueStateDescriptor<>("current-version",
DimensionRow.class);
desc.enableTimeToLive(ttlConfig);
Choosing Between Type 1 and Type 2
The decision often comes down to the specific dimension attribute:
| Attribute | Recommended SCD Type | Reasoning |
|---|---|---|
| Customer name (typo fix) | Type 1 | No analytical value in the old typo |
| Customer address | Type 2 | Geography affects regional reporting |
| Product price | Type 2 | Historical pricing affects revenue analysis |
| Product description | Type 1 | Cosmetic, no impact on analytics |
| Employee department | Type 2 | Org structure affects cost allocation |
| Status codes/enums | Type 1 | Usually reflects current reality |
Many real-world implementations use a hybrid: Type 1 for some columns on the same table, Type 2 for others. The CDC event contains changes to all columns, and your Flink job decides which changes trigger a new version and which are applied in place.
// Hybrid SCD: only version on specific column changes
private boolean requiresNewVersion(CdcEvent event) {
return !Objects.equals(
event.getBefore().getCity(),
event.getAfter().getCity())
|| !Objects.equals(
event.getBefore().getSegment(),
event.getAfter().getSegment());
// Name changes are Type 1 - applied to current row
}
Putting It Together: A Reference Architecture
Here is what the full pipeline looks like for a system that needs both Type 1 and Type 2 handling:
Source Databases
├── CDC (Streamkap) ──→ Kafka Topics
│ │
│ ┌──────┴──────┐
│ │ │
│ Fact Topics Dimension Topics
│ │ │
│ └──────┬──────┘
│ │
│ Flink Cluster
│ ├── SCD Type 2 processor (versioned dims)
│ ├── Temporal joins (fact enrichment)
│ └── Type 1 passthrough (simple dims)
│ │
│ ┌──────┴──────┐
│ │ │
│ Warehouse Real-time Layer
│ (Snowflake, (ClickHouse,
│ BigQuery) Redis)
The CDC layer captures every change from every source table. Fact events and dimension events flow into separate Kafka topics. Flink consumes both, maintains versioned dimension state, enriches fact events with point-in-time dimension lookups via temporal joins, and writes enriched facts to the warehouse. Dimension tables in the warehouse get either Type 1 or Type 2 treatment depending on the business requirements.
Streamkap handles the CDC-to-Kafka portion of this architecture, with built-in support for schema evolution so that dimension table changes (new columns, type changes) flow through without manual intervention. For simpler pipelines where Type 1 is sufficient, Streamkap can sink directly to the warehouse with upsert semantics, skipping the Flink layer entirely.
When to Skip the Complexity
Not every dimension needs Type 2 handling. Before building a versioned dimension pipeline, ask two questions:
- Does anyone actually query historical values of this attribute? If your analysts always filter to “current state,” Type 2 is overhead with no benefit.
- How frequently does this attribute change? A dimension that changes once per year does not justify a real-time SCD pipeline. A nightly batch job that diffs snapshots might be entirely sufficient.
Start with Type 1. It is simple, it keeps your dimension tables small, and it covers the majority of use cases. Add Type 2 for specific attributes where you have a concrete business need for point-in-time accuracy. That way, you get the benefits of historical tracking where it matters without paying the storage and complexity costs everywhere.