<--- Back to all resources
Flink Watermarks and Event Time: Handling Out-of-Order Events
Master Flink watermarks and event time processing. Learn how watermarks track progress, handle out-of-order data, and configure watermark strategies for production.
Stream processing frameworks face a fundamental challenge: events do not arrive in the order they were created. A click that happened at 10:00:01 might reach your processing engine at 10:00:05, after a click from 10:00:03 has already been processed. Network delays, distributed producers, and retries all contribute to this disorder. Without a mechanism to reason about completeness in the face of out-of-order data, your aggregations, window computations, and joins will produce incorrect results.
Apache Flink solves this problem with two core concepts: event time and watermarks. Event time lets you process data based on when it was actually created rather than when it arrived. Watermarks provide the mechanism for Flink to know when it has seen “enough” data to produce correct results. Together, they form the foundation of correct, deterministic stream processing.
Event Time vs Processing Time
Flink supports two primary notions of time:
- Event time: The timestamp embedded in the event itself, representing when the event actually occurred at the source.
- Processing time: The wall-clock time of the machine executing the operation, representing when the event is being processed.
Processing time is simpler. You do not need watermarks, and results are produced with minimal latency. However, processing time makes your results non-deterministic. If you replay the same data through the same pipeline, you will get different results because the processing timestamps will differ. Processing time also cannot handle out-of-order events correctly because it has no concept of “when the event really happened.”
Event time is the right choice whenever you need:
- Deterministic results: Reprocessing historical data produces identical output.
- Correctness with out-of-order data: Events are placed into the correct windows regardless of arrival order.
- Reproducibility: The same input always yields the same output, which is critical for debugging and auditing.
The trade-off is that event time requires watermarks to function, and watermarks introduce a latency-completeness tension that you must configure based on your use case.
How Watermarks Work
A watermark is a special timestamped element that flows through the data stream alongside regular events. It carries a simple assertion: no events with a timestamp less than or equal to this watermark value will arrive after this point. When a window operator receives a watermark that exceeds the window’s end time, it knows that all events for that window have arrived and it is safe to close the window and emit results.
Watermark Generation
Watermarks are generated at the source operators. As each event arrives, the watermark generator observes the event’s timestamp and decides whether to advance the watermark. The most common approach is to track the maximum observed event time and subtract a tolerance:
watermark = max_observed_event_time - allowed_out_of_orderness
For example, if the maximum event time you have seen so far is 10:00:30 and your tolerance is 5 seconds, the current watermark is 10:00:25. This tells downstream operators: “You can safely assume all events up to 10:00:25 have arrived.”
Watermark Propagation
Once generated, watermarks flow downstream through the operator graph just like regular events. At each operator, Flink maintains the following rule: an operator’s output watermark is the minimum of all its input watermarks. This is critical because it ensures that no operator advances its notion of time beyond what its slowest input can guarantee.
In a parallel pipeline with multiple source partitions, the effective watermark at any downstream operator is the minimum across all upstream partitions. This means a single slow or stalled partition can hold back the entire pipeline’s progress.
Watermark Strategies
Flink provides several built-in watermark strategies and allows you to implement custom generators.
Bounded-Out-of-Orderness
This is the most widely used strategy in production. It assumes that events can be delayed by at most a fixed duration:
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
The watermark is always max_event_time - 5 seconds. Choosing the right tolerance is a trade-off:
- Too small: You close windows before all events arrive, dropping late data.
- Too large: You wait unnecessarily long before producing results, increasing latency.
For most CDC pipelines, where events are generally well-ordered, a tolerance of 1 to 5 seconds works well. For user-generated events from mobile devices with unreliable connectivity, you might need 30 seconds or more.
Monotonously Increasing Timestamps
A special case of bounded-out-of-orderness with zero tolerance. Use this when events arrive perfectly in order:
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
This is appropriate for single-partition Kafka topics where the producer writes events in timestamp order, or for CDC sources that emit changes in commit order.
Custom Watermark Generators
For advanced scenarios, you can implement a WatermarkGenerator directly:
public class CustomWatermarkGenerator implements WatermarkGenerator<Event> {
private long maxTimestamp = Long.MIN_VALUE;
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Emit watermark every 200ms (default interval)
output.emitWatermark(new Watermark(maxTimestamp - 5000));
}
}
Custom generators let you incorporate domain knowledge. For instance, you might advance the watermark more aggressively during high-throughput periods and more conservatively during low-throughput periods.
Flink SQL Watermark Configuration
In Flink SQL, watermarks are defined directly in the table DDL using the WATERMARK FOR clause:
CREATE TABLE orders (
order_id STRING,
amount DECIMAL(10, 2),
event_time TIMESTAMP(3),
-- Define watermark: allow 5 seconds of out-of-orderness
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
The WATERMARK FOR column AS expression syntax tells Flink which column carries the event time and how to derive watermarks from it. Common patterns:
-- Strict ascending (no out-of-orderness)
WATERMARK FOR event_time AS event_time
-- Bounded out-of-orderness
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
-- Using a TIMESTAMP_LTZ column (with timezone)
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
Once declared, any windowed operation on this table automatically uses event time:
SELECT
window_start,
window_end,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;
Idle Sources
The idle source problem is one of the most common issues in production Flink deployments. It occurs when one or more source partitions stop producing events while others continue.
The Problem
Consider a Kafka topic with 8 partitions. Seven partitions produce events continuously, but partition 3 has gone quiet - maybe its producer is down or its data just comes in bursts. Partition 3’s watermark stops advancing at, say, 10:00:00. Since Flink takes the minimum watermark across all partitions, the global watermark stays stuck at 10:00:00 even though the other partitions have progressed to 10:05:00. No windows close. No results are emitted. Your pipeline appears frozen.
The Solution: withIdleness()
Flink provides a built-in mechanism to handle this:
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(1));
After a source partition has not emitted any events for 1 minute, Flink marks it as idle and excludes it from the minimum watermark calculation. The global watermark advances based on the active partitions only. When the idle partition starts producing events again, it is automatically reactivated.
Monitoring Idle Partitions
Keep an eye on the currentInputWatermark metric for each subtask. If one subtask’s watermark lags far behind the others, you have either an idle source or a data skew problem. Alerts on watermark lag are one of the most valuable monitoring signals for any Flink deployment.
Watermark Propagation in Multi-Stream Pipelines
When your job consumes from multiple streams (for example, joining an orders stream with a shipments stream), watermark propagation becomes more nuanced.
Each input stream has its own watermarks. An operator that receives from multiple inputs (like a join operator) advances its watermark to the minimum of all input watermarks. This means:
- If your orders stream is at watermark
10:05:00and your shipments stream is at watermark10:02:00, the join operator’s watermark is10:02:00. - A slow or idle stream in a multi-way join will hold back all downstream processing.
This minimum-watermark rule is fundamental to correctness, but it requires you to ensure that all input streams advance at roughly the same rate. If one stream is significantly slower, consider buffering or using different watermark strategies per source.
Watermark Alignment
Flink 1.17 introduced watermark alignment to prevent one fast source from getting too far ahead of a slow source, which can cause excessive buffering and memory pressure:
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withWatermarkAlignment("alignment-group", Duration.ofSeconds(20));
This pauses consumption from sources that are more than 20 seconds ahead of the slowest source in the alignment group, keeping memory usage bounded.
Late Data Handling
Even with well-configured watermarks, some events will arrive after the watermark has passed their window. These are late events. Flink provides two mechanisms for handling them.
Allowed Lateness
You can configure a window to accept late events for a period after the watermark has passed the window’s end time. The window fires its normal result when the watermark passes, then fires updated results as late events arrive:
stream
.keyBy(event -> event.getKey())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.minutes(5))
.aggregate(new MyAggregateFunction());
The window state is retained for 5 minutes after the watermark passes the window end. Any event that falls into that window during this grace period triggers a re-computation.
Side Outputs for Late Events
For events that arrive even after the allowed lateness period, you can redirect them to a side output for separate processing:
OutputTag<Event> lateTag = new OutputTag<Event>("late-events") {};
SingleOutputStreamOperator<Result> result = stream
.keyBy(event -> event.getKey())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.minutes(5))
.sideOutputLateData(lateTag)
.aggregate(new MyAggregateFunction());
DataStream<Event> lateEvents = result.getSideOutput(lateTag);
// Write late events to a dead-letter topic or separate table
This ensures no data is silently dropped. You can audit, reprocess, or alert on late events as needed.
Practical Example: Watermarks for a CDC Pipeline
Consider a change data capture pipeline that streams database changes from PostgreSQL through Kafka into a real-time analytics system. Each CDC event carries a ts_ms field representing the database commit timestamp.
// Define the watermark strategy for CDC events
WatermarkStrategy<CdcEvent> strategy = WatermarkStrategy
.<CdcEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((event, timestamp) -> event.getTsMs())
.withIdleness(Duration.ofMinutes(2));
// Create the source with watermarks
DataStream<CdcEvent> changes = env
.fromSource(kafkaSource, strategy, "cdc-source");
// Compute per-minute aggregations using event time
changes
.keyBy(event -> event.getTableName())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10))
.process(new ChangeAggregator());
For CDC sources, a 2-second out-of-orderness tolerance is typically sufficient because database commit timestamps are generated by a single source (the database) and are generally well-ordered. The withIdleness of 2 minutes handles tables that receive infrequent updates. Platforms like Streamkap configure these watermark strategies automatically based on source characteristics, handling idle partition detection and watermark monitoring out of the box so you can focus on your processing logic.
Debugging Watermarks
When your pipeline is not producing results or windows are not closing, watermarks are almost always the first place to look.
The currentWatermark Metric
Flink exposes a currentInputWatermark metric on every operator. This is the single most important metric for debugging time-related issues:
- If the watermark is not advancing, check for idle sources or stalled partitions.
- If the watermark is advancing but windows are not closing, check that the watermark has actually exceeded the window end time.
- If the watermark jumps forward suddenly after being stalled, you likely had an idle source that resumed.
Common Issues and Fixes
Watermark stuck at Long.MIN_VALUE: No events have arrived yet, or the timestamp assigner is returning incorrect values. Verify that your timestamp extraction logic returns milliseconds since epoch.
Watermark advances but windows never fire: Your out-of-orderness tolerance might be larger than your window size. If you have a 1-minute window with a 2-minute tolerance, the watermark will always be behind the window end time until 2 minutes after the last event in the window.
Inconsistent results on replay: Check that you are using event time, not processing time. Also verify that your watermark strategy is deterministic and does not depend on wall-clock time.
One subtask lagging behind others: Data skew across Kafka partitions. One partition has significantly more data or significantly older data. Consider repartitioning or adjusting your key distribution.
Memory growing unbounded: With allowed lateness, Flink retains window state for longer. If your lateness is large relative to your window size, state can accumulate. Monitor state size and consider whether you truly need a long allowed lateness or whether side outputs for late data would be more appropriate.
Watermark Monitoring in Production
Set up alerts on two key signals:
- Watermark lag: The difference between the current wall-clock time and the current watermark. If this grows, your pipeline is falling behind.
- Watermark advancement rate: If the watermark stops advancing for more than a few minutes, something is wrong.
Streamkap provides built-in watermark monitoring for CDC pipelines, alerting you to stalled watermarks and idle partitions before they impact downstream consumers.
Getting watermarks right is the difference between a stream processing pipeline that produces correct results and one that silently drops data or never emits output. Start with bounded-out-of-orderness, configure idleness handling, monitor watermark advancement, and use side outputs to capture any data that still arrives late. With these patterns in place, your event-time pipeline will be both correct and observable.