<--- Back to all resources

Engineering

February 25, 2026

14 min read

Your First Flink Job: A Beginner's Tutorial

A hands-on tutorial for writing your first Apache Flink job. Covers local environment setup, Flink SQL basics, connecting to Kafka, and monitoring your running job.

TL;DR: - You can run a fully functional Flink cluster on your laptop using Docker Compose in under five minutes. - Flink SQL lets you write streaming jobs without any Java or Scala code, using familiar SELECT, WHERE, and GROUP BY syntax. - Connecting Flink to Kafka requires a connector JAR and a CREATE TABLE statement that maps Kafka topics to Flink tables. - The Flink web UI gives you real-time visibility into job status, throughput, checkpointing, and backpressure.

Apache Flink is one of those technologies that sounds intimidating until you actually sit down and use it. The documentation is thorough but dense. The architecture diagrams show distributed systems with JobManagers, TaskManagers, and state backends. It is easy to assume you need a cluster of machines and months of experience to get started.

You do not. In this tutorial, you will go from zero to a running Flink job in about 30 minutes. We will set up a local environment with Docker, write a streaming SQL job, connect it to Kafka, and monitor it through Flink’s built-in web UI.

Prerequisites

You will need the following installed on your machine:

  • Docker and Docker Compose (Docker Desktop includes both)
  • A terminal (any shell will do)
  • A text editor (VS Code, Vim, whatever you prefer)
  • About 4 GB of free RAM (Flink and Kafka together need some breathing room)

No Java installation is required. We will use Flink SQL, which runs inside the Flink containers.

Setting Up Your Local Environment

We need three things running locally: a Flink cluster, a Kafka broker, and something to generate test data. Docker Compose makes this straightforward.

The Docker Compose File

Create a directory for this tutorial and add a docker-compose.yml file:

mkdir flink-tutorial && cd flink-tutorial
# docker-compose.yml
version: '3.8'

services:
  jobmanager:
    image: flink:1.19-java11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        state.backend: hashmap
        state.checkpoints.dir: file:///tmp/flink-checkpoints
        execution.checkpointing.interval: 10s
    volumes:
      - ./sql:/opt/flink/sql
      - ./lib:/opt/flink/lib/custom

  taskmanager:
    image: flink:1.19-java11
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 4
        state.backend: hashmap
        state.checkpoints.dir: file:///tmp/flink-checkpoints
        execution.checkpointing.interval: 10s
    volumes:
      - ./lib:/opt/flink/lib/custom

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,HOST://localhost:9092
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
      KAFKA_LISTENERS: PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,HOST://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

This gives you:

  • JobManager: The Flink coordinator process, exposed on port 8081 for the web UI.
  • TaskManager: The Flink worker process with 4 task slots (enough for our tutorial).
  • Kafka: A single-node Kafka broker in KRaft mode (no ZooKeeper needed).

Download the Kafka Connector

Flink needs a connector JAR to talk to Kafka. Download it into a lib directory:

mkdir lib
curl -L -o lib/flink-sql-connector-kafka-3.1.0-1.19.jar \
  https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.19/flink-sql-connector-kafka-3.1.0-1.19.jar

Start Everything Up

docker compose up -d

Wait about 30 seconds for everything to initialize. You can check the status:

docker compose ps

All three services should show as “running.” Open your browser and go to http://localhost:8081. You should see the Flink web UI with one TaskManager and 4 available task slots.

Generating Test Data

Before we write a Flink job, we need data flowing into Kafka. Let’s create a simple topic and produce some JSON messages.

Create a Kafka Topic

docker compose exec kafka kafka-topics --create \
  --topic page-views \
  --bootstrap-server kafka:29092 \
  --partitions 4 \
  --replication-factor 1

Produce Test Messages

We will use the Kafka console producer to send JSON messages. In a real system, these would come from your application, a CDC pipeline, or another streaming source.

docker compose exec -T kafka kafka-console-producer \
  --broker-list kafka:29092 \
  --topic page-views << 'EOF'
{"user_id": "user_001", "page": "/home", "action": "view", "ts": "2026-02-25T10:00:01Z"}
{"user_id": "user_002", "page": "/pricing", "action": "view", "ts": "2026-02-25T10:00:02Z"}
{"user_id": "user_001", "page": "/docs", "action": "view", "ts": "2026-02-25T10:00:05Z"}
{"user_id": "user_003", "page": "/home", "action": "view", "ts": "2026-02-25T10:00:07Z"}
{"user_id": "user_002", "page": "/pricing", "action": "click", "ts": "2026-02-25T10:00:10Z"}
{"user_id": "user_001", "page": "/signup", "action": "click", "ts": "2026-02-25T10:00:12Z"}
{"user_id": "user_004", "page": "/home", "action": "view", "ts": "2026-02-25T10:00:15Z"}
{"user_id": "user_003", "page": "/docs", "action": "view", "ts": "2026-02-25T10:00:18Z"}
{"user_id": "user_002", "page": "/signup", "action": "click", "ts": "2026-02-25T10:00:20Z"}
{"user_id": "user_005", "page": "/home", "action": "view", "ts": "2026-02-25T10:00:22Z"}
EOF

We now have 10 page view events in Kafka. Let’s process them with Flink.

Flink SQL lets you write streaming jobs using familiar SQL syntax. Instead of writing Java code, you define sources and sinks as tables and write queries against them.

docker compose exec jobmanager ./bin/sql-client.sh

You should see the Flink SQL CLI prompt. This is an interactive SQL shell connected to your Flink cluster.

Define the Source Table

First, tell Flink how to read from the Kafka topic:

CREATE TABLE page_views (
    user_id STRING,
    page STRING,
    action STRING,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'page-views',
    'properties.bootstrap.servers' = 'kafka:29092',
    'properties.group.id' = 'flink-tutorial',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

Let’s break this down:

  • Column definitions (user_id STRING, etc.): These map to the fields in your JSON messages. Flink will automatically parse the JSON and extract these fields.
  • WATERMARK FOR ts AS ts - INTERVAL '5' SECOND: This tells Flink to use the ts field as the event time and to allow events that arrive up to 5 seconds late. Watermarks are how Flink tracks the progress of time in a stream.
  • WITH clause: Configuration for the Kafka connector. We specify the topic, broker address, consumer group, where to start reading, and the data format.

Run a Simple Query

Let’s start with the simplest possible query to verify everything works:

SELECT * FROM page_views;

You should see the 10 messages we produced earlier displayed in the terminal. Press Q to stop the query.

This is already a running Flink job. Behind the scenes, Flink is reading from Kafka, deserializing JSON, applying the schema, and streaming results to your terminal. The difference from a regular SQL query is that this one is continuous. If you produce more messages to the Kafka topic while the query is running, they will appear in the output immediately.

Windowed Aggregation

Now let’s do something more interesting. Count page views per page in 1-minute tumbling windows:

SELECT
    page,
    TUMBLE_START(ts, INTERVAL '1' MINUTE) AS window_start,
    TUMBLE_END(ts, INTERVAL '1' MINUTE) AS window_end,
    COUNT(*) AS view_count
FROM page_views
GROUP BY
    page,
    TUMBLE(ts, INTERVAL '1' MINUTE);

This query groups events into 1-minute windows based on the ts field and counts how many views each page received in each window. Because all our test data falls within the same minute, you will see one row per page.

The output should look something like:

    page     window_start            window_end              view_count
    /home    2026-02-25 10:00:00.000 2026-02-25 10:01:00.000          3
    /pricing 2026-02-25 10:00:00.000 2026-02-25 10:01:00.000          2
    /docs    2026-02-25 10:00:00.000 2026-02-25 10:01:00.000          2
    /signup  2026-02-25 10:00:00.000 2026-02-25 10:01:00.000          2

Filtering Streams

You can filter a stream just like a regular table:

SELECT user_id, page, ts
FROM page_views
WHERE action = 'click';

This returns only click events. In a real application, you might use this to build a real-time click tracking pipeline.

Writing Results to a Kafka Sink

So far, we have been viewing results in the SQL client. In production, you want Flink to write results to another system: a Kafka topic, a database, or a data warehouse.

Define the Sink Table

Let’s create an output Kafka topic and a corresponding Flink table:

# In a separate terminal
docker compose exec kafka kafka-topics --create \
  --topic page-view-counts \
  --bootstrap-server kafka:29092 \
  --partitions 4 \
  --replication-factor 1

Back in the Flink SQL client:

CREATE TABLE page_view_counts (
    page STRING,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    view_count BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'page-view-counts',
    'properties.bootstrap.servers' = 'kafka:29092',
    'format' = 'json'
);

Submit a Continuous Job

Now, insert the results of our windowed aggregation into the sink table:

INSERT INTO page_view_counts
SELECT
    page,
    TUMBLE_START(ts, INTERVAL '1' MINUTE) AS window_start,
    TUMBLE_END(ts, INTERVAL '1' MINUTE) AS window_end,
    COUNT(*) AS view_count
FROM page_views
GROUP BY
    page,
    TUMBLE(ts, INTERVAL '1' MINUTE);

This submits a long-running Flink job. Unlike the SELECT queries we ran earlier, this one does not display results in the terminal. Instead, it runs in the background, continuously reading from the page-views topic and writing aggregated results to the page-view-counts topic.

Verify the Output

Read from the output topic to see the results:

docker compose exec kafka kafka-console-consumer \
  --bootstrap-server kafka:29092 \
  --topic page-view-counts \
  --from-beginning

You should see JSON messages with page view counts per window.

Monitoring Your Running Job

Open the Flink web UI at http://localhost:8081. Click on “Running Jobs” and you should see your INSERT job listed.

The Job Graph

Click on the job to see the job graph. This is a visual representation of your Flink pipeline:

Source: page_views  ──▶  GroupAggregate  ──▶  Sink: page_view_counts

Each box in the graph is an operator. Flink chains operators together when possible for efficiency. You can click on individual operators to see their metrics.

Key Metrics to Watch

For each operator, the web UI shows:

  • Records Received / Sent: How many records have flowed through this operator. This tells you whether data is actually moving.
  • Records Per Second: The current throughput. For our small test, this will be low. In production, you would see thousands or millions per second.
  • Busy Time: What percentage of time the operator is actively processing records vs. waiting for input. High busy time (above 90%) on a downstream operator can indicate backpressure.

Checkpointing

Click the “Checkpoints” tab to see checkpoint history. Remember, we configured checkpointing every 10 seconds in our Docker Compose file:

execution.checkpointing.interval: 10s

Each checkpoint is a consistent snapshot of your job’s state. If the job crashes, Flink restarts it from the last successful checkpoint. You should see checkpoints completing every 10 seconds, each taking only a few milliseconds for our small workload.

Key checkpoint metrics:

  • Duration: How long the checkpoint took. This should be small relative to the checkpoint interval. If checkpoints take 8 seconds and your interval is 10 seconds, you have a problem.
  • State Size: How much state data was checkpointed. For our simple aggregation, this is tiny. For jobs with large state (like session windows over millions of users), this can be gigabytes.
  • Alignment Duration: How long operators waited for checkpoint barriers to align across parallel tasks. High alignment duration indicates data skew.

Backpressure

The web UI has a “BackPressure” tab that shows whether any operator is slowing down the pipeline. In a healthy job, all operators should show “OK” with low backpressure ratios. If a downstream operator (like a database sink) cannot keep up, you will see backpressure propagate upstream through the job graph.

Adding a More Complex Job

Let’s build something slightly more realistic: counting unique users per page per minute and detecting pages with unusual spikes.

-- Create a table for spike alerts
CREATE TABLE page_spike_alerts (
    page STRING,
    window_start TIMESTAMP(3),
    unique_users BIGINT,
    alert_message STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'page-spike-alerts',
    'properties.bootstrap.servers' = 'kafka:29092',
    'format' = 'json'
);

-- Submit a job that detects pages with more than 2 unique users per minute
INSERT INTO page_spike_alerts
SELECT
    page,
    TUMBLE_START(ts, INTERVAL '1' MINUTE) AS window_start,
    COUNT(DISTINCT user_id) AS unique_users,
    CONCAT('High traffic on ', page, ': ', CAST(COUNT(DISTINCT user_id) AS STRING), ' unique users') AS alert_message
FROM page_views
GROUP BY
    page,
    TUMBLE(ts, INTERVAL '1' MINUTE)
HAVING COUNT(DISTINCT user_id) > 2;

This job uses COUNT(DISTINCT user_id) to count unique visitors and HAVING to filter for windows with more than 2 unique users. The /home page should trigger an alert because it had 3 unique users in our test data.

Testing With Continuous Data

To see your jobs process data in real time, produce more messages while the jobs are running. Open a new terminal and run:

# Produce a batch of new events
for i in $(seq 1 20); do
  user="user_$(printf '%03d' $((RANDOM % 10 + 1)))"
  pages=("/home" "/pricing" "/docs" "/signup" "/blog")
  page=${pages[$((RANDOM % 5))]}
  actions=("view" "click")
  action=${actions[$((RANDOM % 2))]}
  ts=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
  echo "{\"user_id\": \"$user\", \"page\": \"$page\", \"action\": \"$action\", \"ts\": \"$ts\"}"
done | docker compose exec -T kafka kafka-console-producer \
  --broker-list kafka:29092 \
  --topic page-views

Watch the Flink web UI. You should see the “Records Received” counter tick up as new messages flow through the pipeline. The output topics will receive new results as windows close.

Cleaning Up

When you are done experimenting:

docker compose down -v

This stops all containers and removes the volumes.

Where to Go From Here

You have now built a working Flink pipeline that reads from Kafka, performs windowed aggregations, writes results to another Kafka topic, and monitors job health through the web UI. That covers the core workflow for most Flink projects.

Here are the natural next steps:

Flink SQL supports a lot more than what we covered:

  • Joins: Join two streams together based on time windows or key equality.
  • Session windows: Group events by periods of activity with gaps.
  • Pattern matching (MATCH_RECOGNIZE): Detect complex event patterns across a stream.
  • Temporal joins: Join a stream against a slowly changing dimension table.

Connect to Real Data Sources

Replace the console producer with a real data source. If you have a PostgreSQL database, you can use Flink’s CDC connector to stream changes directly into Flink for processing. Or connect to an existing Kafka topic in your development environment.

Running Flink locally is great for learning. Running it in production is a different story. You need to handle cluster sizing, high availability, state backend configuration, savepoint management, and upgrades.

Streamkap offers managed Flink alongside managed Kafka and real-time CDC connectors. Instead of operating Flink infrastructure, you write your SQL jobs and the platform handles the rest. This is especially useful for teams that want to add real-time stream processing to their data pipelines without dedicating engineering time to Flink operations.

Write a Custom Operator

If Flink SQL does not cover your use case, you can write custom operators in Java or Python. The DataStream API gives you full control over state management, timers, and event processing. Start with ProcessFunction and KeyedProcessFunction, which are the building blocks for most custom Flink logic.

Whatever direction you take, the fundamentals are the same: define your sources, write your processing logic, define your sinks, and let Flink handle the rest. The SQL you wrote in this tutorial is not a toy example. Windowed aggregations over Kafka streams are one of the most common production Flink use cases, and the approach scales from 10 messages to billions.