<--- Back to all resources
Managing Streaming Pipelines with Terraform
How to define CDC pipelines, connectors, and transforms as Terraform code — with CI/CD integration, drift detection, and secrets management.
Clicking through a UI to configure CDC pipelines works fine when you have three connectors. When you have thirty — across dev, staging, and production — it becomes a liability. Configuration drifts between environments, changes happen without audit trails, and recreating a setup after an incident means screenshots and tribal knowledge.
Terraform solves this by treating your streaming infrastructure the same way you treat your application infrastructure: as code in a repository, reviewed by peers, applied by automation.
This guide walks through defining streaming pipelines in Terraform, setting up CI/CD, handling secrets safely, and detecting drift.
Why Infrastructure as Code for Streaming
Before jumping into HCL, here’s what you get from managing pipelines as code versus a UI:
| Concern | UI-Based | Terraform |
|---|---|---|
| Audit trail | Maybe an activity log | Git history with full diffs |
| Environment parity | Manual recreation | terraform workspace or separate state files |
| Rollback | Hope you remember the old config | git revert + terraform apply |
| Code review | Not possible | Standard PR workflow |
| Disaster recovery | Rebuild from memory | terraform apply from code |
| Scaling | Click-per-connector | Loops, modules, variables |
If you’re already using Terraform for databases, Kubernetes clusters, or cloud resources, adding streaming pipelines to the same workflow is a natural extension. For background on pipeline architecture patterns, see Data Pipeline Architecture.
Project Structure
A typical Terraform project for streaming pipelines:
streaming-infra/
├── environments/
│ ├── dev/
│ │ ├── main.tf
│ │ ├── variables.tf
│ │ ├── terraform.tfvars
│ │ └── backend.tf
│ ├── staging/
│ │ ├── main.tf
│ │ ├── variables.tf
│ │ ├── terraform.tfvars
│ │ └── backend.tf
│ └── production/
│ ├── main.tf
│ ├── variables.tf
│ ├── terraform.tfvars
│ └── backend.tf
├── modules/
│ ├── cdc-pipeline/
│ │ ├── main.tf
│ │ ├── variables.tf
│ │ └── outputs.tf
│ └── streaming-agent/
│ ├── main.tf
│ ├── variables.tf
│ └── outputs.tf
└── .github/
└── workflows/
└── terraform.yml
The modules/ directory contains reusable pipeline definitions. Each environment directory contains environment-specific variables and a backend configuration for remote state.
Defining a CDC Pipeline in HCL
Here’s what a CDC pipeline module looks like. The exact resource names depend on your platform’s Terraform provider, but the structure is consistent:
# modules/cdc-pipeline/variables.tf
variable "pipeline_name" {
description = "Name for this CDC pipeline"
type = string
}
variable "source_type" {
description = "Source database type (postgresql, mysql, mongodb)"
type = string
}
variable "source_config" {
description = "Source connection configuration"
type = object({
hostname = string
port = number
database = string
username = string
password = string # Injected from secrets manager
tables = list(string)
})
sensitive = true
}
variable "destination_type" {
description = "Destination type (snowflake, bigquery, clickhouse, kafka)"
type = string
}
variable "destination_config" {
description = "Destination connection configuration"
type = map(string)
sensitive = true
}
variable "transforms" {
description = "Optional transforms to apply"
type = list(object({
type = string
config = map(string)
}))
default = []
}
# modules/cdc-pipeline/main.tf
terraform {
required_providers {
streamingplatform = {
source = "registry.terraform.io/your-provider"
version = "~> 1.0"
}
}
}
resource "streamingplatform_source" "this" {
name = "${var.pipeline_name}-source"
type = var.source_type
config = {
hostname = var.source_config.hostname
port = var.source_config.port
database = var.source_config.database
username = var.source_config.username
password = var.source_config.password
tables = join(",", var.source_config.tables)
# CDC-specific settings
snapshot_mode = "initial"
slot_name = "${var.pipeline_name}_slot"
publication_name = "${var.pipeline_name}_pub"
}
}
resource "streamingplatform_destination" "this" {
name = "${var.pipeline_name}-destination"
type = var.destination_type
config = var.destination_config
}
resource "streamingplatform_pipeline" "this" {
name = var.pipeline_name
source_id = streamingplatform_source.this.id
destination_id = streamingplatform_destination.this.id
dynamic "transform" {
for_each = var.transforms
content {
type = transform.value.type
config = transform.value.config
}
}
}
Then in your environment file:
# environments/production/main.tf
module "orders_pipeline" {
source = "../../modules/cdc-pipeline"
pipeline_name = "orders-to-snowflake"
source_type = "postgresql"
destination_type = "snowflake"
source_config = {
hostname = "orders-db.internal.example.com"
port = 5432
database = "orders"
username = data.aws_secretsmanager_secret_version.orders_db.secret_string["username"]
password = data.aws_secretsmanager_secret_version.orders_db.secret_string["password"]
tables = ["public.orders", "public.order_items", "public.customers"]
}
destination_config = {
account = "xy12345.us-east-1"
database = "ANALYTICS"
schema = "RAW"
warehouse = "LOADING_WH"
role = "LOADER_ROLE"
username = data.aws_secretsmanager_secret_version.snowflake.secret_string["username"]
password = data.aws_secretsmanager_secret_version.snowflake.secret_string["password"]
}
transforms = [
{
type = "mask"
config = {
fields = "customers.email,customers.phone"
method = "hash"
}
}
]
}
This defines a complete CDC pipeline: PostgreSQL source streaming changes to Snowflake, with PII masking applied to customer data. For more on masking patterns, see Data Masking in Streaming Pipelines.
Managing Secrets
Never put credentials in .tfvars files or commit them to Git. Here are the standard approaches:
AWS Secrets Manager
data "aws_secretsmanager_secret_version" "orders_db" {
secret_id = "streaming/orders-db-credentials"
}
locals {
db_creds = jsondecode(data.aws_secretsmanager_secret_version.orders_db.secret_string)
}
# Use in source config:
# username = local.db_creds["username"]
# password = local.db_creds["password"]
HashiCorp Vault
data "vault_generic_secret" "orders_db" {
path = "secret/streaming/orders-db"
}
# Use: data.vault_generic_secret.orders_db.data["password"]
Environment Variables
For simpler setups, use TF_VAR_ prefixed environment variables:
export TF_VAR_db_password="$(aws secretsmanager get-secret-value \
--secret-id streaming/orders-db \
--query SecretString --output text | jq -r .password)"
terraform apply
Whichever approach you use, mark sensitive variables and outputs:
variable "db_password" {
type = string
sensitive = true # Prevents display in plan/apply output
}
output "pipeline_id" {
value = module.orders_pipeline.pipeline_id
# Not sensitive — safe to display
}
CI/CD Integration
The standard pattern: terraform plan on pull requests, terraform apply on merge to main.
# .github/workflows/terraform.yml
name: Streaming Infrastructure
on:
pull_request:
paths:
- 'streaming-infra/**'
push:
branches: [main]
paths:
- 'streaming-infra/**'
env:
TF_VERSION: '1.7.0'
AWS_REGION: 'us-east-1'
jobs:
plan:
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
strategy:
matrix:
environment: [dev, staging, production]
steps:
- uses: actions/checkout@v4
- uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.TF_VERSION }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
aws-region: ${{ env.AWS_REGION }}
- name: Terraform Init
working-directory: streaming-infra/environments/${{ matrix.environment }}
run: terraform init -input=false
- name: Terraform Plan
working-directory: streaming-infra/environments/${{ matrix.environment }}
run: terraform plan -input=false -no-color -out=tfplan
- name: Comment plan on PR
uses: actions/github-script@v7
with:
script: |
const plan = require('fs').readFileSync(
'streaming-infra/environments/${{ matrix.environment }}/tfplan.txt', 'utf8'
);
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `### Plan for \`${{ matrix.environment }}\`\n\`\`\`\n${plan}\n\`\`\``
});
apply:
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
strategy:
matrix:
environment: [dev, staging, production]
max-parallel: 1 # Apply sequentially: dev → staging → production
steps:
- uses: actions/checkout@v4
- uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.TF_VERSION }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
aws-region: ${{ env.AWS_REGION }}
- name: Terraform Init & Apply
working-directory: streaming-infra/environments/${{ matrix.environment }}
run: |
terraform init -input=false
terraform apply -input=false -auto-approve
Key points:
max-parallel: 1ensures environments are updated sequentially. If dev fails, staging and production don’t get applied.- Plans are posted as PR comments so reviewers can see exactly what will change.
- Credentials use OIDC role assumption, not long-lived access keys.
Drift Detection
Configuration drift happens when someone changes a pipeline through the UI or API outside of Terraform. This is the streaming equivalent of SSH-ing into a server and editing config files.
Set up a scheduled workflow to detect drift:
# .github/workflows/drift-detection.yml
name: Drift Detection
on:
schedule:
- cron: '0 8 * * 1-5' # Weekdays at 8 AM
jobs:
detect-drift:
runs-on: ubuntu-latest
strategy:
matrix:
environment: [dev, staging, production]
steps:
- uses: actions/checkout@v4
- uses: hashicorp/setup-terraform@v3
- name: Terraform Plan (Drift Check)
id: plan
working-directory: streaming-infra/environments/${{ matrix.environment }}
run: |
terraform init -input=false
terraform plan -input=false -detailed-exitcode -no-color 2>&1 | tee plan.txt
continue-on-error: true
- name: Alert on drift
if: steps.plan.outputs.exitcode == '2'
run: |
# Exit code 2 = changes detected (drift)
# Send to Slack, PagerDuty, etc.
curl -X POST "${{ secrets.SLACK_WEBHOOK }}" \
-H 'Content-Type: application/json' \
-d "{\"text\": \"Drift detected in ${{ matrix.environment }} streaming infra\"}"
Terraform’s -detailed-exitcode flag returns exit code 2 when the plan detects differences between code and reality. This is your signal that someone made an out-of-band change.
State Management Considerations
Streaming resources are different from typical cloud infrastructure in one important way: recreating them has side effects. If Terraform destroys and recreates a CDC source connector, it may:
- Drop and recreate the replication slot, triggering a full initial snapshot
- Lose track of the last-read position in the source database’s transaction log
- Cause downstream consumers to see duplicate data during re-sync
Protect against accidental recreation:
resource "streamingplatform_source" "this" {
# ...
lifecycle {
prevent_destroy = true # Require explicit removal from state
}
}
For state backend configuration, always use remote state with locking:
# environments/production/backend.tf
terraform {
backend "s3" {
bucket = "mycompany-terraform-state"
key = "streaming/production/terraform.tfstate"
region = "us-east-1"
dynamodb_table = "terraform-state-lock"
encrypt = true
}
}
The DynamoDB lock table prevents two engineers (or two CI runs) from applying changes simultaneously. For more on managing streaming infrastructure, see Terraform for Data Pipelines.
Scaling with Modules and Loops
When you have many similar pipelines, use for_each to avoid repetition:
locals {
pipelines = {
orders = {
source_tables = ["public.orders", "public.order_items"]
destination_schema = "ORDERS_RAW"
}
customers = {
source_tables = ["public.customers", "public.addresses"]
destination_schema = "CUSTOMERS_RAW"
}
inventory = {
source_tables = ["public.products", "public.stock_levels"]
destination_schema = "INVENTORY_RAW"
}
}
}
module "cdc_pipeline" {
for_each = local.pipelines
source = "../../modules/cdc-pipeline"
pipeline_name = "${each.key}-to-snowflake"
source_type = "postgresql"
destination_type = "snowflake"
source_config = {
hostname = var.source_db_host
port = 5432
database = each.key
username = local.db_creds["username"]
password = local.db_creds["password"]
tables = each.value.source_tables
}
destination_config = {
account = var.snowflake_account
database = "ANALYTICS"
schema = each.value.destination_schema
warehouse = "LOADING_WH"
role = "LOADER_ROLE"
username = local.snowflake_creds["username"]
password = local.snowflake_creds["password"]
}
}
Three pipelines defined in a few lines, all sharing the same module, credentials, and destination account. Adding a fourth is a three-line change.
Adopting Terraform Incrementally
You don’t need to migrate everything at once. A practical adoption path:
- Start with
terraform import: Import your existing pipelines into Terraform state without changing them - Write the HCL to match: Define resources that reflect current production state
- Verify with
terraform plan: Should show no changes if your code matches reality - New pipelines in Terraform only: Enforce the policy that all new pipelines go through code
- Migrate existing pipelines gradually: As you modify old pipelines, move them into Terraform
This approach avoids the risk of a big-bang migration while building team familiarity with the workflow.
Picking the Right Level of Automation
Not every team needs the full CI/CD setup on day one. Start where the pain is:
- Just version control: Store your pipeline configs in Git, apply manually. You get audit trails and rollback.
- Plan on PR: Add the GitHub Actions plan step. Reviewers can see what will change before approving.
- Full CI/CD: Automated apply on merge. Best for teams with multiple environments and frequent pipeline changes.
The point isn’t to follow a specific workflow — it’s to stop treating pipeline configuration as ephemeral UI state and start treating it as durable, reviewable code.
Ready to manage your streaming pipelines as code? Streamkap’s platform supports API-driven configuration, making it straightforward to define and deploy CDC pipelines through Terraform or any infrastructure-as-code tool. Start a free trial or learn more about the platform.