<--- Back to all resources
Real-Time Data Enrichment: Joining Streams with Reference Data
Learn how to enrich streaming data with reference data using lookup joins, temporal joins, and stream-to-stream joins - with practical architecture patterns.
Every event stream carries a partial picture. A payment event contains an account ID but not the customer’s risk tier. A clickstream event contains a product SKU but not the product’s category or price. A logistics event contains a shipment ID but not the origin warehouse or carrier SLA. To make streaming data useful - for real-time analytics, fraud detection, personalization, or operational intelligence - you need to join it with context.
That context lives in reference data: the dimension tables, lookup stores, and supporting event streams that transform raw events into actionable facts. This guide covers the four principal techniques for enriching streaming data in real time: lookup joins, temporal joins, stream-to-stream joins, and slowly changing dimension management. For each technique, we cover the mechanics, the tradeoffs, and practical implementation patterns.
Why Enrichment Is Hard in Motion
Batch enrichment is straightforward: read events, read reference table, join on key, write output. Everything is at rest. Streaming enrichment is harder because the two sides of the join exist in fundamentally different states:
- The event stream is unbounded, ordered (usually), and arrives at high velocity
- The reference data is bounded, changes slowly, and is often stored in an OLTP database not designed for high-frequency point lookups from a stream processor
The core tension in streaming enrichment is freshness vs. throughput. Querying a remote database for every event is perfectly fresh but completely impractical at scale. Caching reference data locally is fast but introduces staleness. Understanding this tradeoff shapes every architectural decision.
A second tension is correctness vs. simplicity. Should enrichment reflect the current state of reference data, or the state at the moment the event occurred? For many operational use cases current state is fine. For audit trails, financial calculations, and historical analytics, point-in-time correctness is mandatory.
Technique 1: Lookup Joins (Async Remote Lookups)
A lookup join queries an external data store - a database, a key-value store, or a REST API - for each incoming event. The stream processor uses a key from the event to fetch the corresponding reference record on demand.
How It Works
Event stream: [order_id=123, customer_id=456, amount=99.00]
|
v
Lookup: customers WHERE id=456
|
v
Enriched event: [order_id=123, customer_id=456, amount=99.00,
customer_tier="gold", customer_country="US"]
The lookup can be synchronous (the stream processor blocks on the result) or asynchronous (the processor issues many concurrent lookups and reassembles results). Asynchronous lookup is far more efficient at scale - modern stream processors like Apache Flink’s AsyncFunction can keep thousands of lookups in flight simultaneously.
Implementation: Flink AsyncFunction Pattern
public class CustomerEnrichmentFunction
extends RichAsyncFunction<OrderEvent, EnrichedOrderEvent> {
private transient CustomerLookupClient client;
@Override
public void open(Configuration parameters) {
client = new CustomerLookupClient(connectionPool);
}
@Override
public void asyncInvoke(OrderEvent event,
ResultFuture<EnrichedOrderEvent> resultFuture) {
CompletableFuture
.supplyAsync(() -> client.getCustomer(event.getCustomerId()))
.thenAccept(customer -> {
EnrichedOrderEvent enriched = new EnrichedOrderEvent(event, customer);
resultFuture.complete(Collections.singleton(enriched));
});
}
}
Caching for Lookup Joins
Querying a database for every event is unsustainable. A lookup cache - typically in-heap or off-heap in the stream processor’s local memory - reduces external calls dramatically.
Event stream (10,000 events/sec)
|
v
[Local LRU Cache] ← TTL: 5 minutes, capacity: 100,000 keys
hit | miss
| |
| v
| [Remote DB / Redis]
v
Enriched events
Cache design considerations:
| Parameter | Guidance |
|---|---|
| TTL | Match reference data change frequency (see FAQ) |
| Capacity | Size to hold the hot key space for your workload |
| Eviction | LRU works for most cases; LFU for highly skewed access |
| Warm-up | Pre-populate cache on job startup to avoid cold-start misses |
| Invalidation | Subscribe to a CDC feed from the reference table for push-based invalidation |
The most reliable cache invalidation strategy is not TTL-based expiration but CDC-driven invalidation: run a CDC pipeline from the reference table and update the cache the moment a row changes. This eliminates the staleness window entirely.
Lookup Join Tradeoffs
| Property | Value |
|---|---|
| Freshness | Current state (as of cache TTL) |
| Point-in-time correctness | No - reflects latest value |
| Implementation complexity | Low to medium |
| Throughput impact | Low with caching; high without |
| External dependency | Yes - requires reachable reference store |
Technique 2: Temporal Joins (Point-in-Time Correctness)
A temporal join queries the state of reference data as it was at a specific moment - typically the timestamp embedded in the event. This guarantees historical correctness: enriching an order placed six months ago uses the customer tier that was in effect six months ago, not the current tier.
The Problem Temporal Joins Solve
Suppose you’re enriching orders with product prices. A product’s price changes on January 15. An order placed on January 10 should carry the pre-change price. An order placed on January 20 should carry the post-change price. A lookup join uses the current price for both. A temporal join uses the correct price for each.
Implementing Temporal Joins
Option 1: Versioned State Table
Maintain a versioned reference table with effective_from and effective_to timestamps:
CREATE TABLE product_prices_versioned (
product_id VARCHAR,
price DECIMAL(10,2),
effective_from TIMESTAMP,
effective_to TIMESTAMP, -- NULL means currently active
PRIMARY KEY (product_id, effective_from)
);
The join predicate becomes:
SELECT
o.order_id,
o.product_id,
o.quantity,
p.price,
o.quantity * p.price AS line_total
FROM orders o
JOIN product_prices_versioned p
ON o.product_id = p.product_id
AND o.event_time >= p.effective_from
AND (o.event_time < p.effective_to OR p.effective_to IS NULL)
Option 2: Flink Temporal Table Function
Apache Flink has native support for temporal joins via the FOR SYSTEM_TIME AS OF SQL syntax:
SELECT
o.order_id,
o.product_id,
p.price,
o.quantity * p.price AS line_total
FROM orders AS o
JOIN product_catalog FOR SYSTEM_TIME AS OF o.event_time AS p
ON o.product_id = p.product_id
Flink maintains versioned state for the reference table automatically when the source is a CDC-enabled changelog stream.
Sourcing Versioned Reference Data
Temporal joins require a changelog - an ordered record of every change to the reference table. Change data capture (CDC) provides exactly this. By reading CDC events from your reference database into Kafka, you get an ordered, replayable changelog that a stream processor can use to reconstruct any historical version of the table.
Understanding change data capture is foundational to implementing temporal joins correctly.
Temporal Join Tradeoffs
| Property | Value |
|---|---|
| Freshness | As of event timestamp |
| Point-in-time correctness | Yes - by design |
| Implementation complexity | Medium to high |
| Throughput impact | Medium (state management overhead) |
| External dependency | Changelog stream (Kafka + CDC) |
Technique 3: Stream-to-Stream Joins
When both sides of the join are high-velocity event streams - not a stream and a reference table - you need a stream-to-stream join. The classic examples are correlating impressions with clicks, matching requests with responses, and pairing order events with fulfillment events.
The Core Challenge: Unbounded State
In a batch join, both tables fit in memory or on disk. In a stream-to-stream join, events from side A need to wait for matching events from side B, which may arrive seconds, minutes, or hours later. This waiting requires buffering state in the stream processor - and without bounds, that state grows forever.
The solution is event time windows: only join events that fall within a defined time window of each other.
Window-Based Stream-to-Stream Join
Ad Impressions: [user=U1, ad=A99, ts=10:00:01]
[user=U2, ad=A77, ts=10:00:03]
Ad Clicks: [user=U1, ad=A99, ts=10:00:14] ← arrived 13 seconds later
Join window: 30 minutes
Result: [user=U1, ad=A99, impression_ts=10:00:01, click_ts=10:00:14,
time_to_click_sec=13]
In Flink SQL:
SELECT
i.user_id,
i.ad_id,
i.impression_ts,
c.click_ts,
TIMESTAMPDIFF(SECOND, i.impression_ts, c.click_ts) AS seconds_to_click
FROM ad_impressions AS i
JOIN ad_clicks AS c
ON i.user_id = c.user_id
AND i.ad_id = c.ad_id
AND c.click_ts BETWEEN i.impression_ts AND i.impression_ts + INTERVAL '30' MINUTE
Interval Join Configuration
The join interval is the most critical tuning parameter:
| Join Interval | State Retention | Late Event Handling |
|---|---|---|
| 1 minute | Very small | Events arriving >1 min late will miss their match |
| 30 minutes | Medium | Covers most user interaction latency |
| 24 hours | Large | Captures slow conversion funnels; significant state cost |
For long-window joins (hours to days), consider an external state store (RocksDB, Redis) rather than in-memory state.
Handling Late and Out-of-Order Events
Stream-to-stream joins must account for events that arrive late or out of order due to network delays, producer retries, or clock skew. Watermarks - logical timestamps that signal “I’ve seen all events up to time T” - define when a window can be safely closed and state discarded.
Watermark = event_time - allowed_lateness
# Allow up to 5 minutes of late arrivals:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(5))
Events arriving after the watermark has advanced past their window are either dropped, routed to a side output for separate handling, or trigger a window update (depending on trigger configuration).
Technique 4: Slowly Changing Dimensions in Streaming
Reference data is rarely static. Customer segments change as purchase behavior evolves. Product categories get reorganized. Pricing tiers shift with contract renewals. In data warehousing, these are called slowly changing dimensions (SCDs). In streaming, managing them correctly requires deliberate strategy.
SCD Type 1: Overwrite
The simplest approach: when reference data changes, overwrite the cached value. Past events enriched with old values are not corrected. This is appropriate when historical accuracy is not required - for example, enriching clickstream events with the current product name.
Implementation: TTL-based cache with write-through updates from CDC.
SCD Type 2: Versioned History
When historical accuracy matters, maintain a full version history with effective dates. This is what temporal joins require. Each change creates a new row; old rows are preserved with a closed effective_to date.
CDC pipeline for SCD Type 2:
PostgreSQL (products table)
|
| CDC (via Streamkap or Debezium)
v
Kafka topic: products.changelog
|
v
Flink: Convert changelog to versioned state table
|
v
Temporal join: enrich events at their event_time
SCD Type 3: Limited History
Store only the current value and one previous value. Useful when you need to detect recent transitions (e.g., customer just upgraded from free to paid) but don’t need full history.
SCD Type 6: Combined
A hybrid of Types 1, 2, and 3 that maintains current values in columns alongside a full version history in rows. Useful for analytical workloads but complex to maintain in a streaming context.
Practical Example: Order Enrichment Pipeline
Here is a complete enrichment architecture for an e-commerce order processing pipeline:
MySQL (orders, customers, products)
|
| CDC (change data capture)
v
Kafka Topics:
- orders (raw order events)
- customers.changelog (CDC changelog)
- products.changelog (CDC changelog)
|
v
Flink Job: Order Enrichment
- orders stream → primary input
- customers.changelog → temporal table (versioned, join at order.created_at)
- products.changelog → lookup cache (TTL 10 min, CDC invalidation)
|
v
Enriched Orders:
- customer_id, customer_tier, customer_country (at time of order)
- product_id, product_name, product_category (current name)
- order_amount, enriched_margin
|
v
Destinations:
- Snowflake (analytics)
- Elasticsearch (real-time search)
- Redis (fraud scoring context)
Customer data uses a temporal join because pricing and risk models depend on the customer tier that was in effect when the order was placed. Product data uses a lookup join with CDC-based cache invalidation because the product name and category are display attributes where current state is acceptable.
Choosing the Right Enrichment Pattern
| Scenario | Recommended Pattern |
|---|---|
| Enrich events with current reference data, some staleness OK | Lookup join with TTL cache |
| Enrich events with reference data valid at event timestamp | Temporal join |
| Correlate two high-velocity event streams | Stream-to-stream join with interval |
| Reference data changes rarely, must be consistent | Broadcast join (replicate to all partitions) |
| Reference data changes frequently, zero tolerance for staleness | CDC-driven cache invalidation |
The underlying principle: match the enrichment pattern to the correctness requirement and the velocity of both sides of the join. When in doubt, favor temporal joins - the performance overhead is manageable with a properly sized state backend, and the correctness guarantees prevent subtle bugs that are expensive to debug in production.
Streamkap’s CDC pipelines provide the changelog streams that power temporal joins and CDC-driven cache invalidation - keeping reference data in Flink state synchronized with the source database without manual extract jobs.
Summary
Real-time data enrichment transforms incomplete events into actionable facts by joining streams with reference context. The four core techniques - lookup joins, temporal joins, stream-to-stream joins, and SCD management - address different combinations of velocity, correctness, and freshness requirements. Choosing correctly requires understanding whether you need current or point-in-time reference data, how fast both sides of the join change, and how much state your stream processor can maintain. With the right pattern in place, enrichment adds context at the moment it is most valuable: in motion, before the event reaches its destination.