<--- Back to all resources
Flink Exactly-Once Semantics: How It Works End-to-End
Understand how Flink achieves exactly-once processing end-to-end - from source to sink. Learn the two-phase commit protocol, checkpoint coordination, and sink requirements.
When you hear “exactly-once semantics,” it sounds almost too good to be true. In distributed systems, where machines crash, networks partition, and processes restart without warning, guaranteeing that every record is processed exactly once seems like an impossibility theorem. And in strict terms, it is - no system can ensure a record is physically touched only once. But Apache Flink achieves something just as valuable: it guarantees that the observable effect of each record on the output is as if it was processed exactly once. No duplicates. No data loss. This is the foundation of data correctness in streaming pipelines, and understanding how it works is essential for anyone building real-time data infrastructure.
The Three Components of End-to-End Exactly-Once
Flink’s exactly-once guarantee is not a single mechanism. It is a coordinated contract between three components, and all three must participate for the guarantee to hold end-to-end.
1. A replayable source. The source must be able to re-read records from a specific position. Kafka is the canonical example - its consumer offset mechanism allows Flink to rewind and replay data from any committed offset after a failure.
2. Checkpoint-based state recovery. Flink periodically takes consistent snapshots of all operator state and source positions. On failure, the entire job restores from the last successful checkpoint and replays from that exact point.
3. A transactional or idempotent sink. The sink must either support transactions (so partial writes can be rolled back) or idempotent writes (so replayed records produce the same result). Without this, recovery replay will produce duplicates in the external system.
Remove any one of these three components, and the guarantee degrades. A non-replayable source means data loss on failure. Missing checkpoints means inconsistent recovery. A non-transactional, non-idempotent sink means duplicates.
Checkpointing for Exactly-Once
At the heart of Flink’s exactly-once semantics is its checkpointing mechanism, which is based on the Chandy-Lamport algorithm for consistent distributed snapshots.
How Checkpoint Barriers Work
The JobManager periodically injects checkpoint barriers into the data streams at the source operators. These barriers flow through the dataflow graph alongside regular records. When an operator receives a barrier from all its input channels, it snapshots its current state and forwards the barrier downstream.
Source → [barrier n] → Map → [barrier n] → KeyBy → [barrier n] → Sink
The critical property is barrier alignment: when an operator has multiple input channels, it must wait for the barrier to arrive on all channels before taking its snapshot. Records arriving on a channel after the barrier (belonging to checkpoint n+1) are buffered until all barriers arrive. This ensures the snapshot represents a consistent cut across the entire dataflow.
Recovery From a Checkpoint
When a failure occurs, Flink restores the state of every operator from the last completed checkpoint. Source operators reset their read positions (for example, Kafka consumer offsets) to the positions stored in that checkpoint. Processing resumes from that exact point, replaying all records that arrived after the checkpoint.
Checkpoint n completed → records a, b, c processed → FAILURE
Recovery: restore state from checkpoint n → replay a, b, c
Because the state is restored to the exact moment the checkpoint was taken, and the source replays from the same position, the replayed processing produces the same results. The key insight is that while records are processed more than once during recovery, the combination of state restoration and source replay means the output is identical to what a failure-free execution would have produced.
Two-Phase Commit Protocol
Checkpointing alone guarantees exactly-once within Flink’s internal state. But data pipelines do not end inside Flink - they write to external systems. This is where the two-phase commit (2PC) protocol bridges the gap.
Phase 1: Pre-Commit
When a sink operator receives a checkpoint barrier, it performs a pre-commit. For a transactional sink, this means flushing all buffered records to the external system within an open transaction, but not committing that transaction. The transaction ID is stored as part of the checkpoint state.
Phase 2: Commit on Success
Once the JobManager confirms that all operators have successfully completed their checkpoint snapshots, it notifies the sink operators. Each sink then commits its pending transaction. Only at this point do the records become visible to downstream consumers.
Abort on Failure
If the checkpoint fails (any operator fails during snapshotting), the pending transactions are aborted. No partial data is committed. On recovery, the sink starts a new transaction from the restored checkpoint state.
// Simplified 2PC sink lifecycle
public class ExactlyOnceSink extends TwoPhaseCommitSinkFunction<Record, Transaction, Void> {
@Override
protected Transaction beginTransaction() {
return database.beginTransaction();
}
@Override
protected void invoke(Transaction txn, Record value, Context context) {
txn.write(value);
}
@Override
protected void preCommit(Transaction txn) {
txn.flush(); // Ensure all data is written, but don't commit
}
@Override
protected void commit(Transaction txn) {
txn.commit(); // Checkpoint succeeded - make data visible
}
@Override
protected void abort(Transaction txn) {
txn.rollback(); // Checkpoint failed - discard pending writes
}
}
The two-phase commit creates a tight coupling between Flink’s checkpoint lifecycle and the external system’s transaction lifecycle. This is what makes end-to-end exactly-once possible.
Kafka End-to-End Exactly-Once
Kafka is Flink’s most battle-tested exactly-once integration. The combination of Kafka’s consumer offsets (replayable source) and transactional producer API (transactional sink) creates a complete exactly-once pipeline.
Transactional Producers
The FlinkKafkaProducer (or the newer KafkaSink) with Semantic.EXACTLY_ONCE uses Kafka’s transactional producer. Each checkpoint cycle maps to a Kafka transaction: records are written within the transaction during processing, pre-committed at the barrier, and committed when the checkpoint completes.
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("broker:9092")
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-job-")
.setRecordSerializer(/* ... */)
.build();
Read-Committed Consumers
For the guarantee to reach downstream consumers, those consumers must use isolation.level=read_committed. This setting ensures they only see records from committed transactions. Without it, consumers would read uncommitted records from in-progress transactions, breaking the exactly-once chain.
# Downstream consumer configuration
isolation.level=read_committed
One important consideration: with transactional Kafka sinks, output records are only visible after the checkpoint commits. This adds output latency equal to the checkpoint interval. A 30-second checkpoint interval means downstream consumers see data with up to 30 seconds of additional delay.
JDBC and Database Sinks
Database sinks achieve exactly-once through two distinct approaches, depending on the database’s capabilities and the use case requirements.
Idempotent Writes via UPSERT
The most common and often simplest approach is idempotent upserts. If every record carries a natural key (or a deterministic key derived from the source data), the sink can use INSERT ... ON CONFLICT UPDATE (PostgreSQL) or MERGE (SQL Server) to write records. Replayed records during recovery simply overwrite themselves:
INSERT INTO target_table (id, value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at;
This approach avoids the complexity of two-phase commit entirely. The trade-off is that it requires a meaningful primary key and the write operation must be deterministic.
Two-Phase Commit with Database Transactions
For append-only workloads or when idempotent keys are not available, the JDBC sink can use database transactions in a two-phase commit pattern. Records are written within a database transaction, which is held open until the checkpoint completes. This mirrors the general 2PC protocol described above, using the database’s native transaction mechanism.
The risk here is long-running transactions. If the checkpoint interval is 60 seconds, the database transaction stays open for up to 60 seconds, which can cause lock contention and connection pool exhaustion. This is why idempotent upserts are generally preferred for database sinks.
File Sinks
File-based sinks (writing to S3, HDFS, or local filesystems) use a different commit protocol because files do not support transactions in the traditional sense.
The Staging Protocol
The FileSink writes records to in-progress files during normal processing. When a checkpoint barrier arrives, the current in-progress file is closed and moved to a pending state. When the checkpoint completes, pending files are committed - typically by renaming them from a staging directory to their final location.
Processing: /output/.in-progress/part-0-1
Pre-commit: /output/.pending/checkpoint-42/part-0-1
Committed: /output/part-0-1
On failure recovery, any in-progress or pending files from incomplete checkpoints are discarded. Only committed files survive, ensuring no partial or duplicate data appears in the output directory.
The “Effectively Once” Nuance
It is important to understand what exactly-once does not mean. It does not mean each record is physically processed exactly once. During recovery, records between the last checkpoint and the failure point are replayed and processed again. Operator side effects that occur during this replay happen again - log messages are written again, counters increment again, external API calls fire again.
What Flink guarantees is that the observable state - the contents of Flink’s managed state and the data written to exactly-once sinks - reflects a world where each record was processed once. This is why some practitioners prefer the term “effectively once.”
This distinction matters for operators with external side effects. If your Flink job sends an HTTP request for each record, recovery will send duplicate requests. Exactly-once only covers what Flink can control: its own state and sinks that participate in the checkpoint protocol.
Performance Implications
Exactly-once semantics are not free. Understanding the costs helps you make informed decisions about when to use them.
Checkpoint Interval and Latency
The checkpoint interval directly affects output latency for transactional sinks. With Kafka exactly-once, data is invisible to read_committed consumers until the next checkpoint commits. A 30-second interval means 30 seconds of additional output latency on top of processing time. Shortening the interval reduces latency but increases checkpoint coordination overhead.
Barrier Alignment Overhead
Aligned checkpoints require buffering records on faster channels while waiting for barriers on slower channels. This can increase memory pressure and introduce processing stalls. Flink 1.11 introduced unaligned checkpoints to mitigate this - barriers can overtake records, and in-flight records are stored as part of the checkpoint state. Unaligned checkpoints reduce checkpoint duration at the cost of larger checkpoint sizes.
Throughput Impact
For most workloads, the throughput overhead of exactly-once compared to at-least-once is in the range of 5-15%. The cost comes from barrier alignment buffering, state snapshotting, and transaction coordination. For high-throughput pipelines, this overhead can be significant in absolute terms, but it is typically acceptable given the data correctness guarantees.
Platforms like Streamkap tune these parameters automatically - adjusting checkpoint intervals, state backend configurations, and sink commit strategies to balance latency and throughput for each pipeline’s specific requirements.
When At-Least-Once Is Sufficient
Exactly-once is not always the right choice. In many real-world scenarios, at-least-once delivery combined with an idempotent sink provides the same end-to-end correctness guarantee with lower overhead.
Consider a pipeline that reads CDC events from a database and writes them to a data warehouse using keyed upserts. The sink is inherently idempotent - writing the same record twice produces the same result. In this case, Flink’s at-least-once mode (which skips barrier alignment and does not require transactional sinks) delivers the same observable outcome with lower checkpoint overhead and zero transaction commit latency.
At-least-once is sufficient when:
- The sink is idempotent - upserts, key-value stores with put semantics, or deduplicating consumers.
- Duplicates are acceptable - analytics pipelines where approximate counts are fine.
- Latency is critical - the transaction commit delay of exactly-once is unacceptable.
At-least-once is not sufficient when:
- The sink is append-only and non-idempotent - inserting into a table without a unique key, where duplicates accumulate.
- Financial accuracy is required - billing, ledger entries, or any system where duplicates have real-world consequences.
- Downstream consumers cannot deduplicate - when the consumer has no way to identify or discard duplicate records.
Streamkap pipelines use this principle by defaulting to idempotent writes at the destination layer. By coordinating Kafka consumer offsets with deterministic upsert keys, the platform achieves end-to-end exactly-once delivery semantics without the overhead of transactional sink commits - giving you data correctness without sacrificing latency.
Choosing the Right Guarantee
The decision between at-least-once and exactly-once is ultimately about understanding your sink’s capabilities and your application’s tolerance for duplicates. Flink gives you the tools for both. The checkpoint mechanism is always running - the difference lies in whether you enable barrier alignment and whether your sink participates in the two-phase commit protocol.
For teams building production streaming pipelines, the recommendation is straightforward: start with idempotent sinks and at-least-once, which gives you effectively-once results with minimal overhead. Upgrade to full transactional exactly-once only when your sink demands it - append-only Kafka topics consumed by read_committed readers, or scenarios where the sink has no natural idempotency key. Understanding the mechanics behind both options puts you in a position to make that choice with confidence.