<--- Back to all resources
Handling Late-Arriving Data in Stream Processing
Learn how to handle late-arriving and out-of-order data in streaming pipelines. Configure watermarks, allowed lateness, and side outputs in Flink for correct results.
Introduction
In a perfect world, every event would arrive at your stream processor the instant it occurs, in perfect chronological order. In reality, distributed systems make no such guarantees. A sensor reading stamped at 10:03:47 might not reach your Flink job until 10:08:12. A mobile app event generated during a subway ride might not sync until the user reconnects thirty minutes later. A partner’s nightly file drop contains transactions from twelve hours ago.
These late-arriving events pose a fundamental challenge: your streaming pipeline has already computed and emitted results for the time window those events belong to. Do you silently drop them and accept incorrect results? Do you hold windows open indefinitely and never emit anything? The answer lies somewhere in between, and getting it right is one of the defining skills of production stream processing.
This guide walks through the causes of late data, the mechanisms Flink provides to handle it, and the practical tradeoffs you need to make.
Causes of Late Data
Late-arriving data is not a single problem with a single cause. Understanding the sources of lateness helps you choose appropriate tolerance settings.
Network delays and partitioning. Events travel from producers through message brokers to your stream processor. Any hop can introduce latency. Kafka partition rebalances, broker restarts, or cross-region replication lag can delay delivery by seconds or minutes.
Offline sync and mobile clients. Mobile apps routinely buffer events locally when connectivity is unavailable. When the device comes back online, it flushes a batch of events whose timestamps span the entire offline period. This can produce minutes or hours of lateness.
Batch uploads and file ingestion. Many data pipelines still receive data via periodic file drops, SFTP uploads, or API polling. Events in these files carry timestamps from when they originally occurred, not when the file was processed.
Cross-timezone event collection. Systems that aggregate events from globally distributed sources may see clock skew or timezone misconfiguration that creates apparent lateness.
Reprocessing and backfill. When you replay a Kafka topic from an earlier offset to fix a bug or backfill a new pipeline, the entire replayed history arrives as “late” relative to the current watermark.
Watermarks Explained
Flink’s core mechanism for reasoning about event time is the watermark. A watermark is a special timestamp that flows through the data stream alongside regular events. It carries a declaration: “no events with a timestamp earlier than this value are expected to arrive.”
When a watermark reaches a window operator, the operator checks whether any windows have an end time at or before the watermark. If so, those windows fire and emit their results.
Watermark Strategies
The most common strategy is bounded out-of-orderness, where the watermark is set to the maximum observed event time minus a fixed tolerance:
watermark = max_event_time - tolerance
For example, with a 5-second tolerance, if the latest event has timestamp 10:05:30, the current watermark is 10:05:25. Any event with a timestamp before 10:05:25 is considered late.
In Flink SQL, you declare watermarks directly in the table DDL:
CREATE TABLE sensor_readings (
sensor_id STRING,
temperature DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sensor-readings',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
The WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND declaration tells Flink to use bounded out-of-orderness with a 5-second tolerance.
Monitoring Watermark Progress
You can inspect the current watermark inside queries using CURRENT_WATERMARK():
SELECT
sensor_id,
event_time,
CURRENT_WATERMARK(event_time) AS current_wm
FROM sensor_readings;
This is invaluable for debugging. If the watermark is significantly behind the latest event time, it usually means one partition or source is lagging and holding back the global watermark.
Allowed Lateness
Watermarks define when windows fire, but they do not define when late events are permanently dropped. Flink provides an allowed lateness parameter that keeps windows open for additional time after they fire.
When an event arrives late but within the allowed lateness period, Flink re-fires the window and emits an updated result. Downstream consumers see a second (or third, or fourth) emission for the same window, each incorporating more data.
In the DataStream API, you configure this on the window:
stream
.keyBy(event -> event.getSensorId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(10))
.aggregate(new TemperatureAggregator());
This configuration says: fire windows when the watermark passes, but keep accepting late events for an additional 10 minutes. Any event arriving more than 10 minutes after the window closed is dropped.
State Implications
Allowed lateness is not free. Flink must retain the window’s state for the entire allowed lateness duration. If you have a 5-minute tumbling window with 1 hour of allowed lateness, Flink holds state for 12 overlapping windows per key at any given time. For high-cardinality keys, this can consume significant memory and checkpoint size.
Side Outputs for Late Data
Events that arrive after both the watermark and the allowed lateness period are dropped by default. Flink provides side outputs to capture these extremely late events instead of silently discarding them.
OutputTag<SensorReading> lateTag = new OutputTag<>("late-data") {};
SingleOutputStreamOperator<WindowResult> result = stream
.keyBy(event -> event.getSensorId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(10))
.sideOutputLateData(lateTag)
.aggregate(new TemperatureAggregator());
DataStream<SensorReading> lateStream = result.getSideOutput(lateTag);
// Route late data to a dead letter topic or separate sink
lateStream.sinkTo(deadLetterSink);
The side output stream gives you a second chance. You can route these events to a dead letter queue for manual review, trigger a batch reconciliation job, or write them to a separate table for later merging.
Flink SQL Configuration
In Flink SQL, watermarks are configured in the table DDL as shown above. For lateness hints in SQL, Flink uses table hints and configuration options:
-- Set allowed lateness via table configuration
SET 'table.exec.emit.late-fire.enabled' = 'true';
SET 'table.exec.emit.late-fire.delay' = '10 min';
For windowed aggregations in Flink SQL, you define the window in the query itself:
SELECT
sensor_id,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
AVG(temperature) AS avg_temp,
COUNT(*) AS reading_count
FROM sensor_readings
GROUP BY
sensor_id,
TUMBLE(event_time, INTERVAL '5' MINUTE);
Flink SQL’s newer Table Value Function (TVF) syntax for windows provides more flexibility:
SELECT
sensor_id,
window_start,
window_end,
AVG(temperature) AS avg_temp,
COUNT(*) AS reading_count
FROM TABLE(
TUMBLE(TABLE sensor_readings, DESCRIPTOR(event_time), INTERVAL '5' MINUTES)
)
GROUP BY sensor_id, window_start, window_end;
Practical Examples
IoT Sensor Data with Network Delays
Consider a fleet of industrial sensors reporting temperature readings every second. The sensors connect over cellular networks with variable latency. Most readings arrive within 2 seconds, but network congestion can delay them by up to 30 seconds. Rare outages can cause buffering of several minutes.
A reasonable configuration:
- Watermark tolerance: 10 seconds (covers 99.9% of normal network delay)
- Allowed lateness: 2 minutes (catches cellular reconnection bursts)
- Side output: Route to a corrections topic for anything beyond 2 minutes
This means your dashboards see initial results within ~10 seconds of real time, updated results for late arrivals within 2 minutes, and a corrections pipeline for extreme outliers.
Mobile App Events with Offline Sync
A ride-sharing app generates events (ride requests, GPS pings, fare calculations) on the driver’s phone. Drivers frequently pass through tunnels or rural areas with no connectivity. Events buffer locally and sync when connectivity returns.
A configuration for this use case:
- Watermark tolerance: 30 seconds (covers normal network jitter)
- Allowed lateness: 1 hour (covers most offline periods)
- Side output: Route to a reconciliation job for multi-hour offline periods
The 1-hour allowed lateness means Flink holds substantially more state, but the business requirement for accurate fare calculations demands it.
The Correctness-Latency Tradeoff
Every late-data configuration represents a point on a three-axis tradeoff:
Correctness: How accurately do your results reflect the complete set of events? Longer allowed lateness means more events are incorporated into window results.
Latency: How quickly do you emit final results? If downstream systems need to know when a result is “done,” longer allowed lateness delays that determination.
Resource usage: How much state does Flink maintain? Longer allowed lateness means more windows held in memory and larger checkpoints.
There is no universal answer. A real-time fraud detection system might use a 5-second watermark tolerance and zero allowed lateness because speed matters more than counting every transaction. A financial reconciliation pipeline might use 1-hour allowed lateness because every cent must be accounted for.
The key is to measure and iterate. Monitor how many events arrive late, how late they are, and how many get dropped. Use this data to tune your watermark tolerance and allowed lateness settings.
Late Data in CDC Pipelines
Change Data Capture (CDC) pipelines have a distinct advantage when it comes to late-arriving data: database transaction logs are inherently ordered. The WAL (Write-Ahead Log) in PostgreSQL or the binlog in MySQL records events in strict commit order. A CDC connector reads this log sequentially, so events arrive at Kafka in the same order they were committed.
This means CDC-sourced streams rarely suffer from the out-of-order delivery that plagues IoT or mobile data. However, there are edge cases:
Cross-source joins. When you join a CDC stream from PostgreSQL with a CDC stream from MongoDB, the two sources have independent progress rates. One source might lag behind the other due to snapshot phase, network differences, or varying transaction volumes. Events from the faster source appear “early” relative to the slower one, which is functionally equivalent to the slower source’s events arriving late.
Network and broker delays. Even though the source database logs are ordered, network partitions or Kafka broker issues between the CDC connector and Flink can introduce delivery delays.
Reprocessing from earlier offsets. If you reset a CDC connector to an earlier position in the transaction log (for backfill or bug fix), the replayed events carry historical timestamps and arrive as massively late data relative to the current watermark.
Streamkap’s managed Flink platform accounts for these CDC-specific patterns, configuring watermark strategies that respect the naturally ordered nature of transaction logs while providing appropriate tolerance for cross-source join scenarios.
Testing Late Data Handling
Verifying that your pipeline correctly handles late data requires deliberate simulation. You cannot rely on production behavior alone because late events are, by definition, edge cases that may not surface during initial testing.
Generating Controlled Late Events
Write a test producer that emits events with specific timestamps:
// Emit events in order, then inject a late event
producer.send("sensor-1", timestamp("10:00:01"), 22.5);
producer.send("sensor-1", timestamp("10:00:02"), 22.6);
producer.send("sensor-1", timestamp("10:00:03"), 22.7);
// Advance time past the window boundary
producer.send("sensor-1", timestamp("10:06:00"), 23.0);
// Now send a late event for the first window
producer.send("sensor-1", timestamp("10:00:04"), 22.8);
Verifying Window Updates
For windows with allowed lateness, verify that the downstream sink receives multiple emissions for the same window:
- First emission at watermark crossing: result includes events at 10:00:01, 10:00:02, 10:00:03
- Second emission after late arrival: result now includes the 10:00:04 event
Your test should assert that both emissions occur and that the second emission has the correct updated aggregate.
Verifying Side Output Capture
Send an event that arrives after the allowed lateness period and verify it appears in the side output sink rather than being silently dropped:
// With 10-minute allowed lateness on a [10:00-10:05) window,
// send an event at 10:16 for timestamp 10:03 - beyond the grace period
producer.send("sensor-1", timestamp("10:16:00"), 24.0); // advance watermark
producer.send("sensor-1", timestamp("10:03:00"), 22.9); // extremely late
// Assert this event appears in the dead letter topic, not in the window result
Checkpoint and Recovery Testing
Late data handling interacts with Flink’s checkpointing. After a failure and recovery, Flink restores window state from the latest checkpoint. Verify that:
- Windows within the allowed lateness period are correctly restored and still accept late events
- Side output routing continues to function after recovery
- Watermark progress resumes correctly from the restored state
Summary
Late-arriving data is not a bug to be fixed but a reality to be managed. The tools Flink provides - watermarks, allowed lateness, and side outputs - give you precise control over the tradeoff between correctness, latency, and resource consumption. The right configuration depends on your data sources, business requirements, and tolerance for approximate results. Measure your actual lateness distribution, configure your pipeline accordingly, and always capture dropped events for reconciliation. With platforms like Streamkap that manage these configurations automatically for common source types, you can focus on getting the business logic right rather than wrestling with infrastructure tuning.