<--- Back to all resources

Engineering

February 25, 2026

9 min read

Data Completeness in Streaming: Detecting Missing Events

Learn how to detect missing events, gaps, and data loss in streaming pipelines. Build completeness checks that ensure every record from the source reaches the destination.

TL;DR: • Data completeness verifies that every record from the source arrives at the destination - no dropped events, no gaps, no silent data loss. • Key techniques include row count reconciliation (source vs destination), sequence gap detection (missing IDs or timestamps), and heartbeat monitoring (synthetic events that prove the pipeline is flowing). • Completeness is harder to verify than freshness because you are looking for the absence of something - missing data does not trigger alerts by default.

Data completeness is arguably the hardest data quality dimension to monitor in a streaming pipeline. Freshness is straightforward: you measure when the last record arrived and alert if it exceeds a threshold. Schema correctness is binary: the record either parses or it does not. But completeness asks a fundamentally different question: did every record that should have arrived actually arrive? You are looking for the absence of something, and absent things do not raise their hand.

In batch ETL, completeness checks are relatively simple. You run a query against the source, run a query against the destination, and compare the counts. The pipeline has a clear start and end. Streaming pipelines never stop. Data flows continuously, and the boundary between “this batch” and “that batch” does not exist. That continuous nature makes completeness verification both more important and significantly harder to implement.

This guide walks through the techniques, risks, and practical implementations you need to build solid completeness monitoring for streaming CDC pipelines.

Types of Incompleteness

Not all data loss looks the same. Understanding the categories of incompleteness helps you design detection strategies that cover each failure mode.

Dropped events are the most obvious form: a record exists at the source but never appears at the destination. This happens when a Kafka consumer skips messages, a CDC connector crashes between reading the WAL and writing to Kafka, or a destination write fails silently.

Gaps are subtler. The total row count might match, but a contiguous range of records is missing. This often occurs when a replication slot is dropped and recreated, or when a Kafka topic partition loses data due to retention policies.

Partial records arrive at the destination but are incomplete. A row might land with null values where the source had data, typically caused by schema mismatches or deserialization errors that populate only some fields.

Duplicate-masked loss is the trickiest. If your pipeline produces duplicates (common with at-least-once delivery) and you deduplicate at the destination, you might mask genuine data loss. Suppose the source produces records A, B, C, and D. The pipeline delivers A, A, B, D. After deduplication you have A, B, D, and the count is three unique records. If you only check unique counts against a source that also has four records, you catch it. But if your deduplication is aggressive or your count queries are not precise, record C vanishes without a trace.

Row Count Reconciliation

The simplest completeness check is counting rows at the source and comparing with the destination. Despite its simplicity, this technique catches a wide range of issues when implemented correctly.

-- Source count (PostgreSQL)
SELECT COUNT(*) AS source_count
FROM orders
WHERE updated_at >= NOW() - INTERVAL '1 hour';

-- Destination count (Snowflake)
SELECT COUNT(*) AS dest_count
FROM orders
WHERE _streamkap_updated_at >= DATEADD('hour', -1, CURRENT_TIMESTAMP());

The challenge with streaming pipelines is timing. When you run the source count, new records are already flowing. By the time the destination count executes, those records may or may not have arrived. To handle this, introduce a reconciliation window with a buffer:

-- Compare counts with a 10-minute buffer to account for pipeline latency
-- Source: records older than 10 minutes (guaranteed to have been emitted)
SELECT COUNT(*) FROM orders
WHERE updated_at BETWEEN NOW() - INTERVAL '1 hour'
                     AND NOW() - INTERVAL '10 minutes';

-- Destination: same window
SELECT COUNT(*) FROM warehouse.orders
WHERE _source_updated_at BETWEEN NOW() - INTERVAL '1 hour'
                              AND NOW() - INTERVAL '10 minutes';

The buffer must exceed your pipeline’s maximum end-to-end latency. If your P99 latency is 45 seconds, a 10-minute buffer gives you a comfortable margin. Run these reconciliation queries on a schedule (every 15 or 30 minutes), and alert when the counts diverge beyond a configurable threshold. A small, consistent discrepancy (less than 0.01%) may be acceptable depending on your use case, but a growing gap is always a signal worth investigating.

Sequence Gap Detection

Row counts tell you whether the totals match. Sequence gap detection tells you whether the specific records match. If your source table has an auto-incrementing primary key or a monotonically increasing timestamp, you can detect holes in the sequence.

-- Find gaps in an auto-increment ID sequence at the destination
WITH id_series AS (
  SELECT id, LEAD(id) OVER (ORDER BY id) AS next_id
  FROM warehouse.orders
  WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
)
SELECT id AS gap_start, next_id AS gap_end, (next_id - id - 1) AS missing_count
FROM id_series
WHERE next_id - id > 1
ORDER BY missing_count DESC;

For timestamp-based gap detection, bucket your events into intervals and look for empty or suspiciously sparse buckets:

-- Detect timestamp gaps (buckets with zero events)
SELECT time_bucket, event_count
FROM (
  SELECT DATE_TRUNC('minute', event_time) AS time_bucket,
         COUNT(*) AS event_count
  FROM warehouse.events
  WHERE event_time >= NOW() - INTERVAL '6 hours'
  GROUP BY 1
) bucketed
WHERE event_count = 0
ORDER BY time_bucket;

A zero-count bucket during business hours almost certainly indicates data loss rather than genuine inactivity. Combine this with baseline metrics for each time-of-day window to distinguish between real silence and missing data.

Heartbeat Monitoring

Heartbeats are synthetic events injected at the source specifically to verify pipeline health. The idea is simple: if you regularly insert a known record at the source and it always arrives at the destination, the pipeline is flowing. If it stops arriving, something is broken.

Setting Up a Heartbeat Table

Create a dedicated heartbeat table in your source database:

-- Source database (PostgreSQL)
CREATE TABLE _pipeline_heartbeat (
  id SERIAL PRIMARY KEY,
  pipeline_name VARCHAR(64) NOT NULL,
  heartbeat_ts TIMESTAMP NOT NULL DEFAULT NOW()
);

Insert heartbeats on a regular schedule using a cron job or database scheduler:

-- Insert heartbeat every 60 seconds
INSERT INTO _pipeline_heartbeat (pipeline_name, heartbeat_ts)
VALUES ('orders_pipeline', NOW());

End-to-End Verification

At the destination, monitor the most recent heartbeat:

-- Alert if no heartbeat received in last 5 minutes
SELECT pipeline_name,
       MAX(heartbeat_ts) AS last_heartbeat,
       DATEDIFF('second', MAX(heartbeat_ts), CURRENT_TIMESTAMP()) AS seconds_since_heartbeat
FROM warehouse._pipeline_heartbeat
GROUP BY pipeline_name
HAVING seconds_since_heartbeat > 300;

Heartbeats serve double duty: they verify both freshness (is data flowing?) and completeness (is the pipeline capturing all changes from this source?). If CDC stops capturing changes from the heartbeat table, it has almost certainly stopped capturing changes from your application tables too.

CDC-Specific Completeness Risks

Change data capture pipelines have unique failure modes that directly cause data loss.

Replication Slot Dropped

In PostgreSQL, CDC relies on logical replication slots to track which WAL segments have been consumed. If a slot is dropped (manually, or by a failover event), the connector loses its position in the WAL. When it reconnects, it may miss every change that occurred between the drop and the reconnection.

Detection: Monitor the slot’s confirmed_flush_lsn and alert if the slot disappears or if the LSN jumps forward unexpectedly.

-- Monitor replication slot health (PostgreSQL)
SELECT slot_name, active, restart_lsn, confirmed_flush_lsn,
       pg_current_wal_lsn() - confirmed_flush_lsn AS lag_bytes
FROM pg_replication_slots
WHERE slot_type = 'logical';

WAL Retention Exceeded

If the CDC consumer falls too far behind, PostgreSQL may recycle WAL segments that the slot still needs. This manifests as an error like requested WAL segment has already been removed. At that point, the only recovery is a resnapshot.

Snapshot Gaps

During the initial snapshot of a large table, the source database continues to receive writes. If the connector does not correctly merge snapshot data with streaming changes, records written during the snapshot window can be lost. Reliable CDC platforms handle this with a snapshot-then-stream merge strategy, but misconfigurations or crashes during this phase can create gaps.

Kafka-Specific Risks

Kafka sits between your CDC connector and your destination. Its own data retention and consumer management introduce additional completeness risks.

Retention expiry is the most common. If a Kafka topic is configured with 7-day retention and your consumer is offline for 8 days, messages from that first day are gone. The consumer resumes from the earliest available offset and silently skips everything that was purged.

Consumer group rebalancing can cause offset loss when not handled correctly. If a consumer crashes during a rebalance and offsets were not committed, the new consumer may re-process some messages (duplicates) while skipping others (loss) depending on the committed offset position.

Offset management mistakes compound these risks. If your application commits offsets before processing (at-most-once), any crash between commit and processing means lost messages. If it commits after processing (at-least-once), crashes cause duplicates but not loss, which is generally the safer default.

Practical Example: PostgreSQL CDC Completeness Monitoring

Here is a concrete setup for monitoring completeness of a PostgreSQL CDC pipeline streaming orders to a data warehouse.

-- 1. Create heartbeat table on source
CREATE TABLE _cdc_heartbeat (
  id SERIAL PRIMARY KEY,
  source_table VARCHAR(128),
  heartbeat_ts TIMESTAMP DEFAULT NOW(),
  seq BIGINT
);

-- 2. Create a sequence for gap detection
CREATE SEQUENCE heartbeat_seq START 1;

-- 3. Cron job: insert heartbeat every 30 seconds
INSERT INTO _cdc_heartbeat (source_table, heartbeat_ts, seq)
VALUES ('orders', NOW(), nextval('heartbeat_seq'));

At the destination, run a scheduled reconciliation query:

-- Completeness check: run every 15 minutes
WITH source_counts AS (
  SELECT 'orders' AS table_name, count_estimate('orders') AS src_count
),
dest_counts AS (
  SELECT 'orders' AS table_name, COUNT(*) AS dst_count
  FROM warehouse.orders
),
heartbeat_check AS (
  SELECT source_table,
         MAX(seq) AS latest_seq,
         MAX(heartbeat_ts) AS latest_ts
  FROM warehouse._cdc_heartbeat
  GROUP BY source_table
)
SELECT s.table_name,
       s.src_count,
       d.dst_count,
       ABS(s.src_count - d.dst_count) AS count_diff,
       h.latest_ts AS last_heartbeat,
       AGE(NOW(), h.latest_ts) AS heartbeat_age
FROM source_counts s
JOIN dest_counts d ON s.table_name = d.table_name
LEFT JOIN heartbeat_check h ON h.source_table = s.table_name;

This single query gives you three signals: row count divergence, heartbeat recency, and heartbeat sequence continuity. If any of these indicators degrade, you know exactly where to look.

Automated Completeness Checks

Manual queries are useful for debugging, but production completeness monitoring must be automated. Build a reconciliation pipeline that runs on a schedule and feeds results into your alerting system.

A solid automated setup includes the following. First, scheduled reconciliation jobs that run every 15 to 30 minutes, comparing source and destination counts for every monitored table. Second, heartbeat monitors that alert when no heartbeat is received within a configurable window (typically 2 to 5 minutes). Third, gap detection scans that run hourly, checking for sequence gaps in the destination. Fourth, trend tracking that stores reconciliation results over time so you can distinguish between a one-time blip and a growing divergence.

Platforms like Streamkap provide built-in monitoring that tracks record counts per source and destination, reducing the need to build this infrastructure from scratch. The platform manages Kafka consumer offsets and CDC replication slots automatically, eliminating the most common causes of silent data loss.

Recovery from Data Loss

When completeness checks reveal missing data, you need a recovery plan. The approach depends on the scope and cause of the loss.

Resnapshot is the nuclear option: re-read the entire source table and reload it at the destination. This guarantees completeness but is expensive for large tables and can take hours. Use this when the gap is large or the cause is unclear.

Offset reset works when Kafka still has the data but your consumer skipped past it. Reset the consumer group offset to an earlier position and replay from there. Be prepared to handle duplicates at the destination.

# Reset consumer group to a specific timestamp
kafka-consumer-groups --bootstrap-server broker:9092 \
  --group cdc-orders-group \
  --topic cdc.public.orders \
  --reset-offsets \
  --to-datetime 2026-02-24T00:00:00.000 \
  --execute

Targeted backfill is the most surgical approach. Query the source for the specific records identified as missing by your gap detection queries and insert them directly into the destination. This avoids the overhead of a full resnapshot but requires accurate gap identification.

-- Extract missing records from source for targeted backfill
SELECT * FROM orders
WHERE id BETWEEN 150001 AND 150500
ORDER BY id;

Whichever recovery method you choose, run your completeness checks again after the backfill to verify that the gap has been closed. Document the incident: what caused the loss, how it was detected, how long data was missing before detection, and what was done to prevent recurrence. These postmortems are how you turn a completeness failure into a stronger pipeline.


Data completeness in streaming pipelines requires deliberate, layered monitoring. No single technique is sufficient on its own. Row count reconciliation catches large-scale loss. Sequence gap detection finds specific missing records. Heartbeat monitoring proves the pipeline is alive and capturing changes. Together, these techniques form a completeness safety net that catches failures whether they originate in the source database, Kafka, or the destination.

The key insight is that completeness monitoring must be proactive. Unlike schema errors or latency spikes, missing data does not announce itself. If you wait for a downstream consumer to notice something is wrong, the data may already be unrecoverable. Build the checks early, automate them aggressively, and treat any sustained divergence as an incident worth investigating.