<--- Back to all resources

Engineering

February 25, 2026

12 min read

Building a Real-Time Recommendation Engine with Apache Flink

Learn how to build a real-time recommendation engine using Apache Flink. Covers collaborative filtering on streams, feature computation, session-based recommendations, and writing to serving stores.

TL;DR: • Apache Flink enables real-time collaborative filtering and feature computation on live event streams, replacing batch recommendation pipelines that update once a day. • Session-based recommendations use Flink windowing to capture short-term user intent without requiring historical profiles. • Flink SQL simplifies feature aggregation, item scoring, and writing enriched signals to low-latency serving stores like Redis or DynamoDB. • Streamkap provides managed Flink infrastructure that lets your team focus on recommendation logic instead of cluster operations.

Recommendations that update once a day are recommendations that lie. A user who just added running shoes to their cart does not want to see the yoga mat they browsed last Tuesday. By the time a nightly batch job recalculates scores, the moment is gone. The user has either bought the shoes or left the site entirely.

This is why recommendation engines are moving to stream processing. Apache Flink gives you the ability to process user interactions the instant they happen, compute features on live data, and push updated recommendations to a serving store within seconds. In this guide, we will walk through how to build a real-time recommendation engine with Flink, covering collaborative filtering on streams, session-based recommendations, feature computation pipelines, and writing results to low-latency stores.

Why Batch Recommendations Fall Short

Traditional recommendation systems follow a predictable pattern. A scheduled job runs overnight, pulls all user-item interactions from the past N days, trains or updates a model, and writes a lookup table of user-to-recommendation mappings. The application reads from that table until the next batch run overwrites it.

This approach has three problems that compound as your product grows:

Stale signals. A user’s intent changes by the minute. Batch pipelines capture what users did yesterday, not what they are doing right now. For e-commerce, media, and content platforms, this delay directly costs conversions.

Cold start blindness. New users and new items get no recommendations until the next batch run processes their interactions. If your catalog changes frequently or you have high new-user traffic, a significant portion of your audience sees generic fallback recommendations.

Resource spikes. Nightly batch jobs that process millions of interactions create predictable but painful compute spikes. You provision for peak load, pay for it 24 hours a day, and still cross your fingers that the job finishes before the morning traffic ramp.

Flink eliminates all three problems by processing interactions continuously.

Architecture Overview

A real-time recommendation engine built on Flink has four main components:

  1. Event ingestion — User interactions (clicks, views, purchases, ratings) stream into Kafka topics.
  2. Feature computation — Flink jobs consume these events, compute aggregation features, and maintain stateful user and item profiles.
  3. Scoring and ranking — Flink applies scoring logic (co-occurrence, weighted interactions, or model inference) to rank candidate items for each user.
  4. Serving store writes — Flink writes the top-N recommendations for each user to a low-latency store like Redis, DynamoDB, or Aerospike, where the application reads them at request time.

Streamkap fits naturally at the ingestion layer. If your product catalog lives in PostgreSQL and your user profiles sit in MongoDB, Streamkap streams CDC changes from both databases into Kafka topics. Flink then joins these streams with the clickstream events to produce enriched features without you ever running a batch export.

Setting Up the Event Schema

Before writing any Flink SQL, you need to define the events your recommendation engine will consume. A typical clickstream event looks like this:

CREATE TABLE user_events (
  user_id       STRING,
  item_id       STRING,
  event_type    STRING,    -- 'view', 'click', 'add_to_cart', 'purchase'
  category      STRING,
  event_time    TIMESTAMP(3),
  session_id    STRING,
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'user-events',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset'
);

The WATERMARK declaration tells Flink how to handle event-time processing. The 5-second tolerance accommodates events that arrive slightly out of order, which is common in distributed systems where mobile clients and web browsers emit events with unpredictable network delays.

You also need a product catalog table, which Streamkap can keep in sync from your source database:

CREATE TABLE product_catalog (
  item_id       STRING,
  item_name     STRING,
  category      STRING,
  price         DECIMAL(10, 2),
  tags          ARRAY<STRING>,
  updated_at    TIMESTAMP(3),
  PRIMARY KEY (item_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'product-catalog-cdc',
  'properties.bootstrap.servers' = 'kafka:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

Collaborative Filtering on Streams

Collaborative filtering works on a simple principle: users who interacted with the same items in the past will likely interact with the same items in the future. In a batch system, you build a full co-occurrence matrix from historical data. In a streaming system, you maintain and update that matrix incrementally.

The first step is computing item co-occurrence pairs within user sessions. When a user views item A and then views item B in the same session, those items have a co-occurrence signal.

CREATE VIEW item_pairs AS
SELECT
  a.item_id AS item_a,
  b.item_id AS item_b,
  COUNT(*)  AS co_occurrence_count
FROM user_events a
JOIN user_events b
  ON a.user_id = b.user_id
  AND a.session_id = b.session_id
  AND a.item_id < b.item_id
  AND b.event_time BETWEEN a.event_time AND a.event_time + INTERVAL '30' MINUTE
GROUP BY a.item_id, b.item_id;

This query continuously updates co-occurrence counts as new events arrive. The item_id < item_id condition avoids counting the same pair twice. The 30-minute window limits co-occurrence to interactions that are temporally related.

To turn co-occurrence counts into recommendations, you score candidate items for a given user based on what they have recently interacted with:

CREATE VIEW user_recommendations AS
SELECT
  e.user_id,
  ip.item_b AS recommended_item,
  SUM(ip.co_occurrence_count) AS score
FROM user_events e
JOIN item_pairs ip
  ON e.item_id = ip.item_a
WHERE e.event_type IN ('click', 'add_to_cart', 'purchase')
GROUP BY e.user_id, ip.item_b
ORDER BY score DESC;

Session-Based Recommendations

Not every user has a deep interaction history. New visitors, logged-out users, and infrequent visitors pose a cold-start problem for traditional collaborative filtering. Session-based recommendations address this by focusing entirely on what the user is doing right now.

Flink’s session windows are designed exactly for this pattern. A session window groups events that are separated by less than a configurable gap:

CREATE VIEW session_interactions AS
SELECT
  session_id,
  user_id,
  COLLECT(item_id)   AS viewed_items,
  COLLECT(category)  AS viewed_categories,
  COUNT(*)           AS interaction_count,
  window_start,
  window_end
FROM TABLE(
  SESSION(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '30' MINUTE)
)
GROUP BY session_id, user_id, window_start, window_end;

Within each session, you can compute short-term preference signals. If a user has viewed three items in the “running shoes” category and one in “casual sneakers,” the session signal strongly points toward running shoes. You can use this to boost items from the dominant category:

CREATE VIEW session_category_affinity AS
SELECT
  session_id,
  user_id,
  category,
  COUNT(*) AS category_views,
  COUNT(*) * 1.0 / SUM(COUNT(*)) OVER (PARTITION BY session_id) AS affinity_score
FROM user_events
GROUP BY session_id, user_id, category;

This affinity_score gives you a normalized preference weight per category within the current session. Items from higher-affinity categories get ranked above items from lower-affinity categories.

Feature Computation Pipeline

Raw events are noisy. A single page view means less than a purchase. A click on a product after searching for it means more than a click from a homepage carousel. Your recommendation engine needs weighted features that capture signal strength.

Here is a feature computation pipeline that assigns weights to different interaction types and aggregates them over a sliding window:

CREATE VIEW weighted_interactions AS
SELECT
  user_id,
  item_id,
  SUM(
    CASE event_type
      WHEN 'purchase'     THEN 10.0
      WHEN 'add_to_cart'  THEN 5.0
      WHEN 'click'        THEN 2.0
      WHEN 'view'         THEN 1.0
      ELSE 0.5
    END
  ) AS interaction_score,
  COUNT(*) AS event_count,
  MAX(event_time) AS last_interaction
FROM TABLE(
  HOP(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
)
GROUP BY user_id, item_id, window_start, window_end;

This hop window (also known as a sliding window) evaluates every 5 minutes and looks back over the past hour. Each user-item pair gets a composite score that reflects both the recency and the strength of the interaction.

You can further enrich these features by joining against the product catalog:

CREATE VIEW enriched_interactions AS
SELECT
  w.user_id,
  w.item_id,
  w.interaction_score,
  p.category,
  p.price,
  p.tags
FROM weighted_interactions w
JOIN product_catalog p
  ON w.item_id = p.item_id;

Because the product_catalog table is backed by a Kafka topic that Streamkap keeps in sync with your source database, this join always reflects the current state of your catalog. If a product’s category changes or its price updates, the enriched features reflect that immediately.

Writing to a Serving Store

Recommendations are useless if your application cannot read them at low latency. The final step in the pipeline writes the top-N items per user to a serving store. Redis is a common choice because of its sub-millisecond read latency and built-in sorted sets.

CREATE TABLE user_recommendations_sink (
  user_id          STRING,
  recommended_item STRING,
  score            DOUBLE,
  updated_at       TIMESTAMP(3),
  PRIMARY KEY (user_id, recommended_item) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://serving-db:5432/recommendations',
  'table-name' = 'user_recommendations',
  'driver' = 'org.postgresql.Driver'
);

INSERT INTO user_recommendations_sink
SELECT
  user_id,
  recommended_item,
  score,
  CURRENT_TIMESTAMP AS updated_at
FROM user_recommendations;

For Redis specifically, you would use the Flink Redis connector or a custom sink function. The pattern is the same: Flink continuously updates the serving store so that the application always reads fresh recommendations.

At serving time, your application simply queries the store by user ID and retrieves a pre-ranked list of items. There is no model inference at request time, no batch lookup, and no stale cache. The recommendations are as fresh as the last event Flink processed, which is typically seconds old.

Handling Late Data and Retractions

Real-time systems have to deal with late-arriving data. A mobile app might buffer events during a subway ride and flush them all at once when connectivity resumes. Flink’s watermark mechanism handles this gracefully, but you need to configure it appropriately for your use case.

The watermark delay you set on your source table determines how long Flink waits for late events before finalizing a window. A 5-second delay works for web clickstream data where events arrive quickly. For mobile-heavy traffic, you might increase this to 30 seconds or even a few minutes.

Flink also supports retractions, which means that if a previously emitted result changes because a late event arrived, Flink can issue an update. This is particularly important for recommendation scores that aggregate over windows. When a new event arrives and changes the co-occurrence count, Flink retracts the old count and emits the updated one. Downstream sinks that support upserts (like JDBC or upsert-Kafka) handle this automatically.

Scaling Considerations

Recommendation engines process a lot of state. Every user-item interaction pair, every co-occurrence count, and every session window is maintained in Flink’s state backend. For a platform with millions of users and tens of thousands of items, this state can grow to terabytes.

Key scaling strategies include:

Partition by user ID. This ensures that all events for a given user are processed by the same Flink task manager, which keeps state local and avoids expensive cross-partition lookups.

Use RocksDB state backend. For large state sizes, RocksDB spills to disk while keeping hot data in memory. This lets you handle state that exceeds available RAM without crashing.

Tune checkpointing intervals. More frequent checkpoints mean faster recovery but higher overhead. For recommendation engines, a 60-second checkpoint interval is a reasonable starting point.

Limit candidate sets. Instead of scoring every item in your catalog for every user, pre-filter candidates based on category affinity or popularity. This reduces the computational cost of the scoring step by orders of magnitude.

Streamkap’s managed Flink offering handles the operational side of these scaling concerns. You configure your parallelism and state backend preferences, and Streamkap manages the underlying cluster, checkpoint storage, and resource allocation.

Monitoring and Observability

A recommendation engine that silently degrades is worse than one that breaks loudly. You should monitor several key metrics:

Event throughput. Track events per second at the Kafka consumer and at each Flink operator. A sudden drop in throughput usually indicates a backpressure issue or an upstream producer failure.

Recommendation freshness. Measure the time between a user event and the corresponding update in the serving store. If this latency creeps above a few seconds, your recommendations are starting to go stale.

State size growth. Monitor the size of Flink’s managed state. Unbounded growth suggests a missing TTL configuration or a window that never closes.

Serving store hit rate. Track how often the application finds pre-computed recommendations in the serving store versus falling back to a default list. A low hit rate means your pipeline is not covering enough users.

Putting It All Together

Building a real-time recommendation engine with Flink replaces a fragile batch pipeline with a system that responds to user behavior as it happens. The architecture is straightforward: events flow from your application into Kafka, Flink computes features and scores continuously, and results land in a low-latency serving store that your application queries at request time.

The complexity is in the details: tuning watermarks, managing state, scaling co-occurrence computations, and keeping the serving store consistent. But these are engineering problems with well-understood solutions, not unsolved research challenges.

If you are currently running a nightly batch recommendation job and feeling the pain of stale results, start small. Pick one recommendation surface in your product, build a Flink pipeline that computes scores for that surface, and compare conversion rates against the batch baseline. The difference usually speaks for itself.

Streamkap can help you get there without building a Flink cluster from scratch. With managed Flink and CDC streaming from your databases, you get the infrastructure layer handled so you can focus on the recommendation logic that actually moves your business metrics.