<--- Back to all resources
Flink Parallelism and Scaling: Right-Sizing Your Stream Processing
Learn how to set and tune Flink parallelism for optimal throughput. Understand task slots, operator chaining, key groups, and scaling strategies for production workloads.
Introduction
Apache Flink processes data by distributing work across parallel operator instances. The primary lever you have for scaling a Flink job up or down is parallelism — the number of concurrent subtasks that each operator runs. Set it too low and your pipeline cannot keep up with incoming data. Set it too high and you waste cluster resources, introduce unnecessary network shuffles, and may even degrade performance through excessive coordination overhead.
Getting parallelism right is not a one-time exercise. As data volumes grow, schemas evolve, and downstream systems change capacity, you need to revisit your parallelism settings. This guide covers the mechanics of how Flink distributes work, how to choose the right parallelism for each stage of your pipeline, and how to scale safely in production without losing data.
Parallelism Basics
Subtasks and the Job Graph
When you submit a Flink job, the runtime compiles your pipeline into a job graph of operators (source, map, filter, keyBy, sink, etc.). Each operator is then split into subtasks based on its parallelism setting. If a map operator has parallelism 8, Flink creates 8 subtask instances of that operator, each processing a disjoint slice of the data.
TaskManagers and Task Slots
Flink clusters consist of a JobManager (coordinator) and one or more TaskManagers (workers). Each TaskManager offers a fixed number of task slots, which represent a unit of resource allocation (memory, CPU share). A subtask occupies one slot.
Cluster Layout (3 TaskManagers, 4 slots each = 12 total slots)
TaskManager-1: [slot-0] [slot-1] [slot-2] [slot-3]
TaskManager-2: [slot-4] [slot-5] [slot-6] [slot-7]
TaskManager-3: [slot-8] [slot-9] [slot-10] [slot-11]
The total number of slots across all TaskManagers caps the maximum parallelism your cluster can support. If you set parallelism to 16 but only have 12 slots, the job will not start.
Slot Sharing
By default, Flink uses slot sharing — subtasks from different operators within the same job can share a single slot. This means a source subtask, a map subtask, and a sink subtask can all run in the same slot, as long as they belong to the same slot sharing group. Slot sharing allows a job with parallelism 8 to run in just 8 slots total, rather than 8 slots per operator.
This is efficient because operators in a pipeline rarely all need peak resources at the same time. It also keeps data local: a source subtask and the downstream map subtask in the same slot exchange records without network transfer.
Setting Parallelism
Flink lets you control parallelism at multiple levels. Settings at narrower scopes override broader ones.
Global Default
Set in flink-conf.yaml:
parallelism.default: 4
Every operator in every job uses this unless overridden.
Execution Environment
Set programmatically when defining the job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
This overrides the global default for all operators in this job.
Per-Operator
Override for a specific operator when it has different throughput characteristics:
DataStream<Event> filtered = source
.filter(new HeavyFilter())
.setParallelism(16); // This operator needs more parallelism
DataStream<Result> mapped = filtered
.map(new LightTransform()); // Inherits job-level parallelism (8)
Per-operator parallelism is useful when one stage is a bottleneck. A CPU-intensive transformation might need higher parallelism than a simple pass-through source.
CLI Override
When submitting a job via the command line, you can override the environment-level parallelism:
flink run -p 12 my-job.jar
Operator Chaining
Flink optimizes execution by chaining adjacent operators into a single task when possible. Chained operators run in the same thread and exchange records via method calls rather than serialization and network buffers. This dramatically reduces overhead.
Chaining Requirements
Two operators can be chained when:
- They have the same parallelism
- They are connected by a forward data exchange (no shuffle or rebalance)
- Neither operator has disabled chaining
- They belong to the same slot sharing group
When to Disable Chaining
Sometimes you want to break a chain to isolate an operator for monitoring or resource control:
stream
.map(new CpuIntensiveTransform())
.disableChaining() // Force this into its own task
.setParallelism(16);
Disabling chaining introduces serialization overhead between operators. Only do this when you need isolated metrics for a specific operator or when one operator has fundamentally different resource requirements.
Impact on Performance
In a well-chained pipeline, a source-map-filter-sink chain with parallelism 8 runs as 8 tasks, each executing the full chain. Without chaining, the same pipeline would have 32 tasks (8 per operator) with network exchanges between each stage. Chaining can yield 2-5x throughput improvements for simple transformations.
Key Groups and State Distribution
When you use keyed state (e.g., after a keyBy() operation), Flink must distribute that state across parallel subtasks. It does this through key groups.
How Key Groups Work
Flink hashes each key to a key group — a fixed-size partition of the key space. The number of key groups is determined at job start:
max key groups = max parallelism (default: 128)
Key groups are then evenly distributed across the current parallelism. With max parallelism 128 and current parallelism 4, each subtask owns 32 key groups. When you rescale to parallelism 8, each subtask gets 16 key groups.
Setting Max Parallelism
The maxParallelism setting is immutable once the job starts. It determines how finely state can be redistributed during rescaling. If you set it too low, you cannot scale beyond that limit later without starting fresh.
env.setMaxParallelism(256);
Best practice: set max parallelism to a power of 2, significantly higher than your expected peak parallelism. A value of 128 or 256 works for most jobs. Going extremely high (e.g., 32768) wastes memory on key group metadata.
Choosing the Right Parallelism
There is no universal formula, but several factors guide the decision.
Kafka Source Partitions
For Kafka-sourced jobs, each Flink source subtask reads from one or more Kafka partitions. If your topic has 12 partitions, setting source parallelism above 12 wastes resources because extra subtasks will be idle. Match source parallelism to partition count, or use a factor of it.
Sink Throughput
Your downstream destination has a write throughput ceiling. A data warehouse that can ingest 10,000 rows/second does not benefit from a pipeline that pushes 50,000 rows/second — it just causes backpressure. Size parallelism so that sink throughput roughly matches what the destination can absorb.
Key Cardinality
If you have a keyBy() on a field with only 100 distinct values, parallelism above 100 is wasted for downstream keyed operators. Each key maps to exactly one subtask. With parallelism 200 and 100 keys, half your subtasks process nothing.
State Size
More parallelism means less state per subtask, which means faster checkpoints and smaller recovery times. If individual subtasks hold gigabytes of state and checkpoints are slow, increasing parallelism can help distribute the load.
A Starting Heuristic
parallelism = min(kafka_partitions, available_slots, sink_throughput / per_subtask_throughput)
Start there, then benchmark and adjust.
Scaling Operations
Flink is a stateful system. You cannot simply change parallelism on a running job — state must be redistributed.
Savepoint-Based Rescaling
The standard approach:
- Trigger a savepoint:
flink savepoint <job-id> s3://savepoints/ - Cancel the job:
flink cancel <job-id> - Restart with new parallelism:
flink run -s s3://savepoints/savepoint-xxx -p 16 my-job.jar
Flink reads the savepoint, redistributes key groups across the new parallelism, and resumes processing. There is a brief period of downtime during the switch. For CDC pipelines, this is safe because the source (Kafka or database WAL) retains data during the outage, and the pipeline catches up after restart.
Platforms like Streamkap automate this entire workflow — monitoring throughput, triggering savepoints, adjusting parallelism, and restoring the job without manual intervention.
Reactive Mode
Flink’s reactive mode adjusts job parallelism automatically based on available TaskManager slots. When you add TaskManagers, the job scales up. When TaskManagers are removed, it scales down.
scheduler-mode: reactive
Reactive mode works well with Kubernetes autoscalers: the HPA adds pods (TaskManagers), and Flink automatically adjusts parallelism to fill new slots. The limitation is that all operators share the same parallelism — you cannot set per-operator parallelism in reactive mode.
Adaptive Scheduler
The adaptive scheduler (introduced in Flink 1.15) is a more refined version of reactive scaling. It can start a job even if not all requested slots are available, running at reduced parallelism and scaling up as slots become available. This is useful for jobs that need to start processing immediately, even at reduced throughput.
jobmanager.scheduler: adaptive
Data Skew
Even with correct parallelism, uneven data distribution can create hotspots where one subtask processes far more data than others.
Detecting Hotkeys
Monitor per-subtask metrics. If one subtask processes 10x the records of its peers, you have a skew problem. Common culprits include:
- A
keyBy(userId)where one user generates 90% of events - A
keyBy(country)where one country dominates traffic - NULL or default values concentrating on a single key
Handling Skewed Partitions
Salting is the standard remedy. Append a random suffix to the key to distribute records across subtasks, then aggregate results in a second pass:
// Step 1: Salt the key to distribute load
DataStream<Tuple2<String, Long>> salted = events
.map(e -> Tuple2.of(e.getKey() + "-" + (e.hashCode() % 10), e.getValue()))
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.sum(1);
// Step 2: Remove salt and aggregate
DataStream<Tuple2<String, Long>> result = salted
.map(t -> Tuple2.of(t.f0.substring(0, t.f0.lastIndexOf("-")), t.f1))
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.sum(1);
This trades a second shuffle for even distribution. The overhead is usually worth it when skew is severe.
Other Approaches
- Pre-aggregate before keyBy: Use a non-keyed window or process function to batch records before the keyed stage, reducing the per-key volume.
- Filter hotkeys: Route known hotkeys to a separate processing path with dedicated resources.
- Increase partitions upstream: If the skew originates in Kafka, repartition the topic with more partitions and a better partitioning strategy.
Practical Example: Scaling a CDC Pipeline from 4 to 16
Consider a change data capture pipeline that reads from a PostgreSQL source via Kafka (16 partitions), applies transformations, and writes to a data warehouse.
Initial setup (parallelism = 4):
Source (4 subtasks, each reads 4 Kafka partitions)
-> Map/Transform (4 subtasks, chained with source)
-> Sink (4 subtasks)
At 4 subtasks, each source instance reads 4 partitions. Throughput: 2,000 records/second total. As the source table grows, volume increases to 12,000 records/second and the pipeline falls behind.
Scaling to 16:
# 1. Take savepoint
flink savepoint abc123 s3://savepoints/cdc-pipeline/
# 2. Stop the job
flink cancel abc123
# 3. Restart with new parallelism
flink run -s s3://savepoints/cdc-pipeline/savepoint-xxx \
-p 16 \
cdc-pipeline.jar
After scaling (parallelism = 16):
Source (16 subtasks, each reads 1 Kafka partition)
-> Map/Transform (16 subtasks, chained with source)
-> Sink (16 subtasks)
Each source subtask now reads exactly 1 partition. Throughput increases to ~12,000 records/second. Keyed state is redistributed from 4 subtasks to 16 — Flink reassigns key groups automatically from the savepoint.
Things to verify after rescaling:
- Checkpoint duration should decrease (less state per subtask)
- No subtask should show persistent backpressure
- Sink throughput should match the destination’s capacity
- Consumer lag on the Kafka source topic should be trending toward zero
Monitoring
Effective parallelism tuning requires visibility into per-subtask behavior. Key metrics to watch:
Per-Subtask Throughput
The numRecordsOutPerSecond metric on each subtask reveals whether work is evenly distributed. Large variance across subtasks of the same operator indicates data skew or partition imbalance.
Busy Time
The busyTimeMsPerSecond metric shows how much of each second a subtask spends doing actual work versus waiting. A subtask that is busy 950ms out of 1000ms is at capacity. If all subtasks are near 100% busy, you need more parallelism.
Backpressure per Operator
Flink’s backpressure monitor shows which operators are slow. Backpressure propagates upstream: if the sink is slow, the map operator before it shows high backpressure. Look at the first operator in the chain that shows backpressure — that is your bottleneck.
Source (OK) -> Map (OK) -> KeyedProcess (HIGH) -> Sink (OK)
^^^^^^^^^^^^^^^^^
Bottleneck: increase parallelism here
or optimize the keyed processing logic
Checkpoint Duration and Size
Monitor checkpoint duration per subtask. If one subtask takes 30 seconds while others take 2 seconds, that subtask likely holds disproportionate state (a sign of skew) or has slow state backend I/O.
Streamkap surfaces these metrics through built-in monitoring dashboards, making it straightforward to identify when a pipeline needs rescaling and to verify that scaling operations had the intended effect.
Practical Metric Queries
If you use Prometheus with Flink’s metric reporter, useful queries include:
# Per-subtask throughput variance
stddev(flink_taskmanager_job_task_numRecordsOutPerSecond) by (task_name)
# Subtasks at capacity (busy > 900ms per second)
flink_taskmanager_job_task_busyTimeMsPerSecond > 900
# Checkpoint duration by subtask
flink_taskmanager_job_task_checkpointAlignmentTime
Summary
Parallelism is the fundamental scaling mechanism in Flink. Getting it right requires understanding the relationship between task slots, operator chaining, key groups, and your specific data characteristics. Start with your Kafka partition count, factor in sink capacity and key cardinality, then monitor per-subtask metrics to refine. When you need to scale, use savepoints to preserve state and redistribute cleanly. Watch for data skew as a hidden limiter, and address it with salting or pre-aggregation when it appears.