<--- Back to all resources
Data Lineage in Streaming Pipelines
How to track data from source to dashboard in streaming systems. Covers OpenLineage, Apache Flink lineage, end-to-end tracking patterns, and using lineage for debugging production data issues.
Something is wrong with the revenue number on the executive dashboard. It is showing $2.3 million for today, but the finance team says it should be closer to $1.8 million. Where do you start?
Without data lineage, you start by guessing. You open the dashboard and find the SQL query. It reads from a Snowflake table called daily_revenue_summary. You check the dbt model that populates it. It reads from orders_fact, which is loaded by a streaming pipeline. You ask the data platform team which pipeline, and they say it is a Flink job that reads from a Kafka topic called order-events-enriched. That topic is produced by another Flink job that joins commerce.public.orders with commerce.public.customers. The orders topic comes from a CDC connector reading the production PostgreSQL database.
That investigation took two hours and four Slack conversations. With data lineage, you would click on daily_revenue_summary in your lineage tool, see the full upstream graph in seconds, and start debugging at the right layer immediately.
This article covers how to build and maintain data lineage for streaming pipelines, the tools and standards available, and the practical patterns that make lineage useful for debugging, compliance, and impact analysis.
What Lineage Captures
Data lineage is a directed graph where:
- Nodes are datasets (Kafka topics, database tables, files, API endpoints)
- Edges are jobs (Flink applications, Kafka Connect connectors, dbt models, Spark jobs)
Each edge represents a data flow: this job reads from these datasets and writes to those datasets. The graph shows the full path from raw sources to final consumption points.
Column-Level vs. Dataset-Level Lineage
Dataset-level lineage tells you: “Flink job X reads from topic A and writes to topic B.” This is the coarser granularity and the easier one to capture.
Column-level lineage tells you: “The revenue column in topic B is computed from the amount and discount columns in topic A.” This is more useful for debugging but harder to extract, especially from complex Flink SQL queries with expressions, joins, and aggregations.
Most teams start with dataset-level lineage and add column-level lineage for critical paths where debugging precision matters.
Operational vs. Design-Time Lineage
Design-time lineage is what you document when you build the pipeline: “this job is designed to read from topic A and write to topic B.” It is accurate when written but can drift from reality if someone changes the job without updating the documentation.
Operational lineage is captured at runtime: “at 14:32 today, this job actually read 1.2 million records from topic A and wrote 1.1 million records to topic B.” It is always accurate because it is observed, not declared.
The best lineage systems combine both: design-time lineage for planning and impact analysis, operational lineage for debugging and auditing.
OpenLineage: The Emerging Standard
OpenLineage is an open-source standard for lineage event collection, originally developed at Datakin and now a Linux Foundation project. It defines a JSON event format that any job can emit to describe its lineage.
The OpenLineage Event Model
An OpenLineage event has three core components:
{
"eventType": "COMPLETE",
"eventTime": "2026-02-25T14:32:00Z",
"run": {
"runId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
},
"job": {
"namespace": "streaming-platform",
"name": "order-enrichment-flink-job"
},
"inputs": [
{
"namespace": "kafka",
"name": "commerce.public.orders",
"facets": {
"schema": {
"fields": [
{"name": "order_id", "type": "BIGINT"},
{"name": "customer_id", "type": "BIGINT"},
{"name": "amount", "type": "DECIMAL"}
]
}
}
},
{
"namespace": "kafka",
"name": "commerce.public.customers"
}
],
"outputs": [
{
"namespace": "kafka",
"name": "order-events-enriched",
"facets": {
"schema": {
"fields": [
{"name": "order_id", "type": "BIGINT"},
{"name": "customer_name", "type": "STRING"},
{"name": "amount", "type": "DECIMAL"}
]
}
}
}
]
}
This single event tells the lineage system: “The order-enrichment Flink job completed a run. It read from commerce.public.orders and commerce.public.customers, and wrote to order-events-enriched.”
OpenLineage Event Types
Jobs emit events at three points:
- START: The job run began. Inputs and outputs are declared.
- RUNNING: Periodic heartbeat during long-running jobs (like streaming). Can include updated metrics.
- COMPLETE or FAIL: The job run finished. Includes final metrics and any error information.
For streaming jobs that run indefinitely, the RUNNING events are the most useful. They confirm the job is alive and can include operational facets like record counts and byte throughput.
Facets: Extensible Metadata
OpenLineage uses “facets” to attach additional metadata to jobs, runs, and datasets. Standard facets include:
- SchemaDatasetFacet: Field names and types
- DataQualityMetricsInputDatasetFacet: Record counts, null counts, distinct counts
- ColumnLineageDatasetFacet: Column-level lineage mappings
- SQLJobFacet: The SQL query that defines the transformation
- SourceCodeLocationJobFacet: Git repo and file path for the job code
You can also define custom facets for domain-specific metadata.
Lineage Collection for Each Pipeline Component
Kafka Connect
Kafka Connect has emerging OpenLineage integration. The Marquez Kafka Connect lineage plugin emits OpenLineage events for source and sink connectors. When a connector starts, it emits a START event declaring which database tables (for source connectors) or which Kafka topics (for sink connectors) it interacts with.
For Streamkap-managed connectors, the platform tracks which source databases and tables are connected to which Kafka topics and which destinations receive data. This pipeline-level view forms the outer edges of your lineage graph: the CDC source and the final destination.
Apache Flink
Flink’s lineage story is the least mature component in the streaming stack. As of Flink 1.18+, there is a LineageGraph API that exposes job topology information, but native OpenLineage emission is not built in for most deployment modes.
Practical approaches for Flink lineage:
1. Static analysis of Flink SQL. Parse the SQL to extract input and output table references. This works for Flink SQL jobs and can be automated in CI:
import sqlparse
def extract_tables(sql):
parsed = sqlparse.parse(sql)[0]
inputs = []
outputs = []
# Simple heuristic: tables after FROM/JOIN are inputs,
# tables after INSERT INTO are outputs
tokens = [t for t in parsed.flatten() if not t.is_whitespace]
for i, token in enumerate(tokens):
if token.ttype is sqlparse.tokens.Keyword:
if token.normalized in ('FROM', 'JOIN'):
if i + 1 < len(tokens):
inputs.append(tokens[i + 1].value)
elif token.normalized == 'INTO':
if i + 1 < len(tokens):
outputs.append(tokens[i + 1].value)
return inputs, outputs
This is a simplified example. Production SQL parsers need to handle CTEs, subqueries, and table functions. But the principle works: extract the DAG from the SQL at deployment time and emit it as a design-time lineage event.
2. Job annotations. Add lineage metadata to your Flink job’s configuration:
# flink-job-config.yaml
job:
name: order-enrichment
lineage:
inputs:
- kafka://commerce.public.orders
- kafka://commerce.public.customers
outputs:
- kafka://order-events-enriched
owner: data-platform-team
description: "Joins orders with customer data, filters cancelled orders"
A deployment script reads this configuration and emits OpenLineage events to your lineage backend when the job is deployed or restarted.
3. Flink metrics as operational lineage. Flink exposes per-source and per-sink metrics (records in, records out, bytes in, bytes out). A sidecar process can scrape these metrics and emit OpenLineage RUNNING events that include operational facets:
{
"eventType": "RUNNING",
"job": {"name": "order-enrichment"},
"inputs": [{
"name": "commerce.public.orders",
"inputFacets": {
"inputStatistics": {
"rowCount": 1523847,
"byteCount": 892341567
}
}
}]
}
dbt and Warehouse Models
If your streaming pipeline feeds into a warehouse where dbt models run, dbt has native OpenLineage support via the dbt-openlineage integration. This connects the streaming lineage graph to the analytics lineage graph, giving you end-to-end visibility from source database to dashboard.
Lineage Backends: Where the Graph Lives
Marquez
Marquez is the reference implementation for OpenLineage. It provides:
- An HTTP API for collecting OpenLineage events
- A metadata store (PostgreSQL-backed) for the lineage graph
- A web UI for browsing and searching lineage
- REST APIs for querying upstream/downstream dependencies
Marquez is lightweight and focused specifically on lineage. It does not try to be a full data catalog.
DataHub
DataHub includes a lineage graph as part of its broader catalog functionality. If you are already using DataHub as your data catalog (as discussed in our streaming data catalog guide), adding lineage is a natural extension. DataHub can ingest OpenLineage events directly.
Atlan, Alation, and Commercial Options
Commercial catalog tools like Atlan and Alation also support lineage ingestion, often with richer visualization and collaboration features. The trade-off is cost and vendor lock-in.
End-to-End Lineage: Connecting the Dots
The most valuable lineage graph connects everything from the source database to the dashboard. Here is what a complete lineage path looks like for a typical Streamkap-based pipeline:
PostgreSQL (commerce.orders table)
│
▼ [Streamkap CDC Connector]
│
Kafka (commerce.public.orders topic)
│
▼ [Flink: order-enrichment job]
│
Kafka (order-events-enriched topic)
│
▼ [Streamkap Snowflake Connector]
│
Snowflake (raw.orders_enriched table)
│
▼ [dbt: orders_fact model]
│
Snowflake (analytics.orders_fact table)
│
▼ [dbt: daily_revenue_summary model]
│
Snowflake (analytics.daily_revenue_summary table)
│
▼ [Looker Dashboard]
│
Executive Revenue Dashboard
Each arrow represents a job that should emit lineage events. The complete graph lets you:
- Trace forward (impact analysis): “If I change the
orderstable schema, what dashboards are affected?” - Trace backward (root cause): “The dashboard revenue number is wrong. Where in the pipeline did the error enter?”
- Assess blast radius: “If the Flink cluster goes down, which destinations stop receiving fresh data?”
Using Lineage for Debugging
Lineage is most valuable during incidents. Here is a practical debugging workflow.
Scenario: Dashboard Shows Wrong Revenue
-
Start at the symptom. The
daily_revenue_summarytable shows $2.3M instead of the expected $1.8M. -
Trace upstream in the lineage graph. Click on
daily_revenue_summaryin your lineage tool and follow the upstream path. -
Check each layer.
orders_factin Snowflake: doesSELECT SUM(amount) FROM orders_fact WHERE date = todaymatch the dashboard? If yes, the dbt model is correct; the problem is upstream.orders_enrichedin Snowflake: check raw ingested data. Are there duplicate records? RunSELECT order_id, COUNT(*) FROM raw.orders_enriched GROUP BY order_id HAVING COUNT(*) > 1.order-events-enrichedKafka topic: check the Flink job metrics. Is the output record count higher than expected?commerce.public.ordersKafka topic: check the CDC connector metrics. Are there unexpected events?
-
Identify the layer where the data diverges from expected. If the Kafka topic has duplicate events, the problem is in the CDC connector or the Flink job. If the raw Snowflake table has more records than the Kafka topic, the problem is in the sink connector.
-
Fix the root cause. Without lineage, you might fix the symptom (patch the dbt model to deduplicate) without addressing the root cause (the CDC connector is producing duplicates due to a rebalance).
Record-Level Tracing for Critical Paths
For pipelines where you need to trace individual records (e.g., financial transactions, regulatory data), add a correlation ID at the source:
-- In Flink, propagate the correlation ID through every transformation
SELECT
order_id,
correlation_id, -- propagated from source CDC event
customer_name,
amount
FROM orders_enriched;
The correlation ID lets you query any intermediate system and find the specific records that contributed to a given output. This is expensive (extra storage, extra field in every schema) but invaluable for regulated industries.
Automating Lineage Collection
Manual lineage documentation drifts from reality. Automate collection wherever possible:
CI/CD Integration
When a Flink job is deployed, the deployment pipeline should:
- Parse the job’s SQL or configuration to extract inputs and outputs
- Emit an OpenLineage START event to the lineage backend
- Store the lineage metadata alongside the job artifact
#!/bin/bash
# deploy-flink-job.sh
# Extract lineage from job config
python emit_lineage.py --config flink-job-config.yaml --event-type START
# Deploy the job
flink run -d job.jar
# Emit completion event
python emit_lineage.py --config flink-job-config.yaml --event-type RUNNING
Periodic Discovery
Run a scheduled job that:
- Lists all Kafka topics via the admin API
- Lists all consumer groups and their topic subscriptions
- Compares with the known lineage graph
- Alerts on new topics or consumers that are not documented
# discover_undocumented_topics.py
known_topics = lineage_api.get_all_datasets(namespace="kafka")
actual_topics = kafka_admin.list_topics()
undocumented = actual_topics - known_topics
if undocumented:
alert(f"Undocumented Kafka topics found: {undocumented}")
This catches shadow pipelines (topics and consumers created without going through the standard deployment process).
Streamkap Pipeline Metadata
Streamkap’s management interface tracks every configured pipeline: which source it reads from, which transforms are applied, and which destination it writes to. This metadata forms the authoritative lineage for the CDC and sink portions of your streaming architecture. Export it periodically to your lineage backend to keep the graph current.
Lineage for Compliance
Regulations like GDPR, CCPA, and HIPAA require you to know where personal data flows. Lineage answers these questions:
- “Which systems contain customer email addresses?” Trace forward from the source table that has the email column.
- “If a customer requests deletion, which systems need to be updated?” Follow the lineage graph from the source to every downstream destination.
- “Can this team access this data?” Combine lineage with access control metadata to verify that only authorized teams are in the consumption path.
For streaming systems, this is especially important because data fans out to many consumers in real time. A single Kafka topic might feed ten different systems. Without lineage, you cannot enumerate all the places where a given piece of personal data ends up.
Getting Started
If you have no lineage today, here is a practical starting plan:
-
Document your top 5 critical pipelines manually. Draw the graph on a whiteboard or in a diagramming tool. Identify the sources, transformations, and destinations.
-
Deploy Marquez (or enable lineage in your existing catalog tool). It takes less than an hour with Docker Compose.
-
Add OpenLineage emission to one pipeline. Pick the most critical or most frequently debugged pipeline. Instrument the Kafka Connect connectors and add job annotations to the Flink job.
-
Use the lineage graph the next time there is an incident. If it helps you debug faster, expand to more pipelines. If it does not, re-evaluate what metadata you are capturing.
-
Automate collection in CI/CD. As you prove the value, bake lineage emission into your deployment pipeline so every new job is automatically tracked.
Data lineage is infrastructure that pays off during incidents, audits, and onboarding. You will not use it every day, but when you need it, nothing else can substitute for knowing exactly where your data came from and where it went.