<--- Back to all resources
Streaming CDC Events to Vector Databases for Real-Time AI
How to build a CDC pipeline to vector databases — covering embedding generation, incremental updates, delete handling, chunking strategies, and metadata filtering.
Vector databases power similarity search, recommendation engines, and retrieval-augmented generation (RAG). But there is a gap most teams overlook: the data in the vector database gets stale the moment the source database changes.
If you load vectors in a nightly batch job, your search results, recommendations, and AI agent responses are up to 24 hours behind your actual data. For applications where accuracy matters — customer support bots, product search, fraud detection — this staleness directly affects quality.
The solution is straightforward: use CDC to stream database changes to your vector database in real time. Every insert, update, and delete in the source triggers an embedding update in the vector store. This article covers the architecture, implementation details, and trade-offs.
Why Batch Embedding Falls Short
The typical batch approach:
- Run a scheduled job (daily, hourly)
- Read all rows from the source database (or rows changed since last run)
- Generate embeddings for each row
- Upsert embeddings into the vector database
This works for initial loads but has problems at scale:
Staleness: Between batch runs, the vector database does not reflect reality. A product marked as discontinued is still returned in similarity search. A corrected support article still has the old, incorrect embedding.
Waste: Re-embedding unchanged rows is expensive. Embedding API costs scale linearly with the number of tokens processed. If only 1% of rows changed, you are paying 100x more than necessary.
Database load: Full-table scans for the batch export put load on your production database during the extraction phase.
Complexity of change detection: Tracking “what changed since last run” requires reliable timestamps, which not all tables have. Missing a change means the vector store drifts from the source.
CDC eliminates all of these by streaming each change as it happens, so you only process what actually changed.
Architecture
Source DB ──> CDC Engine ──> Kafka ──> Embedding Service ──> Vector DB
│
Embedding API
(OpenAI, Cohere,
local model)
Component Responsibilities
CDC engine: Captures every INSERT, UPDATE, and DELETE from the source database’s transaction log and publishes structured change events to Kafka.
Kafka: Buffers and orders change events. Provides exactly-once or at-least-once delivery guarantees to the downstream consumer. If the embedding service is slow or down, events queue up and are processed when it recovers.
Embedding service: Consumes CDC events, decides whether to generate a new embedding, calls the embedding model, and writes the result to the vector database. This is where most of the logic lives.
Vector database: Stores embeddings with metadata and serves similarity search queries.
Deciding What to Embed
Not every column in a database row should be embedded. Embeddings capture semantic meaning from text — they are not useful for numeric IDs, timestamps, or boolean flags.
Column Classification
For each table, classify columns into three categories:
| Category | Examples | Treatment |
|---|---|---|
| Embed | name, description, content, title | Concatenate and pass to embedding model |
| Metadata | price, category, status, created_at | Store as vector metadata for filtering |
| Ignore | internal_id, audit columns, binary data | Do not send to vector DB |
Example: Product Catalog
def prepare_for_embedding(cdc_event):
row = cdc_event['after']
# Text to embed — only semantically meaningful columns
text_to_embed = f"{row['name']}. {row['description']}"
if row.get('features'):
text_to_embed += f". Features: {row['features']}"
# Metadata for filtering (not embedded)
metadata = {
'product_id': row['id'],
'category': row['category'],
'price': row['price'],
'in_stock': row['in_stock'],
'brand': row['brand'],
'updated_at': row['updated_at'],
}
return text_to_embed, metadata
At query time, you can filter by metadata before or after the vector search:
# "Find products similar to this query, but only in the 'electronics' category"
results = vector_db.query(
vector=query_embedding,
filter={"category": "electronics", "in_stock": True},
top_k=10,
)
Embedding Generation
When to Re-Embed
Not every CDC event requires a new embedding. Generating embeddings is the most expensive step (API cost + latency), so minimize unnecessary calls:
# Columns that affect the embedding
EMBEDDED_COLUMNS = {'name', 'description', 'features'}
def needs_re_embedding(cdc_event):
if cdc_event['op'] == 'c': # INSERT — always embed
return True
if cdc_event['op'] == 'd': # DELETE — no embedding needed
return False
if cdc_event['op'] == 'u': # UPDATE — check which columns changed
before = cdc_event.get('before', {})
after = cdc_event['after']
changed_columns = {
k for k in after
if before.get(k) != after.get(k)
}
return bool(changed_columns & EMBEDDED_COLUMNS)
return False
If only metadata columns changed (e.g., price updated), skip the embedding API call and just update the metadata in the vector database:
def process_cdc_event(event):
if event['op'] == 'd':
vector_db.delete(id=str(event['key']))
return
text, metadata = prepare_for_embedding(event)
if needs_re_embedding(event):
embedding = embedding_model.encode(text)
vector_db.upsert(
id=str(event['key']),
vector=embedding,
metadata=metadata,
)
else:
# Only update metadata — much cheaper
vector_db.update(
id=str(event['key']),
metadata=metadata,
)
Embedding Model Options
| Option | Latency | Cost | Quality | Notes |
|---|---|---|---|---|
OpenAI text-embedding-3-small | 50-200ms | $0.02/1M tokens | Good | Managed, easy to use |
OpenAI text-embedding-3-large | 50-200ms | $0.13/1M tokens | Better | Higher dimensional |
Cohere embed-v3 | 50-200ms | $0.10/1M tokens | Good | Good multilingual |
| Local (sentence-transformers) | 5-20ms | Infrastructure cost | Varies | No API dependency, full control |
| Local (ONNX-optimized) | 2-10ms | Infrastructure cost | Varies | Fastest for high throughput |
For CDC workloads, local models are often the better choice. You avoid API rate limits, reduce latency, and eliminate per-token costs. A single GPU instance running a sentence-transformer model can embed thousands of records per second.
Batching Embedding Calls
If using an API-based embedding model, batch multiple CDC events into a single API call:
class EmbeddingBatcher:
def __init__(self, model, batch_size=64, flush_interval_s=1.0):
self.model = model
self.batch_size = batch_size
self.flush_interval = flush_interval_s
self.buffer = []
def add(self, event_id, text, metadata):
self.buffer.append((event_id, text, metadata))
if len(self.buffer) >= self.batch_size:
return self.flush()
return []
def flush(self):
if not self.buffer:
return []
texts = [item[1] for item in self.buffer]
embeddings = self.model.encode_batch(texts) # Single API call
results = []
for (event_id, text, metadata), embedding in zip(self.buffer, embeddings):
results.append((event_id, embedding, metadata))
self.buffer.clear()
return results
Chunking Strategies for Database Records
Embedding models have token limits (typically 512-8192 tokens). Most database records fit within this limit — a product name and description is usually under 200 tokens. But some tables have long text fields (article bodies, support ticket histories, legal documents).
Short Records: Single Vector
If the concatenated embedded text fits within the model’s context window, embed it as a single vector:
Database row → "Product Name. Description. Features." → 1 vector
One row = one vector = one document ID. Simple, clean, and easy to manage with CDC.
Long Records: Chunked Vectors
For long text fields, split into chunks and store multiple vectors per source row:
Database row → "Article title. Paragraph 1..." → chunk_0
→ "...Paragraph 2... Paragraph 3..." → chunk_1
→ "...Paragraph 4. Paragraph 5..." → chunk_2
Each chunk gets its own vector ID, but all chunks share the source row’s primary key as metadata:
def chunk_and_embed(event):
row = event['after']
text = f"{row['title']}. {row['body']}"
source_id = str(row['id'])
chunks = split_into_chunks(text, max_tokens=500, overlap=50)
vectors = []
for i, chunk in enumerate(chunks):
embedding = model.encode(chunk)
vectors.append({
'id': f"{source_id}:chunk_{i}",
'vector': embedding,
'metadata': {
'source_id': source_id,
'chunk_index': i,
'total_chunks': len(chunks),
'category': row['category'],
}
})
return vectors
Handling Updates to Chunked Records
When a long text field is updated, the number of chunks may change. A simple approach:
- Delete all existing chunks for this source row
- Re-chunk and re-embed the updated text
- Upsert the new chunks
def update_chunked_record(event):
source_id = str(event['key'])
# Delete old chunks — filter by source_id metadata
vector_db.delete(filter={"source_id": source_id})
# Generate new chunks
if event['op'] != 'd':
vectors = chunk_and_embed(event)
vector_db.upsert(vectors=vectors)
This is safe because CDC events for the same row arrive in order within a partition. There is no race condition between the delete and the insert.
Handling Deletes
When a row is deleted from the source database, the corresponding vectors must be removed from the vector database. Stale vectors for deleted records will surface in search results and pollute RAG pipelines.
Single-Vector Deletes
Straightforward — delete by the vector ID (which matches the source primary key):
if event['op'] == 'd':
vector_db.delete(id=str(event['key']))
Chunked-Vector Deletes
Delete all chunks associated with the source row:
if event['op'] == 'd':
vector_db.delete(filter={"source_id": str(event['key'])})
Most vector databases support deletion by metadata filter. If yours does not, maintain a mapping of source row ID to chunk IDs in a separate store.
Soft Deletes
If your source database uses soft deletes (setting a deleted_at column instead of removing the row), the CDC engine emits an UPDATE event, not a DELETE. Your embedding service must check for this:
def process_event(event):
if event['op'] == 'u' and event['after'].get('deleted_at') is not None:
# Soft delete — remove from vector DB
vector_db.delete(id=str(event['key']))
return
# ... normal processing
Vector Database Options
| Database | Type | Upsert Support | Metadata Filtering | Hosted Option |
|---|---|---|---|---|
| Pinecone | Managed | Yes | Yes (rich filtering) | Yes (only) |
| Weaviate | Open source | Yes | Yes (GraphQL) | Yes |
| Qdrant | Open source | Yes | Yes (payload filtering) | Yes |
| Milvus | Open source | Yes | Yes | Yes (Zilliz) |
| pgvector | PostgreSQL extension | Yes (via SQL) | Yes (SQL WHERE) | Any PG host |
| Chroma | Open source | Yes | Yes | No |
For CDC workloads, the key requirement is efficient upsert-by-ID. Every vector database in this list supports it. The differences are in query performance, filtering capabilities, and operational overhead.
pgvector is worth calling out: if your destination is already PostgreSQL, you can stream CDC events to a separate PostgreSQL instance with pgvector. This keeps your stack simple and lets you use SQL for both vector search and metadata filtering.
Putting It All Together
A complete CDC-to-vector pipeline handles the full lifecycle:
- Initial load: Backfill existing records from the source database into the vector store
- Ongoing CDC: Stream every insert, update, and delete in real time
- Smart embedding: Only re-embed when text columns change; update metadata for non-text changes
- Delete propagation: Remove vectors when source rows are deleted (hard or soft)
- Monitoring: Track embedding latency, vector count drift (source rows vs vectors), and search quality
The result is a vector database that reflects the current state of your source data — not a stale snapshot from the last batch run. For AI applications like RAG, agent context, and recommendation engines, this freshness is the difference between useful and unreliable.
Keeping Vectors Fresh at Scale
The architecture described here scales well. CDC events are naturally incremental — you only process what changed. Embedding costs are proportional to the rate of change, not the size of the dataset. And because Kafka buffers events, temporary slowdowns in the embedding service or vector database do not cause data loss.
The biggest operational concern is embedding model latency during traffic spikes. If your source database has a burst of writes (bulk import, migration), the embedding service must keep up or the CDC consumer lag grows. Auto-scaling the embedding service based on consumer lag is the standard solution.
Ready to keep your vector database in sync with your source data? Streamkap streams CDC events from your databases in real time, giving you a reliable foundation for embedding pipelines and AI applications. Start a free trial or learn more about real-time data for AI.