<--- Back to all resources
Flink SQL: The Complete Guide to Stream Processing with SQL
Master Flink SQL for real-time stream processing. Learn dynamic tables, continuous queries, window functions, joins, and deployment patterns with practical examples.
Stream processing has historically required specialized knowledge: Java or Scala APIs, cluster tuning, stateful operator design. Flink SQL changes that equation. By expressing continuous computations in a language that most data engineers already know, it dramatically lowers the barrier to building real-time pipelines.
This guide covers everything you need to understand and use Flink SQL effectively - from the conceptual model that makes streaming SQL possible, to practical query patterns and deployment considerations.
The Core Abstraction: Dynamic Tables
Before writing a single query, it helps to understand why streaming SQL is conceptually different from batch SQL.
A traditional database table is a snapshot: a fixed set of rows at a point in time. A stream is the opposite - it is an unbounded sequence of events that never ends. Flink SQL bridges these two worlds through dynamic tables.
A dynamic table changes continuously as new events arrive. Querying it does not return a static result set; instead, it emits a continuous stream of result updates. This is the fundamental idea behind Flink SQL’s continuous query model: a SQL query runs indefinitely, and its output is itself a dynamic table that can feed into another query or be written to a sink.
There are two types of dynamic table:
- Append-only tables - rows are only added, never updated or deleted. A Kafka topic of click events is a natural append-only stream.
- Changelog tables - rows can be inserted, updated, or deleted. CDC (Change Data Capture) streams from databases like PostgreSQL or MySQL produce changelogs.
Understanding which type you are working with determines which SQL operations are valid and how output is emitted.
Setting Up: DDL Basics
In Flink SQL, you define sources and sinks using CREATE TABLE statements with a WITH clause that specifies the connector and its configuration.
-- Define a Kafka source
CREATE TABLE user_events (
user_id BIGINT,
event_type STRING,
page STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-sql-consumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
-- Define a sink (e.g., a PostgreSQL table)
CREATE TABLE page_view_counts (
page STRING,
window_end TIMESTAMP(3),
view_count BIGINT,
PRIMARY KEY (page, window_end) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/analytics',
'table-name'= 'page_view_counts'
);
The WATERMARK declaration is critical for event-time processing. It tells Flink how far behind the maximum observed event timestamp it should wait before considering a time window complete. A 5-second watermark means Flink tolerates events arriving up to 5 seconds late.
Time Semantics: Event Time vs Processing Time
Flink SQL supports two time domains:
| Aspect | Processing Time | Event Time |
|---|---|---|
| Source | System clock when record is processed | Timestamp field in the record |
| Determinism | Non-deterministic - reruns differ | Deterministic - same data, same results |
| Out-of-order handling | None needed | Requires watermarks |
| Latency | Lowest | Slightly higher |
| Recommended for | Prototyping, latency-critical | Production pipelines |
For production work, always use event time. The determinism guarantee makes debugging, backfilling, and reprocessing far simpler.
To use processing time, declare a computed column:
CREATE TABLE orders (
order_id BIGINT,
amount DECIMAL(10,2),
proc_time AS PROCTIME() -- processing-time attribute
) WITH ( ... );
Window Functions
Windowing is the mechanism that makes aggregation on unbounded streams tractable. Without windows, an aggregation like COUNT(*) would accumulate forever. Windows bound the aggregation to a finite slice of time.
Tumbling Windows
Tumbling windows divide the stream into non-overlapping, fixed-size time buckets.
SELECT
page,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
COUNT(*) AS view_count
FROM user_events
GROUP BY
page,
TUMBLE(event_time, INTERVAL '1' MINUTE);
Use tumbling windows when you need per-period metrics - hourly counts, daily totals - with no overlap.
Sliding Windows (Hopping Windows)
Sliding windows have a fixed size but advance by a smaller step, creating overlapping windows.
SELECT
page,
HOP_START(event_time, INTERVAL '30' SECOND, INTERVAL '5' MINUTE) AS window_start,
HOP_END(event_time, INTERVAL '30' SECOND, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS view_count
FROM user_events
GROUP BY
page,
HOP(event_time, INTERVAL '30' SECOND, INTERVAL '5' MINUTE);
The first interval is the slide (how often a new window starts), the second is the window size. This query emits a fresh 5-minute rolling count every 30 seconds - useful for dashboards and anomaly detection.
Session Windows
Session windows group events that are close together in time, separated by gaps of inactivity.
SELECT
user_id,
SESSION_START(event_time, INTERVAL '10' MINUTE) AS session_start,
SESSION_END(event_time, INTERVAL '10' MINUTE) AS session_end,
COUNT(*) AS events_in_session
FROM user_events
GROUP BY
user_id,
SESSION(event_time, INTERVAL '10' MINUTE);
A session window closes when 10 minutes pass with no events from that user. Session windows are ideal for user journey analysis, where you want to group naturally bounded bursts of activity.
Cumulative Windows (Flink 1.15+)
A newer addition, cumulative windows are useful for “metrics so far within this period” - for example, running totals within a day that reset at midnight.
SELECT
user_id,
CUMULATE_START(event_time, INTERVAL '1' HOUR, INTERVAL '1' DAY) AS window_start,
CUMULATE_END(event_time, INTERVAL '1' HOUR, INTERVAL '1' DAY) AS window_end,
SUM(amount) AS cumulative_revenue
FROM orders
GROUP BY
user_id,
CUMULATE(event_time, INTERVAL '1' HOUR, INTERVAL '1' DAY);
Joins in Flink SQL
Joining streams is one of the most powerful - and most nuanced - capabilities in Flink SQL. Unlike batch SQL, stream joins must handle the fact that both sides are continuously arriving and (potentially) unbounded.
Regular Stream-Stream Join
SELECT
o.order_id,
o.amount,
c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.event_time BETWEEN c.updated_at AND c.updated_at + INTERVAL '1' HOUR;
Regular stream-stream joins keep state for both sides. Without a time boundary, state grows unboundedly - always bound the join with an interval condition.
Interval Join
Interval joins are the idiomatic pattern for correlating two streams where matching events arrive within a known time window of each other.
SELECT
a.user_id,
a.action,
b.action AS follow_up_action
FROM user_events a, user_events b
WHERE a.user_id = b.user_id
AND b.event_time BETWEEN a.event_time AND a.event_time + INTERVAL '5' MINUTE;
This finds pairs of actions from the same user within 5 minutes. Flink can bound its state to the interval, making it memory-efficient.
Temporal Join (Point-in-Time Join)
A temporal join looks up the value of a slowly-changing dimension table as of the time of the stream event. This is critical for correctness when dimensions change over time (e.g., product prices, exchange rates).
-- Define versioned dimension table
CREATE TABLE exchange_rates (
currency STRING,
rate DECIMAL(10,6),
valid_from TIMESTAMP(3),
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
...
);
-- Temporal join: use the rate that was valid at order time
SELECT
o.order_id,
o.amount,
o.amount * r.rate AS amount_usd
FROM orders o
LEFT JOIN exchange_rates FOR SYSTEM_TIME AS OF o.event_time AS r
ON o.currency = r.currency;
Without a temporal join, you would get the current rate, not the historical rate at the time of the transaction - a subtle but important correctness issue.
Lookup Join
A lookup join enriches a stream with data from an external system (a database or key-value store) at query time.
SELECT
u.user_id,
u.event_type,
p.plan_tier,
p.company_name
FROM user_events u
JOIN user_profiles FOR SYSTEM_TIME AS OF u.proc_time AS p
ON u.user_id = p.user_id;
Flink caches lookup results to avoid hitting the external system on every record. Configure the cache TTL via connector hints.
Aggregations and Grouping
Beyond windowed aggregations, Flink SQL supports standard GROUP BY on streaming tables that emit changelogs (i.e., they retract and reissue updated results).
-- Running count of events by type (emits retractions as counts update)
SELECT
event_type,
COUNT(*) AS total_events
FROM user_events
GROUP BY event_type;
This emits a +I (insert) row when a new event type is first seen, then -U / +U (update-before / update-after) pairs as the count changes. Downstream sinks that support upserts (Kafka with key, JDBC with primary key) handle this automatically.
GROUPING SETS, ROLLUP, CUBE
Flink SQL supports multi-dimensional aggregations:
SELECT
page,
event_type,
COUNT(*) AS cnt
FROM user_events
GROUP BY GROUPING SETS (
(page, event_type),
(page),
()
);
This produces subtotals per (page, event_type), per page, and a grand total in a single pass - useful for pre-aggregating dashboards.
Pattern Matching with MATCH_RECOGNIZE
MATCH_RECOGNIZE is a lesser-known but useful Flink SQL feature for detecting patterns across a sequence of events - essentially a regular expression over rows.
SELECT *
FROM user_events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
FIRST(A.event_time) AS first_event_time,
LAST(C.event_time) AS conversion_time
PATTERN (A B* C)
DEFINE
A AS event_type = 'view_product',
B AS event_type <> 'purchase',
C AS event_type = 'purchase'
) AS pattern_matches;
This detects a “view then purchase” funnel: a product view followed eventually by a purchase, with any events in between.
Deployment Patterns
Flink SQL Gateway
The Flink SQL Gateway is a REST API server that accepts SQL statements and submits them as Flink jobs. It enables:
- Submitting long-running streaming SQL jobs without a client session
- Integrating SQL submission into CI/CD pipelines
- Running multiple isolated SQL sessions concurrently
# Start the SQL Gateway
./bin/sql-gateway.sh start
# Submit a streaming query via REST
curl -X POST http://localhost:8083/v1/sessions \
-H 'Content-Type: application/json' \
-d '{"properties": {"execution.runtime-mode": "streaming"}}'
Catalogs and Persistent Metadata
In a production deployment, you want your CREATE TABLE definitions to persist across sessions. Flink integrates with Hive Metastore and other catalogs to store table definitions persistently.
CREATE CATALOG my_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/etc/hive/conf'
);
USE CATALOG my_catalog;
USE my_database;
-- Now CREATE TABLE persists to the Hive Metastore
Managed Flink Services
Self-managing a Flink cluster requires deep operational expertise: JVM tuning, checkpoint storage management, upgrade coordination, and monitoring. Purpose-built platforms handle this infrastructure automatically, letting teams write SQL queries rather than manage clusters. Streamkap’s managed Flink environment, for example, couples the transformation layer directly with CDC ingestion - so you can define SQL transforms that run against your live change stream without provisioning any infrastructure.
Performance Considerations
A few practices make the difference between a Flink SQL job that runs well and one that struggles in production:
State size management: Every stateful operator (aggregations, joins) accumulates state. Understand what drives state growth - join keys, window sizes, group cardinality - and size your state backend and checkpointing accordingly.
Mini-batch optimization: For high-throughput aggregations, enable mini-batch processing to reduce the number of state accesses:
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '5000';
Watermark alignment: When joining two Kafka topics with different throughputs, watermark skew can cause one side to buffer indefinitely. Use watermark alignment (Flink 1.15+) to keep sources in sync.
Operator chaining: By default, Flink chains compatible operators into a single task to reduce network overhead. Understand when chaining is broken (different parallelism, shuffle) and whether it is intentional.
Summary
Flink SQL makes stream processing accessible through a familiar interface while exposing the full power of a distributed stateful stream processor. The key concepts to internalize are:
- Dynamic tables are the bridge between SQL semantics and streaming reality
- Watermarks are how Flink handles out-of-order events in event-time processing
- Window types determine the shape of your time-based aggregations
- Join types (interval, temporal, lookup) each solve a distinct streaming join problem
- Changelog semantics enable full SQL expressiveness on mutable streams
For teams looking to go deeper on how these concepts apply to real-world CDC pipelines, see our guide on Change Data Capture architecture and how transforms fit into the broader pipeline.