<--- Back to all resources

Engineering

February 25, 2026

10 min read

Flink Memory Tuning: Preventing OutOfMemoryErrors in Production

Learn how to configure Flink memory to prevent OutOfMemoryErrors. Understand the Flink memory model, tune heap and off-heap settings, and diagnose memory issues in production.

TL;DR: • Flink's memory model divides TaskManager memory into JVM heap, managed memory (for RocksDB and sorting), network buffers, and JVM metaspace - each must be sized correctly. • The most common OOM cause is unbounded state growth from non-windowed GROUP BY or regular joins - fix with state TTL or windowed operations before tuning memory. • Start with taskmanager.memory.process.size and let Flink auto-calculate the split, then tune individual components only when profiling shows specific bottlenecks.

If there is one operational issue that Flink teams spend the most time debugging, it is memory. CPU bottlenecks surface clearly in metrics. Disk pressure gives obvious I/O wait signals. But memory failures in Flink tend to arrive without warning, crash a TaskManager, trigger a cascading restart, and leave you staring at a stack trace that could point to half a dozen root causes. The challenge is that Flink runs inside the JVM but manages substantial amounts of memory outside of it. Getting the balance wrong between heap, off-heap, managed memory, and network buffers is the single most common cause of instability in production Flink deployments.

This guide walks through Flink’s memory model from top to bottom, explains how each component behaves under load, and gives you a systematic approach to diagnosing and fixing OutOfMemoryErrors.

Flink does not treat the JVM as a simple heap-based runtime. Instead, it carves up the total memory available to a TaskManager process into a carefully structured hierarchy. Understanding this hierarchy is the foundation for every tuning decision.

At the highest level, Process Memory is everything the operating system allocates to the TaskManager JVM process. This breaks down into two major regions:

  • Flink Memory: The portion Flink actively manages and subdivides.
  • JVM Overhead: Memory the JVM itself needs for thread stacks, code cache, GC data structures, and other internal bookkeeping.

Flink Memory then further divides into:

  • Framework Heap: Memory for Flink’s own internal data structures (checkpoint coordination, network stack management).
  • Task Heap: Memory for your user-defined functions, operators, and any objects they allocate.
  • Managed Memory: Off-heap memory that Flink controls directly, used for RocksDB state backend caches, batch sorting, and hash tables.
  • Network Memory: Off-heap buffers used for shuffling data between operators across the network.
  • JVM Metaspace: Space for class metadata loaded by the JVM.

The key insight is that only Task Heap and Framework Heap live on the JVM heap. Everything else is off-heap, which means traditional JVM heap monitoring misses a large portion of actual memory consumption.

Memory Components in Detail

Framework Heap and Task Heap

Framework Heap defaults to 128 MB and rarely needs adjustment. It covers Flink’s internal coordination structures. Task Heap is where your application code lives. Every record deserialized, every intermediate collection built inside a ProcessFunction, and every user-created object allocates from this pool.

The default Task Heap calculation gives it whatever heap space remains after Framework Heap is subtracted from total JVM Heap. If your operators do heavy in-memory processing, such as buffering records for windowed aggregations on the heap state backend, Task Heap is where pressure builds first.

Managed Memory

Managed memory is Flink’s off-heap allocation pool. It defaults to 40% of total Flink memory, which is a large share. This allocation makes sense when using RocksDB, because RocksDB’s block cache and write buffer memory are drawn from managed memory. For pure heap-based state backends, 40% is excessive and should be reduced.

Key configuration:

taskmanager.memory.managed.fraction: 0.4    # Default: 40% of Flink memory
taskmanager.memory.managed.size: 512m       # Or set an absolute size

Network Memory

Network memory holds the buffers used for data exchange between operators. Each buffer is 32 KB by default. The total network memory depends on parallelism and the number of shuffle connections:

taskmanager.memory.network.fraction: 0.1      # Default: 10% of Flink memory
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb

More parallel subtasks and more distinct shuffle channels require more buffers. If you see IOException: Insufficient number of network buffers, this is the component to increase.

JVM Metaspace and Overhead

JVM Metaspace stores class metadata. The default of 256 MB is usually sufficient, but jobs that dynamically load many classes (heavy use of reflection, Groovy scripts, or complex UDF libraries) can exceed it:

taskmanager.memory.jvm-metaspace.size: 256m

JVM Overhead covers thread stacks, code cache, and GC internal structures. It defaults to a fraction (10%) of the total process memory with a minimum of 192 MB and a maximum of 1 GB.

Default Configuration and Auto-Splitting

The recommended starting point for memory configuration is to set a single value and let Flink calculate everything else:

taskmanager.memory.process.size: 4096m

With this single setting, Flink applies its default fractions to derive every sub-component:

ComponentDefault Allocation (4 GB process)
JVM Overhead (10%)~410 MB
JVM Metaspace256 MB
Flink Memory~3430 MB
— Managed Memory (40%)~1372 MB
— Network Memory (10%)~343 MB
— Framework Heap128 MB
— Task Heap~1587 MB

This auto-split is a reasonable starting point for RocksDB workloads. For heap-only workloads, reduce managed memory and network memory to give more room to Task Heap.

An alternative approach is to set taskmanager.memory.flink.size instead, which excludes JVM overhead and metaspace from the calculation. Use this when you want precise control over the Flink-managed portion and your container already accounts for JVM overhead.

RocksDB Memory Management

When using the RocksDB state backend, memory tuning becomes critical because RocksDB operates entirely off-heap. Flink allocates a fixed amount of managed memory per TaskManager slot and configures RocksDB to stay within that budget.

RocksDB memory consumption comes from three sources:

  • Block Cache: Caches recently read SST file blocks. Larger caches mean fewer disk reads.
  • Write Buffers (MemTables): In-memory buffers where new writes accumulate before being flushed to disk.
  • Index and Filter Blocks: Bloom filters and index structures that can optionally be pinned in the block cache.

Flink controls these through the state.backend.rocksdb.memory.managed flag (enabled by default). When enabled, Flink uses a shared WriteBufferManager and Cache that draws from managed memory. The split between block cache and write buffers is roughly 2:1 by default.

For large-state workloads, tune managed memory based on the number of state entries and access patterns:

# For a CDC pipeline with large state
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.fraction: 0.5    # Give more to RocksDB
state.backend.rocksdb.memory.managed: true

If RocksDB cache hit rates drop below 90%, you either need more managed memory or need to reduce state size. The state.backend.rocksdb.metrics.block-cache-hit and state.backend.rocksdb.metrics.block-cache-miss metrics tell you exactly where you stand.

Write Amplification and Compaction

RocksDB compaction also consumes temporary memory. Heavy write workloads can spike memory usage during compaction cycles. If you see intermittent OOM errors that correlate with write bursts, consider limiting concurrent compactions:

state.backend.rocksdb.compaction.level.max-size-level-base: 256mb
state.backend.rocksdb.thread.num: 2

Diagnosing OutOfMemoryErrors

Not all OOM errors are the same. The error message tells you which memory region is exhausted:

java.lang.OutOfMemoryError: Java heap space

The JVM heap is full. This points to Task Heap or Framework Heap exhaustion. Common causes:

  • User code holding references to large collections.
  • Unbounded state on the heap state backend (HashMapStateBackend).
  • Memory leaks in UDFs that accumulate objects across invocations.

Diagnostic steps: Enable GC logging with -verbose:gc -XX:+PrintGCDetails. Look for Full GC cycles that fail to reclaim significant memory. Use heap dumps (-XX:+HeapDumpOnOutOfMemoryError) to identify the largest object graphs.

java.lang.OutOfMemoryError: Direct buffer memory

Off-heap direct memory is exhausted. This usually points to network buffer exhaustion. Check whether parallelism recently increased or new shuffle channels were added.

java.lang.OutOfMemoryError: Metaspace

The JVM ran out of metaspace. Increase taskmanager.memory.jvm-metaspace.size. This commonly happens after deploying jobs with many dependencies or dynamic class loading.

Container OOM Killed (exit code 137)

The operating system or container runtime killed the process because total memory usage exceeded the container limit. This happens when the sum of heap, off-heap, managed memory, and JVM overhead exceeds taskmanager.memory.process.size. It usually indicates a mismatch between Flink’s configured process size and the actual container memory limit, or a native memory leak (often from RocksDB or JNI code).

Tuning Strategies: Fix State Growth First

The single most important rule of Flink memory tuning is: fix unbounded state before touching memory configuration. The majority of production OOM errors trace back to state that grows without bound, not to misconfigured memory.

Common patterns that cause unbounded state:

  • Non-windowed GROUP BY: Without a window, the aggregation keeps state for every key forever.
  • Regular Joins: A regular (non-temporal, non-interval) join buffers both sides indefinitely.
  • Missing State TTL: Even windowed operations accumulate state if results are never cleaned up.

The fix is to apply state TTL or switch to windowed operations:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupFullSnapshot()
    .build();

ValueStateDescriptor<MyState> descriptor = new ValueStateDescriptor<>("my-state", MyState.class);
descriptor.enableTimeToLive(ttlConfig);

Only after confirming that state growth is bounded should you move to memory tuning. The sequence is: (1) verify state size metrics are stable over time, (2) profile GC behavior and memory pool utilization, (3) adjust specific memory components based on evidence.

Network Buffers for High-Parallelism Jobs

Network buffer requirements scale with the product of upstream parallelism and downstream parallelism for each shuffle. A job with 64 parallel map operators feeding into 64 parallel reduce operators creates 4,096 shuffle channels, each needing both sender and receiver buffers.

If you are running high-parallelism jobs and see network buffer exhaustion, you have two options:

  1. Increase network memory:
taskmanager.memory.network.fraction: 0.15
taskmanager.memory.network.min: 128mb
  1. Reduce buffer size to fit more buffers in the same memory (at the cost of more frequent transfers):
taskmanager.network.memory.buffers-per-channel: 2      # Default: 2
taskmanager.network.memory.floating-buffers-per-gate: 8 # Default: 8

Reducing buffers-per-channel from 2 to 1 cuts the dedicated buffer memory in half but increases backpressure sensitivity. This is a valid trade-off when memory is tight and latency requirements are relaxed.

Practical Example: CDC Pipeline with RocksDB State

Consider a CDC pipeline that reads change events from PostgreSQL, enriches them with a lookup join against a dimension table, and writes to a downstream data warehouse. The state includes the dimension table snapshot and deduplication state for exactly-once delivery.

Expected characteristics:

  • Dimension table: 5 million rows, ~2 GB when serialized.
  • Deduplication state: 10 million keys with TTL of 24 hours.
  • Parallelism: 8.
  • Container memory: 16 GB per TaskManager.

Configuration:

taskmanager.memory.process.size: 16384m
taskmanager.memory.managed.fraction: 0.5       # 8 GB for RocksDB (large state)
taskmanager.memory.network.fraction: 0.08      # ~1.3 GB for network buffers
taskmanager.memory.jvm-metaspace.size: 384m    # Extra room for CDC connectors
taskmanager.memory.jvm-overhead.fraction: 0.08 # ~1.3 GB for JVM internals

# RocksDB tuning
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true
state.backend.rocksdb.compaction.level.use-dynamic-size: true

This leaves roughly 5 GB for Task Heap, which is sufficient for CDC deserialization and enrichment logic. The 8 GB of managed memory gives RocksDB a ~5.3 GB block cache and ~2.7 GB write buffer pool, enough to keep the hot portion of the dimension table cached and handle write bursts from deduplication state updates.

Platforms like Streamkap automate this kind of configuration. Rather than manually calculating memory splits for each pipeline, Streamkap profiles pipeline characteristics and sets memory parameters based on the state backend, operator count, parallelism, and expected state size.

Monitoring Memory in Production

Effective memory monitoring requires watching multiple metrics simultaneously. No single metric tells the full story.

GC Metrics

Track GC frequency and pause times through Flink’s built-in metrics:

  • Status.JVM.GarbageCollector.<collector>.Time: Total GC time. A sudden increase signals heap pressure.
  • Status.JVM.GarbageCollector.<collector>.Count: GC frequency. Frequent young-gen GCs are normal; frequent old-gen GCs indicate Task Heap exhaustion.

Memory Pool Utilization

  • Status.JVM.Memory.Heap.Used / Status.JVM.Memory.Heap.Max: Heap utilization. Sustained usage above 85% is a warning sign.
  • Status.JVM.Memory.NonHeap.Used: Covers metaspace and code cache.
  • Status.JVM.Memory.Direct.Used: Direct byte buffers (network memory).

RocksDB Cache Metrics

Enable RocksDB metrics to track cache efficiency:

state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.block-cache-hit: true
state.backend.rocksdb.metrics.block-cache-miss: true
state.backend.rocksdb.metrics.cur-size-all-mem-tables: true

A block cache hit rate below 90% means RocksDB is reading from disk frequently. Either increase managed memory or investigate whether state access patterns have changed (e.g., scanning cold keys).

State Size Metrics

Monitor State Size through Flink’s checkpoint metrics. If state size grows linearly over time without plateauing, you have unbounded state, and no amount of memory tuning will save you.

Set up alerts on:

  • Heap utilization > 85% sustained for 5 minutes.
  • Block cache hit rate < 85% sustained for 10 minutes.
  • State size growth > 10% per hour without corresponding input growth.
  • Full GC pauses > 5 seconds or old-gen GC frequency > 1 per minute.

These thresholds give you early warning before an OOM crash. Combined with checkpoint size trending, they form a complete picture of whether your memory configuration is holding up under production load. The goal is never to eliminate GC entirely but to keep it predictable and proportional to your workload.