<--- Back to all resources
Incremental Aggregation in Streaming Pipelines
How to compute running totals, counts, and metrics in real time using incremental aggregation. Covers non-windowed aggregations, changelog output, retraction handling, and state management in Apache Flink.
Batch analytics is conceptually simple: you have all the data, you run a query, you get a result. Streaming analytics inverts this model. Data arrives continuously, and you need results that update in real time as each new event arrives. Recomputing the full result from scratch on every event does not scale. Instead, you use incremental aggregation: maintaining a running result that gets updated with each new event, not recalculated from the beginning.
This article covers how incremental aggregation works in practice, with a focus on Apache Flink. We will walk through non-windowed aggregations, changelog semantics, the retraction mechanism that makes correctness possible, and the state management challenges that determine whether your aggregation job runs for months or crashes after a day.
The Core Idea: Update, Don’t Recompute
Imagine you need a real-time count of orders per customer. In batch SQL, you would write:
SELECT customer_id, COUNT(*) AS order_count
FROM orders
GROUP BY customer_id;
This scans the entire orders table every time you run it. In a streaming context, the equivalent Flink SQL looks identical:
SELECT customer_id, COUNT(*) AS order_count
FROM orders_stream
GROUP BY customer_id;
But the execution is fundamentally different. Flink does not re-scan all historical events. Instead, it maintains a state entry for each customer_id containing the current count. When a new order event arrives for customer_42, Flink reads the current count (say, 15), increments it to 16, stores the new count, and emits the updated result. The work per event is O(1) regardless of how many historical events exist.
This is incremental aggregation: the aggregation function is applied to each incoming event and the existing accumulated state, producing a new accumulated state and an output.
Which Aggregations Are Incrementalizable?
Not all aggregation functions can be computed incrementally. The requirement is that the function must be expressible as a fold over the input stream. Functions that work:
- COUNT: State is a single counter. Increment by 1 per event.
- SUM: State is a running total. Add the new value.
- MIN/MAX: State is the current extreme value. Compare and update.
- AVG: State is (sum, count). Update both, divide when emitting.
Functions that do not work incrementally (without additional tricks):
- MEDIAN: Requires access to all values to find the middle one. Approximate versions (like t-digest) can be incremental.
- DISTINCT COUNT (exact): Requires storing all seen values. HyperLogLog provides an approximate incremental version.
- PERCENTILES (exact): Similar to median.
In practice, approximate algorithms make most aggregations incrementalizable. Flink provides built-in support for approximate distinct counts and has a rich ecosystem of user-defined aggregate functions (UDAGGs) for custom incremental logic.
Non-Windowed Aggregation: The Continuous Query
Most streaming tutorials focus on windowed aggregation: tumbling windows, sliding windows, session windows. These are important, but many real-world use cases call for non-windowed (also called “global” or “unbounded”) aggregation. You want the running total right now, not the total for the last 5 minutes.
Examples of non-windowed aggregation:
- Total revenue per customer, all time
- Current inventory count per SKU
- Number of active sessions per region
- Running average response time per API endpoint
In Flink SQL, any GROUP BY without a window function creates a non-windowed aggregation:
-- Running total revenue per customer (non-windowed)
SELECT
customer_id,
SUM(order_total) AS lifetime_revenue,
COUNT(*) AS total_orders,
MAX(event_time) AS last_order_time
FROM orders_stream
GROUP BY customer_id;
This query never “finishes.” It continuously updates as new events arrive. But here is the critical question: what does the output of this query look like?
Changelog Output and Upsert Semantics
A non-windowed aggregation produces a changelog stream, not an append-only stream. Each output record represents an update to the current state of the aggregation for a given key.
When customer_42 places their first order for $50, the output is:
+I (customer_42, 50.00, 1, 2026-02-25T10:00:00) -- Insert
When they place a second order for $30, the output is:
-U (customer_42, 50.00, 1, 2026-02-25T10:00:00) -- Update Before (retract old)
+U (customer_42, 80.00, 2, 2026-02-25T11:00:00) -- Update After (emit new)
The -U (retract) and +U (insert new) pair is how Flink communicates “the previous value I told you was wrong; here is the corrected value.” This is the changelog protocol.
Why Changelogs Matter for Downstream Systems
If your downstream system is a Kafka topic consumed by another Flink job, the changelog semantics are handled automatically. Flink understands the retraction protocol natively.
If your downstream system is a database or warehouse, you need upsert semantics: each output record should update the row with the matching key, not append a new row. Flink’s JDBC and Kafka upsert sinks handle this:
CREATE TABLE customer_metrics (
customer_id BIGINT,
lifetime_revenue DECIMAL(12, 2),
total_orders BIGINT,
last_order_time TIMESTAMP(3),
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'customer-metrics',
'key.format' = 'json',
'value.format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092'
);
INSERT INTO customer_metrics
SELECT
customer_id,
SUM(order_total) AS lifetime_revenue,
COUNT(*) AS total_orders,
MAX(event_time) AS last_order_time
FROM orders_stream
GROUP BY customer_id;
The upsert-kafka connector writes the latest value per key. Consumers of the customer-metrics topic can compact it to get the current state.
When using Streamkap to sink aggregated data to Snowflake or BigQuery, the platform handles the upsert logic automatically. You do not need to manage merge statements or deduplication queries; Streamkap writes the latest value per key using the destination’s native upsert mechanisms.
Retractions in Depth
Retractions are the mechanism that makes streaming aggregation correct in the face of updates and deletes. They are also the feature most likely to cause confusion when you first encounter them.
When Retractions Happen
Retractions occur whenever a downstream operator has already consumed a result that needs to be corrected. This happens in two scenarios:
1. Aggregation updates. Every time a non-windowed aggregation produces an updated value for a key, it retracts the previous value. This is the most common case.
2. CDC deletes flowing into aggregations. If a row is deleted in the source database and your CDC pipeline captures the delete, the aggregation needs to “un-count” that row. Flink handles this by processing the delete as a retraction to the aggregation operator, which updates the running total accordingly.
Consider a sum aggregation. Customer 42 has three orders: $50, $30, $20. The running sum is $100. Then the $30 order is canceled (deleted in the source database). The CDC stream sends a delete event, Flink retracts $30 from the sum, and the new output is $70. Without retractions, you would still report $100 forever.
Multi-Stage Retractions
When you chain aggregations, retractions propagate through the entire pipeline:
-- Stage 1: Revenue per customer
CREATE VIEW customer_revenue AS
SELECT customer_id, SUM(order_total) AS revenue
FROM orders_stream
GROUP BY customer_id;
-- Stage 2: Average revenue per region
SELECT
region,
AVG(revenue) AS avg_customer_revenue
FROM customer_revenue
JOIN customers ON customer_revenue.customer_id = customers.customer_id
GROUP BY region;
When a new order arrives for a customer, stage 1 emits a retraction of the old revenue and an insert of the new revenue. Stage 2 receives this retraction, adjusts its running average, and emits its own retraction and update. The correctness guarantee propagates end-to-end.
Sinks That Don’t Support Retractions
Not all sinks can process retractions. An append-only sink like a standard Kafka topic or a file sink cannot “un-write” a previously written record. If you connect a non-windowed aggregation to an append-only sink, Flink will throw an error at planning time (in SQL) or produce incorrect results (in the DataStream API).
Solutions:
- Use an upsert sink (upsert-kafka, JDBC with primary key, or a connector like Streamkap that handles upserts natively).
- Materialize the aggregation into a compacted Kafka topic where the latest value per key is the current state.
- Add a final windowed aggregation (like a 1-second tumbling window) that emits only the latest value, converting the changelog stream to an append stream. This adds latency but eliminates retractions.
State Management: The Scaling Bottleneck
Incremental aggregation trades compute for state. Instead of recomputing from scratch, you store intermediate results. The size of this state determines your job’s memory requirements, checkpoint duration, and recovery time.
State Size Estimation
For a COUNT or SUM per key, each state entry is small: the key bytes plus a fixed-size accumulator (8 bytes for a long, 16 bytes for a decimal). With 10 million distinct keys, that is roughly:
10M keys * (avg_key_size + 16 bytes) = ~200 MB for 10-byte keys
For a COLLECT or LIST_AGG, state per key grows with the number of events per key. If one customer has 10 million orders and you are collecting all order IDs, that single key’s state entry is enormous. Avoid unbounded collection aggregations in production.
State TTL
Non-windowed aggregation state grows with the number of distinct keys and never shrinks on its own. If you aggregate by session_id and sessions last an average of 30 minutes, but you never clean up old session state, your state grows indefinitely.
Flink’s state TTL solves this:
SET 'table.exec.state.ttl' = '24h';
This tells Flink to expire state entries that have not been updated in 24 hours. When a state entry expires, the aggregation result for that key is silently dropped. If a late event arrives for an expired key, Flink treats it as a new key and starts the aggregation from scratch.
This is the correct trade-off for most real-time use cases. If your downstream system needs historically-complete aggregates, compute those in the warehouse on the full dataset. Use streaming aggregation for fresh, recent data.
State Backend Selection
Flink offers two primary state backends:
- HashMapStateBackend (in-memory): Fast but limited by JVM heap. Good for small state (under a few GB).
- EmbeddedRocksDBStateBackend: Stores state on local SSD with an in-memory cache. Can handle terabytes of state. Slower per-access but scales much further.
For non-windowed aggregation with millions of keys, RocksDB is almost always the right choice. The per-access overhead (a few microseconds vs. nanoseconds for HashMap) is negligible compared to the risk of running out of memory.
With Streamkap’s managed Flink, the state backend and checkpointing are configured automatically based on your job’s requirements. You do not need to tune RocksDB block cache sizes or checkpoint intervals manually.
Checkpoint Sizing and Recovery
Flink periodically checkpoints state to durable storage (S3, GCS, HDFS). Checkpoint size equals state size. If your aggregation state is 50 GB, each checkpoint writes 50 GB (incrementally, after the first full checkpoint).
Recovery time after a failure is proportional to the latest checkpoint size. A 50 GB checkpoint takes roughly 1-5 minutes to restore, depending on storage throughput. During this time, the job is not processing events, and consumer lag increases.
Monitor these metrics:
lastCheckpointSize: If it is growing linearly over time, your state is growing unboundedly. Check your TTL settings.lastCheckpointDuration: If it exceeds your checkpoint interval, you are in trouble. Checkpoints will start failing.numRecordsInPerSecondvs.numRecordsOutPerSecond: If input exceeds output sustained, your job is falling behind.
Practical Pattern: Real-Time Dashboard Metrics
Here is a complete example of incremental aggregation feeding a real-time dashboard. The source is a CDC stream from PostgreSQL, captured by Streamkap, flowing through Flink, and landing in a Kafka topic that the dashboard reads.
-- Source: CDC stream from orders table
CREATE TABLE orders_cdc (
order_id BIGINT,
customer_id BIGINT,
product_category STRING,
order_total DECIMAL(10, 2),
status STRING,
created_at TIMESTAMP(3),
WATERMARK FOR created_at AS created_at - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'postgres.public.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
-- Aggregation: metrics per product category
CREATE TABLE category_metrics (
product_category STRING,
total_revenue DECIMAL(12, 2),
order_count BIGINT,
avg_order_value DECIMAL(10, 2),
last_updated TIMESTAMP(3),
PRIMARY KEY (product_category) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'dashboard-category-metrics',
'key.format' = 'json',
'value.format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092'
);
INSERT INTO category_metrics
SELECT
product_category,
SUM(order_total) AS total_revenue,
COUNT(*) AS order_count,
SUM(order_total) / COUNT(*) AS avg_order_value,
MAX(created_at) AS last_updated
FROM orders_cdc
WHERE status <> 'cancelled'
GROUP BY product_category;
This pipeline:
- Reads CDC events from PostgreSQL via Streamkap
- Filters out cancelled orders
- Incrementally aggregates revenue, count, and average per product category
- Writes the latest values to a compacted Kafka topic
- The dashboard reads this topic and displays current metrics
The entire pipeline maintains correctness even when orders are updated or cancelled in the source database, because Flink processes the CDC deletes and updates as retractions to the aggregation.
Common Pitfalls
Forgetting state TTL. Your job runs fine for a week, then OOMs because you aggregate by a high-churn key (like session_id or request_id) without TTL. Always set a TTL for non-windowed aggregations.
Connecting changelog output to append-only sinks. This produces incorrect results without any error in the DataStream API. In Flink SQL, the planner usually catches this, but not always. Validate your sink type matches your query semantics.
Aggregating before filtering. If you filter events after aggregation, the aggregation has already consumed state for events that will be discarded. Filter first, aggregate second. This reduces state size and improves throughput.
Using exact distinct counts at scale. COUNT(DISTINCT user_id) stores every distinct user ID in state. With 100 million users, that is gigabytes of state per key. Use approximate distinct counts (HyperLogLog) for dashboards where a 1-2% error margin is acceptable.
Incremental aggregation is the foundation of real-time analytics. Get the state management and retraction handling right, and you have a pipeline that produces correct, continuously-updated metrics indefinitely. Get them wrong, and you have a pipeline that slowly drifts from reality until someone notices the numbers do not match the batch report.