<--- Back to all resources
Real-Time Data Validation: Catching Bad Data Before It Lands
Learn how to validate streaming data in real time - schema checks, business rule validation, and anomaly detection that catches bad data before it reaches your warehouse.
Every data team has a version of the same story. A dashboard number looks wrong on Monday morning. Someone traces the issue back to a single malformed record that arrived over the weekend - a null customer ID, a negative order total, a timestamp set three years in the future. That one record propagated through joins, aggregations, and model training pipelines before anyone noticed. By the time the root cause is identified, the damage has spread across dozens of downstream tables, three executive reports, and a machine learning model that has been making predictions on garbage inputs for forty-eight hours.
The fix is always the same: validate data before it reaches the destination. In batch pipelines, validation typically happens as a post-load step - run a dbt test, check a Great Expectations suite, flag anomalies after the fact. In streaming pipelines, you have a fundamentally better option. Because data flows through the pipeline record by record, you can inspect, validate, and route every single event before it ever touches your warehouse, lake, or downstream application.
This guide walks through how to build real-time data validation into streaming pipelines - what to check, how to check it, and what to do when something fails.
Why Validate in the Stream
The economics of data quality follow a simple rule: the earlier you catch a problem, the cheaper it is to fix. A malformed record caught at the source costs almost nothing to handle. The same record discovered after it has been loaded into a warehouse, joined with other tables, and used to train a model can cost hours or days of engineering time to remediate.
Batch validation catches problems after the fact. Streaming validation catches them in transit. The difference matters for three reasons.
First, latency. A streaming validation check fires within milliseconds of the record being produced. There is no window during which bad data sits undetected in your warehouse.
Second, isolation. When you catch a bad record in the stream, you can route it to a dead letter queue without affecting the rest of the pipeline. In batch, a single bad record can cause an entire load to fail or - worse - succeed silently with corrupted data.
Third, completeness. In a streaming pipeline, every record passes through the validation layer. There is no sampling, no “we only test the first 1,000 rows.” You get 100% coverage by default.
Validation Layers
Effective data validation is not a single check. It is a series of layers, each catching a different category of problem. Think of it as defense in depth: if one layer misses something, the next layer catches it.
Schema Validation
The most basic layer. Does the record have the expected structure? Are the required fields present? Are the data types correct?
Schema validation catches problems like a string arriving where an integer is expected, a required field being absent, or an entirely new field appearing that the pipeline does not know how to handle. In streaming systems backed by schema registries (like Apache Kafka with Avro or Protobuf schemas), much of this validation happens automatically at the serialization layer. For JSON-based pipelines, you need explicit checks.
Format Validation
A field can have the correct type but the wrong format. An email field might contain a string, but "not-an-email" is not a valid email address. A phone number field might hold digits, but "000" is not a real phone number. A date field might be a valid string, but "2026-13-45" is not a real date.
Format validation applies pattern matching - typically regular expressions - to ensure that values conform to expected formats.
Range Validation
Numeric and date fields often have expected bounds. An order amount should be positive. A customer age should be between 0 and 150. A transaction timestamp should not be in the future. A discount percentage should be between 0 and 100.
Range validation is simple to implement and catches a surprisingly large percentage of data quality issues.
Business Rule Validation
This is where domain knowledge comes in. Business rules encode constraints that are specific to your application logic. An order cannot transition from “shipped” directly to “pending.” A refund amount cannot exceed the original charge. A user’s subscription end date cannot precede their start date.
These rules cannot be inferred from the schema alone - they require understanding of the business domain.
Statistical Validation
The most sophisticated layer. Statistical validation looks at the distribution of values over time and flags records or batches that deviate significantly from expected patterns. If your average order value is $45 and a batch of records suddenly shows an average of $4,500, something has gone wrong - even though each individual record might pass every other validation layer.
Implementation Patterns
There are three primary patterns for implementing validation in a streaming pipeline. They are not mutually exclusive - most production systems use all three.
Inline Filtering
The simplest approach. Records that fail validation are dropped from the stream entirely. This is appropriate for records that are clearly garbage - null primary keys, unparseable payloads, records with no salvageable data.
-- Flink SQL: filter out records with null primary keys or invalid amounts
SELECT *
FROM orders
WHERE order_id IS NOT NULL
AND customer_id IS NOT NULL
AND amount > 0;
The risk with inline filtering is data loss. You should only drop records when you are confident they have zero value and you have a separate mechanism (like logging or a dead letter topic) to track what was dropped.
Side-Output Routing
A more sophisticated approach. Records that fail validation are not dropped - they are routed to a separate stream (a “dead letter queue” or “quarantine topic”) for later inspection and remediation. The main pipeline continues processing only valid records.
-- Flink SQL: route invalid records to a dead letter table
INSERT INTO dead_letter_orders
SELECT *, 'INVALID_AMOUNT' AS rejection_reason, CURRENT_TIMESTAMP AS rejected_at
FROM orders
WHERE amount <= 0 OR amount IS NULL;
-- Main pipeline processes only valid records
INSERT INTO validated_orders
SELECT *
FROM orders
WHERE amount > 0 AND amount IS NOT NULL;
Validation Flags
Sometimes you do not want to reject a record - you want to let it through but flag it for review. This is useful for soft violations where the record is probably fine but deserves a second look.
-- Flink SQL: add validation flags to each record
SELECT *,
CASE
WHEN email IS NULL THEN 'MISSING'
WHEN email NOT LIKE '%_@_%.__%' THEN 'INVALID_FORMAT'
ELSE 'VALID'
END AS email_validation,
CASE
WHEN amount > 10000 THEN 'HIGH_VALUE_FLAG'
WHEN amount <= 0 THEN 'INVALID'
ELSE 'VALID'
END AS amount_validation
FROM orders;
Downstream consumers can then filter on these flags - dashboards might exclude INVALID records while alerting systems might specifically watch for them.
Common Validations
Here is a reference catalog of the validations that cover the vast majority of data quality issues in streaming pipelines.
Null Checks
The most fundamental validation. Every table has fields that must not be null - primary keys, foreign keys, required business fields.
SELECT *
FROM orders
WHERE order_id IS NOT NULL
AND customer_id IS NOT NULL
AND order_date IS NOT NULL;
Type Verification
In loosely-typed pipelines (JSON, CSV), values may arrive as the wrong type. A price field might contain the string "N/A" instead of a number. Type verification catches these before they cause casting errors downstream.
SELECT *
FROM raw_events
WHERE TRY_CAST(event_timestamp AS TIMESTAMP) IS NOT NULL
AND TRY_CAST(quantity AS INT) IS NOT NULL;
Regex for Emails, Phone Numbers, and Identifiers
Pattern-based validation for structured string fields.
SELECT *,
CASE
WHEN email LIKE '%_@_%.__%' THEN TRUE
ELSE FALSE
END AS is_valid_email,
CASE
WHEN phone_number LIKE '+%' AND CHAR_LENGTH(phone_number) >= 10 THEN TRUE
ELSE FALSE
END AS is_valid_phone
FROM customers;
Date Range Checks
Timestamps should fall within reasonable bounds. Orders should not have future dates. Birthdates should not precede 1900.
SELECT *
FROM orders
WHERE order_date <= CURRENT_TIMESTAMP
AND order_date >= TIMESTAMP '2020-01-01 00:00:00';
Enum Value Validation
Fields with a fixed set of allowed values should be checked against that set.
SELECT *
FROM orders
WHERE status IN ('pending', 'confirmed', 'shipped', 'delivered', 'cancelled')
AND payment_method IN ('credit_card', 'debit_card', 'paypal', 'bank_transfer');
Business Rule Validation
Business rules are the validations that require domain expertise. They cannot be derived from the schema alone.
Status Transitions
Most entities have a lifecycle with valid state transitions. An order that is already delivered should not suddenly become pending. Catching invalid transitions prevents corrupted state in downstream systems.
-- Join current orders with incoming changes to validate transitions
SELECT o.*
FROM order_updates o
JOIN current_order_states c ON o.order_id = c.order_id
WHERE (c.status = 'pending' AND o.status IN ('confirmed', 'cancelled'))
OR (c.status = 'confirmed' AND o.status IN ('shipped', 'cancelled'))
OR (c.status = 'shipped' AND o.status IN ('delivered'))
OR (c.status = 'delivered' AND o.status = 'delivered');
Referential Integrity
Foreign key relationships should be valid. An order should reference an existing customer. A line item should reference an existing product. In streaming, you can validate referential integrity by joining against a lookup table or a cached reference dataset.
-- Flag orders with unknown customer IDs
SELECT o.*,
CASE
WHEN c.customer_id IS NULL THEN 'UNKNOWN_CUSTOMER'
ELSE 'VALID'
END AS customer_validation
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id;
Cross-Field Constraints
Some validations involve relationships between fields within the same record. A discount_amount should not exceed subtotal. A ship_date should not precede order_date. An end_date should not precede start_date.
SELECT *
FROM orders
WHERE discount_amount <= subtotal
AND (ship_date IS NULL OR ship_date >= order_date)
AND total_amount = subtotal - discount_amount + tax_amount;
Statistical Validation
Statistical validation catches problems that per-record checks miss. A record with an order amount of $5,000 might be perfectly valid on its own, but if your typical order is $50 and you suddenly see a hundred $5,000 orders in a minute, something is wrong.
Z-Score Anomaly Detection
The z-score measures how many standard deviations a value is from the mean. Values beyond a threshold (typically 3 standard deviations) are flagged as anomalies.
-- Calculate rolling statistics and flag anomalies
SELECT *,
(amount - avg_amount) / NULLIF(stddev_amount, 0) AS z_score
FROM (
SELECT *,
AVG(amount) OVER (
ORDER BY order_date
ROWS BETWEEN 1000 PRECEDING AND 1 PRECEDING
) AS avg_amount,
STDDEV(amount) OVER (
ORDER BY order_date
ROWS BETWEEN 1000 PRECEDING AND 1 PRECEDING
) AS stddev_amount
FROM orders
);
Records with a z-score above 3 or below -3 can be flagged for review without being rejected outright.
Rate Change Detection
Beyond individual values, you can monitor the rate of events. A sudden spike in order volume, a drop in event frequency, or an unusual ratio of errors to successes can all indicate upstream problems.
-- Detect sudden spikes in order volume per minute
SELECT
window_start,
COUNT(*) AS order_count,
AVG(COUNT(*)) OVER (
ORDER BY window_start
ROWS BETWEEN 60 PRECEDING AND 1 PRECEDING
) AS avg_order_count
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_date), INTERVAL '1' MINUTE)
)
GROUP BY window_start
HAVING COUNT(*) > 3 * AVG(COUNT(*)) OVER (
ORDER BY window_start
ROWS BETWEEN 60 PRECEDING AND 1 PRECEDING
);
Handling Invalid Records
What you do with invalid records matters as much as detecting them. There are three tiers of response.
Reject and quarantine. For critical violations - null primary keys, unparseable records, completely wrong types - route the record to a dead letter queue. Include the original record, the reason for rejection, and a timestamp. Engineers can inspect and replay quarantined records after fixing the root cause.
Flag and pass through. For soft violations - suspicious but plausible values, minor format issues, unusual but not impossible amounts - add validation columns to the record and let it flow to the destination. Downstream consumers decide whether to include or exclude flagged records.
Alert and monitor. For statistical anomalies and rate changes, trigger an alert but do not block the pipeline. These patterns often indicate upstream issues (a broken producer, a bulk import, a configuration change) rather than individual bad records.
Practical Example: Order Stream Validation
Here is a complete validation pipeline for an e-commerce order stream that combines multiple validation layers.
-- Full validation pipeline for an order stream
INSERT INTO validated_orders
SELECT
order_id,
customer_id,
amount,
currency,
status,
order_date,
email,
-- Validation flags
CASE WHEN customer_id IS NULL THEN 'FAIL' ELSE 'PASS' END AS customer_id_check,
CASE WHEN amount <= 0 THEN 'FAIL'
WHEN amount > 50000 THEN 'WARN'
ELSE 'PASS' END AS amount_check,
CASE WHEN order_date > CURRENT_TIMESTAMP THEN 'FAIL'
WHEN order_date < CURRENT_TIMESTAMP - INTERVAL '1' YEAR THEN 'WARN'
ELSE 'PASS' END AS date_check,
CASE WHEN email NOT LIKE '%_@_%.__%' THEN 'FAIL' ELSE 'PASS' END AS email_check,
CASE WHEN status NOT IN ('pending','confirmed','shipped','delivered','cancelled')
THEN 'FAIL' ELSE 'PASS' END AS status_check
FROM orders
WHERE order_id IS NOT NULL;
-- Route hard failures to dead letter queue
INSERT INTO dead_letter_orders
SELECT *, 'CRITICAL_VALIDATION_FAILURE' AS rejection_reason
FROM orders
WHERE order_id IS NULL
OR customer_id IS NULL
OR amount <= 0
OR order_date > CURRENT_TIMESTAMP;
This pipeline does three things simultaneously: it passes valid records through with validation flags, it hard-rejects records that fail critical checks, and it soft-flags records that are suspicious but salvageable. Downstream dashboards can filter on the _check columns to include only fully validated data, while data engineers can monitor the dead letter queue for systemic issues.
Monitoring Validation Results
Validation is only useful if someone is watching the results. Build monitoring around three key metrics.
Validation pass rate. Track the percentage of records that pass all validation checks over time. A healthy pipeline typically shows a pass rate above 99%. A sudden drop signals an upstream problem.
Failure distribution. Break down failures by validation type. If 80% of your failures are null customer_id values, you have a specific upstream bug to fix. If failures are spread evenly across all checks, you might have a broader data quality issue.
Dead letter queue depth. Monitor the size of your quarantine queue. A growing queue means problems are accumulating faster than they are being resolved. Set alerts for queue depth thresholds - ten records might be normal, a thousand in an hour is a problem.
Streamkap provides built-in monitoring for these patterns. Its data quality features automatically track schema changes, null rate spikes, and data freshness across every pipeline, surfacing issues before they propagate to your warehouse. Combined with dead letter queue support for records that fail transformation rules, you get a complete validation and observability layer without building it from scratch.
The goal is not zero validation failures - some bad data is inevitable. The goal is catching bad data fast enough that it never corrupts a dashboard, breaks a model, or wastes an engineer’s weekend. Real-time validation, applied consistently across your streaming pipeline, is how you get there.