<--- Back to all resources

AI & Agents

May 22, 2025

12 min read

Building a Real-Time Embedding Pipeline with CDC and Vector Stores

Learn how to build a streaming embedding pipeline that captures database changes, generates embeddings, and upserts to vector stores in real time.

Most teams build their first vector search prototype with a batch script: dump a table, run it through an embedding model, load the results into Pinecone or Weaviate. It works for demos. It falls apart in production.

The problem is staleness. If a customer updates their profile, a product description changes, or a support ticket gets resolved, your vector store doesn’t reflect that until the next batch run. For retrieval-augmented generation (RAG) and agent workflows, stale embeddings mean wrong answers.

A streaming embedding pipeline fixes this by connecting change data capture to your embedding model and vector store in a continuous flow. Every insert, update, and delete propagates through the pipeline automatically.

Architecture Overview

The pipeline has four stages:

  1. CDC capture — Stream row-level changes from your source database
  2. Transform and prepare — Extract the text fields to embed, concatenate them, and handle formatting
  3. Embed — Call an embedding model (API or local) to generate vectors
  4. Upsert/delete — Write the vector to your store, or remove it if the source row was deleted
PostgreSQL/MySQL → CDC Stream → Text Extraction → Embedding API → Vector Store
                                                                    (Pinecone,
                                                                     Weaviate,
                                                                     Qdrant,
                                                                     pgvector)

Each stage needs to handle the three CDC event types differently:

  • INSERT — Generate embedding, upsert to vector store with the row’s primary key as the vector ID
  • UPDATE — Re-generate embedding for the changed row, upsert (overwrite) in vector store
  • DELETE — Remove the vector by ID from the store

Extracting Text from CDC Events

A CDC event contains the full row (or the changed fields, depending on configuration). You need to decide which fields to embed.

For a product catalog, you might concatenate name, description, and category:

def extract_text(event: dict) -> str:
    """Build the text string to embed from a CDC event."""
    row = event["after"]  # post-change row state

    parts = []
    if row.get("name"):
        parts.append(f"Product: {row['name']}")
    if row.get("category"):
        parts.append(f"Category: {row['category']}")
    if row.get("description"):
        parts.append(row["description"])

    return "\n".join(parts)

Keep the text format consistent. If you embed "Product: Widget\nCategory: Hardware\nA small metal widget" during initial load but "Widget - Hardware - A small metal widget" during streaming updates, your search quality will degrade because the same content produces different embeddings.

Generating Embeddings

API-Based Models (OpenAI, Cohere, Voyage)

API models are the fastest path to production. Here’s a batched embedding function:

import openai
from typing import List

client = openai.OpenAI()

def embed_batch(texts: List[str], model: str = "text-embedding-3-small") -> List[List[float]]:
    """Generate embeddings for a batch of texts."""
    response = client.embeddings.create(
        input=texts,
        model=model
    )
    return [item.embedding for item in response.data]

Local Models (Sentence Transformers)

For higher throughput or data privacy requirements, run a local model:

from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")  # 384 dimensions

def embed_batch_local(texts: List[str]) -> List[List[float]]:
    """Generate embeddings using a local model."""
    return model.encode(texts, normalize_embeddings=True).tolist()

Local models eliminate per-token costs and API latency, but you need GPU infrastructure. A single NVIDIA T4 can process roughly 500-1000 embeddings per second with all-MiniLM-L6-v2.

Model Selection Tradeoffs

ModelDimensionsSpeedCostQuality
OpenAI text-embedding-3-small1536~500/s (API)$0.02/1M tokensGood
OpenAI text-embedding-3-large3072~300/s (API)$0.13/1M tokensBetter
Cohere embed-english-v31024~400/s (API)$0.10/1M tokensBetter
all-MiniLM-L6-v2 (local)384~800/s (T4 GPU)GPU cost onlyAcceptable
BGE-large-en-v1.5 (local)1024~200/s (T4 GPU)GPU cost onlyGood

For most production workloads, start with an API model and switch to local once you understand your volume and latency requirements.

Batching for API Rate Limits

Embedding APIs have rate limits. OpenAI allows 3,000 requests per minute on the standard tier. Since each request can contain multiple texts (up to about 8,000 tokens total), batching is essential.

import time
from collections import deque

class BatchEmbedder:
    def __init__(self, batch_size: int = 64, flush_interval_ms: int = 500):
        self.batch_size = batch_size
        self.flush_interval_ms = flush_interval_ms
        self.buffer: deque = deque()
        self.last_flush = time.monotonic()

    def add(self, event_id: str, text: str):
        self.buffer.append((event_id, text))
        if self.should_flush():
            return self.flush()
        return []

    def should_flush(self) -> bool:
        if len(self.buffer) >= self.batch_size:
            return True
        elapsed_ms = (time.monotonic() - self.last_flush) * 1000
        return elapsed_ms >= self.flush_interval_ms and len(self.buffer) > 0

    def flush(self) -> list:
        batch = []
        while self.buffer and len(batch) < self.batch_size:
            batch.append(self.buffer.popleft())

        ids = [item[0] for item in batch]
        texts = [item[1] for item in batch]
        embeddings = embed_batch(texts)

        self.last_flush = time.monotonic()
        return list(zip(ids, embeddings))

The two flush triggers — batch size and time interval — balance throughput and latency. During high-change periods (bulk imports, migrations), the size trigger dominates. During quiet periods, the time trigger ensures changes don’t sit in the buffer too long.

Upserting to Vector Stores

The upsert logic needs to handle all three event types. Here’s pseudocode that works across vector store APIs:

def process_cdc_event(event: dict, embedder: BatchEmbedder, vector_store):
    operation = event["op"]  # "c" (create), "u" (update), "d" (delete)
    row_id = str(event["after"]["id"] if event["after"] else event["before"]["id"])

    if operation == "d":
        # Delete: remove from vector store immediately
        vector_store.delete(ids=[row_id])
        return

    # Insert or Update: extract text, embed, upsert
    text = extract_text(event)

    # Include metadata for filtered search
    metadata = {
        "source_table": event["source"]["table"],
        "updated_at": event["ts_ms"],
        "category": event["after"].get("category", ""),
    }

    results = embedder.add(row_id, text)
    for vec_id, embedding in results:
        vector_store.upsert(
            id=vec_id,
            vector=embedding,
            metadata=metadata
        )

Store metadata alongside vectors. You’ll need it for filtered queries (e.g., “search only products in the Electronics category”) and for debugging which source record a vector corresponds to.

Handling Updates: When to Re-Embed

Not every column change requires re-embedding. If a product’s price changes but its name and description stay the same, re-embedding wastes money and API quota.

Track which columns affect the embedding text:

EMBEDDING_COLUMNS = {"name", "description", "category"}

def needs_re_embedding(event: dict) -> bool:
    """Check if the changed columns affect the embedding."""
    if event["op"] == "c":  # new row always needs embedding
        return True
    if event["op"] == "d":  # deletes don't need embedding
        return False

    before = event.get("before", {})
    after = event.get("after", {})

    for col in EMBEDDING_COLUMNS:
        if before.get(col) != after.get(col):
            return True
    return False

This optimization can cut embedding costs by 50-80% depending on your update patterns, since most row changes affect non-text columns like timestamps, counters, or status flags.

Cost Estimation at Scale

Here’s a realistic cost model for a streaming embedding pipeline:

Assumptions:

  • 500K rows in the source table (initial load)
  • 50K row changes per day (ongoing)
  • Average 400 tokens per row
  • Using OpenAI text-embedding-3-small ($0.02/1M tokens)
  • 30% of updates affect embedding-relevant columns

Initial load:

500,000 rows × 400 tokens = 200M tokens
200M tokens × $0.02/1M = $4.00

Daily ongoing:

50,000 changes × 30% needing re-embed = 15,000 embeddings
15,000 × 400 tokens = 6M tokens
6M tokens × $0.02/1M = $0.12/day = $3.60/month

Vector store (Pinecone Serverless):

500K vectors × 1536 dimensions ≈ 3GB storage
~$10/month at low query volume

Total: ~$14/month after the $4 initial load. Compare this to running a nightly batch job on a GPU instance ($50-200/month) that produces stale results.

Handling Initial Load vs Streaming

Your pipeline needs two modes: initial backfill (embed all existing rows) and streaming (embed only changes). The backfill path should:

  1. Snapshot the table at a point in time
  2. Process rows in large batches (256-512 per API call)
  3. Use parallel workers to saturate your API rate limit
  4. Record the snapshot position (LSN for PostgreSQL, binlog position for MySQL)
  5. Switch to streaming from that position with no gaps

This is the same pattern described in CDC snapshot strategies, applied to embeddings specifically.

Error Handling and Retries

Embedding APIs fail. Rate limits hit. Vector store writes time out. Your pipeline needs to handle all of these without losing events.

Key patterns:

  • Retry with exponential backoff for transient API errors (429, 500, 503)
  • Dead letter queue for events that fail after max retries — don’t block the pipeline
  • Idempotent upserts — vector stores naturally support this since upsert overwrites by ID
  • Checkpointing — track which CDC events have been fully processed (embedded and upserted) so you can resume after crashes

For a deeper look at exactly-once processing patterns, see idempotency in streaming pipelines.

Keeping Embeddings Consistent Across Model Changes

When you upgrade your embedding model, every vector in the store becomes incompatible. You can’t mix embeddings from different models — cosine similarity between them is meaningless.

The safest approach:

  1. Create a new vector collection/index
  2. Run backfill with the new model into the new collection
  3. Run streaming updates in parallel to both old and new collections
  4. Switch reads to the new collection
  5. Delete the old collection

This is essentially a blue-green deployment for your vector store. It adds temporary storage cost but avoids any search quality degradation during the transition.

Production Checklist

Before going live, verify:

  • Delete handling — Deletes in the source DB remove vectors from the store
  • Column filtering — Only embedding-relevant column changes trigger re-embedding
  • Batch flushing — Buffer flushes on both size and time triggers
  • Rate limit handling — Backoff on 429 responses from embedding API
  • Monitoring — Track embedding latency, vector store lag, API error rates
  • Cost alerts — Set budget alerts on your embedding API account
  • Initial load tested — Backfill completes cleanly and streaming picks up without gaps

Building Streaming Embedding Pipelines That Scale

The shift from batch to streaming embeddings mirrors the broader shift from batch to streaming data processing. The architecture is more complex, but the payoff — vector stores that stay current with your source data — is what makes RAG pipelines actually reliable.

Start with a single table and an API embedding model. Measure your change volume, embedding costs, and end-to-end latency. Then expand to more tables and consider local models as your volume grows.


Ready to stream database changes into your embedding pipeline? Streamkap captures row-level changes from PostgreSQL, MySQL, and MongoDB and delivers them to your embedding infrastructure with sub-second latency. Start a free trial or learn more about real-time data for AI.