<--- Back to all resources

Tutorials & How-To

May 22, 2025

14 min read

Managing Streaming Pipelines with Terraform

How to define CDC pipelines, connectors, and transforms as Terraform code — with CI/CD integration, drift detection, and secrets management.

Clicking through a UI to configure CDC pipelines works fine when you have three connectors. When you have thirty — across dev, staging, and production — it becomes a liability. Configuration drifts between environments, changes happen without audit trails, and recreating a setup after an incident means screenshots and tribal knowledge.

Terraform solves this by treating your streaming infrastructure the same way you treat your application infrastructure: as code in a repository, reviewed by peers, applied by automation.

This guide walks through defining streaming pipelines in Terraform, setting up CI/CD, handling secrets safely, and detecting drift.

Why Infrastructure as Code for Streaming

Before jumping into HCL, here’s what you get from managing pipelines as code versus a UI:

ConcernUI-BasedTerraform
Audit trailMaybe an activity logGit history with full diffs
Environment parityManual recreationterraform workspace or separate state files
RollbackHope you remember the old configgit revert + terraform apply
Code reviewNot possibleStandard PR workflow
Disaster recoveryRebuild from memoryterraform apply from code
ScalingClick-per-connectorLoops, modules, variables

If you’re already using Terraform for databases, Kubernetes clusters, or cloud resources, adding streaming pipelines to the same workflow is a natural extension. For background on pipeline architecture patterns, see Data Pipeline Architecture.

Project Structure

A typical Terraform project for streaming pipelines:

streaming-infra/
├── environments/
│   ├── dev/
│   │   ├── main.tf
│   │   ├── variables.tf
│   │   ├── terraform.tfvars
│   │   └── backend.tf
│   ├── staging/
│   │   ├── main.tf
│   │   ├── variables.tf
│   │   ├── terraform.tfvars
│   │   └── backend.tf
│   └── production/
│       ├── main.tf
│       ├── variables.tf
│       ├── terraform.tfvars
│       └── backend.tf
├── modules/
│   ├── cdc-pipeline/
│   │   ├── main.tf
│   │   ├── variables.tf
│   │   └── outputs.tf
│   └── streaming-agent/
│       ├── main.tf
│       ├── variables.tf
│       └── outputs.tf
└── .github/
    └── workflows/
        └── terraform.yml

The modules/ directory contains reusable pipeline definitions. Each environment directory contains environment-specific variables and a backend configuration for remote state.

Defining a CDC Pipeline in HCL

Here’s what a CDC pipeline module looks like. The exact resource names depend on your platform’s Terraform provider, but the structure is consistent:

# modules/cdc-pipeline/variables.tf

variable "pipeline_name" {
  description = "Name for this CDC pipeline"
  type        = string
}

variable "source_type" {
  description = "Source database type (postgresql, mysql, mongodb)"
  type        = string
}

variable "source_config" {
  description = "Source connection configuration"
  type = object({
    hostname = string
    port     = number
    database = string
    username = string
    password = string  # Injected from secrets manager
    tables   = list(string)
  })
  sensitive = true
}

variable "destination_type" {
  description = "Destination type (snowflake, bigquery, clickhouse, kafka)"
  type        = string
}

variable "destination_config" {
  description = "Destination connection configuration"
  type        = map(string)
  sensitive   = true
}

variable "transforms" {
  description = "Optional transforms to apply"
  type = list(object({
    type   = string
    config = map(string)
  }))
  default = []
}
# modules/cdc-pipeline/main.tf

terraform {
  required_providers {
    streamingplatform = {
      source  = "registry.terraform.io/your-provider"
      version = "~> 1.0"
    }
  }
}

resource "streamingplatform_source" "this" {
  name = "${var.pipeline_name}-source"
  type = var.source_type

  config = {
    hostname = var.source_config.hostname
    port     = var.source_config.port
    database = var.source_config.database
    username = var.source_config.username
    password = var.source_config.password
    tables   = join(",", var.source_config.tables)

    # CDC-specific settings
    snapshot_mode     = "initial"
    slot_name         = "${var.pipeline_name}_slot"
    publication_name  = "${var.pipeline_name}_pub"
  }
}

resource "streamingplatform_destination" "this" {
  name = "${var.pipeline_name}-destination"
  type = var.destination_type

  config = var.destination_config
}

resource "streamingplatform_pipeline" "this" {
  name        = var.pipeline_name
  source_id   = streamingplatform_source.this.id
  destination_id = streamingplatform_destination.this.id

  dynamic "transform" {
    for_each = var.transforms
    content {
      type   = transform.value.type
      config = transform.value.config
    }
  }
}

Then in your environment file:

# environments/production/main.tf

module "orders_pipeline" {
  source = "../../modules/cdc-pipeline"

  pipeline_name    = "orders-to-snowflake"
  source_type      = "postgresql"
  destination_type = "snowflake"

  source_config = {
    hostname = "orders-db.internal.example.com"
    port     = 5432
    database = "orders"
    username = data.aws_secretsmanager_secret_version.orders_db.secret_string["username"]
    password = data.aws_secretsmanager_secret_version.orders_db.secret_string["password"]
    tables   = ["public.orders", "public.order_items", "public.customers"]
  }

  destination_config = {
    account   = "xy12345.us-east-1"
    database  = "ANALYTICS"
    schema    = "RAW"
    warehouse = "LOADING_WH"
    role      = "LOADER_ROLE"
    username  = data.aws_secretsmanager_secret_version.snowflake.secret_string["username"]
    password  = data.aws_secretsmanager_secret_version.snowflake.secret_string["password"]
  }

  transforms = [
    {
      type = "mask"
      config = {
        fields = "customers.email,customers.phone"
        method = "hash"
      }
    }
  ]
}

This defines a complete CDC pipeline: PostgreSQL source streaming changes to Snowflake, with PII masking applied to customer data. For more on masking patterns, see Data Masking in Streaming Pipelines.

Managing Secrets

Never put credentials in .tfvars files or commit them to Git. Here are the standard approaches:

AWS Secrets Manager

data "aws_secretsmanager_secret_version" "orders_db" {
  secret_id = "streaming/orders-db-credentials"
}

locals {
  db_creds = jsondecode(data.aws_secretsmanager_secret_version.orders_db.secret_string)
}

# Use in source config:
# username = local.db_creds["username"]
# password = local.db_creds["password"]

HashiCorp Vault

data "vault_generic_secret" "orders_db" {
  path = "secret/streaming/orders-db"
}

# Use: data.vault_generic_secret.orders_db.data["password"]

Environment Variables

For simpler setups, use TF_VAR_ prefixed environment variables:

export TF_VAR_db_password="$(aws secretsmanager get-secret-value \
  --secret-id streaming/orders-db \
  --query SecretString --output text | jq -r .password)"

terraform apply

Whichever approach you use, mark sensitive variables and outputs:

variable "db_password" {
  type      = string
  sensitive = true  # Prevents display in plan/apply output
}

output "pipeline_id" {
  value = module.orders_pipeline.pipeline_id
  # Not sensitive — safe to display
}

CI/CD Integration

The standard pattern: terraform plan on pull requests, terraform apply on merge to main.

# .github/workflows/terraform.yml

name: Streaming Infrastructure

on:
  pull_request:
    paths:
      - 'streaming-infra/**'
  push:
    branches: [main]
    paths:
      - 'streaming-infra/**'

env:
  TF_VERSION: '1.7.0'
  AWS_REGION: 'us-east-1'

jobs:
  plan:
    if: github.event_name == 'pull_request'
    runs-on: ubuntu-latest
    strategy:
      matrix:
        environment: [dev, staging, production]
    steps:
      - uses: actions/checkout@v4

      - uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TF_VERSION }}

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
          aws-region: ${{ env.AWS_REGION }}

      - name: Terraform Init
        working-directory: streaming-infra/environments/${{ matrix.environment }}
        run: terraform init -input=false

      - name: Terraform Plan
        working-directory: streaming-infra/environments/${{ matrix.environment }}
        run: terraform plan -input=false -no-color -out=tfplan

      - name: Comment plan on PR
        uses: actions/github-script@v7
        with:
          script: |
            const plan = require('fs').readFileSync(
              'streaming-infra/environments/${{ matrix.environment }}/tfplan.txt', 'utf8'
            );
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: `### Plan for \`${{ matrix.environment }}\`\n\`\`\`\n${plan}\n\`\`\``
            });

  apply:
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    strategy:
      matrix:
        environment: [dev, staging, production]
      max-parallel: 1  # Apply sequentially: dev → staging → production
    steps:
      - uses: actions/checkout@v4

      - uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TF_VERSION }}

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
          aws-region: ${{ env.AWS_REGION }}

      - name: Terraform Init & Apply
        working-directory: streaming-infra/environments/${{ matrix.environment }}
        run: |
          terraform init -input=false
          terraform apply -input=false -auto-approve

Key points:

  • max-parallel: 1 ensures environments are updated sequentially. If dev fails, staging and production don’t get applied.
  • Plans are posted as PR comments so reviewers can see exactly what will change.
  • Credentials use OIDC role assumption, not long-lived access keys.

Drift Detection

Configuration drift happens when someone changes a pipeline through the UI or API outside of Terraform. This is the streaming equivalent of SSH-ing into a server and editing config files.

Set up a scheduled workflow to detect drift:

# .github/workflows/drift-detection.yml

name: Drift Detection

on:
  schedule:
    - cron: '0 8 * * 1-5'  # Weekdays at 8 AM

jobs:
  detect-drift:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        environment: [dev, staging, production]
    steps:
      - uses: actions/checkout@v4
      - uses: hashicorp/setup-terraform@v3

      - name: Terraform Plan (Drift Check)
        id: plan
        working-directory: streaming-infra/environments/${{ matrix.environment }}
        run: |
          terraform init -input=false
          terraform plan -input=false -detailed-exitcode -no-color 2>&1 | tee plan.txt
        continue-on-error: true

      - name: Alert on drift
        if: steps.plan.outputs.exitcode == '2'
        run: |
          # Exit code 2 = changes detected (drift)
          # Send to Slack, PagerDuty, etc.
          curl -X POST "${{ secrets.SLACK_WEBHOOK }}" \
            -H 'Content-Type: application/json' \
            -d "{\"text\": \"Drift detected in ${{ matrix.environment }} streaming infra\"}"

Terraform’s -detailed-exitcode flag returns exit code 2 when the plan detects differences between code and reality. This is your signal that someone made an out-of-band change.

State Management Considerations

Streaming resources are different from typical cloud infrastructure in one important way: recreating them has side effects. If Terraform destroys and recreates a CDC source connector, it may:

  • Drop and recreate the replication slot, triggering a full initial snapshot
  • Lose track of the last-read position in the source database’s transaction log
  • Cause downstream consumers to see duplicate data during re-sync

Protect against accidental recreation:

resource "streamingplatform_source" "this" {
  # ...

  lifecycle {
    prevent_destroy = true  # Require explicit removal from state
  }
}

For state backend configuration, always use remote state with locking:

# environments/production/backend.tf

terraform {
  backend "s3" {
    bucket         = "mycompany-terraform-state"
    key            = "streaming/production/terraform.tfstate"
    region         = "us-east-1"
    dynamodb_table = "terraform-state-lock"
    encrypt        = true
  }
}

The DynamoDB lock table prevents two engineers (or two CI runs) from applying changes simultaneously. For more on managing streaming infrastructure, see Terraform for Data Pipelines.

Scaling with Modules and Loops

When you have many similar pipelines, use for_each to avoid repetition:

locals {
  pipelines = {
    orders = {
      source_tables = ["public.orders", "public.order_items"]
      destination_schema = "ORDERS_RAW"
    }
    customers = {
      source_tables = ["public.customers", "public.addresses"]
      destination_schema = "CUSTOMERS_RAW"
    }
    inventory = {
      source_tables = ["public.products", "public.stock_levels"]
      destination_schema = "INVENTORY_RAW"
    }
  }
}

module "cdc_pipeline" {
  for_each = local.pipelines
  source   = "../../modules/cdc-pipeline"

  pipeline_name    = "${each.key}-to-snowflake"
  source_type      = "postgresql"
  destination_type = "snowflake"

  source_config = {
    hostname = var.source_db_host
    port     = 5432
    database = each.key
    username = local.db_creds["username"]
    password = local.db_creds["password"]
    tables   = each.value.source_tables
  }

  destination_config = {
    account   = var.snowflake_account
    database  = "ANALYTICS"
    schema    = each.value.destination_schema
    warehouse = "LOADING_WH"
    role      = "LOADER_ROLE"
    username  = local.snowflake_creds["username"]
    password  = local.snowflake_creds["password"]
  }
}

Three pipelines defined in a few lines, all sharing the same module, credentials, and destination account. Adding a fourth is a three-line change.

Adopting Terraform Incrementally

You don’t need to migrate everything at once. A practical adoption path:

  1. Start with terraform import: Import your existing pipelines into Terraform state without changing them
  2. Write the HCL to match: Define resources that reflect current production state
  3. Verify with terraform plan: Should show no changes if your code matches reality
  4. New pipelines in Terraform only: Enforce the policy that all new pipelines go through code
  5. Migrate existing pipelines gradually: As you modify old pipelines, move them into Terraform

This approach avoids the risk of a big-bang migration while building team familiarity with the workflow.

Picking the Right Level of Automation

Not every team needs the full CI/CD setup on day one. Start where the pain is:

  • Just version control: Store your pipeline configs in Git, apply manually. You get audit trails and rollback.
  • Plan on PR: Add the GitHub Actions plan step. Reviewers can see what will change before approving.
  • Full CI/CD: Automated apply on merge. Best for teams with multiple environments and frequent pipeline changes.

The point isn’t to follow a specific workflow — it’s to stop treating pipeline configuration as ephemeral UI state and start treating it as durable, reviewable code.


Ready to manage your streaming pipelines as code? Streamkap’s platform supports API-driven configuration, making it straightforward to define and deploy CDC pipelines through Terraform or any infrastructure-as-code tool. Start a free trial or learn more about the platform.