<--- Back to all resources
Stream Data Transformation: Patterns for Shaping Data in Real Time
A complete guide to stream data transformation patterns - filtering, enrichment, masking, and more. Learn when to use Kafka SMTs vs Flink SQL vs no-code tools.
Raw data rarely arrives in the shape that downstream systems need. A CDC event from PostgreSQL carries internal metadata columns. A click event has a user identifier but not the user’s account tier. A financial transaction stores amounts as strings. A nested JSON document needs to be flattened before it can land in a columnar table.
Stream data transformation is the practice of reshaping this data while it is in motion - before it reaches its destination. Getting transformation right reduces storage waste, improves query performance, enforces data contracts, and eliminates cleanup work that would otherwise compound downstream.
This guide covers the full picture: the case for in-flight transformation, the most common patterns with concrete examples, and how to choose the right tool for each job.
Why Transform in Flight, Not at Rest?
The traditional pattern is ELT (Extract, Load, Transform): land raw data in a data warehouse, then run SQL to clean and reshape it. ELT works well for batch workloads with capable warehouses, but it has costs that compound at scale:
Storage amplification: Raw data often contains fields you will never use - internal metadata, debug fields, legacy columns. Loading everything means storing everything. At millions of events per day, this adds up.
Repeated work: If five teams all need the same normalized representation of a field, each team runs the same transformation independently. In-pipeline transformation runs it once.
Latency: In ELT, data is not useful until the transformation job has run. For real-time use cases - fraud detection, personalization, alerting - that delay is unacceptable.
PII exposure: Loading raw data that contains personally identifiable information into a data warehouse means that warehouse now has a PII compliance scope. Masking or pseudonymizing in-flight limits PII exposure to only the systems that genuinely need it.
Broken contracts: Downstream consumers often depend on a specific schema. When the source schema changes, in-flight transformation provides a place to absorb that change before it propagates.
The Seven Core Transformation Patterns
1. Filtering
The simplest transformation: drop records that do not meet a criterion. This is often the highest-impact operation because it reduces the volume of data that every downstream step must handle.
Use cases: Excluding internal test accounts, filtering to a specific event type, dropping low-confidence signals.
Kafka SMT approach:
{
"transforms": "filterInternal",
"transforms.filterInternal.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterInternal.predicate": "isInternalUser",
"predicates.isInternalUser.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
}
Flink SQL approach:
INSERT INTO filtered_events
SELECT * FROM raw_events
WHERE user_type <> 'internal'
AND event_type IN ('purchase', 'view_product', 'add_to_cart');
Use Flink SQL when the filter condition is complex, references multiple fields, or needs to change frequently without restarting connectors.
2. Field Mapping and Renaming
Renaming fields to match destination naming conventions, dropping unused columns, or reordering fields to match a target schema.
-- Rename snake_case source fields to camelCase destination schema
-- and drop internal metadata columns
INSERT INTO analytics_events
SELECT
user_id AS userId,
event_type AS eventType,
page AS pageUrl,
event_time AS occurredAt
-- omit: _kafka_offset, _source_table, _cdc_op (internal metadata)
FROM raw_cdc_events
WHERE _cdc_op IN ('c', 'u'); -- inserts and updates only
SMT approach for simple renames:
{
"transforms": "renameFields",
"transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameFields.renames": "user_id:userId,event_type:eventType"
}
3. Type Conversion
Source systems often store values in types that differ from what downstream systems expect. String-encoded numbers, epoch milliseconds as BIGINT, date strings in non-ISO formats - all require conversion.
INSERT INTO clean_orders
SELECT
order_id,
CAST(amount_str AS DECIMAL(10,2)) AS amount,
TO_TIMESTAMP(FROM_UNIXTIME(event_epoch_ms / 1000)) AS event_time,
UPPER(TRIM(status)) AS status,
COALESCE(CAST(discount_pct AS DOUBLE), 0.0) AS discount_pct
FROM raw_orders;
Pay close attention to precision and overflow. Silently casting a DECIMAL(12,4) to FLOAT loses precision in the least significant digits - which is acceptable for analytics but disastrous for financial reporting.
4. Enrichment
Enrichment adds context to a record by joining it with reference data. This is arguably the most valuable transformation because it converts raw identifiers into semantically meaningful attributes.
Lookup join enrichment (Flink SQL):
-- Enrich transaction events with customer account tier
SELECT
t.transaction_id,
t.amount,
t.merchant_id,
c.account_tier,
c.country_code,
c.is_high_value_customer
FROM transactions t
JOIN customer_profiles FOR SYSTEM_TIME AS OF t.proc_time AS c
ON t.customer_id = c.customer_id;
The FOR SYSTEM_TIME AS OF syntax performs a point-in-time lookup - it retrieves the customer profile as it existed at the time of the transaction, not the current profile. This is critical correctness: if a customer’s tier changes, historical transactions should reflect the tier that applied at the time.
Enrichment from a slow-changing dimension via Kafka topic:
CREATE TABLE product_catalog (
product_id STRING PRIMARY KEY NOT ENFORCED,
category STRING,
brand STRING,
unit_cost DECIMAL(10,2)
) WITH (
'connector' = 'kafka',
'topic' = 'product-catalog-updates',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);
SELECT
e.event_id,
e.product_id,
e.quantity,
p.category,
p.brand,
e.quantity * p.unit_cost AS total_cost
FROM purchase_events e
LEFT JOIN product_catalog p ON e.product_id = p.product_id;
5. PII Masking and Pseudonymization
Any pipeline that carries personal data needs a strategy for limiting its exposure. There are three main approaches:
| Technique | Description | Reversible? | Use When |
|---|---|---|---|
| Redaction | Replace with null or *** | No | Data truly not needed downstream |
| Pseudonymization | Replace with a consistent token | Yes (with key) | Need to rejoin on identifier later |
| Hashing | One-way hash (SHA-256 + salt) | No | Consistency needed, reversibility not |
| Tokenization | Replace with random token, store mapping | Yes | Payment card data, high-security PII |
Flink SQL with hashing:
INSERT INTO masked_events
SELECT
MD5(CONCAT(email, 'your-salt-here')) AS email_hash,
-- keep non-PII fields
event_type,
event_time,
country_code
FROM raw_events;
For reversible pseudonymization at scale, the hashing approach is insufficient - you need a tokenization service. Flink can call external services via User Defined Functions (UDFs), but that introduces network latency and external dependencies into your pipeline.
A practical pattern: mask in-flight for most destinations, and maintain a separate secured pipeline (with strict access controls) that carries the original identifiers to a designated PII vault.
6. Struct Flattening
Nested JSON is common in event streams. Columnar stores and SQL analytics engines prefer flat tables. Flattening converts nested objects into top-level fields.
Source record (nested JSON):
{
"order_id": "ORD-123",
"customer": {
"id": "C-456",
"name": "Alice Chen",
"address": {
"city": "London",
"country": "GB"
}
},
"items": [...]
}
Flink SQL flattening:
INSERT INTO flat_orders
SELECT
order_id,
customer['id'] AS customer_id,
customer['name'] AS customer_name,
customer['address']['city'] AS customer_city,
customer['address']['country'] AS customer_country
FROM raw_orders;
Handling arrays requires UNNEST or CROSS JOIN UNNEST:
INSERT INTO order_line_items
SELECT
o.order_id,
item.product_id,
item.quantity,
item.unit_price
FROM raw_orders o
CROSS JOIN UNNEST(o.items) AS item(product_id, quantity, unit_price);
Note that unnesting multiplies rows - one order with five items becomes five rows. Make sure downstream consumers understand this cardinality change.
7. Deduplication
Exactly-once delivery is hard to guarantee in distributed systems. Upstream systems can retry, consumers can restart, and identical events can arrive multiple times. Deduplication is the safety net.
-- Keep only the first occurrence of each event_id within a 1-hour window
SELECT
event_id,
user_id,
event_type,
event_time
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY event_id
ORDER BY event_time ASC
) AS rn
FROM raw_events
)
WHERE rn = 1;
This stateful deduplication requires Flink to maintain a state store keyed on event_id. For high-cardinality keys, consider using a bounded time window (only deduplicate within the last N hours) to cap state growth.
Choosing the Right Tool
| Transform Complexity | Statefulness | Recommended Tool |
|---|---|---|
| Simple rename, filter, cast | Stateless | Kafka Connect SMT |
| Moderate logic, multi-field expressions | Stateless | Kafka Connect SMT or no-code transform layer |
| Enrichment from external lookup | Stateless per-record | Flink SQL lookup join |
| Windowed aggregation, deduplication | Stateful | Flink SQL |
| Complex event detection, pattern matching | Stateful | Flink SQL MATCH_RECOGNIZE |
| ML scoring, custom business logic | Stateful + custom code | Flink Java/Python API |
Kafka Connect SMTs: When They Shine
SMTs are the right tool when:
- The transformation is simple and expressible in a few configuration lines
- You do not want to operate a separate Flink cluster
- You need to apply the transform to every message without exceptions
- The transform is stateless (each record is processed independently)
SMTs run inside the connector process, which means they add no network hops and have very low overhead. The tradeoff is limited expressibility - joins, aggregations, and any logic requiring context from other records are not possible.
Flink SQL: When It Is Necessary
Flink SQL becomes necessary when:
- The transformation requires joining two streams or a stream and a table
- You need windowed aggregations
- Deduplication across a time window is required
- Business logic is complex enough that JSON configuration is unmanageable
- You need to version, test, and review transformation logic like application code
No-Code Transform Layers
An emerging pattern in modern streaming platforms is a visual, no-code interface for defining field-level transforms. Rather than writing SMT JSON or Flink SQL, an engineer configures transforms through a UI: add a field, rename a column, apply a mask, set a default value.
This approach is particularly valuable for teams where data engineers and analysts collaborate - analysts can define transform requirements without writing pipeline code. Streamkap’s built-in transform layer, for example, lets you apply column mappings, filters, and masking rules directly in the pipeline configuration, with changes taking effect without a redeployment.
A Complete Transformation Pipeline
The following illustrates how multiple transformation patterns compose in a realistic pipeline:
Scenario: CDC stream from a PostgreSQL orders table needs to feed a real-time analytics dashboard.
Transformations required:
- Filter out internal test orders (
customer_id LIKE 'TEST-%') - Rename
created_attoorder_timeto match the destination schema - Enrich with customer account tier from a lookup table
- Mask
billing_emailvia hashing - Cast
total_amountfromVARCHARtoDECIMAL - Flatten the nested
shipping_addressstruct
INSERT INTO analytics_orders
SELECT
o.order_id,
o.order_time,
CAST(o.total_amount AS DECIMAL(12,2)) AS total_amount,
o.status,
-- Enrichment
c.account_tier,
c.acquisition_channel,
-- PII masking
MD5(CONCAT(o.billing_email, 'sk-salt-2026')) AS billing_email_hash,
-- Struct flattening
o.shipping_address['city'] AS shipping_city,
o.shipping_address['country'] AS shipping_country,
o.shipping_address['postcode'] AS shipping_postcode
FROM cdc_orders o
-- Filter: exclude internal test orders
WHERE o.customer_id NOT LIKE 'TEST-%'
-- Enrichment join
LEFT JOIN customer_profiles FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;
This single query applies six distinct transformation patterns in one pass - showing why Flink SQL is the right tool when complexity outgrows what SMTs can handle.
Operational Considerations
Schema evolution: Source schemas change. A field is added, renamed, or removed. Your transformation layer needs a policy for each case:
- New fields: pass through, drop, or fail?
- Renamed fields: use a compatibility alias or fail immediately?
- Removed fields: substitute a default or fail?
Define your policy before you need it. Schema registries (Confluent Schema Registry, AWS Glue Schema Registry) enforce compatibility rules and prevent incompatible schema changes from breaking downstream pipelines.
Testing transformations: Treat transformation logic as code. Unit test transformation functions, run integration tests with representative data samples, and include edge cases - nulls, empty strings, maximum values, Unicode characters.
Monitoring transform output: After a transformation runs, monitor its output distribution. Sudden changes in null rates, unexpected value distributions, or cardinality shifts often indicate a bug in the transformation logic or a change in the upstream data. For guidance on building a full monitoring framework, see Data Quality in Streaming Pipelines.
Summary
Stream data transformation is not a single feature - it is a set of composable patterns that together shape raw data into trusted, useful information. The key decisions are:
- Where to transform: source, pipeline, or destination
- What pattern applies: filter, map, enrich, mask, flatten, deduplicate
- Which tool to use: SMT for simple stateless logic, Flink SQL for complex stateful operations, a no-code layer for rapid iteration
Getting transformation right at the pipeline layer pays compound dividends: cleaner data in every downstream system, less repeated logic, faster query performance, and tighter compliance posture - all without modifying source systems or burdening destination teams.