<--- Back to all resources
CDC to Elasticsearch: Building Real-Time Search from Database Changes
How to stream database changes to Elasticsearch for real-time search — covering index mapping, document ID strategies, handling deletes, and zero-downtime reindexing.
Search is one of the most latency-sensitive features in any application. When a user updates their profile, adds a product listing, or modifies an order, they expect search results to reflect that change within seconds — not minutes or hours after a batch reindex job runs.
CDC (change data capture) solves this by streaming every database write to Elasticsearch as it happens. Instead of running periodic full-table scans or managing complex application-level dual writes, you get a reliable, decoupled pipeline that keeps your search index in sync with your source of truth.
This guide covers the technical details of building this pipeline correctly — from index mapping to delete handling to zero-downtime reindexing.
Architecture Overview
The basic flow:
Source Database ──> CDC Engine ──> Message Broker ──> Sink Connector ──> Elasticsearch
(PostgreSQL, (reads WAL/ (Kafka) (transforms & (search index)
MySQL, etc.) binlog) writes)
Each component has a job:
- CDC engine: Reads the database transaction log and emits structured change events (insert, update, delete) with before/after row snapshots
- Message broker: Buffers events, provides ordering guarantees per partition, and decouples the source from the sink
- Sink connector: Reads from the broker, transforms events into Elasticsearch operations, and writes them in bulk
- Elasticsearch: Indexes documents and serves search queries
The separation between components is important. If Elasticsearch goes down for maintenance, CDC events queue up in the broker. When Elasticsearch comes back, the sink connector drains the backlog and catches up. Your source database is never affected.
Index Mapping Strategy
The first decision: how should your Elasticsearch index mapping relate to your database schema?
One Table, One Index (Simple Case)
The most straightforward approach. Each database table maps to one Elasticsearch index.
PUT /products
{
"mappings": {
"properties": {
"id": { "type": "integer" },
"name": { "type": "text", "analyzer": "standard" },
"description": { "type": "text", "analyzer": "standard" },
"price": { "type": "float" },
"category": { "type": "keyword" },
"in_stock": { "type": "boolean" },
"updated_at": { "type": "date" }
}
}
}
CDC events from the products table map directly to documents in the products index. The mapping is clean and easy to reason about.
Denormalized Documents (Common Case)
Search indexes usually need denormalized documents — a product document that includes its category name, brand details, and review count, not just foreign key IDs. In the database, this data is spread across multiple tables. In Elasticsearch, it should be a single document.
You have two approaches:
Option A: Transform in the stream processing layer
Join related data in your stream processing layer before writing to Elasticsearch. This is the cleanest approach but requires maintaining stream-table joins.
-- Stream processing pseudocode (e.g., in a Streaming Agent)
SELECT
p.id,
p.name,
p.description,
p.price,
c.name AS category_name,
b.name AS brand_name,
COALESCE(r.avg_rating, 0) AS avg_rating
FROM products_stream p
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN brands b ON p.brand_id = b.id
LEFT JOIN review_aggregates r ON p.id = r.product_id
Option B: Enrich at the sink
The sink connector looks up related data when processing each CDC event. Simpler to set up but adds latency and load on the source database.
For most production systems, Option A is better. Stream processing handles the joins once, and the sink receives fully denormalized documents.
Nested vs Flat Fields
When a document has one-to-many relationships (a product with multiple variants), you choose between:
Flat arrays: Simple but limited query capability.
{
"id": 1,
"name": "T-Shirt",
"variant_sizes": ["S", "M", "L", "XL"],
"variant_colors": ["red", "blue"]
}
You cannot query “find t-shirts where size S is available in red” because the arrays are independent.
Nested objects: Full query capability but higher indexing cost.
{
"id": 1,
"name": "T-Shirt",
"variants": [
{ "size": "S", "color": "red", "in_stock": true },
{ "size": "M", "color": "blue", "in_stock": false }
]
}
With the nested field type, you can query “find t-shirts where a variant with size S AND color red is in stock.”
The trade-off: nested documents are indexed as separate Lucene documents internally. A product with 50 variants creates 51 Lucene documents. This increases indexing time and index size.
Document ID Mapping
The _id field in Elasticsearch determines how documents are uniquely identified. Getting this right is critical for idempotent writes.
Single-Column Primary Key
Use the primary key value directly:
Database row: { id: 42, name: "Widget", ... }
Elasticsearch: PUT /products/_doc/42
Composite Primary Key
Concatenate with a delimiter that cannot appear in the key values:
Database row: { order_id: 100, line_item_id: 3, ... }
Elasticsearch: PUT /order_items/_doc/100:3
Why This Matters
When the CDC engine emits an update event, the sink writes to the same _id. Elasticsearch treats this as an upsert — if the document exists, it is replaced; if not, it is created. This means:
- Retries are safe: If the sink crashes after writing but before committing the consumer offset, it re-reads the event and writes the same document to the same
_id. No duplicates. - Out-of-order events: If events arrive out of order (due to partitioning or retries), the last write wins. Include a version field (like a database timestamp or LSN) and use optimistic concurrency control if ordering matters.
Optimistic Concurrency with Version Numbers
PUT /products/_doc/42?version=1027&version_type=external
{
"name": "Updated Widget",
"updated_at": "2025-05-22T10:30:00Z"
}
With version_type=external, Elasticsearch only accepts the write if the provided version is greater than the stored version. Out-of-order updates with lower versions are rejected. Use the database LSN or a monotonically increasing sequence as the version.
Handling Deletes
When a row is deleted from the source database, the CDC engine emits a delete event. How you handle it in Elasticsearch depends on your requirements.
Hard Deletes
Delete the document from Elasticsearch:
DELETE /products/_doc/42
This is simple but has implications:
- The document is gone from search results immediately
- You lose the ability to audit what was deleted
- Frequent deletes create “tombstones” in Lucene segments, increasing merge pressure
Soft Deletes
Update the document with a deleted flag and filter it out at query time:
POST /products/_doc/42/_update
{
"doc": {
"deleted": true,
"deleted_at": "2025-05-22T10:30:00Z"
}
}
Then create a filtered alias:
POST /_aliases
{
"actions": [
{
"add": {
"index": "products",
"alias": "products_active",
"filter": { "term": { "deleted": false } }
}
}
]
}
Point your search queries at products_active. Deleted documents are invisible to search but preserved in the index. Run a periodic cleanup job to hard-delete documents that have been soft-deleted for longer than your retention period.
For more on this topic, see the guide on CDC soft deletes and tombstones.
Tombstone Events in Kafka
Kafka supports tombstone messages — records with a key but a null value. CDC engines emit these for delete operations. Configure your sink connector to translate tombstones into Elasticsearch delete operations:
Kafka record: key=42, value=null → DELETE /products/_doc/42
Kafka record: key=42, value={...} → PUT /products/_doc/42 { ... }
Bulk Indexing from CDC Events
Elasticsearch performs best with bulk operations. Instead of indexing one document at a time, batch CDC events and send them in a single _bulk request.
Bulk Request Format
POST /_bulk
{ "index": { "_index": "products", "_id": "42" } }
{ "name": "Widget", "price": 29.99, "category": "tools" }
{ "index": { "_index": "products", "_id": "43" } }
{ "name": "Gadget", "price": 49.99, "category": "electronics" }
{ "delete": { "_index": "products", "_id": "44" } }
Batching Strategy
The sink connector should accumulate events and flush them as a bulk request when either:
- The batch reaches a size threshold (e.g., 500-2000 documents)
- A time threshold expires (e.g., every 1-5 seconds)
class ElasticsearchSink:
def __init__(self, es_client, batch_size=1000, flush_interval_ms=2000):
self.es = es_client
self.batch_size = batch_size
self.flush_interval_ms = flush_interval_ms
self.buffer = []
self.last_flush = time.time()
def process(self, cdc_event):
if cdc_event['op'] == 'd': # Delete
self.buffer.append({ 'delete': { '_index': 'products', '_id': cdc_event['key'] } })
else: # Insert or Update
self.buffer.append({ 'index': { '_index': 'products', '_id': cdc_event['key'] } })
self.buffer.append(cdc_event['after'])
if self.should_flush():
self.flush()
def should_flush(self):
return (
len(self.buffer) >= self.batch_size * 2 or # *2 for action+document pairs
(time.time() - self.last_flush) * 1000 > self.flush_interval_ms
)
def flush(self):
if not self.buffer:
return
response = self.es.bulk(body=self.buffer)
if response['errors']:
self.handle_errors(response)
self.buffer.clear()
self.last_flush = time.time()
Error Handling in Bulk Requests
Bulk requests can partially fail — some operations succeed while others fail. Check each item in the response:
- 429 (Too Many Requests): Elasticsearch is overloaded. Back off and retry.
- 409 (Version Conflict): Expected with optimistic concurrency. The document was already updated with a newer version. Skip.
- 400 (Bad Request): Mapping error — the document has a field type that conflicts with the index mapping. Send to a dead letter queue and alert.
Zero-Downtime Reindexing
When you need to change your index mapping (new fields, different analyzers, updated tokenizers), you cannot modify an existing mapping for fields that already have data. You need to create a new index and migrate to it.
The Process
- Create the new index with the updated mapping:
PUT /products_v2
{
"mappings": {
"properties": {
"name": { "type": "text", "analyzer": "english" },
"description": { "type": "text", "analyzer": "english" },
"tags": { "type": "keyword" },
...
}
}
}
-
Start dual-writing: Configure your CDC sink to write to both
products_v1andproducts_v2. New changes go to both indexes from this point forward. -
Backfill the new index: Use the Elasticsearch Reindex API or a separate backfill job to copy existing documents from
products_v1toproducts_v2:
POST /_reindex
{
"source": { "index": "products_v1" },
"dest": { "index": "products_v2" }
}
Since CDC events have been flowing to products_v2 since step 2, any changes that happened during the backfill are already captured. The dual-write ensures the new index converges.
- Switch the alias:
POST /_aliases
{
"actions": [
{ "remove": { "index": "products_v1", "alias": "products" } },
{ "add": { "index": "products_v2", "alias": "products" } }
]
}
This is atomic — search queries switch from products_v1 to products_v2 instantly.
- Stop dual-writing and remove the old index after confirming the new one is healthy.
Monitoring the Pipeline
Key metrics to track:
| Metric | What it tells you | Alert threshold |
|---|---|---|
| Consumer lag | How far behind the sink is from the source | > 10,000 events or > 60 seconds |
| Bulk request latency | Elasticsearch indexing performance | p99 > 5 seconds |
| Bulk error rate | Documents failing to index | > 0.1% |
| Refresh interval alignment | How soon indexed docs are searchable | Check refresh_interval setting |
| Search latency | End-user impact | p99 > 500ms (application dependent) |
The most important metric is end-to-end latency — the time from when a row is committed in the source database to when the corresponding document is searchable in Elasticsearch. This combines CDC capture latency, broker transit time, sink processing time, and Elasticsearch refresh interval.
Elasticsearch’s default refresh interval is 1 second. This means a document is searchable at most ~1 second after it is indexed. If your CDC pipeline adds 2-3 seconds, your total end-to-end latency is around 3-4 seconds — fast enough for most search use cases.
Bringing It All Together
Building a real-time search pipeline from CDC to Elasticsearch is not just about moving data. It requires thinking about document design (denormalized, with proper ID mapping), delete strategies (soft vs hard), bulk write optimization, and a reindexing process that does not require downtime.
The reward is search results that reflect the current state of your database within seconds, without any application code changes and without the source database bearing the load of direct Elasticsearch writes.
Ready to build real-time search from your database? Streamkap streams CDC events from PostgreSQL, MySQL, MongoDB, and other sources directly to Elasticsearch with built-in bulk optimization and error handling. Start a free trial or learn more about available connectors.