<--- Back to all resources

Engineering

February 25, 2026

11 min read

Flink State Management: RocksDB, Heap, and Choosing the Right Backend

Master Flink state management - understand state backends, keyed vs operator state, TTL configuration, and how to choose between RocksDB and heap for your workload.

TL;DR: • Flink state is the memory that operators maintain between events - window contents, join buffers, aggregation accumulators - and choosing the right state backend determines your job's performance and scalability. • HashMapStateBackend (heap) is fastest for small state (<1GB per task) but limited by JVM heap. EmbeddedRocksDBStateBackend scales to terabytes using disk but adds serialization overhead. • State TTL (time-to-live) is critical for production - without it, state from non-windowed operations grows unbounded until the job runs out of memory.

Stream processing differs from batch processing in one fundamental way: it remembers. Every window aggregation, every join between two streams, every deduplication check depends on the system holding onto data between events. In Apache Flink, this memory is called state, and how you manage it determines whether your job runs reliably for months or crashes at 3 AM on a Saturday.

This guide covers Flink’s state model from the ground up: the types of state available, how backends store it, when to pick RocksDB over heap, and the configuration details that separate stable production jobs from ticking time bombs.

Types of State

Flink organizes state into three categories based on how it is partitioned and distributed across parallel operator instances.

Keyed State

Keyed state is the most common form. It is partitioned by key, meaning each unique key gets its own isolated state instance. When you write a KeyedProcessFunction or use a keyed window, Flink automatically routes events with the same key to the same operator instance and provides access to that key’s state.

public class CountPerKey extends KeyedProcessFunction<String, Event, Result> {
    private ValueState<Long> countState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Long> descriptor =
            new ValueStateDescriptor<>("count", Types.LONG);
        countState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<Result> out) throws Exception {
        Long current = countState.value();
        long newCount = (current == null) ? 1L : current + 1;
        countState.update(newCount);
        out.collect(new Result(event.getKey(), newCount));
    }
}

Because keyed state is partitioned, Flink can rescale it when parallelism changes. If you increase parallelism from 4 to 8, Flink redistributes key groups across the new set of tasks automatically.

Operator State

Operator state is not partitioned by key. Instead, each parallel instance of an operator maintains its own state. The Kafka consumer connector uses operator state to track partition offsets: each parallel consumer instance stores the offsets for the partitions it is assigned to.

Operator state requires you to implement CheckpointedFunction and define how state should be redistributed during rescaling (even-split or broadcast).

Broadcast State

Broadcast state is a special form of operator state where the same data is replicated to every parallel instance. The canonical use case is a rules engine: a stream of rules is broadcast to all instances, while a high-volume data stream is processed against those rules. Every instance holds the full set of rules.

State Backends: Heap vs RocksDB

The state backend controls where and how Flink physically stores state. There are two production backends.

HashMapStateBackend (Heap)

The heap backend stores all state as Java objects in the JVM heap of each TaskManager. Access is a direct object lookup with zero serialization overhead during processing.

# flink-conf.yaml
state.backend: hashmap

Strengths: Fastest possible access. No serialization during reads and writes. Ideal for jobs where total state per TaskManager stays under a few gigabytes.

Limitations: State size is bounded by available JVM heap. Large state triggers aggressive garbage collection, increasing latency variance and potentially causing TaskManager timeouts. Full GC pauses on a 32 GB heap can exceed Flink’s heartbeat timeout.

EmbeddedRocksDBStateBackend

The RocksDB backend stores state in an embedded RocksDB instance on each TaskManager’s local disk. Only the active working set lives in an in-memory block cache; the rest is on SSD.

# flink-conf.yaml
state.backend: rocksdb
state.backend.rocksdb.localdir: /mnt/ssd/flink-rocksdb

Strengths: State can grow to terabytes, limited by disk rather than heap. Incremental checkpoints are supported natively - only changed SST files are uploaded, reducing checkpoint size and duration.

Limitations: Every state access requires serialization and deserialization. Random reads hit disk if the block cache misses. Write amplification from RocksDB compaction consumes I/O bandwidth.

Choosing a Backend

The decision comes down to three factors: state size, latency sensitivity, and checkpoint behavior.

FactorHashMapStateBackendEmbeddedRocksDBStateBackend
State size per TM< 5 GB5 GB to multiple TB
Read/write latencyNanoseconds (object access)Microseconds (serialization + potential disk I/O)
Checkpoint typeFull snapshot every timeIncremental (only changed data)
Checkpoint sizeEntire state on each checkpointDelta since last checkpoint
GC pressureHigh (all state on heap)Low (state off-heap on disk)
Best forStateless or low-state transformations, small windowsLarge joins, long windows, CDC aggregations

For CDC pipelines that join or aggregate across many tables and millions of keys, RocksDB is almost always the right choice. The serialization cost is well worth the stability gain.

State TTL

State TTL is the single most important production configuration for stateful Flink jobs that do not use windows. Without it, a GROUP BY aggregation or an interval join accumulates state for every key it has ever seen, forever.

Configuration

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupFullSnapshot()
    .build();

ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count", Types.LONG);
descriptor.enableTimeToLive(ttlConfig);

Cleanup Strategies

  • On full snapshot: Expired entries are filtered out during checkpoint creation. Simple but only cleans during checkpoints.
  • Incremental cleanup on access: Each state access also probes and removes a configurable number of expired entries. Adds minor overhead per access but provides steady cleanup between checkpoints.
  • RocksDB compaction filter: A custom compaction filter runs during RocksDB’s background compaction process, removing expired entries at the storage layer. This is the most efficient strategy for RocksDB backends.
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter(1000) // check every 1000 entries
    .build();

For production CDC workloads, combining OnCreateAndWrite updates with the RocksDB compaction filter gives you bounded state growth with minimal processing overhead.

RocksDB Tuning

The default RocksDB configuration works for moderate workloads, but high-throughput pipelines benefit from tuning three areas.

Block Cache

The block cache holds recently accessed data blocks in memory. A larger cache reduces disk reads.

state.backend.rocksdb.block.cache-size: 256mb

Size this based on your working set. If most reads hit the same keys repeatedly (common in CDC deduplication), a generous block cache dramatically reduces I/O.

Write Buffer

RocksDB batches writes in memory before flushing to disk. More write buffer memory means fewer, larger flushes and less write amplification.

state.backend.rocksdb.writebuffer.size: 128mb
state.backend.rocksdb.writebuffer.count: 4

Bloom Filters

Bloom filters let RocksDB skip reading SST files that definitely do not contain a requested key. For workloads with many point lookups (which describes most keyed state access patterns), bloom filters are essential.

state.backend.rocksdb.bloom-filter.bits-per-key: 10
state.backend.rocksdb.bloom-filter.block-based: false

Managed Memory

Flink allocates a fraction of TaskManager memory as “managed memory” for RocksDB. This memory is shared across all RocksDB instances on a TaskManager for block caches and write buffers.

taskmanager.memory.managed.fraction: 0.4

Increasing this fraction gives RocksDB more room to cache data, at the cost of less heap for user code. For state-heavy jobs, 0.4 to 0.6 is typical.

Keyed State Types

Flink provides several keyed state primitives, each optimized for different access patterns.

  • ValueState: Stores a single value per key. Use for counters, last-seen timestamps, or any scalar per-key data. The most common state type.
  • ListState: Stores an appendable list per key. Use when you need to buffer multiple events per key, such as collecting events before emitting a batch. Beware of unbounded growth without TTL.
  • MapState<K, V>: Stores a key-value map per key. Use when each primary key has sub-keys. With RocksDB, each map entry is a separate RocksDB key, enabling efficient partial reads without deserializing the entire map.
  • ReducingState: Automatically applies a reduce function on each add. Use for running aggregations where you only need the reduced result, not individual elements. Keeps state size constant per key.
  • AggregatingState<IN, OUT>: Similar to ReducingState but allows different input and output types. Use for aggregations that transform types, such as computing an average from individual values.

For RocksDB, MapState is significantly more efficient than ValueState<Map<K,V>> because Flink serializes each map entry independently. With ValueState<Map>, the entire map is serialized and deserialized on every access.

State and Checkpointing

Checkpointing is how Flink makes state fault-tolerant. The state backend and checkpoint configuration are closely coupled.

How Checkpoints Work

During a checkpoint, each operator snapshots its state and uploads it to durable storage (S3, HDFS, GCS). If a task fails, Flink restores state from the latest completed checkpoint.

execution.checkpointing.interval: 60s
execution.checkpointing.min-pause: 30s
execution.checkpointing.timeout: 600s
state.checkpoints.dir: s3://my-bucket/flink-checkpoints/

Incremental Checkpoints

With the RocksDB backend, incremental checkpoints upload only the SST files that changed since the last checkpoint. For jobs with terabytes of state where only a fraction changes between intervals, this reduces checkpoint duration from minutes to seconds.

state.backend.incremental: true

The heap backend does not support incremental checkpoints. Every checkpoint serializes the full state. For jobs with more than a few gigabytes of state, this alone makes RocksDB the better choice.

Checkpoint vs Savepoint

Checkpoints are automatic and optimized for recovery. Savepoints are manually triggered, always full snapshots, and portable across job versions. Use savepoints for upgrades, schema changes, and migration between clusters.

Practical Example: CDC Aggregation Pipeline

Consider a pipeline that consumes CDC events from an orders database, joins them with a customer dimension table, and maintains running revenue totals per customer segment. This is a representative workload for platforms like Streamkap that manage CDC-to-analytics pipelines at scale.

// Configure RocksDB with incremental checkpoints
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // incremental = true
env.getCheckpointConfig().setCheckpointInterval(60_000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);

// State TTL for the join - expire unmatched orders after 7 days
StateTtlConfig joinTtl = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter(1000)
    .build();

// State TTL for aggregation - keep segment totals for 90 days
StateTtlConfig aggTtl = StateTtlConfig
    .newBuilder(Time.days(90))
    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter(1000)
    .build();

Key decisions in this configuration:

  • RocksDB with incremental checkpoints: The join between orders and customers produces state proportional to the number of active orders and customer records, easily exceeding heap capacity.
  • Separate TTLs: Join state expires after 7 days (unmatched orders are likely stale), while aggregation state uses 90 days to preserve historical segment totals.
  • OnReadAndWrite for aggregation: Resets the timer whenever a segment’s total is read or updated, keeping active segments alive.
  • RocksDB compaction filter: Cleans expired state during background compaction without impacting event processing throughput.

Monitoring State

State-related failures are the most common cause of production Flink job instability. Proactive monitoring catches problems before they become outages.

Key Metrics

  • State.CurrentKey count: Total number of keyed state entries. A monotonically increasing trend without TTL indicates unbounded state growth.
  • RocksDB block cache hit rate (rocksdb.block-cache-hit / rocksdb.block-cache-miss): Below 90% means your working set exceeds cache size. Increase managed memory or block cache.
  • Checkpoint duration (lastCheckpointDuration): Increasing checkpoint times signal growing state. If checkpoints approach the configured timeout, the job will fail.
  • Checkpoint size (lastCheckpointSize): Track this over time. A steady upward trend with incremental checkpoints enabled suggests state TTL is not expiring entries fast enough.
  • RocksDB compaction pending bytes (rocksdb.compaction-pending): High values mean compaction cannot keep up with writes, causing read amplification and increased latency.

Detecting Unbounded State Growth

The clearest signal is checkpoint size growing linearly over time. Plot lastCheckpointSize on a dashboard and set an alert for sustained growth over 24 hours. If the trend is linear and unbounded, you have a stateful operation missing TTL configuration.

Streamkap automates this monitoring for managed pipelines, tracking state size trends across all running jobs and alerting when growth patterns suggest missing or misconfigured TTL settings.

RocksDB-Specific Monitoring

Enable RocksDB native metrics for deeper visibility:

state.backend.rocksdb.metrics.actual-delayed-write-rate: true
state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.estimate-live-data-size: true
state.backend.rocksdb.metrics.cur-size-all-mem-tables: true

Watch actual-delayed-write-rate closely. A non-zero value means RocksDB is throttling writes because compaction cannot keep up. This directly impacts your pipeline’s throughput and end-to-end latency.

State management is not glamorous, but it is the foundation that determines whether your streaming pipeline is a reliable production system or a prototype that works until the data volume increases. Get the backend, TTL, and monitoring right, and your Flink jobs will run without surprises.