<--- Back to all resources

Engineering

February 25, 2026

12 min read

Flink SQL Joins: Regular, Temporal, and Lookup Joins Explained

Learn every join type in Flink SQL - regular joins, interval joins, temporal joins, and lookup joins. Understand when to use each with practical streaming examples.

TL;DR: • Flink SQL supports four main join types: regular joins (unbounded state), interval joins (time-bounded), temporal joins (point-in-time correct), and lookup joins (external system queries). • Regular joins keep all state forever and are rarely used in production - prefer interval or temporal joins for bounded state. • Temporal joins are the key pattern for enriching CDC streams with slowly-changing dimension data while maintaining historical accuracy.

If you have written SQL joins in a batch context, you already understand the semantics of inner, left, right, and full outer joins. Flink SQL supports all of these. But there is a fundamental difference that trips up nearly every engineer transitioning from batch to streaming: in batch SQL, both sides of the join are finite and known; in streaming SQL, data arrives continuously and the “other side” of the join may not have arrived yet — or may arrive hours later.

This single difference changes everything about how joins are executed, how much state they consume, and which join type you should reach for. Choosing the wrong join in a streaming context does not just produce incorrect results. It can silently accumulate gigabytes of state, degrade checkpoint performance, and eventually crash your application.

This guide walks through every join type available in Flink SQL, explains the trade-offs of each, and provides practical SQL examples you can adapt for your own pipelines.

Regular Joins

Regular joins in Flink SQL look exactly like their batch SQL counterparts:

SELECT o.order_id, o.amount, c.name, c.email
FROM orders o
INNER JOIN customers c
  ON o.customer_id = c.customer_id;

Flink executes this by maintaining the full state of both the orders and customers streams. When a new order arrives, Flink probes the state of the customers stream for a matching customer_id. When a new customer arrives, Flink probes the state of the orders stream. Matches emit results downstream.

Why Unbounded State Is a Problem

The challenge is that Flink has no way of knowing whether a future event on either side might match a past event on the other side. So it keeps everything. Both sides of the join accumulate state indefinitely.

In practice, this means:

  • State grows linearly with the total volume of events received.
  • Checkpoints become slower and more expensive as state grows.
  • Eventually the TaskManager runs out of memory or RocksDB disk space, and the job fails.

Regular joins are appropriate only when at least one side of the join is a small, bounded dataset (such as a static reference table loaded once), or when you configure aggressive state time-to-live (TTL) and accept that late-arriving matches will be silently dropped.

For most production streaming use cases, you should use one of the bounded join types described below.

Interval Joins

Interval joins solve the unbounded state problem by restricting the join to events that fall within a specified time window of each other. Flink can then safely discard state for events that have fallen outside the window.

SELECT c.click_id, p.purchase_id, p.amount
FROM clicks c, purchases p
WHERE c.user_id = p.user_id
  AND p.purchase_time BETWEEN c.click_time AND c.click_time + INTERVAL '30' MINUTE;

This join matches each click with any purchase by the same user that occurred within 30 minutes after the click. Flink only needs to keep 30 minutes of state for each side, which is a bounded and predictable amount.

Requirements for Interval Joins

Interval joins require that:

  1. Both sides have a time attribute (event time or processing time).
  2. The join condition includes a BETWEEN clause that bounds the time difference between the two sides.
  3. Watermarks are configured on both input streams so Flink knows when it is safe to discard old state.

If you forget to configure watermarks, Flink cannot advance its notion of event time, state will never be cleaned up, and the join degrades to a regular join with unbounded state.

When to Use Interval Joins

Interval joins are the right choice when the business logic naturally defines a time window that connects two events. Common examples include correlating ad impressions with clicks, matching sensor readings from different devices within a tolerance window, or linking web activity events that happen in the same session.

Temporal Joins

Temporal joins are arguably the most important join type for CDC-based streaming pipelines. They let you enrich a stream of events with the version of a dimension record that was valid at the exact time the event occurred.

The Problem Temporal Joins Solve

Consider an orders stream and a products table where product prices change over time. If a product’s price was $10 when order #1001 was placed but has since changed to $12, which price should the join return?

A regular join would return the latest version of the product record, which is $12 — giving you an incorrect order total. A temporal join returns the version that was valid when the order was placed: $10.

FOR SYSTEM_TIME AS OF Syntax

Flink uses the SQL:2011 standard syntax for temporal joins:

SELECT
  o.order_id,
  o.product_id,
  o.quantity,
  p.price,
  o.quantity * p.price AS total
FROM orders o
JOIN products_versioned FOR SYSTEM_TIME AS OF o.order_time AS p
  ON o.product_id = p.product_id;

Here, products_versioned is a versioned table — a table backed by a changelog stream (such as a CDC stream from a database) that Flink automatically maintains as a keyed, time-versioned state. When an order event arrives with order_time = 2026-02-25T10:30:00, Flink looks up the version of the product record that was current at that exact timestamp.

Versioned Tables and CDC Streams

For temporal joins to work, the right-hand side of the join must be a versioned table. In Flink, a table qualifies as versioned if it is backed by a changelog stream (INSERT, UPDATE, DELETE events) with a defined primary key and event-time attribute.

CDC streams from databases are the canonical example. A Debezium or CDC connector captures every insert, update, and delete from a source database table and publishes them as a changelog stream. Flink can consume this stream and automatically maintain the versioned state needed for temporal joins.

This is where a platform like Streamkap fits naturally. Streamkap delivers CDC changelog streams from databases such as PostgreSQL, MySQL, and MongoDB in the exact format that Flink temporal joins expect, eliminating the need to build and maintain your own CDC connector infrastructure.

Key Properties of Temporal Joins

  • Point-in-time correctness: Every event is enriched with the dimension value that was valid at the event’s timestamp.
  • Bounded state: Only the latest version (plus versions still needed for unprocessed events) is kept in state.
  • Append-only left side: The probe side (left) must be an append-only stream. The build side (right) must be a versioned table.
  • Event-time required: Temporal joins require event-time attributes and watermarks on the probe side.

Lookup Joins

Lookup joins are similar to temporal joins in that they enrich a stream with external data, but instead of looking up a versioned in-memory table, they query an external system (a database, cache, or API) at the time each event is processed.

SELECT o.order_id, o.product_id, p.name, p.category
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.proc_time AS p
  ON o.product_id = p.product_id;

The key difference from a temporal join is that o.proc_time is a processing-time attribute (not event time), and products is a lookup table backed by a connector that queries an external system on each lookup.

Async I/O and Caching

Because each lookup involves a network call, lookup joins can become a latency bottleneck. Flink provides two mechanisms to mitigate this:

  • Async I/O: Flink can issue multiple lookup requests concurrently rather than blocking on each one sequentially. This dramatically improves throughput when the external system can handle parallel queries.
  • Caching: Most Flink lookup connectors support a local cache with a configurable TTL. Repeated lookups for the same key within the TTL window are served from memory instead of hitting the external system.
CREATE TABLE products (
  product_id STRING,
  name STRING,
  category STRING,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/mydb',
  'table-name' = 'products',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.max-rows' = '10000',
  'lookup.partial-cache.expire-after-write' = '10 min'
);

When to Use Lookup Joins

Lookup joins are ideal when:

  • The dimension data lives in an external system and you do not want to ingest it as a full CDC stream.
  • You need the current value of a dimension (not the historical value at event time).
  • The dimension table is too large to hold entirely in Flink state.

The trade-off is that lookup joins give up point-in-time correctness in exchange for simplicity and lower state overhead. They always return whatever the external system has at query time.

Choosing the Right Join Type

The following table summarizes how the four join types compare across the dimensions that matter most in production:

CharacteristicRegular JoinInterval JoinTemporal JoinLookup Join
StateUnbounded (both sides)Bounded by time windowBounded (versioned table)None (external query)
CorrectnessCurrent state onlyTime-windowed matchesPoint-in-time exactCurrent value at query time
LatencyLowLowLowHigher (network call)
Left sideAny streamAppend-only with time attrAppend-only with event timeAny stream
Right sideAny streamAppend-only with time attrVersioned table (CDC)External lookup table
Time semanticsNone requiredEvent or processing timeEvent time requiredProcessing time
Best forSmall bounded datasetsCorrelating two event streamsCDC enrichmentExternal dimension lookups

Rule of thumb: If you are enriching a stream with CDC data from a database, use a temporal join. If you are correlating two event streams within a time window, use an interval join. If you need to query an external system for the current value, use a lookup join. Avoid regular joins unless you have a specific reason and a plan for state management.

Practical Examples

Enriching Orders with Customer Data (Temporal Join)

A common pattern in e-commerce pipelines is enriching order events with customer details from a CDC stream:

-- customers_cdc is a CDC changelog stream from the source database
CREATE TABLE customers_cdc (
  customer_id STRING,
  name STRING,
  email STRING,
  tier STRING,
  updated_at TIMESTAMP(3),
  WATERMARK FOR updated_at AS updated_at - INTERVAL '5' SECOND,
  PRIMARY KEY (customer_id) NOT ENFORCED
) WITH ('connector' = 'kafka', ...);

-- orders is an append-only stream of new orders
SELECT
  o.order_id,
  o.amount,
  c.name AS customer_name,
  c.tier AS customer_tier
FROM orders o
JOIN customers_cdc FOR SYSTEM_TIME AS OF o.order_time AS c
  ON o.customer_id = c.customer_id;

This returns the customer name and tier that were valid when each order was placed, even if the customer has since been upgraded or changed their name.

Correlating Clicks with Purchases (Interval Join)

Attributing purchases to marketing clicks requires matching two event streams within a conversion window:

SELECT
  c.click_id,
  c.campaign_id,
  p.purchase_id,
  p.amount AS revenue
FROM clicks c, purchases p
WHERE c.user_id = p.user_id
  AND p.purchase_time BETWEEN c.click_time AND c.click_time + INTERVAL '24' HOUR;

This attributes any purchase within 24 hours of a click to the originating campaign. State is bounded to 24 hours on each side.

Looking Up Product Details from a Database (Lookup Join)

When you need the current product name and category for each order but do not want to maintain a full CDC stream for the products table:

SELECT
  o.order_id,
  o.product_id,
  p.name AS product_name,
  p.category
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.proc_time AS p
  ON o.product_id = p.product_id;

This queries the products database for each incoming order. With caching enabled on the lookup connector, repeated lookups for popular products are served from memory.

State Management and Performance

Every join type except lookup joins stores state in Flink’s state backend (either heap or RocksDB). Managing this state is critical for long-running streaming jobs.

State TTL Configuration

For regular joins where you accept eventual state expiration, configure a state TTL at the table level:

SET 'table.exec.state.ttl' = '24 h';

This tells Flink to expire state entries that have not been accessed in 24 hours. Be aware that this means late-arriving events that arrive after the TTL window will find no matching state and will either be dropped (inner join) or produce nulls (outer join).

For interval joins, state cleanup is handled automatically based on the time bounds in the BETWEEN clause. For temporal joins, Flink retains versions only as long as they might be needed based on watermark progress.

Checkpoint and Memory Implications

Large state directly impacts checkpoint duration and size. Each checkpoint must serialize and persist all join state to durable storage. When state grows into the tens of gigabytes, checkpoints can take minutes, increasing the window of data loss during failures.

Recommendations for production deployments:

  • Use RocksDB state backend for any join that accumulates significant state. Heap state backend keeps everything in JVM memory and is limited by heap size.
  • Enable incremental checkpoints with RocksDB to avoid re-serializing the entire state on every checkpoint.
  • Monitor state size via Flink’s metrics (e.g., State Size in the Flink dashboard) and set alerts for unexpected growth.
  • Right-size parallelism so that state is distributed across enough TaskManager slots to avoid hotspots.

Common Mistakes

Using Regular Joins When Temporal Joins Would Work

This is the most frequent mistake. Engineers write a regular INNER JOIN between an event stream and a CDC stream, not realizing that state will grow without bound. If the right-hand side is a CDC changelog with a primary key, switch to a temporal join with FOR SYSTEM_TIME AS OF.

Forgetting Watermarks

Interval joins and temporal joins rely on watermarks to advance event time and trigger state cleanup. Without watermarks, Flink’s event-time clock never advances, and state accumulates indefinitely. Always define a WATERMARK FOR clause on your time attributes:

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

The watermark delay (5 seconds in this example) should reflect the maximum expected out-of-orderness of your data. Setting it too low drops late events; setting it too high delays results and increases state.

Ignoring State Backend Configuration

Running interval or temporal joins on the default heap state backend works for development but fails in production when state exceeds available JVM heap. Always configure RocksDB for production workloads and test with realistic data volumes.

Not Setting Lookup Cache TTL

Lookup joins without caching make a network call for every single event. At high throughput, this overwhelms the external database. Always configure a lookup cache with an appropriate TTL - even 60 seconds of caching can reduce external load by orders of magnitude for hot keys.

Mismatched Time Semantics

Mixing event-time and processing-time attributes in the same join condition produces unpredictable results. Temporal joins require event time on the probe side. Lookup joins require processing time. If your pipeline uses event time everywhere, do not accidentally define a lookup join thinking you will get point-in-time correctness - you will get current-value semantics regardless.

Wrapping Up

Flink SQL joins are one of the most powerful features of the platform, but they demand a clear understanding of state, time, and correctness trade-offs. Regular joins are a useful starting point for learning but rarely belong in production streaming pipelines. Interval joins, temporal joins, and lookup joins each solve a specific class of problem with predictable resource consumption.

For teams building CDC-driven streaming pipelines — enriching event streams with database state, correlating changes across systems, or feeding real-time views downstream — temporal joins are the workhorse pattern. Paired with a CDC platform like Streamkap that delivers properly formatted changelog streams, temporal joins let you write concise, correct SQL that handles the full complexity of time-varying data.

Start with the join type that matches your correctness requirements, configure your state and watermarks deliberately, and monitor state size from day one. The effort you invest in choosing the right join upfront saves you from debugging state explosions and silent data quality issues later.