<--- Back to all resources

Engineering

February 25, 2026

11 min read

Stream-to-Stream Joins: Correlating Events Across Data Sources

How to join two unbounded event streams in real time using Flink SQL. Covers interval joins, windowed joins, state management, and practical patterns.

TL;DR: • Stream-to-stream joins correlate events from two unbounded streams based on key and time conditions. • Interval joins are the most common pattern - they bound state by requiring events to match within a time window. • Without time bounds, join state grows indefinitely and will eventually cause failures. • Flink SQL makes stream-to-stream joins accessible with standard SQL syntax plus time predicates.

In batch processing, joining two tables is straightforward. Both sides are finite. You scan them, match on a key, and you are done. Stream-to-stream joins are a different problem entirely. Both sides are unbounded - events keep arriving with no end in sight. You need to decide how long to wait for a matching event, how much state to keep, and what to do when a match never shows up.

If you have ever tried correlating order events with payment events, or matching ad clicks with conversions across two Kafka topics, you have run into this problem. This article walks through the mechanics of stream-to-stream joins in Apache Flink, the state management trade-offs, and the patterns that actually work in production.

Why Stream-to-Stream Joins Are Hard

A regular batch join looks something like this: take table A, take table B, match rows where A.key = B.key, emit results. Simple. Both tables fit in memory (or at least on disk), and you know when you have seen every row.

With streams, neither side ever finishes. An event arrives on stream A, and the matching event on stream B might show up one second later, one hour later, or never. This creates three fundamental problems:

State accumulation. The join operator must buffer events from both streams while waiting for matches. Without bounds on how long to wait, this buffer grows forever.

Late arrivals. Events can arrive out of order. An event with a timestamp of 10:00:00 might arrive at 10:00:05. Your join logic has to account for this.

Output semantics. When should you emit a result? As soon as a match is found? After a window closes? What about events that never match?

The answer to all three comes down to time bounds. You need to tell Flink how long to keep events around, and that decision shapes everything about your join.

The State Management Problem

Here is the core issue. Consider a join between two streams - orders and payments - on order_id:

SELECT o.order_id, o.amount, p.payment_method, p.paid_at
FROM orders o
JOIN payments p
ON o.order_id = p.order_id;

This query is valid Flink SQL, but it is dangerous. There is no time condition, which means Flink must keep every order and every payment in state indefinitely. If your order stream produces 10,000 events per hour and your payment stream produces 8,000, after 24 hours you have 432,000 events in state. After a week, 3 million. After a month, your job runs out of memory and crashes.

Flink stores join state in its state backend - either the JVM heap (HashMapStateBackend) or RocksDB (EmbeddedRocksDBStateBackend). RocksDB can spill to disk, which buys you time, but even disk has limits. Eventually, checkpoint sizes balloon, recovery takes forever, and the job becomes unstable.

The fix is always the same: add a time condition that tells Flink when it can safely evict old state.

Interval Joins

Interval joins are the workhorse of stream-to-stream joining. They let you specify a time range relative to one stream’s event time, within which the other stream’s events must fall. This is the pattern you will reach for most often.

Basic Syntax

SELECT o.order_id, o.amount, o.customer_id,
       p.payment_method, p.paid_at
FROM orders o, payments p
WHERE o.order_id = p.order_id
  AND p.event_time BETWEEN o.event_time AND o.event_time + INTERVAL '15' MINUTE;

This says: for each order, look for payments that arrive between the order’s event time and 15 minutes after. Flink keeps each order in state for exactly 15 minutes, then discards it. The state is bounded.

You can also express asymmetric intervals:

  AND p.event_time BETWEEN o.event_time - INTERVAL '5' MINUTE
                       AND o.event_time + INTERVAL '15' MINUTE

This handles cases where the payment event might actually arrive slightly before the order event (due to clock skew or pipeline ordering differences). The 5-minute lookback adds some state cost, but it catches those edge cases.

How Interval Joins Work Internally

When an event arrives on the left stream (orders), Flink stores it in state keyed by order_id, along with its event timestamp. When an event arrives on the right stream (payments), Flink checks if there is a matching order whose timestamp falls within the specified interval. If yes, it emits a joined row. If no match exists yet, the payment event is also stored in state to wait for a potential late-arriving order.

Flink uses event-time watermarks to determine when it is safe to purge old state. Once the watermark advances past the upper bound of the interval for a given event, that event can never match anything new, so Flink drops it.

A Practical Example: Click-to-Conversion Matching

A common use case in adtech is correlating ad clicks with conversions (purchases, signups). Clicks and conversions arrive on separate topics:

CREATE TABLE clicks (
    click_id STRING,
    user_id STRING,
    campaign_id STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'ad_clicks',
    'format' = 'json'
);

CREATE TABLE conversions (
    conversion_id STRING,
    user_id STRING,
    revenue DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'conversions',
    'format' = 'json'
);

SELECT c.click_id, c.campaign_id, v.conversion_id, v.revenue
FROM clicks c, conversions v
WHERE c.user_id = v.user_id
  AND v.event_time BETWEEN c.event_time AND c.event_time + INTERVAL '30' MINUTE;

This join attributes conversions to clicks that happened within the previous 30 minutes for the same user. The 10-second watermark delay allows for minor out-of-order arrivals without dropping events.

Windowed Joins

Windowed joins take a different approach. Instead of relative time ranges per event, they align both streams into fixed windows and join events that land in the same window.

Tumbling Window Join

SELECT o.order_id, o.amount, p.payment_method
FROM orders o
JOIN payments p
ON o.order_id = p.order_id
  AND o.window_start = p.window_start
  AND o.window_end = p.window_end
WHERE o.window_start IS NOT NULL;

To use this, both streams need to be windowed first using Flink’s TVF (Table-Valued Function) syntax:

-- Window the orders stream into 10-minute tumbling windows
SELECT * FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
);

Tumbling windows are non-overlapping and fixed-size. Every event belongs to exactly one window. The join only matches events that fall in the same 10-minute window.

Sliding Window Join

Sliding windows overlap, which means an event can belong to multiple windows. This is useful when you need to catch matches that straddle window boundaries:

SELECT * FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(event_time), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)
);

This creates 10-minute windows that slide every 5 minutes. The overlap ensures that events near a boundary still get matched. The trade-off is more state and more output rows, since each event participates in two windows.

When to Use Windowed Joins vs. Interval Joins

Windowed joins are simpler to reason about when both streams have similar event rates and you want results aligned to fixed time boundaries - for example, aggregating matched events per 10-minute window for a dashboard.

Interval joins are better when the time relationship between events is relative to each other rather than to the clock. “Payment within 15 minutes of order” is an interval join. “Orders and payments in the same 10-minute window” is a windowed join. Most real-world use cases fit interval joins better.

Inner vs. Outer Stream Joins

So far, all examples have been inner joins - they only emit results when both sides match. But what about events that never find a match?

LEFT OUTER Interval Join

SELECT o.order_id, o.amount, p.payment_method
FROM orders o
LEFT JOIN payments p
ON o.order_id = p.order_id
  AND p.event_time BETWEEN o.event_time AND o.event_time + INTERVAL '15' MINUTE;

This emits all orders, with NULL values for the payment columns when no payment arrives within 15 minutes. This is useful for detecting unpaid orders - downstream logic can filter for rows where p.payment_method IS NULL and trigger a reminder.

FULL OUTER Interval Join

SELECT
    COALESCE(o.order_id, p.order_id) AS order_id,
    o.amount,
    p.payment_method
FROM orders o
FULL OUTER JOIN payments p
ON o.order_id = p.order_id
  AND p.event_time BETWEEN o.event_time AND o.event_time + INTERVAL '15' MINUTE;

Full outer joins emit unmatched events from both sides. This is less common but useful for audit or reconciliation pipelines where you need visibility into orphaned events on either stream.

One thing to note: outer joins in Flink produce a retraction stream. When an order first arrives with no match, Flink emits (order_id, amount, NULL). If a payment later arrives within the interval, Flink retracts the previous row and emits a new (order_id, amount, payment_method). Downstream sinks need to handle retractions correctly - this works well with upsert-capable sinks like databases or compacted Kafka topics, but can be tricky with append-only sinks like S3.

Estimating and Tuning State Size

State size for interval joins follows a simple formula:

state_per_side = event_rate * interval_duration * avg_event_size

For example:

  • Order stream: 500 events/second, average 200 bytes each
  • Payment stream: 400 events/second, average 150 bytes each
  • Interval: 15 minutes (900 seconds)
Orders state:  500 * 900 * 200 = ~85 MB
Payments state: 400 * 900 * 150 = ~51 MB
Total: ~136 MB per parallel subtask

With 8 parallel subtasks, that is roughly 1 GB of state. Manageable. But if your interval is 24 hours instead of 15 minutes, multiply by 96 and you are looking at ~13 GB per subtask. That changes the conversation significantly.

Tuning Tips

Use RocksDB for large state. The HashMapStateBackend keeps everything on the JVM heap. RocksDB spills to local SSD and can handle state sizes well beyond available memory. For any join with more than a few hundred MB of state, RocksDB is the right choice.

Set state TTL as a safety net. Even with interval joins, bugs or unexpected data patterns can cause state to grow. Configure table.exec.state.ttl as an upper bound:

SET 'table.exec.state.ttl' = '3600000';  -- 1 hour in milliseconds

This ensures state is cleaned up even if watermarks stall or events arrive far out of order.

Keep intervals as tight as possible. The narrower the interval, the less state you carry. If business logic says “payments within 15 minutes,” do not set the interval to 1 hour just to be safe. You are paying for that extra state in memory, checkpoint size, and recovery time.

Monitor checkpoint duration. Growing checkpoint times are the canary in the coal mine for state problems. If checkpoints that used to take 5 seconds start taking 30, your state is growing and you need to investigate.

Multi-Service Event Correlation

A more advanced pattern is correlating events across three or more services. For example, matching a user signup event, an email verification event, and a first-purchase event:

SELECT s.user_id, s.signup_time, e.verified_at, p.first_purchase_at
FROM signups s
LEFT JOIN email_verifications e
    ON s.user_id = e.user_id
    AND e.event_time BETWEEN s.event_time AND s.event_time + INTERVAL '24' HOUR
LEFT JOIN purchases p
    ON s.user_id = p.user_id
    AND p.event_time BETWEEN s.event_time AND s.event_time + INTERVAL '7' DAY;

This correlates three streams with different time horizons: verification is expected within 24 hours, first purchase within 7 days. The state cost of the 7-day window on the purchases side is substantial, so you would want to filter the purchases stream to only first purchases before the join to reduce volume.

This kind of multi-stream correlation is where Flink really stands out. Doing this with batch queries means waiting for all the data to land in a warehouse, then running expensive multi-table joins. With Flink, you get results as events arrive.

Running Stream-to-Stream Joins on Streamkap

Getting stream-to-stream joins right in production means more than writing the SQL. You need to configure the state backend, size your cluster for the expected state, tune checkpointing intervals, and monitor for state growth over time.

Streamkap handles the operational side of this. As a managed CDC and Flink platform, Streamkap configures the RocksDB state backend, incremental checkpointing, and storage for you. Your CDC streams from databases like PostgreSQL, MySQL, or MongoDB are already flowing through Kafka - adding a Flink SQL job to join those streams is a matter of writing the query and deploying it.

The state backend configuration, checkpoint tuning, and cluster sizing are managed by the platform. If your join state grows beyond what the current allocation supports, Streamkap scales the resources. You focus on the join logic and the business problem, not on debugging RocksDB compaction or tuning JVM garbage collection.

Wrapping Up

Stream-to-stream joins are one of the most powerful - and most misused - operations in stream processing. The key insight is simple: always bound your state with time conditions. Interval joins give you the most flexibility and are the right starting point for most use cases. Windowed joins work well when you need results aligned to fixed time boundaries.

Start with the narrowest time interval your business logic allows, use RocksDB for anything beyond trivial state sizes, and monitor checkpoint durations as your early warning system. Get those fundamentals right, and stream-to-stream joins become a reliable building block rather than a source of production incidents.