<--- Back to all resources
Build an AI Agent with Real-Time Streaming Context
Step-by-step tutorial: set up CDC from PostgreSQL, stream to Redis, and build a Python agent that reads fresh data instead of querying the source DB.
Most AI agent tutorials show the agent calling a function that queries a database. It works, but it has a cost: every agent interaction adds load to your production database, queries take 50-200ms, and you’re coupling your agent’s performance to your database’s availability.
There’s a better pattern for read-heavy agent workloads: use CDC (change data capture) to stream database changes into a fast cache, then have the agent read from the cache instead. The agent gets sub-millisecond lookups, the source database sees zero additional load, and the data stays fresh within seconds.
This tutorial walks through the full flow: PostgreSQL → CDC → Redis → Python agent.
What We’re Building
┌──────────────┐ CDC ┌───────┐ Stream ┌───────┐
│ PostgreSQL │ ──────────── │ Kafka │ ──────────── │ Redis │
│ (source DB) │ events │ │ consumer │(cache)│
└──────────────┘ └───────┘ └───────┘
│
Sub-ms reads
│
┌───────────┐
│ Python │
│ Agent │
└───────────┘
The agent is a Python function-calling setup (works with OpenAI, Anthropic, or any LLM with tool use) that has access to customer and order data through Redis lookups instead of PostgreSQL queries.
Step 1: Set Up the Source Database
Start with a PostgreSQL database that has the tables your agent needs. For this tutorial, we’ll use a simple e-commerce schema:
-- Enable logical replication (required for CDC)
ALTER SYSTEM SET wal_level = 'logical';
-- Restart PostgreSQL after this change
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
plan VARCHAR(50) DEFAULT 'free',
mrr_cents INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES customers(id),
status VARCHAR(50) DEFAULT 'pending',
total_cents INTEGER NOT NULL,
items JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Create publication for CDC
CREATE PUBLICATION agent_context FOR TABLE customers, orders;
The key prerequisite is wal_level = 'logical', which tells PostgreSQL to include enough detail in its write-ahead log for CDC to reconstruct row-level changes. For a deeper dive, see PostgreSQL Change Data Capture.
Step 2: Stream Changes to Redis
In production, you’d use a managed CDC platform to handle the PostgreSQL → Kafka → Redis pipeline. For this tutorial, here’s a simplified Python consumer that reads from a Kafka topic (populated by CDC) and writes to Redis:
# cache_sync.py — Kafka consumer that keeps Redis in sync with PostgreSQL
import json
import redis
from kafka import KafkaConsumer
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
consumer = KafkaConsumer(
"cdc.public.customers",
"cdc.public.orders",
bootstrap_servers="localhost:9092",
group_id="redis-cache-sync",
auto_offset_reset="earliest",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
def sync_customer(event: dict):
"""Write customer data to Redis as a hash."""
op = event.get("op") # c=create, u=update, d=delete
after = event.get("after") # New row state
if op == "d":
customer_id = event["before"]["id"]
r.delete(f"customer:{customer_id}")
r.delete(f"customer:email:{event['before']['email']}")
return
customer_id = after["id"]
# Store as hash for field-level access
r.hset(f"customer:{customer_id}", mapping={
"id": customer_id,
"email": after["email"],
"name": after["name"],
"plan": after["plan"],
"mrr_cents": after["mrr_cents"],
"updated_at": after["updated_at"],
})
# Secondary index: lookup by email
r.set(f"customer:email:{after['email']}", customer_id)
def sync_order(event: dict):
"""Write order data and maintain per-customer order lists."""
op = event.get("op")
after = event.get("after")
if op == "d":
order_id = event["before"]["id"]
customer_id = event["before"]["customer_id"]
r.delete(f"order:{order_id}")
r.lrem(f"customer:{customer_id}:orders", 0, order_id)
return
order_id = after["id"]
customer_id = after["customer_id"]
r.hset(f"order:{order_id}", mapping={
"id": order_id,
"customer_id": customer_id,
"status": after["status"],
"total_cents": after["total_cents"],
"items": json.dumps(after["items"]),
"created_at": after["created_at"],
})
# Maintain a list of order IDs per customer
# Use a set to avoid duplicates on re-processing
r.sadd(f"customer:{customer_id}:orders", order_id)
HANDLERS = {
"cdc.public.customers": sync_customer,
"cdc.public.orders": sync_order,
}
for message in consumer:
handler = HANDLERS.get(message.topic)
if handler:
handler(message.value)
This consumer does three things:
- Writes customer data as Redis hashes — each field is individually accessible
- Creates secondary indexes — email-to-ID mapping for lookups by email
- Maintains per-customer order sets — so you can quickly get all orders for a customer
For a more detailed look at CDC-to-Redis patterns, see CDC to Redis: Real-Time Cache Synchronization.
Step 3: Build the Agent’s Tool Functions
Now the interesting part. The agent needs functions (tools) it can call to get customer and order data. These functions read from Redis, not PostgreSQL:
# agent_tools.py — Functions the agent can call for real-time context
import json
import redis
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
def get_customer(customer_id: int = None, email: str = None) -> dict | None:
"""Look up a customer by ID or email.
Returns customer data from the CDC-fed Redis cache.
Typical latency: <1ms.
"""
if email and not customer_id:
customer_id = r.get(f"customer:email:{email}")
if not customer_id:
return None
data = r.hgetall(f"customer:{customer_id}")
if not data:
return None
return {
"id": int(data["id"]),
"email": data["email"],
"name": data["name"],
"plan": data["plan"],
"mrr_cents": int(data["mrr_cents"]),
"updated_at": data["updated_at"],
}
def get_customer_orders(customer_id: int, limit: int = 10) -> list[dict]:
"""Get recent orders for a customer.
Returns orders from the CDC-fed Redis cache.
Typical latency: <2ms for 10 orders.
"""
order_ids = r.smembers(f"customer:{customer_id}:orders")
if not order_ids:
return []
orders = []
pipeline = r.pipeline() # Batch Redis commands
for order_id in order_ids:
pipeline.hgetall(f"order:{order_id}")
results = pipeline.execute()
for data in results:
if data:
orders.append({
"id": int(data["id"]),
"status": data["status"],
"total_cents": int(data["total_cents"]),
"items": json.loads(data["items"]),
"created_at": data["created_at"],
})
# Sort by created_at descending, limit results
orders.sort(key=lambda o: o["created_at"], reverse=True)
return orders[:limit]
def get_order(order_id: int) -> dict | None:
"""Look up a specific order by ID."""
data = r.hgetall(f"order:{order_id}")
if not data:
return None
return {
"id": int(data["id"]),
"customer_id": int(data["customer_id"]),
"status": data["status"],
"total_cents": int(data["total_cents"]),
"items": json.loads(data["items"]),
"created_at": data["created_at"],
}
# Tool definitions for the LLM (OpenAI function calling format)
TOOL_DEFINITIONS = [
{
"type": "function",
"function": {
"name": "get_customer",
"description": "Look up a customer by ID or email address",
"parameters": {
"type": "object",
"properties": {
"customer_id": {"type": "integer", "description": "Customer ID"},
"email": {"type": "string", "description": "Customer email"},
},
},
},
},
{
"type": "function",
"function": {
"name": "get_customer_orders",
"description": "Get recent orders for a customer",
"parameters": {
"type": "object",
"properties": {
"customer_id": {"type": "integer", "description": "Customer ID"},
"limit": {"type": "integer", "description": "Max orders to return", "default": 10},
},
"required": ["customer_id"],
},
},
},
{
"type": "function",
"function": {
"name": "get_order",
"description": "Look up a specific order by ID",
"parameters": {
"type": "object",
"properties": {
"order_id": {"type": "integer", "description": "Order ID"},
},
"required": ["order_id"],
},
},
},
]
Step 4: Wire Up the Agent
Here’s a minimal agent loop using OpenAI’s API (the pattern is similar for Anthropic or any function-calling LLM):
# agent.py — Minimal agent with streaming context
import json
from openai import OpenAI
from agent_tools import (
get_customer, get_customer_orders, get_order,
TOOL_DEFINITIONS
)
client = OpenAI()
TOOL_DISPATCH = {
"get_customer": get_customer,
"get_customer_orders": get_customer_orders,
"get_order": get_order,
}
SYSTEM_PROMPT = """You are a customer support agent. You have access to
real-time customer and order data. Use the available tools to look up
information before answering questions. Be specific and reference actual
data from the tools."""
def run_agent(user_message: str, conversation: list = None):
if conversation is None:
conversation = [{"role": "system", "content": SYSTEM_PROMPT}]
conversation.append({"role": "user", "content": user_message})
while True:
response = client.chat.completions.create(
model="gpt-4o",
messages=conversation,
tools=TOOL_DEFINITIONS,
)
message = response.choices[0].message
conversation.append(message)
# If no tool calls, we have the final response
if not message.tool_calls:
return message.content, conversation
# Execute each tool call
for tool_call in message.tool_calls:
fn_name = tool_call.function.name
fn_args = json.loads(tool_call.function.arguments)
result = TOOL_DISPATCH[fn_name](**fn_args)
conversation.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result),
})
# Example usage
if __name__ == "__main__":
reply, _ = run_agent(
"What's the status of the most recent order for customer jane@example.com?"
)
print(reply)
When the user asks about Jane’s order, the agent:
- Calls
get_customer(email="jane@example.com")— Redis lookup, <1ms - Calls
get_customer_orders(customer_id=42)— Redis pipeline, <2ms - Returns a natural language answer with the actual order status
Total data access time: under 3ms. Compare that to the direct DB query approach below.
Comparison: Cache vs. Direct DB Queries
Let’s compare the streaming context approach with the simpler “just query the database” approach:
# Direct DB approach (for comparison)
import psycopg2
def get_customer_direct(email: str) -> dict:
"""Query PostgreSQL directly — adds load to source DB."""
conn = psycopg2.connect("postgresql://...")
cur = conn.cursor()
cur.execute(
"SELECT id, email, name, plan, mrr_cents FROM customers WHERE email = %s",
(email,)
)
row = cur.fetchone()
conn.close()
return dict(zip(["id", "email", "name", "plan", "mrr_cents"], row)) if row else None
Here’s how they stack up:
| Factor | Direct DB Query | CDC-Fed Cache |
|---|---|---|
| Read latency | 50-200ms (network + query) | <1ms (local Redis) |
| Source DB load | +1 query per agent tool call | Zero |
| Data freshness | Real-time (current state) | 1-5 seconds behind |
| Failure mode | Agent fails if DB is down | Agent uses last-known state |
| Connection management | Need connection pooling | Redis connection (simple) |
| Cost at scale | DB compute scales with agent usage | Fixed CDC + cache cost |
| Setup complexity | Low (just a query) | Medium (CDC pipeline + cache sync) |
The direct approach is simpler and gives you perfectly current data. But the cost model is wrong for agents: every agent interaction becomes a database transaction. If you have 1,000 concurrent agents each making 3-5 tool calls per interaction, that’s 3,000-5,000 queries hitting your production database per batch of interactions.
The cache approach has a fixed cost. Whether you have 10 agents or 10,000, the CDC pipeline streams the same volume of changes. The cache handles read scaling independently.
When Direct Queries Still Make Sense
The cache approach isn’t always better. Use direct queries when:
- You need transactional consistency: The agent needs to read-then-write in a transaction (e.g., reserving inventory)
- The data changes constantly and 1-5 second staleness matters: Real-time bidding, live pricing
- You have very low agent volume: <100 interactions/day doesn’t justify the CDC infrastructure
- You need complex ad-hoc queries: The cache only serves the access patterns you built into it
For a broader discussion of these trade-offs, see Real-Time Data for AI Agents.
Making It Production-Ready
The tutorial above covers the core pattern. For production, you’ll want to add:
Health Checks and Freshness Monitoring
def check_cache_freshness() -> dict:
"""Check how fresh the cached data is."""
# Store the last CDC event timestamp
last_sync = r.get("meta:last_sync_timestamp")
if not last_sync:
return {"status": "unknown", "lag_seconds": None}
lag = time.time() - float(last_sync)
return {
"status": "healthy" if lag < 30 else "degraded",
"lag_seconds": round(lag, 1),
}
Fallback to Direct Query
def get_customer_with_fallback(customer_id: int) -> dict | None:
"""Try cache first, fall back to direct query if cache is stale."""
freshness = check_cache_freshness()
if freshness["status"] == "healthy":
return get_customer(customer_id=customer_id)
# Cache is degraded — fall back to direct query
logger.warning(f"Cache lag: {freshness['lag_seconds']}s, using direct query")
return get_customer_direct(customer_id=customer_id)
Cache Rebuild
If the cache gets corrupted or you add new fields, you need to rebuild it from scratch. With Kafka, reset the consumer group offset to the beginning:
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group redis-cache-sync \
--topic cdc.public.customers \
--reset-offsets --to-earliest \
--execute
Then restart the consumer. It replays all events and rebuilds the cache. This is one of the biggest advantages of having Kafka in the pipeline — you never need to do a full database dump to rebuild downstream stores.
Going Beyond Key-Value Lookups
The Redis hash approach works well for point lookups (get customer by ID, get order by ID). For more complex access patterns:
- Search over customer data: Stream CDC events to Elasticsearch for full-text search
- Semantic search for RAG: Stream to a vector database (Pinecone, Weaviate, Qdrant) with embeddings computed on the fly
- Aggregations: Use Streaming Agents to compute running totals, averages, or windows before writing to the cache
The architecture stays the same — CDC as the source of change events, a processing layer in the middle, and a fast read store at the end. Only the read store and the processing logic change.
For more on connecting agents to live data through context layers, see Context Layer for AI Agents.
Ready to give your agents real-time context without loading your production database? Streamkap streams CDC events from PostgreSQL, MySQL, and MongoDB to any downstream store — Redis, vector databases, or data warehouses — with sub-second latency. Start a free trial or learn more about streaming for AI agents.