<--- Back to all resources
Flink SQL Cookbook: 20 Ready-to-Use Query Patterns
A practical cookbook of 20 Flink SQL query patterns for common stream processing tasks - filtering, aggregations, joins, deduplication, Top-N, and more. Copy, adapt, and deploy.
This cookbook is a reference collection of 20 Flink SQL patterns that solve common stream processing problems. Each pattern includes a ready-to-use SQL query, a brief explanation of how it works, and guidance on when to reach for it. The patterns are organized by category so you can jump directly to the problem you are solving.
All queries assume standard Flink SQL syntax (1.17+). Table names and column names are illustrative - replace them with your own schema. If you are running these on Streamkap’s managed Flink environment, you can deploy them directly against CDC sources without worrying about infrastructure.
Filtering & Routing
1. Simple Stream Filtering
SELECT
order_id,
customer_id,
amount,
order_status,
event_time
FROM orders
WHERE order_status = 'completed'
AND amount > 100.00;
The most fundamental streaming pattern. Flink evaluates the WHERE clause against every incoming record and forwards only matching rows downstream. Use this to reduce data volume early in a pipeline before heavier operations like joins or aggregations. Because filtering is stateless, it adds negligible overhead and scales linearly.
2. Content-Based Routing
-- Route high-value orders to a priority sink
INSERT INTO priority_orders
SELECT order_id, customer_id, amount, event_time
FROM orders
WHERE amount >= 1000.00;
-- Route everything else to the standard sink
INSERT INTO standard_orders
SELECT order_id, customer_id, amount, event_time
FROM orders
WHERE amount < 1000.00;
Content-based routing splits a single stream into multiple output sinks based on record attributes. In Flink SQL you express this as multiple INSERT INTO statements, each with its own WHERE predicate. Flink compiles these into a single execution graph with shared source reading, so the source is not scanned twice. Use this pattern when different downstream systems need different subsets of the same stream.
3. Deduplication
SELECT order_id, customer_id, amount, event_time
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY event_time ASC
) AS row_num
FROM orders
)
WHERE row_num = 1;
This pattern keeps only the first occurrence of each key using ROW_NUMBER() partitioned by the deduplication key. The ORDER BY event_time ASC ensures the earliest event wins. Flink maintains state for each partition key, so configure an appropriate state TTL to prevent unbounded state growth. Use this when upstream sources may produce duplicate records - common with at-least-once delivery guarantees or CDC replay scenarios.
Aggregations
4. Per-Minute Event Count (Tumble Window)
SELECT
window_start,
window_end,
event_type,
COUNT(*) AS event_count
FROM TABLE(
TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, event_type;
A tumbling window divides the stream into fixed, non-overlapping intervals. Here each one-minute window counts events grouped by type. The window fires once the watermark passes window_end, emitting a single result per group. Use this for regular time-series metrics, dashboards, or feeding downstream alerting systems that expect periodic counts.
5. Moving Average (Hop Window)
SELECT
window_start,
window_end,
sensor_id,
AVG(temperature) AS avg_temperature,
MAX(temperature) AS max_temperature
FROM TABLE(
HOP(TABLE sensor_readings, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '5' MINUTES)
)
GROUP BY window_start, window_end, sensor_id;
A hopping (sliding) window computes aggregates over overlapping intervals. This query creates five-minute windows that advance every one minute, so each record participates in up to five windows. The result is a smoothed moving average that updates every minute. Use this when you need trend detection or anomaly smoothing without the latency of waiting for a full window to close.
6. Running Total (Non-Windowed Group By)
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(amount) AS total_spent
FROM orders
GROUP BY customer_id;
A GROUP BY without a window function produces an updating (retract) stream. Each time a new order arrives for a customer, Flink retracts the previous aggregate and emits an updated row. This gives you a continuously maintained running total per key. Use this for real-time customer profiles, account balances, or any metric that must reflect the latest state. The downstream sink must support upserts (e.g., a database or key-value store).
7. Count Distinct Per Window
SELECT
window_start,
window_end,
page_url,
COUNT(DISTINCT user_id) AS unique_visitors
FROM TABLE(
TUMBLE(TABLE page_views, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, page_url;
COUNT(DISTINCT ...) inside a window calculates the number of unique values per window. Flink tracks distinct values in state for the duration of the window. This is more memory-intensive than a simple count because the engine must remember every distinct value it has seen within the window. Use this for unique visitor counts, unique device counts, or any cardinality metric. For extremely high cardinality, consider approximate counting with user-defined functions.
8. Top-N Per Window
SELECT product_id, window_start, total_sold, row_num
FROM (
SELECT
product_id,
window_start,
total_sold,
ROW_NUMBER() OVER (
PARTITION BY window_start
ORDER BY total_sold DESC
) AS row_num
FROM (
SELECT
window_start,
window_end,
product_id,
SUM(quantity) AS total_sold
FROM TABLE(
TUMBLE(TABLE sales, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, product_id
)
)
WHERE row_num <= 10;
This two-stage pattern first aggregates within a tumbling window, then ranks the results with ROW_NUMBER() to extract the top N. The outer WHERE row_num <= 10 filters to only the top 10 products by hourly sales. Use this for leaderboards, trending item lists, or any reporting scenario where you need a ranked subset of windowed results.
Joins
9. Stream Enrichment with Temporal Join
SELECT
o.order_id,
o.amount,
o.event_time,
c.customer_name,
c.customer_tier
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.event_time AS c
ON o.customer_id = c.customer_id;
A temporal join enriches a stream with the version of a dimension table that was current at the time of each event. The FOR SYSTEM_TIME AS OF clause tells Flink to look up the customer record that was valid at o.event_time, not the latest version. This is essential for correctness when dimension data changes over time. Use this to enrich transactional streams with slowly changing reference data.
10. Time-Bounded Event Correlation (Interval Join)
SELECT
o.order_id,
o.event_time AS order_time,
s.shipment_id,
s.event_time AS ship_time,
TIMESTAMPDIFF(MINUTE, o.event_time, s.event_time) AS fulfillment_minutes
FROM orders o, shipments s
WHERE o.order_id = s.order_id
AND s.event_time BETWEEN o.event_time AND o.event_time + INTERVAL '24' HOUR;
An interval join correlates events from two streams that share a key and occur within a bounded time range. Flink buffers records from both streams and matches them when the time constraint is satisfied. The BETWEEN clause defines the window - here, a shipment must occur within 24 hours of the order. Use this for matching request-response pairs, order-fulfillment tracking, or any scenario where two events are expected within a known time bound.
11. Lookup Join to External Database
SELECT
o.order_id,
o.product_id,
o.amount,
p.product_name,
p.category
FROM orders AS o
JOIN product_catalog FOR SYSTEM_TIME AS OF o.proc_time AS p
ON o.product_id = p.product_id;
A lookup join enriches each incoming record by querying an external system (database, cache, or API) at processing time. The FOR SYSTEM_TIME AS OF o.proc_time syntax with a processing-time attribute tells Flink to fetch the current value from the external table for each record. Unlike a temporal join, this always returns the latest version, not a historical snapshot. Use this when you need to enrich a stream with reference data from an external database and the dimension table does not require versioning.
12. Self-Join for Session Stitching
SELECT
a.user_id,
a.session_id AS anonymous_session,
b.session_id AS authenticated_session,
a.event_time AS first_seen,
b.event_time AS login_time
FROM clickstream a, clickstream b
WHERE a.device_fingerprint = b.device_fingerprint
AND a.user_id IS NULL
AND b.user_id IS NOT NULL
AND b.event_time BETWEEN a.event_time AND a.event_time + INTERVAL '30' MINUTE;
A self-join matches events from the same stream using different filter conditions. Here anonymous clickstream events (no user_id) are correlated with authenticated events (has user_id) using device_fingerprint as the join key, bounded to a 30-minute window. This lets you stitch together anonymous browsing sessions with known user identities after they log in. Use this for identity resolution, session merging, or linking pre-login behavior to user accounts.
Windowing
13. Tumbling Window with Late Data Handling
SELECT
window_start,
window_end,
sensor_id,
AVG(reading) AS avg_reading,
COUNT(*) AS sample_count
FROM TABLE(
TUMBLE(TABLE sensor_data, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, sensor_id;
-- Configure via table options or SET statements:
-- SET 'table.exec.source.idle-timeout' = '5s';
-- Watermark strategy in table DDL:
-- WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
Watermarks control when windows fire. Defining WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND tells Flink to tolerate up to 10 seconds of late data before closing a window. Records arriving after the watermark has passed are dropped by default. Tune the watermark delay based on your observed data lateness. Use this when you need aggregated sensor data, metrics, or event summaries and your source has variable network or processing delays.
14. Session Window for Activity Detection
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS events_in_session,
TIMESTAMPDIFF(SECOND, window_start, window_end) AS session_duration_seconds
FROM TABLE(
SESSION(TABLE clickstream, DESCRIPTOR(event_time), INTERVAL '30' MINUTE)
)
GROUP BY window_start, window_end, user_id;
A session window groups events that arrive within a specified gap of each other. If no new event arrives within 30 minutes, the window closes and emits results. Unlike tumbling or hopping windows, session windows have dynamic boundaries - they start when activity begins and end when it stops. Use this for user session analysis, detecting periods of machine activity, or grouping bursts of events that logically belong together.
15. Cumulate Window for Running Within-Day Totals
SELECT
window_start,
window_end,
product_id,
SUM(revenue) AS cumulative_revenue,
COUNT(*) AS cumulative_orders
FROM TABLE(
CUMULATE(TABLE sales, DESCRIPTOR(event_time), INTERVAL '1' HOUR, INTERVAL '1' DAY)
)
GROUP BY window_start, window_end, product_id;
A cumulate window emits running totals at regular intervals within a larger window. This query produces hourly snapshots of cumulative daily revenue per product - at 01:00 you see revenue from 00:00-01:00, at 02:00 you see revenue from 00:00-02:00, and so on until the full day resets. Use this for intra-day dashboards, progressive reports, or any case where stakeholders need to see accumulating totals throughout a time period without waiting for the full period to end.
CDC Patterns
16. CDC Changelog to Upsert Sink
-- Source table with CDC format
CREATE TABLE orders_cdc (
order_id STRING,
customer_id STRING,
amount DECIMAL(10, 2),
order_status STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
-- Sink table with upsert mode
CREATE TABLE orders_sink (
order_id STRING,
customer_id STRING,
amount DECIMAL(10, 2),
order_status STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders-processed',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO orders_sink
SELECT order_id, customer_id, amount, order_status, updated_at
FROM orders_cdc
WHERE order_status <> 'cancelled';
This pattern reads a CDC changelog (Debezium format), applies a filter, and writes to an upsert-capable sink. Flink interprets the Debezium envelope to understand inserts, updates, and deletes, then propagates the correct changelog operations downstream. The PRIMARY KEY declaration tells the sink to upsert on order_id. Use this as the foundation for any CDC-to-sink pipeline where you need to maintain a materialized, filtered view of a source table.
17. Slowly Changing Dimension Lookup from CDC
CREATE TABLE customers_cdc (
customer_id STRING,
customer_name STRING,
tier STRING,
updated_at TIMESTAMP(3),
WATERMARK FOR updated_at AS updated_at - INTERVAL '5' SECOND,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'customers',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
SELECT
o.order_id,
o.amount,
o.event_time,
c.customer_name,
c.tier
FROM orders AS o
JOIN customers_cdc FOR SYSTEM_TIME AS OF o.event_time AS c
ON o.customer_id = c.customer_id;
This combines temporal join semantics with a CDC-sourced dimension table. As the customers_cdc table receives change events, Flink maintains a versioned history of each key. When an order arrives, the join looks up the customer record that was valid at the order’s event time. This gives you SCD Type 2 behavior without maintaining a separate history table. Use this when your dimension data comes from a CDC stream and you need point-in-time correct enrichment.
18. CDC Event Counting (Inserts vs Updates vs Deletes)
SELECT
window_start,
window_end,
table_name,
SUM(CASE WHEN op = 'c' THEN 1 ELSE 0 END) AS inserts,
SUM(CASE WHEN op = 'u' THEN 1 ELSE 0 END) AS updates,
SUM(CASE WHEN op = 'd' THEN 1 ELSE 0 END) AS deletes,
COUNT(*) AS total_changes
FROM TABLE(
TUMBLE(TABLE cdc_events, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, table_name;
This pattern breaks down CDC events by operation type within time windows. The op field in Debezium events indicates the operation: c for create (insert), u for update, d for delete, and r for snapshot read. Counting these per minute gives you a real-time change rate dashboard for your source databases. Use this for operational monitoring, capacity planning, or detecting unusual write patterns that might indicate application issues or data quality problems.
Analytics
19. Sessionization (Assign Session IDs)
SELECT
user_id,
event_time,
event_type,
page_url,
session_start,
CONCAT(
CAST(user_id AS STRING),
'_',
DATE_FORMAT(session_start, 'yyyyMMddHHmmss')
) AS session_id
FROM (
SELECT
user_id,
event_time,
event_type,
page_url,
window_start AS session_start
FROM TABLE(
SESSION(TABLE clickstream, DESCRIPTOR(event_time), INTERVAL '30' MINUTE)
)
GROUP BY window_start, window_end, user_id, event_time, event_type, page_url
);
This pattern assigns a deterministic session ID to every event by combining the user ID with the session window start time. The session window groups events with a 30-minute inactivity gap, and the window_start timestamp anchors the session identity. Every event within the same session receives the same session_id. Use this when downstream analytics systems need an explicit session identifier to group user activity - for example, feeding a data warehouse, building session-scoped aggregations, or powering user journey visualizations.
20. Funnel Analysis (MATCH_RECOGNIZE)
SELECT
user_id,
step1_time,
step2_time,
step3_time,
TIMESTAMPDIFF(SECOND, step1_time, step3_time) AS funnel_duration_seconds
FROM clickstream
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
A.event_time AS step1_time,
B.event_time AS step2_time,
C.event_time AS step3_time
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B C) WITHIN INTERVAL '1' HOUR
DEFINE
A AS A.page_url = '/product',
B AS B.page_url = '/cart',
C AS C.page_url = '/checkout/complete'
);
MATCH_RECOGNIZE applies complex event processing (CEP) patterns to a stream. This query defines a three-step conversion funnel: product view, add to cart, and checkout completion. The PATTERN (A B C) clause specifies the sequence, WITHIN INTERVAL '1' HOUR bounds the time allowed, and DEFINE maps each step to a filter condition. Flink evaluates this pattern continuously for each user. Use this for conversion funnel analysis, fraud sequence detection, or any scenario where you need to identify ordered sequences of events within a time bound. MATCH_RECOGNIZE is one of the most powerful features in Flink SQL and can replace many custom stateful applications.