<--- Back to all resources
Idempotency in Streaming Pipelines: Exactly-Once Without the Headaches
Learn how to build idempotent streaming pipelines that produce correct results even with retries, reprocessing, and at-least-once delivery. Practical patterns for every destination.
Every streaming pipeline will, at some point, process a record more than once. Network blips, consumer group rebalances, checkpoint recovery after a crash - these are not edge cases. They are the normal operating conditions of a distributed system. The question is not whether duplicate processing will happen, but whether your pipeline will produce correct results when it does.
Exactly-once semantics get all the marketing attention, but the engineers who build reliable pipelines know the real workhorse is idempotency: designing your operations so that applying them once or ten times yields the same result. Idempotency turns the messy reality of at-least-once delivery into something that behaves like exactly-once from the perspective of the destination. It is simpler to reason about, easier to implement, and works across a much wider range of sinks than true exactly-once protocols.
Why Retries Happen
Before diving into patterns, it helps to understand the specific failure modes that cause duplicate processing.
Pipeline restarts. A deployment, a config change, or an out-of-memory kill restarts your consumer. It resumes from its last committed offset, which is almost always behind the records it had already processed but not yet committed.
Consumer rebalancing. When a consumer joins or leaves a Kafka consumer group, partitions are redistributed. The new owner of a partition starts from the last committed offset, replaying any records processed but uncommitted by the previous owner.
Network timeouts. A write to the destination succeeds, but the acknowledgment is lost. The pipeline retries the write, sending the same record again.
Checkpoint recovery. Stream processors like Flink periodically snapshot their state. On failure, they roll back to the last checkpoint and replay all records since that snapshot. Every record between the checkpoint and the failure is processed a second time.
In all of these scenarios, the pipeline does exactly what it is supposed to do: retry to guarantee no data loss. The burden falls on your write logic to handle the duplicates.
Idempotent Write Patterns
There are four primary patterns for making writes idempotent, each suited to different situations.
Upsert on Primary Key
The most widely applicable pattern. Instead of INSERT, use an upsert operation (INSERT … ON CONFLICT UPDATE, MERGE, etc.) keyed on the record’s natural primary key. If the record already exists, the row is updated to the same values. If it does not exist, it is inserted. Either way, the result is a single correct row.
-- PostgreSQL upsert
INSERT INTO orders (order_id, customer_id, total, status, updated_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (order_id)
DO UPDATE SET customer_id = EXCLUDED.customer_id,
total = EXCLUDED.total,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at;
Deduplication at Read Time
For append-only sinks where upsert is not available, write all records (including duplicates) and deduplicate when reading. This is common in data lake architectures.
-- Deduplicate at query time using ROW_NUMBER
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY _ingested_at DESC) AS rn
FROM raw_orders
) WHERE rn = 1;
Conditional Writes
Only write a record if a version number or timestamp is newer than the existing row. This prevents stale replays from overwriting fresher data.
UPDATE orders SET status = 'shipped', version = 5
WHERE order_id = 'ORD-123' AND version < 5;
Deduplication with a Seen-Set
Track which record IDs have been processed using a deduplication store (Redis, a database table, or in-memory state). Skip any record whose ID is already present. This works but adds operational complexity and requires managing the seen-set’s retention.
Destination-Specific Idempotency
Each destination has its own idiomatic approach to idempotent writes. Choosing the right one avoids fighting the system.
Snowflake: MERGE
Snowflake’s MERGE statement is the standard pattern for idempotent loading. Stage the incoming batch, then merge into the target table on the primary key.
MERGE INTO orders AS target
USING orders_staging AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
target.status = source.status,
target.total = source.total,
target.updated_at = source.updated_at
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, status, total, updated_at)
VALUES (source.order_id, source.customer_id, source.status, source.total, source.updated_at);
For high-throughput scenarios, Snowpipe Streaming with RECORD_METADATA$UNIQUE_KEY provides idempotent ingestion at the platform level.
PostgreSQL: ON CONFLICT
PostgreSQL’s ON CONFLICT clause is the most ergonomic upsert in any relational database. It handles both insert and update in a single atomic statement, as shown in the upsert example above.
ClickHouse: ReplacingMergeTree
ClickHouse takes a different approach. The ReplacingMergeTree engine keeps only the latest version of each row (by a version column) and deduplicates during background merges. Duplicates may be visible briefly between merges, so queries should use FINAL for strong consistency.
CREATE TABLE orders (
order_id String,
customer_id String,
total Decimal(10, 2),
status String,
version UInt64
) ENGINE = ReplacingMergeTree(version)
ORDER BY order_id;
-- Query with deduplication
SELECT * FROM orders FINAL WHERE order_id = 'ORD-123';
Elasticsearch: Document ID
Elasticsearch indexes are naturally idempotent when you set the document _id to your record’s primary key. Writing the same document ID twice simply overwrites the previous version.
PUT /orders/_doc/ORD-123
{
"customer_id": "CUST-456",
"total": 99.99,
"status": "shipped"
}
Flink Exactly-Once: Two-Phase Commit
Apache Flink offers true end-to-end exactly-once guarantees through its two-phase commit protocol, which coordinates checkpoints with destination writes.
Here is how it works:
- Checkpoint barriers flow through the dataflow graph, aligning state snapshots across all operators.
- During normal processing, the sink operator writes to a pre-commit transaction at the destination.
- When a checkpoint completes, Flink tells the sink to commit the transaction. Only then do the writes become visible.
- If a failure occurs before the checkpoint completes, Flink rolls back to the previous checkpoint. The uncommitted transaction at the destination is aborted, and no duplicate data appears.
This protocol requires the destination to support transactions - it works with Kafka, JDBC-compatible databases, and some cloud warehouses. For destinations that do not support transactions, idempotent upserts remain the fallback.
The trade-off is latency: data is not visible at the destination until the checkpoint interval elapses (typically 1-5 minutes). For use cases that need fresher data, combining at-least-once delivery with idempotent writes often provides lower end-to-end latency.
Kafka Idempotent Producers
Kafka itself can produce duplicates at the source level. If a producer sends a message and the broker acknowledgment is lost, the producer retries, potentially writing the same message twice to the topic.
Kafka’s idempotent producer solves this:
enable.idempotence=true
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
With enable.idempotence=true, the broker assigns a producer ID and sequence number to each message. Duplicate messages (same producer ID + sequence) are silently discarded by the broker. This guarantees exactly-once writes to a single partition at the Kafka level.
For cross-partition or cross-topic atomicity, Kafka’s transactional API extends this with initTransactions(), beginTransaction(), commitTransaction(), and abortTransaction(). This is how Flink’s Kafka sink achieves exactly-once writes back to Kafka topics.
Practical Example: Order Processing Pipeline
Consider a pipeline that reads order events from a source database via CDC, processes them through Kafka, and writes to Snowflake for analytics.
Source DB (orders) → CDC → Kafka (orders topic) → Consumer → Snowflake (orders table)
Without idempotency, a consumer rebalance during a batch write to Snowflake causes the replayed records to appear as duplicate rows. Order totals in reports are inflated.
The idempotent version uses MERGE at the Snowflake sink:
def write_batch(records, snowflake_conn):
# Stage records in a temporary table
stage_records(records, snowflake_conn, "orders_staging")
# MERGE into target - idempotent by order_id
snowflake_conn.execute("""
MERGE INTO orders AS t USING orders_staging AS s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET
t.status = s.status, t.total = s.total, t.updated_at = s.updated_at
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, status, total, updated_at)
VALUES (s.order_id, s.customer_id, s.status, s.total, s.updated_at)
""")
# Commit Kafka offsets only after MERGE succeeds
commit_offsets()
The key detail: Kafka offsets are committed after the MERGE succeeds. If the consumer crashes between the MERGE and the offset commit, the records are replayed and merged again - producing the same result. This is idempotency in action.
Streamkap uses this exact pattern - coordinating checkpoint commits with idempotent destination writes - to provide exactly-once semantics across its managed pipelines.
Idempotent Aggregations
Not all operations are naturally idempotent. Aggregations are the classic trap.
SUM is not idempotent. If you maintain a running total and a record is replayed, the total increases again. Processing an order worth $50 twice yields $100 instead of $50.
-- NOT idempotent: replaying a record adds to the total again
UPDATE customer_totals
SET lifetime_value = lifetime_value + 50.00
WHERE customer_id = 'CUST-456';
Replace-based aggregation is idempotent. Instead of incrementally updating an aggregate, recompute it from the full set of source records. Or store individual records and compute the aggregate at query time.
-- Idempotent: recompute from source of truth
UPDATE customer_totals
SET lifetime_value = (
SELECT SUM(total) FROM orders WHERE customer_id = 'CUST-456'
)
WHERE customer_id = 'CUST-456';
The general rule: store facts, compute metrics. Keep raw events with primary key-based upserts, and derive aggregations from the raw data. This approach is inherently idempotent because the source records are deduplicated before aggregation.
Testing Idempotency
Idempotency is not something you can verify by looking at code. You have to test it empirically.
Replay tests. Run your pipeline against a known dataset. Record the output. Replay the same dataset (simulating a reprocessing event). Compare the outputs - they should be identical, row for row.
Duplicate injection. Intentionally send duplicate records into your pipeline’s input. Verify that the destination contains no duplicates. This tests both your deduplication logic and your primary key constraints.
Kill-and-restart tests. Kill the pipeline process at random points during processing and let it recover. Check the destination for correctness after recovery. This is the most realistic test because it exercises checkpoint recovery, offset replay, and partial batch handling simultaneously.
Verification queries. Build monitoring queries that detect duplicates in your destination tables. Run them continuously in production.
-- Find duplicate order_ids in Snowflake
SELECT order_id, COUNT(*) AS cnt
FROM orders
GROUP BY order_id
HAVING cnt > 1;
If this query ever returns rows, your idempotency contract is broken.
Common Mistakes
Auto-increment IDs as primary keys. If your destination generates a new ID for each INSERT, there is no natural key to deduplicate on. Every retry creates a new row. Always propagate the source system’s primary key to the destination.
Append-only sinks without deduplication. Writing to S3, GCS, or a data lake in append mode means every retry adds another copy of the file or record. You need a dedup layer - either a MERGE-on-read pattern or a compaction job that runs periodically.
Non-deterministic transforms. If your pipeline adds a processed_at timestamp using NOW(), replayed records get different timestamps than the originals. This breaks idempotency for any logic that depends on that field. Use the event time from the source record, not the processing time.
Assuming the broker prevents duplicates. Even with Kafka’s idempotent producer, duplicates can still appear downstream. The producer-level guarantee is per-partition at the Kafka layer only. Consumer-side reprocessing (rebalance, checkpoint recovery) is a completely separate source of duplicates that the broker knows nothing about.
Ignoring partial failures. A batch of 1,000 records where 999 succeed and 1 fails requires the entire batch to be retried (or a mechanism to track which individual records succeeded). Without this, you either lose the failed record or duplicate the 999 that already succeeded.
Building idempotent pipelines is not glamorous work, but it is the single most impactful thing you can do for data correctness in a streaming architecture. Get the write semantics right, propagate primary keys, avoid mutable aggregations, and test by replaying data. The result is a pipeline that handles the chaos of distributed systems and still produces numbers your team can trust.