<--- Back to all resources
PostgreSQL to Elasticsearch: Real-Time Search Index Sync
Keep Elasticsearch search indexes in sync with PostgreSQL using CDC. Learn how to build a real-time sync pipeline for full-text search, autocomplete, and faceted navigation.
PostgreSQL is one of the most trusted relational databases in the world, but it was not built for full-text search, autocomplete suggestions, or faceted navigation over millions of records. That is where Elasticsearch comes in. By pairing PostgreSQL as your system of record with Elasticsearch as a purpose-built search engine, you get the best of both worlds: ACID transactions for writes and sub-second, relevance-ranked search for reads.
The hard part is keeping those two systems in sync. Every product update, every new review, every price change in PostgreSQL needs to appear in Elasticsearch within seconds, or your users see stale search results and your business loses trust. This guide walks through how to build a reliable, real-time sync pipeline from PostgreSQL to Elasticsearch using change data capture (CDC).
The Dual-Write Problem
The most common first attempt at syncing PostgreSQL and Elasticsearch is the dual write. The application writes to PostgreSQL and then immediately writes to Elasticsearch in the same request handler. It looks simple in code:
# Dual write — looks simple, breaks in production
def update_product(product_id, data):
db.execute("UPDATE products SET name=%s WHERE id=%s", data["name"], product_id)
es.index(index="products", id=product_id, body=data)
This approach has three fundamental problems. First, if the Elasticsearch write fails after the PostgreSQL write succeeds, your data is now inconsistent and there is no transaction spanning both systems. Second, if your application crashes between the two writes, the Elasticsearch update is silently lost. Third, every service that modifies PostgreSQL data must remember to also update Elasticsearch, which turns into a maintenance burden as your team and codebase grow.
Even wrapping both calls in a try/except with retry logic does not fully solve the problem. You are fighting distributed systems physics: there is no atomic commit across PostgreSQL and Elasticsearch. Eventually, the two will drift apart, and debugging why search results do not match the database becomes a recurring headache.
CDC eliminates this entire class of bugs. Instead of modifying application code, CDC reads directly from PostgreSQL’s write-ahead log (WAL), the same log the database uses for crash recovery and replication. Every committed INSERT, UPDATE, and DELETE is captured at the database level and forwarded to Elasticsearch. The application only writes to PostgreSQL and never needs to know that Elasticsearch exists.
Architecture Patterns
There are several ways to wire CDC from PostgreSQL to Elasticsearch. The right choice depends on your data complexity and operational requirements.
Direct CDC to Elasticsearch
The simplest pattern connects a CDC connector directly to Elasticsearch with no intermediate message broker. A tool like Debezium reads the PostgreSQL WAL and writes documents to Elasticsearch via its bulk API. This works well when you are syncing individual tables without complex joins.
CDC via Kafka to Elasticsearch
Introducing Kafka (or a Kafka-compatible broker) between the CDC source and the Elasticsearch sink provides durability, replayability, and fan-out. CDC events land in Kafka topics, and a separate Elasticsearch sink connector consumes them. If Elasticsearch goes down for maintenance, events queue in Kafka and are delivered when the cluster recovers. No data loss.
CDC with Stream Processing for Denormalization
When your Elasticsearch documents require data from multiple PostgreSQL tables (products + categories + reviews, for example), you need a stream processing layer between CDC and Elasticsearch. Apache Flink SQL or kSQL can join CDC streams in real time and emit a single denormalized document per entity. This is the most powerful pattern and the one you will likely need for any non-trivial search use case.
A platform like Streamkap provides managed connectors for both PostgreSQL CDC and Elasticsearch, along with built-in Flink-based stream processing for denormalization, removing the need to operate Kafka and Flink clusters yourself.
PostgreSQL CDC Setup
Before any CDC tool can read changes from PostgreSQL, the database must be configured for logical replication.
Set WAL Level
Edit postgresql.conf or use ALTER SYSTEM:
ALTER SYSTEM SET wal_level = 'logical';
-- Requires a restart to take effect
Logical WAL level records enough information to reconstruct row-level changes, including the full before and after images of each row.
Create a Replication Slot
Replication slots ensure PostgreSQL retains WAL segments until the CDC consumer has acknowledged them:
SELECT pg_create_logical_replication_slot('streamkap_slot', 'pgoutput');
Create a Publication
A publication defines which tables are included in the CDC stream:
-- All tables
CREATE PUBLICATION streamkap_pub FOR ALL TABLES;
-- Or specific tables only
CREATE PUBLICATION streamkap_pub FOR TABLE products, categories, reviews;
Dedicated Replication User
Create a user with the minimum required privileges:
CREATE ROLE cdc_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
Monitor the replication slot lag regularly. If the CDC consumer falls behind, PostgreSQL will retain WAL segments and disk usage will grow until the consumer catches up.
Relational to Document Mapping
The biggest conceptual shift when syncing PostgreSQL to Elasticsearch is moving from normalized relational tables to denormalized JSON documents. In PostgreSQL, you might have a products table, a categories table, and a reviews table linked by foreign keys. In Elasticsearch, the ideal structure is a single document per product that embeds its category name and aggregated review data.
Denormalization Strategies
Pre-join in the stream processor. Use Flink SQL or a similar engine to join the CDC streams from products, categories, and reviews into a single enriched event before it reaches Elasticsearch. This keeps Elasticsearch indexing simple and fast.
Nested objects. For one-to-many relationships like product variants or multiple images, Elasticsearch nested objects let you store arrays of structured data inside a single document while still querying them independently:
{
"product_id": 42,
"name": "Wireless Headphones",
"category": "Electronics",
"variants": [
{ "sku": "WH-BLK", "color": "Black", "price": 79.99 },
{ "sku": "WH-WHT", "color": "White", "price": 79.99 }
]
}
Parent-child (join field). When the parent entity updates frequently but the children rarely change (or vice versa), Elasticsearch’s join field type avoids reindexing all children when only the parent changes. This is useful for scenarios like a seller profile linked to thousands of product listings.
Handling Joins Across CDC Streams
When a category name changes in PostgreSQL, the CDC stream emits a change event for the categories table. But every product document in Elasticsearch that references that category also needs updating. A stream processor handles this by maintaining state for each join key and re-emitting enriched product documents whenever any input stream changes.
Elasticsearch Index Design
Good index design is critical for both search quality and write performance.
Mapping Types and Analyzers
Define explicit mappings rather than relying on dynamic mapping. This gives you control over how each field is indexed and searched:
{
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": { "type": "keyword" },
"autocomplete": {
"type": "text",
"analyzer": "autocomplete_analyzer"
}
}
},
"category": { "type": "keyword" },
"price": { "type": "float" },
"description": { "type": "text", "analyzer": "english" },
"created_at": { "type": "date" },
"in_stock": { "type": "boolean" }
}
}
}
For autocomplete, define a custom analyzer with edge n-grams:
{
"settings": {
"analysis": {
"analyzer": {
"autocomplete_analyzer": {
"type": "custom",
"tokenizer": "autocomplete_tokenizer",
"filter": ["lowercase"]
}
},
"tokenizer": {
"autocomplete_tokenizer": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 20,
"token_chars": ["letter", "digit"]
}
}
}
}
}
Use keyword type for fields used in filtering and aggregations (category, status, brand). Use text type with appropriate analyzers for fields users search by free text.
Refresh Interval
By default, Elasticsearch refreshes every second, making new documents searchable. For write-heavy CDC workloads, increase this to reduce indexing overhead:
PUT /products/_settings
{
"index": {
"refresh_interval": "5s"
}
}
A 5-second refresh interval is a reasonable trade-off for most search applications. Users rarely notice the difference between 1-second and 5-second search freshness, but your cluster will handle significantly higher write throughput.
Handling Updates and Deletes
CDC streams emit three event types: inserts, updates, and deletes. Each requires different handling in Elasticsearch.
Inserts and Full Replacements
For inserts and updates where the CDC event contains the full after-image of the row, use the Elasticsearch index API with the PostgreSQL primary key as the document ID. If the document already exists, it is replaced entirely:
PUT /products/_doc/42
{
"name": "Wireless Headphones",
"category": "Electronics",
"price": 79.99,
"in_stock": true
}
This idempotent approach simplifies error handling. If a CDC event is delivered twice (which can happen during connector restarts), the second write simply overwrites the first with identical data.
Partial Updates
When only a subset of fields change and the CDC event includes just the modified columns, use the update API to avoid overwriting the entire document:
POST /products/_update/42
{
"doc": {
"price": 69.99
}
}
Partial updates are more efficient but require that the Elasticsearch document already exists. If the document was somehow deleted, a partial update will fail. Most CDC pipelines use full document replacement for simplicity and reliability.
Deletes
When a DELETE event arrives from PostgreSQL, remove the corresponding Elasticsearch document by ID:
DELETE /products/_doc/42
If the document does not exist, Elasticsearch returns a 404, which is safe to ignore in your CDC consumer. Order matters - make sure delete events are processed after any preceding updates for the same document. Kafka topic partitioning by primary key guarantees this ordering within a partition.
Practical Example: Product Catalog Sync
Consider an e-commerce platform with three PostgreSQL tables:
-- Source tables
CREATE TABLE categories (id SERIAL PRIMARY KEY, name TEXT);
CREATE TABLE products (id SERIAL PRIMARY KEY, name TEXT, category_id INT REFERENCES categories(id), price NUMERIC, description TEXT, in_stock BOOLEAN);
CREATE TABLE reviews (id SERIAL PRIMARY KEY, product_id INT REFERENCES products(id), rating INT, body TEXT, created_at TIMESTAMP);
The goal is a single Elasticsearch document per product that includes the category name and average review rating.
A Flink SQL job joins the three CDC streams:
INSERT INTO es_products
SELECT
p.id AS product_id,
p.name,
c.name AS category,
p.price,
p.description,
p.in_stock,
AVG(r.rating) AS avg_rating,
COUNT(r.id) AS review_count
FROM products_cdc p
LEFT JOIN categories_cdc c ON p.category_id = c.id
LEFT JOIN reviews_cdc r ON r.product_id = p.id
GROUP BY p.id, p.name, c.name, p.price, p.description, p.in_stock;
When a new review is submitted, the Flink job recalculates the average rating and emits an updated product document to Elasticsearch. When a category is renamed, every product in that category gets a refreshed document. The application code does not change at all. It just keeps writing to PostgreSQL.
Performance Tuning
High-throughput CDC pipelines can push thousands of events per second to Elasticsearch. These settings keep your cluster healthy under load.
Bulk Size
The Elasticsearch sink connector should batch documents into bulk requests. A bulk size of 1,000 to 5,000 documents per request is a good starting point. Smaller batches increase per-document overhead; larger batches risk timeouts and memory pressure.
Index Settings for Write-Heavy Workloads
During initial snapshot (when CDC first captures the full state of your PostgreSQL tables), temporarily adjust index settings:
PUT /products/_settings
{
"index": {
"number_of_replicas": 0,
"refresh_interval": "30s"
}
}
After the snapshot completes and you transition to streaming incremental changes, restore production settings:
PUT /products/_settings
{
"index": {
"number_of_replicas": 1,
"refresh_interval": "5s"
}
}
Thread Pool and Queue Size
If you see rejected execution exceptions in Elasticsearch logs, increase the write thread pool queue size. However, this is usually a sign that your cluster needs more data nodes or your bulk size needs tuning.
Translog Settings
For CDC workloads where durability is handled by the upstream Kafka or replication slot, you can relax Elasticsearch’s translog durability to async for higher write throughput at the cost of a small data loss window if an Elasticsearch node crashes:
PUT /products/_settings
{
"index": {
"translog.durability": "async",
"translog.sync_interval": "5s"
}
}
Monitoring
A CDC pipeline is only as reliable as your ability to detect when it falls behind or breaks.
Sync Lag
Track the time difference between when a change is committed in PostgreSQL and when the corresponding document becomes searchable in Elasticsearch. Streamkap exposes this as a built-in metric for its managed CDC pipelines. If you are running your own stack, compare the PostgreSQL WAL LSN (log sequence number) with the last acknowledged LSN in the CDC consumer.
Document Count Reconciliation
Periodically compare counts:
-- PostgreSQL
SELECT COUNT(*) FROM products WHERE deleted_at IS NULL;
// Elasticsearch
GET /products/_count
A persistent mismatch indicates dropped events, duplicate processing, or a stuck replication slot. Automate this check and alert when the counts diverge beyond a threshold (for example, more than 0.1% difference).
Elasticsearch Cluster Health
Monitor these key indicators:
- Cluster status (green/yellow/red) - yellow means replicas are unassigned, red means primary shards are missing.
- Indexing rate (documents per second) - a sudden drop may indicate a CDC pipeline failure.
- Search latency (p50, p95, p99) - rising latency may mean the cluster is overwhelmed by writes.
- JVM heap pressure - sustained heap above 75% leads to long garbage collection pauses that affect both indexing and search.
Replication Slot Monitoring in PostgreSQL
A stalled CDC consumer causes the replication slot to retain WAL segments indefinitely, eventually filling the disk:
SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes
FROM pg_replication_slots;
Set up alerts for replication slot lag exceeding a threshold (for example, 1 GB). If the consumer is down and cannot be recovered quickly, you may need to drop the slot to prevent a disk-full outage, then rebuild the Elasticsearch index from a snapshot.
Wrapping Up
Syncing PostgreSQL to Elasticsearch with CDC gives you reliable, real-time search without touching your application code. The pattern is straightforward: PostgreSQL’s WAL captures every change, CDC streams those changes through an optional processing layer for denormalization, and Elasticsearch indexes the resulting documents within seconds. The dual-write approach may look simpler in a proof of concept, but CDC is the only pattern that scales to production without eventual consistency drift.
The operational investment is in setting up logical replication, designing your Elasticsearch mappings, and monitoring the pipeline end to end. Managed platforms like Streamkap reduce that investment significantly by handling the CDC infrastructure, connectors, and stream processing, letting your team focus on building great search experiences rather than maintaining data plumbing.