<--- Back to all resources

AI & Agents

May 22, 2025

12 min read

Streaming CDC Events to Vector Databases for Real-Time AI

How to build a CDC pipeline to vector databases — covering embedding generation, incremental updates, delete handling, chunking strategies, and metadata filtering.

Vector databases power similarity search, recommendation engines, and retrieval-augmented generation (RAG). But there is a gap most teams overlook: the data in the vector database gets stale the moment the source database changes.

If you load vectors in a nightly batch job, your search results, recommendations, and AI agent responses are up to 24 hours behind your actual data. For applications where accuracy matters — customer support bots, product search, fraud detection — this staleness directly affects quality.

The solution is straightforward: use CDC to stream database changes to your vector database in real time. Every insert, update, and delete in the source triggers an embedding update in the vector store. This article covers the architecture, implementation details, and trade-offs.

Why Batch Embedding Falls Short

The typical batch approach:

  1. Run a scheduled job (daily, hourly)
  2. Read all rows from the source database (or rows changed since last run)
  3. Generate embeddings for each row
  4. Upsert embeddings into the vector database

This works for initial loads but has problems at scale:

Staleness: Between batch runs, the vector database does not reflect reality. A product marked as discontinued is still returned in similarity search. A corrected support article still has the old, incorrect embedding.

Waste: Re-embedding unchanged rows is expensive. Embedding API costs scale linearly with the number of tokens processed. If only 1% of rows changed, you are paying 100x more than necessary.

Database load: Full-table scans for the batch export put load on your production database during the extraction phase.

Complexity of change detection: Tracking “what changed since last run” requires reliable timestamps, which not all tables have. Missing a change means the vector store drifts from the source.

CDC eliminates all of these by streaming each change as it happens, so you only process what actually changed.

Architecture

Source DB ──> CDC Engine ──> Kafka ──> Embedding Service ──> Vector DB

                                     Embedding API
                                     (OpenAI, Cohere,
                                      local model)

Component Responsibilities

CDC engine: Captures every INSERT, UPDATE, and DELETE from the source database’s transaction log and publishes structured change events to Kafka.

Kafka: Buffers and orders change events. Provides exactly-once or at-least-once delivery guarantees to the downstream consumer. If the embedding service is slow or down, events queue up and are processed when it recovers.

Embedding service: Consumes CDC events, decides whether to generate a new embedding, calls the embedding model, and writes the result to the vector database. This is where most of the logic lives.

Vector database: Stores embeddings with metadata and serves similarity search queries.

Deciding What to Embed

Not every column in a database row should be embedded. Embeddings capture semantic meaning from text — they are not useful for numeric IDs, timestamps, or boolean flags.

Column Classification

For each table, classify columns into three categories:

CategoryExamplesTreatment
Embedname, description, content, titleConcatenate and pass to embedding model
Metadataprice, category, status, created_atStore as vector metadata for filtering
Ignoreinternal_id, audit columns, binary dataDo not send to vector DB

Example: Product Catalog

def prepare_for_embedding(cdc_event):
    row = cdc_event['after']

    # Text to embed — only semantically meaningful columns
    text_to_embed = f"{row['name']}. {row['description']}"
    if row.get('features'):
        text_to_embed += f". Features: {row['features']}"

    # Metadata for filtering (not embedded)
    metadata = {
        'product_id': row['id'],
        'category': row['category'],
        'price': row['price'],
        'in_stock': row['in_stock'],
        'brand': row['brand'],
        'updated_at': row['updated_at'],
    }

    return text_to_embed, metadata

At query time, you can filter by metadata before or after the vector search:

# "Find products similar to this query, but only in the 'electronics' category"
results = vector_db.query(
    vector=query_embedding,
    filter={"category": "electronics", "in_stock": True},
    top_k=10,
)

Embedding Generation

When to Re-Embed

Not every CDC event requires a new embedding. Generating embeddings is the most expensive step (API cost + latency), so minimize unnecessary calls:

# Columns that affect the embedding
EMBEDDED_COLUMNS = {'name', 'description', 'features'}

def needs_re_embedding(cdc_event):
    if cdc_event['op'] == 'c':  # INSERT — always embed
        return True
    if cdc_event['op'] == 'd':  # DELETE — no embedding needed
        return False
    if cdc_event['op'] == 'u':  # UPDATE — check which columns changed
        before = cdc_event.get('before', {})
        after = cdc_event['after']
        changed_columns = {
            k for k in after
            if before.get(k) != after.get(k)
        }
        return bool(changed_columns & EMBEDDED_COLUMNS)
    return False

If only metadata columns changed (e.g., price updated), skip the embedding API call and just update the metadata in the vector database:

def process_cdc_event(event):
    if event['op'] == 'd':
        vector_db.delete(id=str(event['key']))
        return

    text, metadata = prepare_for_embedding(event)

    if needs_re_embedding(event):
        embedding = embedding_model.encode(text)
        vector_db.upsert(
            id=str(event['key']),
            vector=embedding,
            metadata=metadata,
        )
    else:
        # Only update metadata — much cheaper
        vector_db.update(
            id=str(event['key']),
            metadata=metadata,
        )

Embedding Model Options

OptionLatencyCostQualityNotes
OpenAI text-embedding-3-small50-200ms$0.02/1M tokensGoodManaged, easy to use
OpenAI text-embedding-3-large50-200ms$0.13/1M tokensBetterHigher dimensional
Cohere embed-v350-200ms$0.10/1M tokensGoodGood multilingual
Local (sentence-transformers)5-20msInfrastructure costVariesNo API dependency, full control
Local (ONNX-optimized)2-10msInfrastructure costVariesFastest for high throughput

For CDC workloads, local models are often the better choice. You avoid API rate limits, reduce latency, and eliminate per-token costs. A single GPU instance running a sentence-transformer model can embed thousands of records per second.

Batching Embedding Calls

If using an API-based embedding model, batch multiple CDC events into a single API call:

class EmbeddingBatcher:
    def __init__(self, model, batch_size=64, flush_interval_s=1.0):
        self.model = model
        self.batch_size = batch_size
        self.flush_interval = flush_interval_s
        self.buffer = []

    def add(self, event_id, text, metadata):
        self.buffer.append((event_id, text, metadata))
        if len(self.buffer) >= self.batch_size:
            return self.flush()
        return []

    def flush(self):
        if not self.buffer:
            return []

        texts = [item[1] for item in self.buffer]
        embeddings = self.model.encode_batch(texts)  # Single API call

        results = []
        for (event_id, text, metadata), embedding in zip(self.buffer, embeddings):
            results.append((event_id, embedding, metadata))

        self.buffer.clear()
        return results

Chunking Strategies for Database Records

Embedding models have token limits (typically 512-8192 tokens). Most database records fit within this limit — a product name and description is usually under 200 tokens. But some tables have long text fields (article bodies, support ticket histories, legal documents).

Short Records: Single Vector

If the concatenated embedded text fits within the model’s context window, embed it as a single vector:

Database row → "Product Name. Description. Features." → 1 vector

One row = one vector = one document ID. Simple, clean, and easy to manage with CDC.

Long Records: Chunked Vectors

For long text fields, split into chunks and store multiple vectors per source row:

Database row → "Article title. Paragraph 1..." → chunk_0
            → "...Paragraph 2... Paragraph 3..." → chunk_1
            → "...Paragraph 4. Paragraph 5..."   → chunk_2

Each chunk gets its own vector ID, but all chunks share the source row’s primary key as metadata:

def chunk_and_embed(event):
    row = event['after']
    text = f"{row['title']}. {row['body']}"
    source_id = str(row['id'])

    chunks = split_into_chunks(text, max_tokens=500, overlap=50)

    vectors = []
    for i, chunk in enumerate(chunks):
        embedding = model.encode(chunk)
        vectors.append({
            'id': f"{source_id}:chunk_{i}",
            'vector': embedding,
            'metadata': {
                'source_id': source_id,
                'chunk_index': i,
                'total_chunks': len(chunks),
                'category': row['category'],
            }
        })
    return vectors

Handling Updates to Chunked Records

When a long text field is updated, the number of chunks may change. A simple approach:

  1. Delete all existing chunks for this source row
  2. Re-chunk and re-embed the updated text
  3. Upsert the new chunks
def update_chunked_record(event):
    source_id = str(event['key'])

    # Delete old chunks — filter by source_id metadata
    vector_db.delete(filter={"source_id": source_id})

    # Generate new chunks
    if event['op'] != 'd':
        vectors = chunk_and_embed(event)
        vector_db.upsert(vectors=vectors)

This is safe because CDC events for the same row arrive in order within a partition. There is no race condition between the delete and the insert.

Handling Deletes

When a row is deleted from the source database, the corresponding vectors must be removed from the vector database. Stale vectors for deleted records will surface in search results and pollute RAG pipelines.

Single-Vector Deletes

Straightforward — delete by the vector ID (which matches the source primary key):

if event['op'] == 'd':
    vector_db.delete(id=str(event['key']))

Chunked-Vector Deletes

Delete all chunks associated with the source row:

if event['op'] == 'd':
    vector_db.delete(filter={"source_id": str(event['key'])})

Most vector databases support deletion by metadata filter. If yours does not, maintain a mapping of source row ID to chunk IDs in a separate store.

Soft Deletes

If your source database uses soft deletes (setting a deleted_at column instead of removing the row), the CDC engine emits an UPDATE event, not a DELETE. Your embedding service must check for this:

def process_event(event):
    if event['op'] == 'u' and event['after'].get('deleted_at') is not None:
        # Soft delete — remove from vector DB
        vector_db.delete(id=str(event['key']))
        return
    # ... normal processing

Vector Database Options

DatabaseTypeUpsert SupportMetadata FilteringHosted Option
PineconeManagedYesYes (rich filtering)Yes (only)
WeaviateOpen sourceYesYes (GraphQL)Yes
QdrantOpen sourceYesYes (payload filtering)Yes
MilvusOpen sourceYesYesYes (Zilliz)
pgvectorPostgreSQL extensionYes (via SQL)Yes (SQL WHERE)Any PG host
ChromaOpen sourceYesYesNo

For CDC workloads, the key requirement is efficient upsert-by-ID. Every vector database in this list supports it. The differences are in query performance, filtering capabilities, and operational overhead.

pgvector is worth calling out: if your destination is already PostgreSQL, you can stream CDC events to a separate PostgreSQL instance with pgvector. This keeps your stack simple and lets you use SQL for both vector search and metadata filtering.

Putting It All Together

A complete CDC-to-vector pipeline handles the full lifecycle:

  1. Initial load: Backfill existing records from the source database into the vector store
  2. Ongoing CDC: Stream every insert, update, and delete in real time
  3. Smart embedding: Only re-embed when text columns change; update metadata for non-text changes
  4. Delete propagation: Remove vectors when source rows are deleted (hard or soft)
  5. Monitoring: Track embedding latency, vector count drift (source rows vs vectors), and search quality

The result is a vector database that reflects the current state of your source data — not a stale snapshot from the last batch run. For AI applications like RAG, agent context, and recommendation engines, this freshness is the difference between useful and unreliable.

Keeping Vectors Fresh at Scale

The architecture described here scales well. CDC events are naturally incremental — you only process what changed. Embedding costs are proportional to the rate of change, not the size of the dataset. And because Kafka buffers events, temporary slowdowns in the embedding service or vector database do not cause data loss.

The biggest operational concern is embedding model latency during traffic spikes. If your source database has a burst of writes (bulk import, migration), the embedding service must keep up or the CDC consumer lag grows. Auto-scaling the embedding service based on consumer lag is the standard solution.


Ready to keep your vector database in sync with your source data? Streamkap streams CDC events from your databases in real time, giving you a reliable foundation for embedding pipelines and AI applications. Start a free trial or learn more about real-time data for AI.