<--- Back to all resources

Engineering

February 25, 2026

15 min read

Running Flink in Production: The Operations Guide

A complete guide to operating Apache Flink in production. Covers checkpointing, state backends, memory tuning, parallelism, monitoring, deployment models, and upgrade strategies.

TL;DR: • Checkpointing is Flink's fault-tolerance mechanism - configure it correctly before you go live or accept that any failure means starting from scratch. • RocksDB is the right state backend for large state (>1 GB per task); heap state is faster but limited by JVM memory. • Memory configuration is the most common source of Flink production failures - understand the five memory pools before tuning. • Consumer lag and checkpoint duration are the two most important operational metrics to monitor.

Getting Apache Flink to work is one thing. Getting it to work reliably at 2 AM when something goes wrong is another. The gap between a development cluster and a production-grade deployment spans checkpointing strategy, state management, memory configuration, monitoring infrastructure, and operational procedures.

This guide covers everything you need to run Flink reliably: the mechanisms that make it fault-tolerant, the configuration knobs that matter most, how to deploy it, and what to do when things go wrong.

Flink’s fault tolerance model is based on distributed snapshots - a technique that takes a consistent snapshot of all operator state across a distributed computation without stopping the job.

When a Flink job fails:

  1. Flink detects the failure (task manager crash, network timeout, OOM)
  2. The JobManager restarts affected tasks
  3. All tasks restore their state from the last successful checkpoint
  4. Sources seek back to the position corresponding to that checkpoint
  5. Processing resumes from the checkpointed position

The result is exactly-once processing semantics (assuming exactly-once-capable sources and sinks) without any coordination required at runtime beyond the checkpoint protocol itself.

The Checkpoint Protocol

Flink uses the Chandy-Lamport algorithm to take distributed snapshots. Conceptually:

  1. The JobManager injects a barrier into each source partition
  2. Barriers flow downstream through the operator DAG along with data records
  3. When an operator receives barriers from all its input partitions, it snapshots its state and forwards the barrier
  4. When barriers reach all sinks, the checkpoint is complete and acknowledged to the JobManager

Barriers ensure that the snapshot captures a causally consistent state: all records that should be in the snapshot are, and no records that should not be are.

Checkpointing Configuration

Checkpointing must be configured before your job goes to production. A job without checkpointing has no fault tolerance - any failure requires restarting from the source’s beginning or latest offset.

Basic Configuration

# flink-conf.yaml (or Flink SQL SET commands)

# Enable checkpointing every 60 seconds
execution.checkpointing.interval: 60000

# Require at least 10 seconds between checkpoints
execution.checkpointing.min-pause: 10000

# Checkpoint must complete within 10 minutes or it is cancelled
execution.checkpointing.timeout: 600000

# Keep 2 completed checkpoints (in case the latest is corrupt)
execution.checkpointing.num-retained: 2

# Exactly-once semantics (use AT_LEAST_ONCE for lower latency)
execution.checkpointing.mode: EXACTLY_ONCE

# External checkpoint storage (required for recovery across cluster restarts)
state.checkpoints.dir: s3://your-bucket/flink-checkpoints/

In Flink SQL:

SET 'execution.checkpointing.interval' = '60s';
SET 'execution.checkpointing.mode'     = 'EXACTLY_ONCE';
SET 'state.checkpoints.dir'            = 's3://your-bucket/flink-checkpoints/';

Choosing a Checkpoint Storage Backend

BackendConfigurationBest For
JobManagerCheckpointStorageDefault, in-memoryDevelopment only - lost on JM restart
FileSystemCheckpointStoragestate.checkpoints.dir: s3://...Production - persists to durable storage

Always use FileSystemCheckpointStorage with a durable store (S3, GCS, HDFS) in production. Checkpoints stored only on local disk are lost if the machine fails.

Savepoints: The Operational Swiss Army Knife

A savepoint is a manually triggered, externally managed snapshot. Use savepoints for:

  • Planned job stop and restart: Upgrade application code or configuration
  • Infrastructure migration: Move the job from one cluster to another
  • Schema evolution debugging: Inspect the state at a specific point in time
  • A/B testing: Fork the job into two versions from the same state
# Trigger a savepoint
flink savepoint <job-id> s3://your-bucket/flink-savepoints/

# Stop a job with a savepoint (atomic stop + save)
flink stop --savepointPath s3://your-bucket/flink-savepoints/ <job-id>

# Restart from a savepoint
flink run --fromSavepoint s3://your-bucket/flink-savepoints/savepoint-abc123 \
  ./my-flink-job.jar

Savepoint vs checkpoint summary:

AttributeCheckpointSavepoint
Triggered byFlink automaticallyOperator manually
LifecycleManaged by Flink, auto-deletedPersists until manually deleted
Primary purposeFault toleranceOperational mobility
FormatOptimized for speedPortable, operator-friendly
State compatibilityStrict - same jobCan accommodate minor changes

State Backends

Every stateful operator in Flink must store its working state somewhere. The state backend determines where.

HashMapStateBackend (Heap)

State is stored in the JVM heap of each TaskManager. Reads and writes are in-memory - very fast.

When to use:

  • Small state (under ~500 MB per task slot)
  • Latency-critical applications
  • Simple jobs with limited state (pure filtering, stateless transforms)

When to avoid:

  • Large state: heap usage grows with state, increasing GC pressure and risk of OOM
  • Long-running sessions or long window sizes that accumulate significant state
state.backend: hashmap

EmbeddedRocksDBStateBackend

State is stored in RocksDB, an embedded key-value store that writes to local disk. State is spilled to disk rather than held in heap, enabling state that is orders of magnitude larger than available JVM memory.

When to use:

  • Large state (anything over a few hundred MB per task)
  • Long-horizon deduplication
  • Joins with high-cardinality keys
  • Session windows with long session gaps
  • Any job where heap OOM is a concern

When to avoid:

  • Ultra-low-latency requirements where disk access is a bottleneck
  • Very simple jobs where RocksDB’s overhead is not justified
state.backend: rocksdb
state.backend.rocksdb.localdir: /mnt/ssd/flink-rocksdb

Pair RocksDB with an SSD-backed local disk for best performance. Store checkpoints on S3 or GCS for durability.

RocksDB incremental checkpoints reduce checkpoint overhead significantly by only uploading state that has changed since the last checkpoint:

state.backend.incremental: true

Enable this for all RocksDB deployments - it can reduce checkpoint upload time by 80-90% for large state.

Memory Configuration

Memory misconfiguration is the most common cause of Flink production failures. Flink’s memory model divides TaskManager memory into five pools:

PoolPurposeConfig Key
Framework HeapFlink framework overheadtaskmanager.memory.framework.heap.size
Task HeapOperator state (HashMapStateBackend)taskmanager.memory.task.heap.size
Managed MemoryRocksDB state, batch sorttaskmanager.memory.managed.fraction
Network MemoryNetwork buffers for data exchangetaskmanager.memory.network.fraction
JVM Metaspace + OverheadJVM internals, off-heaptaskmanager.memory.jvm-metaspace.size
# Total process memory is the one number you usually set
taskmanager.memory.process.size: 4096m

# Flink automatically divides this based on fractions
# Override individual pools only if you have a specific reason:

# More managed memory for RocksDB-heavy jobs
taskmanager.memory.managed.fraction: 0.4

# More network memory for join-heavy or shuffle-heavy jobs
taskmanager.memory.network.fraction: 0.15

Common memory failure patterns:

  • java.lang.OutOfMemoryError: Java heap space - Task heap too small. Increase taskmanager.memory.task.heap.size or switch to RocksDB.
  • java.lang.OutOfMemoryError: GC overhead limit exceeded - Too much state in heap. Migrate to RocksDB.
  • java.lang.OutOfMemoryError: Direct buffer memory - Network memory too small. Increase taskmanager.memory.network.fraction.
  • Container killed by OOM killer - Total process memory exceeds container limits. Reduce taskmanager.memory.process.size or increase container limits.

Parallelism and Scaling

Parallelism is the number of parallel instances of each operator. Setting it correctly determines throughput capacity.

-- Set default parallelism for all operators in the session
SET 'parallelism.default' = '8';

-- Override for a specific operator (Flink Table API / DataStream)
-- In SQL, parallelism is set at the job level or via hints

Guidelines for setting parallelism:

  • Match parallelism to the number of Kafka partitions for source operators (higher parallelism than partitions leaves tasks idle)
  • For CPU-bound operators, match parallelism to available CPU cores
  • For stateful operators (joins, aggregations), balance state size per task - more parallelism means smaller state per task but more network shuffle overhead

Flink’s adaptive scheduler can automatically adjust parallelism when TaskManagers are added or removed:

jobmanager.scheduler: adaptive

Reactive scaling is particularly useful in Kubernetes deployments with horizontal pod autoscaling - Flink can expand during peak load and contract during quiet periods automatically.

Deployment Models

Standalone Cluster

The simplest deployment: start a JobManager and one or more TaskManagers. Suitable for development and simple production workloads.

# Start standalone cluster
./bin/start-cluster.sh

# Submit a job
./bin/flink run ./my-job.jar

Limitations: Manual scaling, no bin-packing, no resource elasticity.

The recommended deployment model for production. The Flink Kubernetes Operator manages Flink clusters as Kubernetes custom resources.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: my-flink-job
spec:
  image: flink:1.18
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend: rocksdb
    state.checkpoints.dir: s3://my-bucket/checkpoints/
    execution.checkpointing.interval: "60s"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "4096m"
      cpu: 2
  job:
    jarURI: s3://my-bucket/jobs/my-job.jar
    parallelism: 4
    upgradeMode: savepoint

The upgradeMode: savepoint field tells the operator to automatically take a savepoint before upgrading the job - preventing state loss on upgrades.

Self-operating Flink requires deep expertise: JVM tuning, checkpoint management, version upgrades, monitoring infrastructure, and 24/7 on-call capability. For teams without a dedicated platform engineering function, managed Flink services eliminate this operational burden.

Streamkap provides a managed Flink environment tightly integrated with the CDC pipeline layer - so the same platform that ingests changes from PostgreSQL or MySQL also runs Flink SQL transforms against those changes, with checkpointing, scaling, and monitoring handled automatically.

Monitoring: What to Watch

Essential Metrics

MetricWhere to Find ItAlert Threshold
lastCheckpointDurationJobManager REST API>50% of checkpoint interval
lastCheckpointSizeJobManager REST APISudden growth (state leak)
numberOfFailedCheckpointsJobManager REST APIAny non-zero
records-lag-max (per topic)Kafka consumer groupSustained growth >5 min
numRestartsJobManager REST APIAny non-zero in production
fullRestartsJobManager REST APIAny non-zero
taskSlotsAvailableTaskManagerMust be > 0
heap_usedJVM metrics>80% of max
rocksdb.estimate-live-data-sizeRocksDB metricsUnexpected growth

Checkpoint Monitoring

Checkpoint health is the single most important indicator of a Flink job’s operational health. Monitor:

  • Checkpoint duration: Should be well under the checkpoint interval. If checkpointing takes longer than the interval, checkpoints queue up and the job stalls.
  • Checkpoint size: Should be roughly stable over time. Unexpected growth indicates state accumulation - a state leak or an unbounded join.
  • Checkpoint failure rate: Should be zero. Any checkpoint failure is worth investigating.

Access checkpoint status via the Flink REST API:

# Get checkpoint overview
curl http://jobmanager:8081/jobs/<job-id>/checkpoints

# Get checkpoint configuration
curl http://jobmanager:8081/jobs/<job-id>/checkpoints/config

Setting Up Prometheus + Grafana

Flink ships with a Prometheus reporter that exposes metrics at a configurable endpoint:

metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249

A minimal Grafana dashboard should include panels for:

  • Checkpoint duration (line chart, alert on sustained high values)
  • Consumer lag per topic-partition (line chart, alert on growth)
  • Heap and managed memory utilization (gauge)
  • Restart count (counter, alert on any increment)
  • Records processed per second (throughput indicator)

Upgrade Strategies

Rolling Upgrade (Stateless Jobs)

For stateless jobs or jobs with small state, a rolling upgrade is straightforward: take a savepoint, deploy the new version, and start from the savepoint.

# 1. Take savepoint
SAVEPOINT_PATH=$(flink savepoint <job-id> s3://bucket/savepoints/)

# 2. Cancel the job
flink cancel <job-id>

# 3. Deploy new version from savepoint
flink run --fromSavepoint $SAVEPOINT_PATH ./new-version.jar

Blue-Green Deployment (Zero Downtime)

For pipelines where even brief interruption is unacceptable:

  1. Start the new job version processing from the same Kafka offset (but writing to a parallel output)
  2. Validate that the new job’s output matches expectations
  3. Cut the downstream consumer over to the new output
  4. Stop the old job

This requires temporarily running two versions in parallel and doubles compute cost during the cutover window, but provides complete rollback capability.

State Compatibility Rules

Not all upgrades are state-compatible. Before upgrading a stateful job, verify:

  • Same operator UID: Flink uses operator UIDs to match saved state to operators. If UIDs change, state is orphaned.
  • Compatible state schema: Adding new state is usually fine; removing or changing the type of existing state may fail to restore.
  • Window type unchanged: Changing window size or type requires state migration.

Always assign explicit UIDs to stateful operators in production jobs:

// In the DataStream API
stream
  .keyBy(r -> r.userId)
  .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
  .aggregate(new MyAggregateFunction())
  .uid("my-tumbling-window-aggregation")  // explicit, stable UID
  .name("My Aggregation");

Common Failure Modes and Remediation

Failure ModeSymptomRoot CauseFix
Checkpoint timeoutCheckpoint X expired before completingState too large, slow disk I/O, backpressureEnable incremental checkpoints, increase timeout, investigate backpressure
OOM on heapTask restarting, OutOfMemoryErrorState in heap state backend growing unboundedlySwitch to RocksDB, reduce state TTL, investigate state leak
Consumer lag growthKafka lag increasing continuouslyInsufficient parallelism or slow processingIncrease parallelism, investigate slow operators
Endless restartsnumRestarts incrementing repeatedlyUnhandled exception in operator, external service unavailableCheck task logs for exception, add error handling, fix dependency
Checkpoint skewOne subtask’s checkpoint takes much longerData skew causing one task to have disproportionate stateInvestigate key distribution, add salt to skewed keys
Watermark stallWindows not closing, output delayedOne source partition has no new eventsEnable watermark alignment, check idle source detection

Production Readiness Checklist

Before promoting a Flink job to production:

  • Checkpointing enabled with external storage (S3/GCS/HDFS)
  • State backend chosen and configured for expected state size
  • Memory configured with appropriate values - no default JVM flags
  • Parallelism matches Kafka partition count for sources
  • All stateful operators have explicit UIDs
  • Savepoint procedure documented and tested
  • Prometheus metrics enabled and Grafana dashboard configured
  • Alerts configured for: checkpoint duration, consumer lag, restart count
  • DLQ configured for records that fail processing
  • Runbook exists for each alert condition
  • Upgrade procedure tested in staging with representative state size
  • Recovery procedure tested (fail a task, verify it recovers from checkpoint)

Running Flink in production is operationally intensive, which is why many teams opt for managed platforms that handle checkpointing, scaling, and upgrades automatically. Whether self-hosted or managed, the concepts in this guide apply - understanding them is essential for debugging, capacity planning, and building confidence that your streaming infrastructure will hold up when it matters most.

For guidance on the SQL layer running on top of this infrastructure, see Flink SQL: The Complete Guide to Stream Processing with SQL.