<--- Back to all resources
Flink Checkpointing Explained: How Fault Tolerance Actually Works
Understand how Flink checkpointing provides fault tolerance and exactly-once semantics. Learn checkpoint internals, configuration, troubleshooting, and production tuning.
Every streaming system eventually faces the same hard question: what happens when something crashes? A server fails, a network partition isolates a task manager, or an out-of-memory error kills a running operator. Without a recovery mechanism, the job restarts from scratch, reprocessing hours of data and producing duplicate results downstream. Flink’s answer is checkpointing - periodic, consistent snapshots of the entire job state that allow recovery to a known-good point instead of starting over.
Checkpointing underpins exactly-once processing semantics, enables smooth failover, and allows jobs to run continuously for months without human intervention. Understanding how it works is essential for anyone building or operating Flink pipelines.
The Chandy-Lamport Algorithm: How Flink Takes Consistent Snapshots
Flink’s checkpointing is built on a variant of the Chandy-Lamport distributed snapshot algorithm. The core challenge it solves is deceptively hard: how do you take a consistent snapshot of a distributed system where dozens of parallel operators are processing data simultaneously, without stopping the entire pipeline?
The answer is checkpoint barriers. When the JobManager triggers a checkpoint, it injects special barrier markers into the data stream at the source operators. These barriers flow through the dataflow graph alongside regular records, carried by the same channels. They act as dividers - everything before the barrier belongs to checkpoint N, and everything after belongs to checkpoint N+1.
Barrier Alignment
When a multi-input operator (like a join or a union) receives a barrier on one of its input channels, it pauses consumption on that channel and continues processing records from other channels that have not yet delivered their barrier. Once barriers have arrived on all input channels - this is called barrier alignment - the operator snapshots its state and forwards the barrier downstream.
This alignment process creates a consistent cut across the distributed system. At the moment every operator has aligned its barriers and snapshotted its state, the checkpoint represents a single, coherent point in the data stream. No record is counted twice, and no record is missed.
Source-1 ──── [record] [record] | barrier | [record] ────> Operator
Source-2 ──── [record] | barrier | [record] [record] ────> Operator
│
Barrier alignment ────┘
Snapshot state
Forward barrier
Processing never fully stops. Only the channels that have already delivered their barrier are paused, and only until the remaining channels catch up. In a well-tuned pipeline, alignment takes milliseconds.
What Gets Checkpointed
A Flink checkpoint captures four categories of state, and all four must be consistent for recovery to work correctly.
Operator State
Operator state is local to a single parallel instance of an operator. Examples include the list of Kafka partition assignments in a source, or a buffer of records waiting to be flushed in a sink. This state is not partitioned by key - it belongs to the operator instance itself.
Keyed State
Keyed state is partitioned by the record key and is by far the most common type. Window contents, aggregation accumulators, join buffers, and any value you store in a ValueState, ListState, or MapState handle falls into this category. When Flink rescales a job (changing parallelism), keyed state is redistributed across operators based on key groups.
Source Offsets
For exactly-once semantics, the checkpoint must record exactly where each source was reading. For a Kafka source, this means the committed offset for every assigned partition. On recovery, the source rewinds to these saved offsets and replays from that point forward.
Sink Pre-Commits
Two-phase commit sinks (like the Kafka sink with exactly-once delivery) write data to an external system during normal processing but only commit that data when a checkpoint completes. The checkpoint stores the pre-commit transaction identifiers so that on recovery, uncommitted transactions can be rolled back and replayed cleanly.
Checkpoint Storage: Where State Lives
Flink supports multiple state backends, each with different trade-offs for performance and scalability.
HashMapStateBackend stores state on the JVM heap of each TaskManager. It is fast for small state sizes but limited by available memory. Checkpoints are serialized and written to a configured checkpoint storage location.
EmbeddedRocksDBStateBackend stores state in RocksDB, an embedded key-value store that spills to local disk. This backend can handle state far larger than available memory, making it the default choice for production workloads with significant state.
Regardless of which backend manages live state, the checkpoint snapshots are written to durable storage. In production, this is almost always a distributed filesystem or object store:
- S3 (or S3-compatible storage like MinIO)
- GCS (Google Cloud Storage)
- HDFS (Hadoop Distributed File System)
- Azure Blob Storage
// Configuring RocksDB state backend with S3 checkpoint storage
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"s3://my-bucket/flink-checkpoints/"
);
The state backend handles live read/write performance; the checkpoint storage handles durability and recovery. Choosing the right combination is one of the most consequential decisions in a Flink deployment.
Incremental Checkpoints: Reducing I/O at Scale
For jobs with large state - tens or hundreds of gigabytes - full checkpoints become expensive. Every checkpoint cycle would serialize and upload the entire state, consuming massive I/O bandwidth and slowing down processing.
RocksDB’s LSM-tree architecture enables incremental checkpoints. Instead of uploading all state on every checkpoint, Flink tracks which RocksDB SST (Sorted String Table) files have changed since the last checkpoint and uploads only the deltas. Unchanged files are referenced by pointer, not re-uploaded.
// Enable incremental checkpoints with RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = incremental
The impact is dramatic. A job with 100 GB of state where only 2 GB changes between checkpoints goes from uploading 100 GB to uploading 2 GB per cycle. For any production job with non-trivial state, incremental checkpoints should be the default.
The trade-off is slightly more complex recovery. Flink must reconstruct full state from a chain of incremental snapshots. Periodic savepoints help keep this chain manageable.
Configuration Deep Dive
Six parameters control checkpoint behavior. Getting these right is the difference between a pipeline that runs for months and one that fails every few hours.
execution.checkpointing.interval
How often Flink triggers a new checkpoint. This is the single most important setting. A 60-second interval means that on failure, you replay at most 60 seconds of data. A 5-minute interval means up to 5 minutes of replay.
execution.checkpointing.timeout
The maximum time a checkpoint is allowed to take before Flink aborts it. If your checkpoints regularly approach this limit, your state is too large, your storage is too slow, or backpressure is preventing barrier alignment. Default is 10 minutes.
execution.checkpointing.min-pause
The minimum time between the end of one checkpoint and the start of the next. This prevents checkpoint storms where a slow checkpoint triggers the next one immediately. If your checkpoints take 40 seconds and your interval is 60 seconds, a min-pause of 30 seconds ensures at least 30 seconds of uninterrupted processing between checkpoints.
execution.checkpointing.max-concurrent-checkpoints
How many checkpoints can be in progress simultaneously. The default of 1 is almost always correct. Concurrent checkpoints add complexity and can cause resource contention. Only increase this if you have a specific, well-understood reason.
execution.checkpointing.tolerable-failed-checkpoints
How many consecutive checkpoint failures Flink tolerates before failing the job. Setting this to 0 (the default) means any checkpoint failure kills the job. In production, a value of 3-10 gives the system room to recover from transient issues like a brief network hiccup or a temporary spike in state size.
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointInterval(60_000); // 60 seconds
config.setCheckpointTimeout(300_000); // 5 minutes
config.setMinPauseBetweenCheckpoints(30_000); // 30 seconds
config.setMaxConcurrentCheckpoints(1);
config.setTolerableCheckpointFailureNumber(5);
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Start with a 60-second interval, 5-minute timeout, 30-second minimum pause, single concurrent checkpoint, and 3-5 tolerable failures. Tune from there based on observed metrics.
Unaligned Checkpoints: Handling Backpressure
Standard barrier alignment has an Achilles’ heel: backpressure. When a downstream operator is slow, records pile up in network buffers. Barriers get stuck behind these queued records, and alignment stalls. A checkpoint that should take seconds can take minutes - or time out entirely.
Flink 1.11 introduced unaligned checkpoints to solve this. Instead of waiting for barriers to align across all input channels, an operator immediately snapshots its state when the first barrier arrives. The in-flight records sitting in network buffers - the ones between barriers on different channels - are captured as part of the checkpoint.
config.enableUnalignedCheckpoints();
Checkpoint duration becomes largely independent of backpressure because barriers no longer wait for slow channels.
The trade-off is larger checkpoint sizes. In-flight buffer records become part of the snapshot, increasing data written to storage. Unaligned checkpoints also do not support concurrent checkpoints.
Use unaligned checkpoints when your pipeline experiences frequent or severe backpressure and checkpoint timeouts are a recurring problem. If your pipeline runs smoothly without backpressure, aligned checkpoints remain the better choice due to their smaller snapshot sizes.
Troubleshooting Checkpoints
When checkpoints misbehave, the symptoms are usually one of three things: slow checkpoints, failed checkpoints, or unexpectedly growing checkpoint sizes.
Slow Checkpoints
The first place to look is barrier alignment duration. If alignment takes a significant fraction of the total checkpoint time, backpressure is the culprit. Check the Flink web UI’s backpressure tab to identify which operators are slow. Consider enabling unaligned checkpoints or scaling up the bottleneck operator’s parallelism.
If alignment is fast but the sync phase (the time spent actually writing state) is slow, your state backend or checkpoint storage is the bottleneck. For RocksDB, check whether incremental checkpoints are enabled. For S3 storage, check network throughput and whether you are hitting request rate limits.
Failed Checkpoints
Checkpoint timeouts are the most common failure mode. Increase the timeout if the checkpoint is genuinely progressing but just needs more time. If checkpoints fail due to task failures, look at the TaskManager logs for out-of-memory errors or other exceptions. Asymmetric state distribution - where one subtask holds far more state than others - can cause a single slow subtask to drag the entire checkpoint past the timeout.
Growing Checkpoint Size
State growth over time usually indicates a leak in your application logic. Common causes include windows that never close, join state that accumulates without TTL, or keyed state that grows unboundedly because keys are never cleaned up. Configure state TTL to automatically expire stale entries:
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("myState", String.class);
descriptor.enableTimeToLive(ttlConfig);
Practical Example: Configuring Checkpointing for a CDC Pipeline
Change data capture (CDC) pipelines are one of the most common Flink workloads. A typical CDC pipeline reads database changes from a source like Debezium via Kafka, applies transformations or joins, and writes to a destination like a data warehouse or search index. Here is how to configure checkpointing for this use case.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// State backend: RocksDB with incremental checkpoints for large state
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
// Checkpoint storage: S3
env.getCheckpointConfig().setCheckpointStorage("s3://pipeline-state/cdc-checkpoints/");
// Checkpoint configuration
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setCheckpointInterval(60_000); // 1 minute
config.setCheckpointTimeout(600_000); // 10 minutes (CDC state can be large)
config.setMinPauseBetweenCheckpoints(30_000); // 30 seconds breathing room
config.setMaxConcurrentCheckpoints(1);
config.setTolerableCheckpointFailureNumber(3);
// Retain checkpoints on cancellation for manual recovery
config.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// Enable unaligned checkpoints if backpressure is expected
// config.enableUnalignedCheckpoints();
CDC pipelines often carry significant keyed state - especially when performing joins between multiple database tables. The 10-minute timeout accounts for large state uploads during initial backfill or after schema changes. Retaining checkpoints on cancellation allows you to restart the job from a known offset without reprocessing the entire changelog.
Platforms like Streamkap handle this configuration automatically for managed CDC pipelines, tuning checkpoint intervals and state backend settings based on observed pipeline characteristics and alerting when checkpoint health degrades.
Monitoring Checkpoint Health
Flink exposes detailed checkpoint metrics through its REST API and web UI. These should be integrated into your observability stack.
Checkpoint duration is the total time from trigger to completion. Track the p50 and p99 values. A steady increase signals growing state or emerging backpressure.
Checkpoint size (in bytes) tells you how much data is being uploaded per cycle. For incremental checkpoints, monitor both the incremental size and the full checkpoint size to understand the delta ratio.
Barrier alignment duration reveals how long operators spend waiting for barriers to arrive on all input channels. High alignment duration is the clearest signal of backpressure affecting checkpoints.
Failed checkpoint count and failure rate should trigger alerts. A single failed checkpoint is usually harmless, but a pattern of failures indicates a systemic problem that will eventually cause job failure.
# Key metrics to monitor (available via Flink REST API)
jobmanager.checkpoint.lastCheckpointDuration
jobmanager.checkpoint.lastCheckpointSize
jobmanager.checkpoint.numberOfFailedCheckpoints
jobmanager.checkpoint.numberOfCompletedCheckpoints
task.checkpointAlignmentTime
Set up alerts for checkpoint duration exceeding 50% of your interval, consecutive checkpoint failures, and checkpoint size growth beyond your expected rate. These three alerts catch the vast majority of issues before they become job-killing problems.
Checkpointing transforms Flink from a fast stream processor into a reliable one. It is not something you configure once and forget. As state grows, throughput increases, and new operators are added, your checkpoint configuration must evolve alongside them. The teams that invest in understanding and monitoring their checkpoints are the ones whose pipelines run for months without intervention.