<--- Back to all resources

Engineering

February 25, 2026

10 min read

Real-Time Log Analytics with Flink: From Raw Logs to Insights

Learn how to build real-time log analytics with Apache Flink. Covers log parsing, structured extraction, error rate monitoring, log-level aggregations, and alerting pipelines.

TL;DR: • Flink processes raw log streams in real time, parsing unstructured text into structured fields that are immediately queryable. • Tumbling and sliding window aggregations on log levels give you live error rate monitoring that catches incidents in seconds instead of minutes. • Pattern detection with MATCH_RECOGNIZE identifies error cascades and repeated failure sequences before they become full outages. • Streamkap's managed Flink and Kafka infrastructure lets your team build log analytics pipelines without operating distributed systems.

Your application is logging thousands of lines per second. Somewhere in that stream, right now, there is a stack trace that signals the beginning of an incident. The question is whether you will find it in 10 seconds or 10 minutes.

Most log analytics setups work like this: logs ship to a central store (Elasticsearch, Splunk, Datadog), someone runs a query when things look wrong, and then the team starts investigating. This is fine for post-incident analysis. It is not fine for catching incidents early.

Apache Flink offers a different approach. Instead of storing logs and querying them reactively, you process the log stream as it happens. Flink parses raw log lines into structured data, computes error rates over sliding windows, detects patterns that indicate cascading failures, and fires alerts the moment something goes wrong. The logs still go to your search tool for interactive investigation, but the real-time analytics layer runs in parallel, watching for trouble continuously.

In this guide, we will build a log analytics pipeline with Flink SQL, covering log ingestion, parsing and structuring, error rate monitoring, log-level aggregations, and alerting.

Log Ingestion Schema

Logs arrive in many formats. Some applications emit structured JSON. Others produce semi-structured text with a known pattern. Legacy systems might emit completely unstructured output. The first step is getting all of these into Kafka, where Flink can consume them.

For JSON-formatted logs (the ideal case), the Flink table definition is straightforward:

CREATE TABLE application_logs (
  log_timestamp  TIMESTAMP(3),
  log_level      STRING,        -- 'DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'
  service_name   STRING,
  instance_id    STRING,
  trace_id       STRING,
  span_id        STRING,
  message        STRING,
  error_class    STRING,        -- e.g., 'NullPointerException'
  stack_trace    STRING,
  extra_fields   MAP<STRING, STRING>,
  WATERMARK FOR log_timestamp AS log_timestamp - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'application-logs',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset'
);

For text-formatted logs that arrive as raw strings, you ingest them as a single-column table and parse them in a subsequent step:

CREATE TABLE raw_logs (
  raw_line       STRING,
  kafka_timestamp TIMESTAMP(3) METADATA FROM 'timestamp',
  WATERMARK FOR kafka_timestamp AS kafka_timestamp - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'raw-application-logs',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'raw',
  'scan.startup.mode' = 'latest-offset'
);

Parsing Unstructured Logs

Many applications emit logs in a semi-structured format like this:

2026-02-25 14:32:01.456 ERROR [payment-service] [trace-abc123] PaymentProcessor - Failed to charge card: TimeoutException connecting to payment gateway

Flink’s REGEXP_EXTRACT function pulls structured fields from these lines:

CREATE VIEW parsed_logs AS
SELECT
  TO_TIMESTAMP(
    REGEXP_EXTRACT(raw_line, '^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})', 1)
  ) AS log_timestamp,
  REGEXP_EXTRACT(raw_line, '^\S+ \S+ (\w+)', 1) AS log_level,
  REGEXP_EXTRACT(raw_line, '\[([^\]]+)\]', 1) AS service_name,
  REGEXP_EXTRACT(raw_line, '\[([^\]]+)\]\s*\[([^\]]+)\]', 2) AS trace_id,
  REGEXP_EXTRACT(raw_line, '\] \w+ - (.+)$', 1) AS message,
  raw_line
FROM raw_logs
WHERE raw_line IS NOT NULL AND raw_line <> '';

This regex-based parsing is not as clean as consuming pre-structured JSON, but it works for legacy applications that you cannot easily modify. The important thing is that after this step, your downstream queries work with typed, structured fields regardless of the original log format.

For applications that emit multiple log formats, you can use a CASE expression to apply different parsing logic based on the log format:

CREATE VIEW multi_format_parsed AS
SELECT
  CASE
    WHEN raw_line LIKE '{%'
    THEN -- JSON format: extract from JSON
      TO_TIMESTAMP(JSON_VALUE(raw_line, '$.timestamp'))
    ELSE -- Text format: extract with regex
      TO_TIMESTAMP(REGEXP_EXTRACT(raw_line, '^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})', 1))
  END AS log_timestamp,
  CASE
    WHEN raw_line LIKE '{%'
    THEN JSON_VALUE(raw_line, '$.level')
    ELSE REGEXP_EXTRACT(raw_line, '^\S+ \S+ (\w+)', 1)
  END AS log_level,
  CASE
    WHEN raw_line LIKE '{%'
    THEN JSON_VALUE(raw_line, '$.service')
    ELSE REGEXP_EXTRACT(raw_line, '\[([^\]]+)\]', 1)
  END AS service_name,
  raw_line
FROM raw_logs;

Error Rate Monitoring

The most valuable thing you can do with a log stream is measure error rates in real time. A spike in error rate is the earliest signal that something is going wrong, often appearing before metrics dashboards or health checks catch the problem.

Here is a tumbling window query that computes error rates per service per minute:

CREATE VIEW error_rates_per_minute AS
SELECT
  service_name,
  TUMBLE_START(log_timestamp, INTERVAL '1' MINUTE) AS window_start,
  COUNT(*) AS total_logs,
  SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) AS error_count,
  SUM(CASE WHEN log_level = 'WARN' THEN 1 ELSE 0 END) AS warn_count,
  CAST(SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) AS DOUBLE)
    / NULLIF(COUNT(*), 0) AS error_rate,
  CAST(SUM(CASE WHEN log_level IN ('ERROR', 'WARN') THEN 1 ELSE 0 END) AS DOUBLE)
    / NULLIF(COUNT(*), 0) AS error_warn_rate
FROM application_logs
GROUP BY
  service_name,
  TUMBLE(log_timestamp, INTERVAL '1' MINUTE);

The NULLIF(COUNT(*), 0) prevents division by zero for services that emit no logs in a given minute. The error_rate field gives you the proportion of log lines that are errors, and error_warn_rate includes warnings, which often precede errors and serve as an early warning signal.

For smoother trend lines on dashboards, use a sliding window:

CREATE VIEW error_rates_rolling AS
SELECT
  service_name,
  HOP_START(log_timestamp, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE) AS window_start,
  CAST(SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) AS DOUBLE)
    / NULLIF(COUNT(*), 0) AS error_rate_10m,
  COUNT(*) AS total_logs_10m
FROM application_logs
GROUP BY
  service_name,
  HOP(log_timestamp, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE);

This 10-minute rolling window updates every minute and absorbs short-lived spikes, showing you the sustained error trend rather than momentary blips.

Log-Level Aggregations

Beyond error rates, log-level distributions tell you about the overall health of your services. A service that suddenly starts emitting 10x more DEBUG logs might have a configuration change that accidentally enabled verbose logging, consuming bandwidth and storage without providing value.

CREATE VIEW log_level_distribution AS
SELECT
  service_name,
  log_level,
  TUMBLE_START(log_timestamp, INTERVAL '5' MINUTE) AS window_start,
  COUNT(*) AS log_count
FROM application_logs
GROUP BY
  service_name,
  log_level,
  TUMBLE(log_timestamp, INTERVAL '5' MINUTE);

You can also track the top error messages per service to quickly identify the dominant failure modes:

CREATE VIEW top_errors_per_service AS
SELECT
  service_name,
  error_class,
  message,
  TUMBLE_START(log_timestamp, INTERVAL '5' MINUTE) AS window_start,
  COUNT(*) AS occurrence_count
FROM application_logs
WHERE log_level = 'ERROR'
GROUP BY
  service_name,
  error_class,
  message,
  TUMBLE(log_timestamp, INTERVAL '5' MINUTE);

This gives your on-call engineers an immediate answer to “what is breaking?” without having to search through logs manually. The top errors by occurrence count surface the most impactful issues first.

Alerting on Error Patterns

Simple threshold alerts (“error rate > 5%”) catch obvious problems. But some incidents manifest as patterns rather than thresholds. A service that alternates between healthy and erroring every few minutes might have a failing health check that triggers restarts. A cascade of errors across multiple services within a short window often indicates a shared dependency failure.

Flink’s MATCH_RECOGNIZE detects these patterns:

CREATE VIEW error_cascade_alerts AS
SELECT
  first_service,
  second_service,
  third_service,
  first_error_time,
  last_error_time,
  TIMESTAMPDIFF(SECOND, first_error_time, last_error_time) AS cascade_duration_seconds
FROM error_rates_per_minute
MATCH_RECOGNIZE (
  ORDER BY window_start
  MEASURES
    A.service_name AS first_service,
    B.service_name AS second_service,
    C.service_name AS third_service,
    A.window_start AS first_error_time,
    C.window_start AS last_error_time
  ONE ROW PER MATCH
  AFTER MATCH SKIP PAST LAST ROW
  PATTERN (A B C) WITHIN INTERVAL '5' MINUTE
  DEFINE
    A AS A.error_rate > 0.10,
    B AS B.error_rate > 0.10 AND B.service_name <> A.service_name,
    C AS C.error_rate > 0.10 AND C.service_name <> A.service_name
      AND C.service_name <> B.service_name
) AS cascade;

This pattern fires when three different services all exceed 10% error rate within a 5-minute window. That is a strong signal that a shared dependency (database, message broker, external API) is having problems, and the alert should page your infrastructure team rather than individual service owners.

For simpler threshold-based alerts, filter the error rate view:

CREATE VIEW error_rate_alerts AS
SELECT
  service_name,
  window_start,
  error_rate,
  total_logs,
  error_count
FROM error_rates_per_minute
WHERE error_rate > 0.05
  AND total_logs >= 100;  -- avoid alerting on low-volume services

The total_logs >= 100 filter prevents false alerts from services that emit only a handful of logs per minute. If a service emits 3 logs and 1 is an error, that is a 33% error rate but probably not an incident.

Enriching Logs with Service Metadata

Raw log analytics tell you which service is failing. Enriched log analytics tell you which team owns it, what environment it runs in, and where to route the alert. A service registry table, kept current by Streamkap’s CDC streaming, provides this context:

CREATE TABLE service_registry (
  service_name    STRING,
  team_owner      STRING,
  environment     STRING,    -- 'production', 'staging', 'development'
  tier            STRING,    -- 'critical', 'standard', 'background'
  pagerduty_key   STRING,
  slack_channel   STRING,
  PRIMARY KEY (service_name) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'service-registry-cdc',
  'properties.bootstrap.servers' = 'kafka:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

CREATE VIEW enriched_alerts AS
SELECT
  a.service_name,
  a.window_start,
  a.error_rate,
  a.error_count,
  s.team_owner,
  s.environment,
  s.tier,
  s.pagerduty_key,
  s.slack_channel
FROM error_rate_alerts a
JOIN service_registry s
  ON a.service_name = s.service_name
WHERE s.environment = 'production';

Now your alert pipeline knows not only that the payment-service has a high error rate, but also that it belongs to the payments team, runs in production, is a critical-tier service, and should route to a specific PagerDuty escalation policy and Slack channel. The WHERE s.environment = 'production' filter ensures you do not get paged for staging environment errors.

Writing to Sinks

Processed log analytics need to land in two places: an alerting system for immediate action and an analytics store for dashboards and historical analysis.

For alerting, write to a Kafka topic that your notification system consumes:

CREATE TABLE alert_sink (
  service_name   STRING,
  window_start   TIMESTAMP(3),
  error_rate     DOUBLE,
  error_count    BIGINT,
  team_owner     STRING,
  tier           STRING,
  pagerduty_key  STRING,
  slack_channel  STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'log-alerts',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json'
);

INSERT INTO alert_sink
SELECT
  service_name, window_start, error_rate, error_count,
  team_owner, tier, pagerduty_key, slack_channel
FROM enriched_alerts
WHERE tier = 'critical';

For dashboards and historical trending, write aggregates to a database:

CREATE TABLE log_metrics_sink (
  service_name STRING,
  window_start TIMESTAMP(3),
  total_logs   BIGINT,
  error_count  BIGINT,
  warn_count   BIGINT,
  error_rate   DOUBLE,
  PRIMARY KEY (service_name, window_start) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://analytics-db:5432/log_analytics',
  'table-name' = 'log_metrics',
  'driver' = 'org.postgresql.Driver'
);

INSERT INTO log_metrics_sink
SELECT
  service_name, window_start, total_logs,
  error_count, warn_count, error_rate
FROM error_rates_per_minute;

Scaling Log Analytics Pipelines

Log volume scales with your application infrastructure. Every new service, every new instance, every increase in traffic produces more logs. Your analytics pipeline must scale accordingly.

Partition by service name. This keeps all logs from a given service on the same Flink task, which is important for per-service aggregations and pattern detection. It also means a noisy service does not cause backpressure for other services.

Set state TTL aggressively. Log analytics queries use short windows (1 to 10 minutes). There is no reason to keep state around longer than the window duration plus watermark delay. A 15-minute state TTL is generous for most log analytics use cases.

Pre-filter before aggregation. If you only care about ERROR and WARN levels for alerting, filter out DEBUG and INFO logs before the aggregation step. This can reduce the volume that your aggregation operators handle by 90% or more in verbose applications.

Control log volume at the source. The best optimization for a log analytics pipeline is producing fewer logs. Work with application teams to ensure that DEBUG-level logging is disabled in production, that repetitive log messages use structured fields instead of string interpolation, and that health check logs are filtered or sampled.

Tying It All Together

Real-time log analytics with Flink gives your operations team a continuous, computed view of system health rather than a reactive search tool. Error rates update every minute. Pattern detection catches cascading failures within minutes of onset. Alert routing uses live service metadata to page the right team through the right channel.

This does not replace your log search tool. You still need Elasticsearch, Loki, or Datadog for interactive investigation once an alert fires. What Flink adds is the proactive layer: the system that watches the log stream constantly, computes the metrics that matter, and tells you something is wrong before a customer does.

Streamkap’s managed Flink and Kafka infrastructure handles the operational side of running these pipelines. You define your parsing logic, aggregation windows, and alerting rules in Flink SQL. Streamkap handles the clusters, checkpoints, and scaling. When your log volume doubles because you deployed 50 new microservices, the infrastructure grows with you.