<--- Back to all resources
Temporal Joins in Flink: Point-in-Time Correct Enrichment
Deep dive into Flink temporal joins for point-in-time lookups against versioned tables. Learn the syntax, when to use them, and how they differ from lookup and regular joins.
Every data engineer has hit this problem at some point: you join a stream of events against a dimension table, and the numbers come out wrong. Not because the join logic is broken, but because the dimension changed between when the event happened and when your pipeline processed it. An order placed when a product cost $10 gets enriched with the current price of $15. A currency conversion uses today’s exchange rate instead of the rate at transaction time.
This is the correctness gap that temporal joins were built to close.
The Problem with Regular Joins on Changing Dimensions
In batch processing, joins are straightforward. You have a fact table, you have a dimension table, and you join them. If the dimension table is a slowly changing dimension (SCD), you build SCD Type 2 tables with validity ranges and filter on them. It works, but it requires careful modeling.
In streaming, the challenge is different. Your fact stream is unbounded, and your dimension tables are not static snapshots - they are living, changing data. A regular join in Flink between two streams matches rows based on key equality at the moment both sides have emitted a matching record. This is not the same as “give me the dimension value that was valid when this event occurred.”
Consider a simplified example. You have an orders stream and a products table:
-- Orders stream (event time)
order_id | product_id | quantity | order_time
---------|------------|----------|-------------------
1001 | P42 | 3 | 2026-02-25 09:00
1002 | P42 | 1 | 2026-02-25 14:30
-- Products table (changes over time)
product_id | price | updated_at
-----------|-------|-------------------
P42 | 10.00 | 2026-02-25 08:00 -- morning price
P42 | 15.00 | 2026-02-25 12:00 -- price changed at noon
Order 1001 was placed at 09:00, when the price was $10. Order 1002 was placed at 14:30, when the price was $15. A correct enrichment should reflect those respective prices. But a regular inner join in Flink does not reason about “which version of the product row was valid at a given time.” It simply matches on key equality across two append streams, producing potentially duplicated or incorrect results.
A lookup join - Flink’s mechanism for querying an external system - fetches the current value at processing time. If both orders are processed at 15:00, both get the $15 price. Order 1001 is now wrong.
What Temporal Joins Actually Do
A temporal join correlates each row from a stream (the probe side) with the version of a dimension table (the build side) that was valid at the probe row’s timestamp. Flink maintains a versioned view of the dimension table internally, so when it processes order 1001 with order_time = 09:00, it looks up the product row that was valid at 09:00 - finding the $10 price. When it processes order 1002 at 14:30, it finds the $15 price.
This is called point-in-time correctness, and it is the defining property of temporal joins.
The concept is not unique to Flink. kdb+ has aj (as-of join). DuckDB and some feature stores support ASOF joins. But Flink’s implementation operates on unbounded streams in real time, which makes it especially useful for production pipelines.
Flink SQL Syntax: FOR SYSTEM_TIME AS OF
Flink SQL uses the FOR SYSTEM_TIME AS OF clause to express a temporal join. Here is the general form:
SELECT
o.order_id,
o.product_id,
o.quantity,
p.price,
o.quantity * p.price AS total
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id;
Breaking this apart:
ordersis the probe side - an append-only stream with an event-time attribute (order_time).products FOR SYSTEM_TIME AS OF o.order_timetells Flink to look up the version ofproductsthat was valid ato.order_time.productsmust be a versioned table - a table that Flink can reconstruct at any point in time.
The result is a stream where each order is enriched with the price that was in effect when the order was placed, regardless of when the pipeline actually processes the row.
Defining the Versioned Table
For the temporal join to work, Flink needs the dimension table to be a versioned table. There are two main ways to set this up.
Option 1: CDC source (most common)
If your dimension table comes from a CDC connector that emits insert, update, and delete events, Flink automatically treats it as a versioned table. The CDC changelog gives Flink the full history of changes, letting it reconstruct any past state.
CREATE TABLE products (
product_id STRING,
price DECIMAL(10, 2),
updated_at TIMESTAMP(3),
WATERMARK FOR updated_at AS updated_at - INTERVAL '5' SECOND,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'db.public.products',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
The combination of a primary key and a changelog format (debezium-json, canal-json, maxwell-json) tells Flink this is a versioned table. Each incoming CDC event updates the version history for that key.
Option 2: Append-only source with a deduplicated temporal table
If you do not have a CDC source, you can create a versioned view from an append-only stream using Flink’s deduplication pattern:
CREATE VIEW products_versioned AS
SELECT product_id, price, updated_at
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY product_id
ORDER BY updated_at DESC
) AS row_num
FROM products_changelog
)
WHERE row_num = 1;
This is less common and more involved. CDC sources are the natural fit for temporal joins, which we will come back to.
Event-Time vs Processing-Time Temporal Joins
Flink supports temporal joins in two time domains, and the distinction matters a great deal.
Event-time temporal joins
This is the version described above. The probe row’s event-time watermark determines which version of the dimension is looked up. This gives true point-in-time correctness: the join result reflects the state of the world as it was when the event actually happened.
Requirements:
- The probe side must have a defined event-time attribute with watermarks.
- The build side must be a versioned table (CDC or deduplicated view) with its own event-time attribute.
- Both sides participate in watermark advancement.
Processing-time temporal joins
With processing time, the join looks up whatever version of the dimension is current at the moment of processing:
SELECT
o.order_id,
o.product_id,
p.price
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.proc_time AS p
ON o.product_id = p.product_id;
Here proc_time is a processing-time attribute (declared with PROCTIME() in the table definition). This gives you the latest version of the dimension at the time the row is processed. It is functionally equivalent to a lookup join.
Processing-time temporal joins are simpler to set up because you do not need watermarks or event-time tracking on the dimension side. But they sacrifice correctness in exchange for simplicity. If your pipeline has any lag, or if you reprocess historical data, the results will not be point-in-time correct.
When to use which:
| Scenario | Time domain |
|---|---|
| Real-time enrichment where correctness matters | Event time |
| Reprocessing historical data | Event time |
| Best-effort enrichment with low latency requirements | Processing time |
| Dimension rarely changes, pipeline has minimal lag | Processing time is acceptable |
For most production use cases involving financial data, pricing, user attributes, or anything where auditors or analysts will scrutinize the numbers, event-time temporal joins are the right choice.
CDC as the Natural Source for Temporal Tables
Change data capture is the ideal mechanism for feeding the build side of a temporal join. Here is why.
A CDC stream from a database captures every insert, update, and delete as it happens. This gives Flink a complete, ordered changelog of the dimension table. Flink can reconstruct the state of any row at any point in time by replaying the changelog up to the desired timestamp.
Compare this to alternatives:
- Periodic snapshots: You could dump the dimension table every N minutes and load it into Flink. But this means you only have point-in-time accuracy at snapshot boundaries. A change between snapshots is invisible.
- Polling queries: A lookup join polls the external database at processing time. It always sees the latest state, never historical.
- Manual versioning: You could build SCD Type 2 tables with
valid_fromandvalid_tocolumns and join with range predicates. This works in batch but is awkward in streaming and puts the versioning burden on you.
CDC gives you granular, row-level change history without any extra modeling. Platforms like Streamkap make this especially straightforward - you point CDC at your source database, and the change stream flows into Kafka in a Debezium-compatible format that Flink treats as a versioned table out of the box. No manual SCD modeling, no periodic dumps, no compromises on freshness.
A Concrete Example: Currency Conversion
Currency exchange rates change constantly. Any system that processes international transactions needs to convert amounts at the rate that was valid when the transaction occurred.
-- Transactions stream
CREATE TABLE transactions (
tx_id STRING,
amount DECIMAL(10, 2),
currency STRING,
tx_time TIMESTAMP(3),
WATERMARK FOR tx_time AS tx_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- Exchange rates from CDC (versioned table)
CREATE TABLE exchange_rates (
currency STRING,
rate_to_usd DECIMAL(10, 6),
updated_at TIMESTAMP(3),
WATERMARK FOR updated_at AS updated_at - INTERVAL '5' SECOND,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'db.public.exchange_rates',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
-- Temporal join: convert each transaction at the rate valid at tx_time
SELECT
t.tx_id,
t.amount,
t.currency,
e.rate_to_usd,
t.amount * e.rate_to_usd AS amount_usd,
t.tx_time
FROM transactions t
JOIN exchange_rates FOR SYSTEM_TIME AS OF t.tx_time AS e
ON t.currency = e.currency;
If EUR/USD was 1.0850 at 10:00 and changed to 1.0920 at 11:00, a transaction at 10:30 gets converted at 1.0850, and a transaction at 11:30 gets converted at 1.0920. This is exactly how financial systems expect conversions to work.
Temporal Joins vs Lookup Joins: When to Use Each
Flink offers both temporal joins and lookup joins for stream-to-dimension enrichment. They solve related but different problems.
Lookup joins query an external system (a database, a cache, an API) at processing time for the current value:
SELECT o.*, p.price
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.proc_time AS p
ON o.product_id = p.product_id;
(With a lookup connector like jdbc, this issues a query to the database for each row.)
Temporal joins look up the version of an internally maintained versioned table at event time:
SELECT o.*, p.price
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id;
The syntax looks similar, but the semantics are very different.
| Aspect | Temporal join | Lookup join |
|---|---|---|
| Time reference | Event time of the probe row | Processing time (wall clock) |
| Data source | Versioned table in Flink (CDC/changelog) | External database or cache |
| Point-in-time correct | Yes | No |
| Historical reprocessing | Correct results | Wrong results (uses current values) |
| State overhead | Flink maintains version history in state | No Flink state for the dimension |
| Latency | Low (local state lookup) | Higher (external I/O per row) |
| External dependency at runtime | No (data pre-loaded via CDC) | Yes (database must be available) |
Use temporal joins when correctness matters and the dimension data is available as a changelog stream. Use lookup joins when you need live data from an external system that is not captured via CDC, or when the dimension is too large to hold in Flink state.
In many architectures, CDC feeds temporal joins for the hot path (where point-in-time correctness is needed), while lookup joins serve as a fallback for reference data that rarely changes or that lives only in an external system.
Practical Considerations
State size
Temporal joins with event time require Flink to keep version history in state. For dimensions with many keys and frequent updates, this state can grow large. Tune your state TTL (time-to-live) to control how far back Flink keeps versions. If your use case only needs the last 24 hours of history, configure the state retention accordingly.
-- In Flink SQL, set idle state retention
SET 'table.exec.state.ttl' = '86400000'; -- 24 hours in milliseconds
Watermark alignment
For event-time temporal joins, Flink uses watermarks to determine “how far along” each side is. If the probe side’s watermarks advance much faster than the build side, Flink may not have received the latest dimension updates yet. Make sure both streams are flowing at comparable rates, or configure watermark alignment to prevent one side from racing ahead.
Late data
Events that arrive after the watermark has passed their timestamp will still be processed, but the dimension version available may not reflect the latest update for that timestamp. Design your watermark strategy with enough slack to handle typical late arrivals in your system.
Tying It Together
Temporal joins are one of Flink’s most important features for building correct streaming pipelines. They solve a problem that is easy to overlook - dimension values change over time, and using the wrong version produces silently incorrect results. The FOR SYSTEM_TIME AS OF syntax makes point-in-time lookups a first-class operation, and CDC streams provide the versioned history that makes it all work.
If you are building real-time pipelines that enrich events with dimension data - prices, exchange rates, user tiers, configuration values - temporal joins should be your default choice over regular or lookup joins whenever correctness matters. Combined with a CDC platform like Streamkap feeding versioned tables into Flink, you get point-in-time correct enrichment without the burden of manual SCD modeling or periodic snapshot management.