<--- Back to all resources
Flink SQL CDC Connectors: Reading Database Changes with SQL
Learn how to read real-time database changes in Flink SQL using CDC connectors for PostgreSQL, MySQL, MongoDB, and more. Build streaming pipelines from database changelogs.
Every database records a trail of changes. Rows are inserted, updated, and deleted thousands of times per second, and that stream of mutations represents the most precise, real-time signal your data infrastructure can offer. The problem is that most analytics and downstream systems never see those changes as they happen. They wait for a batch job, a scheduled sync, or an explicit query to pull a snapshot of the current state.
Flink SQL CDC connectors bridge that gap. They let you declare a SQL table that reads directly from a database’s transaction log, surfacing every INSERT, UPDATE, and DELETE as a row in a streaming query. The result is a live, queryable changelog - one that you can filter, join, aggregate, and route using standard SQL, all in real time.
How CDC Connectors Work
Flink SQL CDC connectors are built on top of Debezium, the open-source change data capture engine. When you create a CDC source table in Flink SQL, the connector establishes a connection to the database’s replication mechanism: the Write-Ahead Log (WAL) in PostgreSQL, the binary log (binlog) in MySQL, the oplog or change streams in MongoDB, and equivalent constructs in Oracle and SQL Server.
The process follows a consistent pattern across databases:
- Initial snapshot: The connector reads the full current state of the table to establish a baseline. Every existing row is emitted as an INSERT event.
- Log streaming: Once the snapshot completes, the connector switches to reading the transaction log in real time. New writes to the database appear as changelog events within milliseconds.
- Changelog encoding: Each event is tagged with an operation type - insert, update (before and after images), or delete - so downstream operators know exactly what changed.
This architecture means the CDC table in Flink SQL is not a static snapshot. It is a continuously updating stream that reflects every mutation applied to the source database, in the exact order it occurred.
Supported Databases
Each database exposes its transaction log differently, and each CDC connector has specific prerequisites. Here is what you need to know for the most common sources.
PostgreSQL
PostgreSQL requires logical replication to be enabled at the server level. Set wal_level=logical in postgresql.conf and create a replication slot for the connector. The connector uses the pgoutput or decoderbufs logical decoding plugin to read changes. You also need a database user with the REPLICATION attribute.
MySQL
MySQL CDC requires the binary log to be enabled with binlog_format=ROW and binlog_row_image=FULL. The connector reads the binlog to capture changes. The database user needs SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT privileges. MySQL 5.7 and 8.x are both supported.
MongoDB
MongoDB uses change streams rather than a traditional WAL. The connector requires a replica set or sharded cluster (standalone instances do not support change streams). MongoDB 4.0 or later is required. The connector captures insert, update, replace, and delete operations from the oplog-backed change stream API.
Oracle
Oracle CDC uses LogMiner or XStream to read the redo logs. LogMiner is available without additional licensing but has higher overhead. XStream requires the Oracle GoldenGate license. The database must have archivelog mode enabled, and supplemental logging must be turned on for the tables you want to capture.
SQL Server
SQL Server requires that CDC be explicitly enabled at both the database and table level using sys.sp_cdc_enable_db and sys.sp_cdc_enable_table. The connector reads from the CDC change tables that SQL Server populates from the transaction log. SQL Server Agent must be running to process the log.
Creating CDC Source Tables in Flink SQL
Declaring a CDC source table in Flink SQL follows the standard CREATE TABLE DDL with a WITH clause that specifies the connector and its configuration. Here is an example for PostgreSQL:
CREATE TABLE orders (
order_id INT,
customer_id INT,
product STRING,
quantity INT,
total_price DECIMAL(10, 2),
order_status STRING,
created_at TIMESTAMP(3),
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'db.example.com',
'port' = '5432',
'username' = 'cdc_user',
'password' = 'secret',
'database-name' = 'production',
'schema-name' = 'public',
'table-name' = 'orders',
'slot.name' = 'flink_orders_slot',
'decoding.plugin.name' = 'pgoutput'
);
For MySQL, the connector name changes to mysql-cdc, and you specify server-id instead of slot.name:
CREATE TABLE inventory (
item_id INT,
sku STRING,
warehouse STRING,
quantity INT,
updated_at TIMESTAMP(3),
PRIMARY KEY (item_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql.example.com',
'port' = '3306',
'username' = 'cdc_user',
'password' = 'secret',
'database-name' = 'warehouse',
'table-name' = 'inventory',
'server-id' = '5401'
);
The PRIMARY KEY ... NOT ENFORCED clause is important. It tells Flink which columns uniquely identify a row so that updates and deletes can be correctly matched to prior state, but Flink does not enforce the constraint at runtime.
Changelog Semantics
Every row emitted by a CDC source table carries an operation flag. Flink SQL represents these as four changelog kinds:
- +I (INSERT): A new row was created. This is the only event type emitted during the initial snapshot phase.
- -U (UPDATE_BEFORE): The previous state of a row before an update. This is the “retract” message - it tells downstream operators to undo any computation based on the old value.
- +U (UPDATE_AFTER): The new state of the row after the update. Paired with -U, this represents a complete update.
- -D (DELETE): The row was removed from the source table.
This four-event model is what makes Flink SQL CDC tables so powerful. Because the connector emits both the before and after images of every update, downstream aggregations can correctly retract stale values and apply new ones. Without the -U event, a running SUM would double-count every updated row.
You can observe the raw changelog by inserting into a Kafka sink that preserves the operation type, or by using the changelog-json format to inspect the events directly.
Practical Examples
Real-Time Order Tracking from PostgreSQL
With the orders CDC table defined above, you can build a real-time dashboard query that tracks order status changes as they happen:
SELECT
order_status,
COUNT(*) AS order_count,
SUM(total_price) AS total_revenue
FROM orders
GROUP BY order_status;
Because this is a CDC table, the results update continuously. When an order moves from pending to shipped in the PostgreSQL source, Flink automatically retracts the count from pending and adds it to shipped - no manual refresh, no batch delay.
Inventory Sync from MySQL CDC
Using the inventory table, you can detect low-stock items in real time and write alerts to a Kafka topic:
INSERT INTO low_stock_alerts
SELECT
sku,
warehouse,
quantity,
updated_at
FROM inventory
WHERE quantity < 10;
Every time an inventory row is updated in MySQL and the quantity drops below 10, the alert is produced within seconds.
Document Change Tracking from MongoDB
MongoDB CDC tables use a slightly different DDL because MongoDB documents do not have a fixed schema. You typically define the table with a STRING column for the document body:
CREATE TABLE user_profiles (
_id STRING,
document STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'mongo1.example.com:27017',
'username' = 'cdc_user',
'password' = 'secret',
'database' = 'app',
'collection' = 'user_profiles'
);
You can then parse the JSON document inside Flink SQL using built-in JSON functions to extract fields for downstream analytics or enrichment.
CDC Tables and Temporal Joins
One of the most valuable patterns with CDC source tables is the temporal join. A temporal join lets you enrich a streaming fact table with a versioned dimension table, looking up the dimension value that was valid at the time the fact event occurred.
For example, suppose you have an orders CDC table and a products CDC table. You can join them using the product’s state as of the order’s event time:
SELECT
o.order_id,
o.product,
o.quantity,
p.price AS unit_price,
o.quantity * p.price AS line_total
FROM orders AS o
JOIN products FOR SYSTEM_TIME AS OF o.proc_time AS p
ON o.product = p.product_id;
The FOR SYSTEM_TIME AS OF clause tells Flink to look up the product row that was current when the order event was processed. If the product’s price changes later, historical orders still reflect the price that was active at the time of purchase. This is only possible because the CDC table maintains a versioned history of every row - something a simple Kafka topic with append-only semantics cannot provide.
CDC Tables and Aggregations
CDC changelog streams are ideally suited for computing running metrics. Because each update includes a retraction of the old value, Flink can maintain correct aggregations over a continuously mutating dataset.
Consider a query that computes a rolling 1-hour revenue total per product category:
SELECT
product_category,
TUMBLE_START(updated_at, INTERVAL '1' HOUR) AS window_start,
SUM(total_price) AS hourly_revenue,
COUNT(*) AS order_count
FROM orders
GROUP BY
product_category,
TUMBLE(updated_at, INTERVAL '1' HOUR);
If an order’s total_price is corrected in the source database, Flink will retract the old value from the window aggregate and apply the corrected value - all without reprocessing the entire window. This retract-and-accumulate behavior is the core advantage of changelog-based stream processing.
Self-Managed vs Managed CDC
Running Flink SQL CDC connectors in production requires more than just the Flink cluster. A typical self-managed deployment involves:
- Debezium connectors running inside Kafka Connect, which need their own cluster, monitoring, and configuration management.
- Apache Kafka to buffer and distribute the changelog events, with its own brokers, Zookeeper/KRaft controllers, topic management, and retention policies.
- Schema Registry to manage Avro or Protobuf schemas as your source tables evolve.
- Flink cluster with properly sized TaskManagers, checkpoint storage, and job lifecycle management.
Each of these components introduces operational surface area: upgrades, security patches, capacity planning, failure recovery, and on-call rotations. For a team running CDC from five or ten database tables, the infrastructure overhead can easily exceed the effort spent on the actual business logic.
This is where a managed platform like Streamkap changes the picture entirely. Streamkap provides managed CDC connectors for over 30 databases that handle the Debezium, Kafka, and schema management layers entirely. You configure a source database in the Streamkap UI, and the CDC stream is immediately available for downstream processing - including Flink SQL transformations. There is no Debezium to deploy, no Kafka Connect cluster to tune, and no schema registry to maintain.
The operational difference is stark. Self-managed CDC is a distributed systems project. Managed CDC is a configuration step.
Common Issues
Even well-architected CDC pipelines encounter recurring challenges. Understanding these upfront saves significant debugging time.
Snapshot Phase Performance
The initial snapshot can be the longest phase of a CDC connector’s lifecycle. For a table with hundreds of millions of rows, the snapshot may take hours and generate significant load on the source database. Strategies to mitigate this include running the snapshot during off-peak hours, using incremental snapshot mode (available in newer Flink CDC versions), and ensuring the source database has adequate read replica capacity. Managed platforms like Streamkap handle snapshot orchestration automatically, including parallelized reads and backpressure management.
Slot Management (PostgreSQL)
PostgreSQL replication slots retain WAL segments until the consumer acknowledges them. If a CDC connector goes offline or falls behind, unacknowledged WAL segments accumulate on disk and can fill the server’s storage. Monitoring the pg_replication_slots view and setting max_slot_wal_keep_size (PostgreSQL 13+) are essential safeguards. Inactive slots should be dropped promptly.
Binlog Retention (MySQL)
MySQL purges binlog files based on binlog_expire_logs_seconds (or expire_logs_days in older versions). If the CDC connector is offline longer than the retention period, it loses its position in the log and must perform a full re-snapshot. Setting binlog retention to at least 72 hours and monitoring connector lag provide a reasonable safety margin.
Schema Evolution
When columns are added, removed, or altered in the source database, the CDC connector must handle the schema change gracefully. Most Debezium-based connectors can handle additive changes (new columns) automatically, but dropping or renaming columns may require connector reconfiguration or a fresh snapshot. Testing schema changes in a staging environment before applying them to production is always advisable.
Connector Lag and Backpressure
If the Flink job cannot consume events as fast as the database produces them, the connector will accumulate lag. In PostgreSQL, this manifests as growing WAL retention. In MySQL, it appears as increasing binlog read latency. Monitoring connector lag metrics and configuring alerts for sustained delays allows you to scale Flink resources or optimize queries before the lag becomes critical.
CDC connectors in Flink SQL give you one of the most direct paths from a database mutation to a streaming computation. The SQL interface lowers the barrier to entry, the changelog semantics ensure correctness, and the connector ecosystem covers the databases most teams actually run. Whether you manage the infrastructure yourself or use a managed platform to handle the operational complexity, the core pattern remains the same: declare a table, point it at a transaction log, and write SQL against the live stream of changes.