<--- Back to all resources
Real-Time Aggregations in Flink SQL: COUNT, SUM, AVG Over Streams
Learn how to compute real-time aggregations in Flink SQL - windowed and non-windowed COUNT, SUM, AVG, MIN, MAX over streaming data with practical examples.
Aggregations are the beating heart of stream processing. Whether you need a live revenue counter, a per-minute error rate, or a rolling average latency over the last five minutes, the answer in Flink SQL almost always starts with GROUP BY and an aggregate function. What makes streaming aggregations fundamentally different from their batch SQL counterparts is the dimension of time: data never stops arriving, so you must decide whether to bound your computation with a window or let it run continuously.
This guide walks through every aggregation pattern Flink SQL offers, from tumbling windows to unbounded GROUP BY to row-level OVER clauses, with production-ready examples and the performance trade-offs you need to understand before deploying.
Windowed vs Non-Windowed Aggregations
The single most important architectural decision when writing a Flink SQL aggregation is whether to use a window. This choice determines three things at once: how much state your job will hold, what kind of results it produces, and which sinks it can write to.
Windowed aggregations divide the infinite stream into finite chunks of time. A tumbling window of one minute collects all events that arrive within that minute, computes the aggregate, emits a single result row when the window closes, and then discards the state. The result is append-only: each window produces exactly one new row, never updating a previous one.
Non-windowed aggregations (a plain GROUP BY without any window function) maintain a running result that updates with every incoming event. If you write SELECT product_id, SUM(amount) FROM orders GROUP BY product_id, Flink will emit an updated sum every time a new order arrives for that product. This means the downstream sink receives retractions (deletes of the old value) followed by insertions of the new value, which is called an updating or retract stream.
Use windowed aggregations when you need periodic summaries (dashboards, reports, alerting thresholds). Use non-windowed aggregations when you need a live, always-current answer (a running total, a real-time leaderboard) and your sink supports upsert semantics.
Windowed Aggregations
Flink SQL provides three window types through dedicated table-valued functions: TUMBLE, HOP, and SESSION. All three follow the same pattern: you apply the window function in the FROM clause and then GROUP BY the window’s start and end columns.
Tumbling Windows
A tumbling window assigns each event to exactly one non-overlapping, fixed-size interval. This is the most common window type for dashboards and periodic metrics.
SELECT
window_start,
window_end,
product_id,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, product_id;
This query computes per-product order counts, revenue, and average order value for every one-minute interval. When the minute closes, Flink emits one row per product and releases the state for that window. State is bounded: it only holds the current open window.
Sliding (Hop) Windows
A hop window produces overlapping intervals. You specify both the window size and the slide interval. This is useful when you want a smoother view of your data, for example a five-minute average that updates every minute.
SELECT
window_start,
window_end,
service_name,
COUNT(*) AS request_count,
SUM(CASE WHEN status_code >= 500 THEN 1 ELSE 0 END) AS error_count
FROM TABLE(
HOP(TABLE requests, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '5' MINUTES)
)
GROUP BY window_start, window_end, service_name;
Each event belongs to multiple windows (five, in this case), so state is proportionally larger than a tumbling window. A five-minute hop window with a one-minute slide holds five open windows at any given time.
Session Windows
Session windows group events that arrive within a configurable gap of each other. They are ideal for user-session analytics where activity is bursty and unpredictable.
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS actions,
MAX(event_time) AS last_activity
FROM TABLE(
SESSION(TABLE clickstream, DESCRIPTOR(event_time), INTERVAL '30' MINUTES)
)
GROUP BY window_start, window_end, user_id;
A session closes when no event for that key arrives within the gap interval (30 minutes here). State per key stays open until the session ends, so high-cardinality keys with long gaps can accumulate significant state.
Non-Windowed (Continuous) Aggregations
When you omit a window, Flink SQL treats the aggregation as unbounded. Every incoming event triggers an updated result for its group.
SELECT
product_id,
COUNT(*) AS total_orders,
SUM(amount) AS lifetime_revenue
FROM orders
GROUP BY product_id;
This produces a continuously updating table: each new order for product_id = 42 causes Flink to retract the previous row for that product and insert a new one with the updated totals.
Retract Streams and Sink Compatibility
The updating nature of non-windowed aggregations has a critical downstream consequence. The sink must understand retractions. Sinks that support upsert semantics - JDBC connectors with a primary key, Upsert Kafka, Elasticsearch, and HBase - handle this correctly by overwriting the previous value. Append-only sinks like standard Kafka topics or file systems cannot process retractions and will either fail at job submission or produce duplicate, incorrect rows.
If your target sink is append-only and you need a running aggregate, the practical solution is to add a window. Even a very short tumbling window (one second) converts the result into an append-only stream at the cost of slight latency.
State Growth
Non-windowed aggregations maintain state for every unique group key indefinitely. If your GROUP BY column has unbounded cardinality (like a user_id that grows over time), state grows without limit. Flink’s state TTL (time-to-live) configuration can mitigate this by expiring keys that have not been updated within a configurable duration, but you must accept that expired keys will reset their aggregates to zero if they reappear.
-- Configure state TTL in the table environment
SET 'table.exec.state.ttl' = '24h';
Built-in Aggregate Functions
Flink SQL ships with a full set of aggregate functions that work identically in windowed, non-windowed, and OVER contexts.
| Function | Description |
|---|---|
COUNT(*) | Number of rows in the group |
COUNT(column) | Number of non-null values |
COUNT(DISTINCT column) | Number of distinct non-null values |
SUM(column) | Sum of numeric values |
AVG(column) | Arithmetic mean |
MIN(column) | Minimum value |
MAX(column) | Maximum value |
FIRST_VALUE(column) | First value in the group (by processing order) |
LAST_VALUE(column) | Last value in the group (by processing order) |
STDDEV_POP(column) | Population standard deviation |
STDDEV_SAMP(column) | Sample standard deviation |
VAR_POP(column) | Population variance |
COUNT(DISTINCT column) deserves special attention. Flink must maintain the full set of distinct values in state. For high-cardinality columns (millions of unique user IDs), this set can consume gigabytes of memory. Flink includes an automatic optimization that replaces exact distinct counts with an approximate algorithm when enabled:
SET 'table.optimizer.distinct-agg.split.enabled' = 'true';
This splits the distinct aggregation into a two-level aggregation, which reduces data skew but does not approximate the count. For truly approximate counting on massive cardinality, a HyperLogLog UDF is the better tool.
OVER Windows (Row-Level Aggregations)
OVER clauses compute an aggregate for each row based on a range of related rows, without collapsing multiple rows into one. This is essential for running totals, moving averages, and ranking queries.
SELECT
order_id,
customer_id,
amount,
SUM(amount) OVER (
PARTITION BY customer_id
ORDER BY event_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total,
AVG(amount) OVER (
PARTITION BY customer_id
ORDER BY event_time
ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
) AS moving_avg_last_10
FROM orders;
Each input row is emitted with additional computed columns. The running_total accumulates all previous orders for that customer. The moving_avg_last_10 averages the last ten orders. OVER windows produce an append-only stream (one output row per input row), so they are compatible with any sink.
Top-N Queries
A common pattern built on OVER windows is Top-N filtering, which selects the highest or lowest N rows per group.
SELECT *
FROM (
SELECT
product_id,
category,
total_sales,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY total_sales DESC) AS rn
FROM product_metrics
)
WHERE rn <= 5;
This returns the top five products by sales within each category, updated continuously as new sales data arrives.
Practical Examples
Real-Time Revenue Dashboard
Compute per-minute revenue broken down by payment method, feeding a live dashboard.
SELECT
window_start,
window_end,
payment_method,
SUM(amount) AS revenue,
COUNT(*) AS transactions,
AVG(amount) AS avg_transaction
FROM TABLE(
TUMBLE(TABLE payments, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, payment_method;
Live Error Rate Monitoring
Calculate a five-minute rolling error rate per service, sliding every 30 seconds.
SELECT
window_start,
window_end,
service,
CAST(SUM(CASE WHEN status >= 500 THEN 1 ELSE 0 END) AS DOUBLE)
/ COUNT(*) AS error_rate,
COUNT(*) AS total_requests
FROM TABLE(
HOP(TABLE http_logs, DESCRIPTOR(event_time), INTERVAL '30' SECONDS, INTERVAL '5' MINUTES)
)
GROUP BY window_start, window_end, service;
CDC-Based Inventory Tracking
When using change data capture to stream database changes, non-windowed aggregations can maintain a live inventory count. With a platform like Streamkap feeding CDC events into Flink, you can compute real-time stock levels directly from your transactional database without polling.
SELECT
warehouse_id,
product_id,
SUM(quantity_change) AS current_stock,
MAX(event_time) AS last_updated
FROM inventory_changes
GROUP BY warehouse_id, product_id;
This query maintains a running stock level per warehouse and product. Because it is non-windowed, it requires an upsert-capable sink such as a JDBC connector with a composite primary key of (warehouse_id, product_id).
State and Performance Considerations
Every aggregation in Flink SQL is backed by state, and understanding state characteristics is what separates a prototype from a production job.
Tumbling windows have the smallest state footprint. Only the current open window is held in memory, and state is released the moment the window fires.
Hop windows hold multiple overlapping windows simultaneously. A 5-minute window with a 30-second slide holds 10 open windows at once, multiplying the state by 10 compared to a tumble of the same size.
Session windows hold one open session per active key. State per key is small, but the number of concurrent keys can be large during peak traffic.
Non-windowed aggregations hold one state entry per unique group key forever (or until TTL expires). This is the most dangerous pattern for state growth. Always configure table.exec.state.ttl when using non-windowed aggregations over unbounded key spaces.
OVER windows with UNBOUNDED PRECEDING hold state proportional to the number of rows per partition key. If a single customer has millions of orders, that partition’s state grows accordingly. Use a bounded ROWS BETWEEN N PRECEDING AND CURRENT ROW to cap state where possible.
For all patterns, use RocksDB as the state backend for large state. The default heap-based backend keeps everything in JVM memory, which limits you to a few gigabytes before garbage collection becomes a bottleneck.
Common Pitfalls
Unbounded state with non-windowed aggregations. This is the most frequent production issue. A GROUP BY user_id without a window or state TTL will eventually exhaust your checkpoint storage and cause the job to fail. Always set table.exec.state.ttl or switch to a windowed aggregation.
Sink incompatibility with updating results. Attempting to write a non-windowed aggregation to an append-only sink is a build-time or runtime error that surprises many newcomers. Check your sink’s changelog mode support before writing the query.
Using processing time when event time is needed. If your events can arrive late or out of order, processing-time windows will silently produce incorrect results because late events land in the wrong window. Use event-time semantics with watermarks for correctness. Flink SQL makes this straightforward by declaring a WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS in your table DDL.
High-cardinality COUNT DISTINCT. Distinct counting on columns with millions of unique values creates massive state and slow checkpoints. Split the aggregation, use approximate algorithms, or pre-aggregate upstream to reduce cardinality.
Forgetting to define watermarks. Event-time windows require watermarks to know when a window can close. Without a watermark definition on the source table, Flink cannot make progress on event-time windows and results will never be emitted.
Real-time aggregations in Flink SQL are deceptively simple to write but demand careful thought about state, time semantics, and downstream sink requirements. Managed Flink platforms like Streamkap handle the operational side - state backends, checkpointing, scaling - so you can focus on getting the SQL right. Start with windowed aggregations, add non-windowed queries only when you genuinely need continuously updating results, and always plan for state growth from day one.