Streaming with Change Data Capture into BigQuery
BigQuery is a cloud data warehouse as part of the Google Cloud Platform and is a popular destination for streaming from operational and transactional sources utilizing Change Data Capture (CDC).
As companies start maturing their data pipelines in the move from slow batch to real-time streaming using CDC, there is the choice for organizations to make whether to use open source software or work with a managed service such as Streamkap.
In this blog, we will look at how to stream or replicate data into BigQuery with open source systems as well as some important considerations such as inserts vs upserts, schema drift, snapshotting/backfilling, adding metadata to the BigQuery tables, cost control, and monitoring.
Why use BigQuery for Streaming CDC Data?
Here are a few features that make BigQuery well-suited for changed data.
- Real-time data analysis: BigQuery supports low latency ingestion between sources and destinations with very low latency times., which allows for real-time analytics.
- Scalability: BigQuery can handle very large volumes of data at a very low cost. This means you can hold more data for longer if you wish for example to look at the data at precise points in time.
- Integration with other Google Cloud tools: If you are already using Google Cloud then BigQuery is an obvious choice for compatibility with other Google Cloud tools, such as Cloud SQL, Cloud Pub/Sub, Cloud Data Fusion, and Cloud Functions.
- Affordable pricing: BigQuery has a pay-as-you-go pricing model, which means you only pay for the resources you use. It is extremely low cost to load data and store streaming data and equally for querying so long as you have optimized your tables so that the SQL queries run faster. Check out the Google docs on table partitions and clustering as well as this helpful guide How to Use Partitions and Clusters in BigQuery Using SQL.
What do we mean by Streaming CDC (Change Data Capture)?
- ETL (extract, transform, load) refers to the extracting of data from one or more source databases, transforming it, and loading it into one or more destination tables. With Streaming ETL, this is occurring continuously whereas with Batch ETL it occurs at intervals. See our blog on Batch vs Real-Time Stream Processing.
- Change Data Capture is the process of capturing changed data from each data source and sending these to a destination system.
- Popular data sources for operational data with changed data streams include PostgreSQL, MongoDB, MySQL, SQL Server & Oracle. It's also possible to read and write to common storage systems such as Google Cloud Storage (GCS).
- Learn more at Change Data Capture for Streaming ETL
Which platforms can stream CDC data into Google BigQuery?
- Open Source solutions Apache Kafka & Apache Flink with Debezium connectors can enable you to stream real-time change data capture into BigQuery.
- Managed solutions such as Confluent allow for easier deployment of a replication service but still requires additional setup, configuration and monitoring, while Streamkap provides a managed platform utilizing Kafka, Debezium, and production-ready connectors, allowing you to start ingesting data within minutes into BigQuery. Streamkap removes the resources, learning curve, and ongoing support required to maintain your platform.
Snapshotting or backfilling refers to the initial load of historical data from the source table to the BigQuery table when you create the pipeline.
Usually, the system will start with the backfill as the initial load and then switch to streaming mode and begin processing the CDC stream. It's also possible to carry out ad-hoc snapshots after the pipeline has been created.
Debezium supports all of this and it's fairly simple to snapshot upon the creation of the pipelines. However, adding tables after is a different, and more involved, process including the need to add a table to the source database, and insert rows into this new table to generate signals to let Debezium know a snapshot is required. With Streamkap, you can add additional tables and trigger snapshots without this more involved process just by adding the table within our app.
Inserts & Upserts
BigQuery supports both Inserts and Upserts and the method you choose will depend upon the data quality, model, freshness, and cost factors.
This is possible via
- INSERTS via Storage Write API.
- MERGE DML statement, which can combine both an INSERT and UPDATE in one atomic operation, i.e UPSERT.
INSERTS will add records for each value and not overwrite/update any other values. The benefit of INSERTS is that they are often cheaper to perform, faster, and allow for point-in-time analysis. If you work with Inserts you will likely need to clean up the older data from time to time.
UPSERTS, as mentioned above, is both an UPDATE+INSERT. If there is a match on a key, the value will be overwritten and if there is no match the event will be inserted.
For example, there is a diagram below showing this in simplified terms.
- Starting with 3 rows here, represented as Green, Blue & Clear
- New rows to process are Blue & Pink
- The resulting table has an updated row for Blue and a new row for Pink.
Running MERGE often as part of streaming, however, can incur significant costs since BigQuery charges upon the volume processed. While the volume being scanned can be reduced by utilizing partitions and clustering, it's still likely to generate significant costs and you would be better off using INSERTS with a periodic schedule to clean up the history.
Handling Large Message Sizes > 10MB
BigQuery has a limit of 10MB for inserting data via the Storage Write API. Some sources such as MongoDB may produce messages larger than this limit and therefore would fail to ingest into BigQuery. In streaming systems, these failed messages will be held in a Dead Letter Queue (DLQ), which you will need to monitor, inspect and decide how to handle the failed messages. Streamkap detects file sizes greater than 10MB and will load these into BigQuery for you via a different method automatically.
When inserting data into BigQuery, you may wish to provide some additional configuration for metadata dimensions to be added to your Bigquery tables, which will help you in the event of diagnosing an issue as well as building out your models further downstream. The following is a useful list:
- SOURCE_TS_MS - This is the timestamp of the row within the source table from the source database. Timestamp in milliseconds.
- OFFSET - This is the record offset in Kafka.
- __DELETED - 0 or 1. Allows you to handle deleted records.
- INSERTED_TS_MS -This is a BigQuery-generated timestamp in milliseconds when the record is inserted in BigQuery. Data may arrive in BigQuery out of sequence at times due to the nature of streaming and distributed systems. If you are using this data to build other models, the INSERTED_TS_MS timestamp is more useful than SOURCE_TS_MS.
The standard approach these days is ELT (extract, load, transform) vs ETL (extract, transform, load). This means that data is loaded into BigQuery and transformations are performed within the database. This is often performed using scheduled SQL queries with a popular product being DBT.
With Streaming, you can maintain the same approach as ELT. For various reasons, however, you may wish to handle transformations within the pipeline and this is becoming increasingly popular again with streaming. Common transformations in streaming pipelines are Aggregations, Filtering and Masking. Stream processing can be carried out with several technologies such as ksqlDB & Flink SQL.
A result of using Inserts as an ingestion method with Change Data Capture (CDC) is that you have a full audit of every change that has happened from the source databases. This can be very useful for point-in-time analysis or understanding how values have changed over time. On the flip side, you may not need this raw data and therefore would like to delete the stale data.
Here is an example script to clean up old records, which you may want to run once daily using BigQuery Scheduled Queries.
Although Upserts may be preferred over Inserts, the cost of using Upserts is significantly greater than using inserts and a scheduled deletion of old records.
Monitoring Streaming Data Pipelines with BigQuery
Due to the nature of distributed systems, monitoring is often the biggest issue for companies and the reason they turn to managed solutions for their production pipelines. Alternatively, if you are looking at self-hosting, then you should consider monitoring tools such as Datadog, Grafana, and AKHQ (Previously known as KafkaHQ). All of these will help you monitor the health of Kafka & Debezium and your streaming pipelines although you will need to customize them to be production ready.
Schema Drift with BigQuery
Schema drift is essential for production pipelines.
- Not all open source destinations include support for schema drift and so it's worth checking before you install them whether it covers each type of schema drift action.
- Streamkap supports the handling of schema drift on all data sources and ensures these schema changes are evolved in the destination.
Which Approach Should I Choose?
The decision to use open source versus managed solutions for building real-time streaming pipelines depends on a variety of factors, including your organization's resources, expertise, and specific use case requirements.
Open source provides the benefit of being adaptable, so you can craft a solution that meets your requirements precisely. Nevertheless, building and managing an open-source data streaming platform takes a lot of continued time, particularly for production and if you need to scale the system.
Managed serverless platforms such as Streamkap provide the benefit of allowing organizations to start streaming operational and transactional data immediately, with enterprise-grade scalability, and fault tolerance while only paying for what you use. If you want to learn more, get in touch with us.