<--- Back to all resources

Engineering

February 25, 2026

10 min read

Upgrading Flink Jobs Without Downtime: Schema Evolution and State Compatibility

Learn how to upgrade Flink jobs without data loss - savepoint-based upgrades, state compatibility rules, schema evolution, and blue-green deployment patterns.

TL;DR: • Flink job upgrades require stopping the job with a savepoint, deploying the new version, and restoring from the savepoint - the key challenge is ensuring state compatibility between the old and new job versions. • State compatibility rules require matching operator UIDs, compatible state serializers, and careful handling of operator graph changes (adding/removing operators). • Blue-green deployment with parallel jobs provides the safest upgrade path for critical pipelines, at the cost of running double resources temporarily.

Upgrading a stateless web service is straightforward: deploy the new container, route traffic, done. Upgrading a stateful stream processing job in Apache Flink is a fundamentally different challenge. Your Flink job carries gigabytes (sometimes terabytes) of accumulated state - windowed aggregations, join buffers, deduplication sets, source offsets - and all of it must survive the upgrade intact. Lose that state and you lose hours or days of computed results. Corrupt it and you produce silently wrong output.

This guide walks through the practical mechanics of upgrading Flink jobs without downtime or data loss: savepoint-based upgrades, state compatibility rules, schema evolution, blue-green deployments, and rollback strategies.

Savepoint-Based Upgrade: The Standard Path

The canonical Flink upgrade follows a stop-and-restart pattern anchored on savepoints. A savepoint is a consistent, self-contained snapshot of your job’s complete state - every operator’s keyed and operator state, plus the exact position in every source partition.

Step-by-Step Process

1. Trigger a savepoint on the running job:

flink savepoint <job-id> s3://my-bucket/savepoints/

This writes the full state snapshot to durable storage. The job continues running during the savepoint.

2. Stop the job with a savepoint (atomic stop-and-snapshot):

flink stop --savepointPath s3://my-bucket/savepoints/ <job-id>

The stop command triggers a savepoint and then gracefully shuts down the job after the savepoint completes. This is preferred over cancel -s because it drains in-flight records first, ensuring exactly-once semantics are preserved.

3. Deploy the new job version:

Update your JAR, container image, or job configuration. This is the window where the job is not processing data - your “downtime” window.

4. Restore from the savepoint:

flink run -s s3://my-bucket/savepoints/savepoint-abc123 \
  -d my-new-job.jar

Flink maps each operator in the new job graph to its corresponding state in the savepoint using operator UIDs. The job resumes processing from exactly where it left off.

The entire process typically takes seconds to minutes depending on state size and cluster resources. During the downtime window, source systems (Kafka, database WAL) retain their data, so no events are lost - they are simply replayed when the job restarts.

State Compatibility Rules

The critical question in any Flink upgrade is: will the new job version accept the old state? Three rules govern compatibility.

Rule 1: Operator UIDs Must Match

Flink identifies operators across job versions by their UID. When restoring from a savepoint, Flink looks up each operator’s UID in the savepoint metadata and maps it to the corresponding operator in the new job graph. If an operator has a different UID (or no UID at all), Flink cannot find its state.

Rule 2: State Serializers Must Be Compatible

Each piece of operator state is stored using a specific serializer. The new job’s serializer for that state must be able to read data written by the old serializer. Flink’s built-in types (String, Long, POJO) handle many schema evolution cases automatically. Custom serializers need explicit migration logic.

Rule 3: State Type Must Not Change Fundamentally

You cannot change a ValueState<String> to a ListState<String> for the same state descriptor. The state access pattern is baked into the serialized format. If you need a different state type, you must use a new state name and handle migration in application logic.

Operator UID Best Practices

Operator UIDs are the single most important detail for upgrade safety, yet they are easy to overlook because Flink generates automatic UIDs based on graph topology - and those change whenever you modify the job graph.

Always Assign Explicit UIDs

DataStream<Event> enriched = source
    .keyBy(Event::getUserId)
    .process(new EnrichmentFunction())
    .uid("user-enrichment-v1")    // Explicit UID
    .name("User Enrichment");

Without the .uid() call, Flink assigns an auto-generated UID that changes if you add, remove, or reorder operators upstream. One missing UID can silently break savepoint restoration for an entire operator chain.

Naming Conventions

Adopt a consistent naming scheme for UIDs across your team:

<domain>-<operation>-<version>

Examples: orders-dedup-v1, payments-window-aggregate-v2, cdc-source-postgres-v1. The version suffix gives you an escape hatch - if you need to fundamentally change an operator’s state, increment the version and accept that old state for that operator will be dropped.

UID Registry

For large teams, maintain a simple registry (a YAML file in your repo, a wiki page, or a shared document) that tracks which UIDs are in use. This prevents accidental collisions when multiple engineers work on the same job.

Schema Evolution in State

Real-world upgrades frequently involve changing the structure of your state classes - adding a field to a POJO, removing a deprecated column, changing a type.

Adding Fields

Adding a new field to a POJO state class is the safest schema change. Flink’s POJO serializer initializes the new field to its default value when restoring old state:

// Before
public class UserProfile {
    public String userId;
    public String email;
}

// After - adding a field
public class UserProfile {
    public String userId;
    public String email;
    public String region;  // New field, defaults to null on restore
}

Removing Fields

Removing a field from a POJO is also supported. Flink skips the serialized data for the removed field during deserialization. The data is effectively dropped:

// Before
public class UserProfile {
    public String userId;
    public String email;
    public String legacyId;  // Being removed
}

// After - field removed
public class UserProfile {
    public String userId;
    public String email;
}

Type Changes

Changing a field’s type (e.g., int to long, String to Enum) is not supported by Flink’s built-in schema evolution. This requires a custom serializer migration or a fresh state start for that operator. If you must change a type, the safest approach is to add the new field alongside the old one, populate it in your processing logic, and remove the old field in a subsequent release.

Adding and Removing Operators

Adding New Operators

Adding a new operator to the job graph is always safe. The new operator has no prior state in the savepoint, so it starts fresh. This is the most common upgrade pattern - introducing new processing logic:

DataStream<Order> validated = orders
    .keyBy(Order::getId)
    .process(new ValidationFunction())
    .uid("order-validation-v1");

// New operator added in this release
DataStream<Order> enriched = validated
    .keyBy(Order::getId)
    .process(new FraudScoreFunction())  // Brand new
    .uid("fraud-scoring-v1");            // No prior state

Removing Operators

Removing an operator means its state in the savepoint has no corresponding operator in the new job. By default, Flink fails the restore to protect against accidental state loss. If the removal is intentional, use the --allowNonRestoredState flag:

flink run -s s3://my-bucket/savepoints/savepoint-abc123 \
  --allowNonRestoredState \
  -d my-new-job.jar

Use this flag with care. It silently drops state for any operator that cannot be matched, which could mask a legitimate UID mismatch bug.

Blue-Green Deployment

For pipelines where even seconds of downtime are unacceptable, blue-green deployment eliminates the stop-start gap entirely.

How It Works

  1. Launch the new job (green) alongside the old job (blue). Both consume from the same source topics or WAL streams.
  2. Verify the green job by comparing its output against the blue job’s output or against known-good test cases.
  3. Cut over by stopping the blue job and directing downstream consumers to the green job’s output.
  4. Clean up by deleting the blue job’s resources.

Sink Idempotency Is Required

During the overlap period, both jobs write to sinks. If your sink is not idempotent, you get duplicate data. Sinks that support upsert semantics (keyed inserts that overwrite on conflict) handle this naturally:

  • Idempotent sinks: Database upserts, keyed Kafka topics (log compaction), object storage with deterministic paths
  • Non-idempotent sinks: Append-only Kafka topics, insert-only databases, email/notification systems

For non-idempotent sinks, write the green job’s output to a separate staging topic or table during verification, then switch downstream consumers atomically.

Trade-offs

Blue-green requires double the compute and storage resources during the overlap window. For large jobs, this can be significant. The benefit is zero-downtime and the ability to verify correctness before committing to the new version.

Canary Deployments

Canary deployments route a percentage of traffic to the new version while the majority continues through the old version. In Flink, this typically means splitting your source:

// Canary: route 5% of keys to new processing path
DataStream<Event> canary = source
    .keyBy(Event::getKey)
    .process(new CanaryRouter(0.05))  // 5% to new path
    .uid("canary-router-v1");

Alternatively, use Kafka consumer group splitting - assign a subset of partitions to the new job and the remainder to the old job. Monitor error rates, latency, and output correctness on the canary partition set before rolling out fully.

Canary deployments add operational complexity (two active job versions, split output, monitoring for both) but provide the highest confidence for risky upgrades - major logic changes, serializer migrations, or Flink version bumps.

Practical Example: Upgrading a CDC Pipeline

Consider a CDC pipeline that captures changes from PostgreSQL, enriches them with a lookup table, and writes to Snowflake. You need to add a new transformation that masks PII fields before the Snowflake sink.

Current job graph:

PostgreSQL CDC Source → Enrichment → Snowflake Sink
       (uid: cdc-pg-v1)   (uid: enrich-v1)  (uid: sink-sf-v1)

New job graph:

PostgreSQL CDC Source → Enrichment → PII Masking → Snowflake Sink
       (uid: cdc-pg-v1)   (uid: enrich-v1)  (uid: pii-mask-v1)  (uid: sink-sf-v1)

The upgrade is safe because: all existing operator UIDs are preserved, the new PII masking operator has no prior state, and no serializers changed. The standard savepoint-based upgrade works here.

Platforms like Streamkap handle this pattern automatically - managing savepoints, deploying new pipeline versions, and restoring state without manual CLI commands. The platform validates state compatibility before deployment, catching UID mismatches or serializer conflicts before they cause a failed restore.

# Manual approach
flink stop --savepointPath s3://savepoints/ $JOB_ID
# Deploy new JAR
flink run -s s3://savepoints/savepoint-xyz -d pipeline-v2.jar

# With Streamkap: handled automatically via the platform UI

Rollback Strategy

Every upgrade should have a rollback plan. The savepoint you took before the upgrade is your safety net.

Restoring from the Original Savepoint

If the new version produces incorrect output or fails to start:

# Rollback: redeploy the OLD version from the ORIGINAL savepoint
flink run -s s3://my-bucket/savepoints/savepoint-abc123 \
  -d my-old-job.jar

This restores the old job to exactly where it was before the upgrade. Any data that arrived during the failed upgrade attempt is replayed from the source.

Data Replay Considerations

After a rollback, the job replays all events from the savepoint’s source offsets forward. Depending on how long the upgrade attempt lasted, this could be seconds or hours of data. Your sinks must handle this replay gracefully:

  • Idempotent sinks (upserts, keyed writes) handle replay automatically - duplicate writes overwrite with the same values.
  • Append-only sinks will produce duplicate records. You may need to truncate the sink’s output back to the savepoint timestamp and let the replayed data fill it back in.
  • External side effects (API calls, notifications) cannot be un-sent. Guard these with deduplication logic or at-least-once acknowledgment patterns.

Savepoint Retention

Keep savepoints for at least 24-48 hours after a successful upgrade. Do not delete the pre-upgrade savepoint immediately - subtle correctness issues may only surface after the new version has processed a full cycle of real data. A disciplined retention policy, combined with automated state compatibility checks like those provided by Streamkap, gives you a reliable safety net for any upgrade scenario.

Key Takeaways

Flink job upgrades are not inherently dangerous - they are dangerous when approached without a plan. Assign explicit operator UIDs from day one. Validate state compatibility before every deployment. Use blue-green or canary patterns for critical pipelines. Always keep your pre-upgrade savepoint until you have confirmed the new version is correct in production. With these practices in place, upgrading stateful Flink jobs becomes a routine, low-risk operation rather than a high-stakes event.