<--- Back to all resources

Tutorials & How-To

May 22, 2025

15 min read

Build an AI Agent with Real-Time Streaming Context

Step-by-step tutorial: set up CDC from PostgreSQL, stream to Redis, and build a Python agent that reads fresh data instead of querying the source DB.

Most AI agent tutorials show the agent calling a function that queries a database. It works, but it has a cost: every agent interaction adds load to your production database, queries take 50-200ms, and you’re coupling your agent’s performance to your database’s availability.

There’s a better pattern for read-heavy agent workloads: use CDC (change data capture) to stream database changes into a fast cache, then have the agent read from the cache instead. The agent gets sub-millisecond lookups, the source database sees zero additional load, and the data stays fresh within seconds.

This tutorial walks through the full flow: PostgreSQL → CDC → Redis → Python agent.

What We’re Building

┌──────────────┐     CDC      ┌───────┐    Stream    ┌───────┐
│  PostgreSQL  │ ──────────── │ Kafka │ ──────────── │ Redis │
│  (source DB) │   events     │       │   consumer   │(cache)│
└──────────────┘              └───────┘              └───────┘

                                                    Sub-ms reads

                                                   ┌───────────┐
                                                   │  Python   │
                                                   │  Agent    │
                                                   └───────────┘

The agent is a Python function-calling setup (works with OpenAI, Anthropic, or any LLM with tool use) that has access to customer and order data through Redis lookups instead of PostgreSQL queries.

Step 1: Set Up the Source Database

Start with a PostgreSQL database that has the tables your agent needs. For this tutorial, we’ll use a simple e-commerce schema:

-- Enable logical replication (required for CDC)
ALTER SYSTEM SET wal_level = 'logical';
-- Restart PostgreSQL after this change

CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    plan VARCHAR(50) DEFAULT 'free',
    mrr_cents INTEGER DEFAULT 0,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    status VARCHAR(50) DEFAULT 'pending',
    total_cents INTEGER NOT NULL,
    items JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- Create publication for CDC
CREATE PUBLICATION agent_context FOR TABLE customers, orders;

The key prerequisite is wal_level = 'logical', which tells PostgreSQL to include enough detail in its write-ahead log for CDC to reconstruct row-level changes. For a deeper dive, see PostgreSQL Change Data Capture.

Step 2: Stream Changes to Redis

In production, you’d use a managed CDC platform to handle the PostgreSQL → Kafka → Redis pipeline. For this tutorial, here’s a simplified Python consumer that reads from a Kafka topic (populated by CDC) and writes to Redis:

# cache_sync.py — Kafka consumer that keeps Redis in sync with PostgreSQL

import json
import redis
from kafka import KafkaConsumer

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

consumer = KafkaConsumer(
    "cdc.public.customers",
    "cdc.public.orders",
    bootstrap_servers="localhost:9092",
    group_id="redis-cache-sync",
    auto_offset_reset="earliest",
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)

def sync_customer(event: dict):
    """Write customer data to Redis as a hash."""
    op = event.get("op")  # c=create, u=update, d=delete
    after = event.get("after")  # New row state

    if op == "d":
        customer_id = event["before"]["id"]
        r.delete(f"customer:{customer_id}")
        r.delete(f"customer:email:{event['before']['email']}")
        return

    customer_id = after["id"]

    # Store as hash for field-level access
    r.hset(f"customer:{customer_id}", mapping={
        "id": customer_id,
        "email": after["email"],
        "name": after["name"],
        "plan": after["plan"],
        "mrr_cents": after["mrr_cents"],
        "updated_at": after["updated_at"],
    })

    # Secondary index: lookup by email
    r.set(f"customer:email:{after['email']}", customer_id)


def sync_order(event: dict):
    """Write order data and maintain per-customer order lists."""
    op = event.get("op")
    after = event.get("after")

    if op == "d":
        order_id = event["before"]["id"]
        customer_id = event["before"]["customer_id"]
        r.delete(f"order:{order_id}")
        r.lrem(f"customer:{customer_id}:orders", 0, order_id)
        return

    order_id = after["id"]
    customer_id = after["customer_id"]

    r.hset(f"order:{order_id}", mapping={
        "id": order_id,
        "customer_id": customer_id,
        "status": after["status"],
        "total_cents": after["total_cents"],
        "items": json.dumps(after["items"]),
        "created_at": after["created_at"],
    })

    # Maintain a list of order IDs per customer
    # Use a set to avoid duplicates on re-processing
    r.sadd(f"customer:{customer_id}:orders", order_id)


HANDLERS = {
    "cdc.public.customers": sync_customer,
    "cdc.public.orders": sync_order,
}

for message in consumer:
    handler = HANDLERS.get(message.topic)
    if handler:
        handler(message.value)

This consumer does three things:

  1. Writes customer data as Redis hashes — each field is individually accessible
  2. Creates secondary indexes — email-to-ID mapping for lookups by email
  3. Maintains per-customer order sets — so you can quickly get all orders for a customer

For a more detailed look at CDC-to-Redis patterns, see CDC to Redis: Real-Time Cache Synchronization.

Step 3: Build the Agent’s Tool Functions

Now the interesting part. The agent needs functions (tools) it can call to get customer and order data. These functions read from Redis, not PostgreSQL:

# agent_tools.py — Functions the agent can call for real-time context

import json
import redis

r = redis.Redis(host="localhost", port=6379, decode_responses=True)


def get_customer(customer_id: int = None, email: str = None) -> dict | None:
    """Look up a customer by ID or email.

    Returns customer data from the CDC-fed Redis cache.
    Typical latency: <1ms.
    """
    if email and not customer_id:
        customer_id = r.get(f"customer:email:{email}")
        if not customer_id:
            return None

    data = r.hgetall(f"customer:{customer_id}")
    if not data:
        return None

    return {
        "id": int(data["id"]),
        "email": data["email"],
        "name": data["name"],
        "plan": data["plan"],
        "mrr_cents": int(data["mrr_cents"]),
        "updated_at": data["updated_at"],
    }


def get_customer_orders(customer_id: int, limit: int = 10) -> list[dict]:
    """Get recent orders for a customer.

    Returns orders from the CDC-fed Redis cache.
    Typical latency: <2ms for 10 orders.
    """
    order_ids = r.smembers(f"customer:{customer_id}:orders")
    if not order_ids:
        return []

    orders = []
    pipeline = r.pipeline()  # Batch Redis commands
    for order_id in order_ids:
        pipeline.hgetall(f"order:{order_id}")

    results = pipeline.execute()
    for data in results:
        if data:
            orders.append({
                "id": int(data["id"]),
                "status": data["status"],
                "total_cents": int(data["total_cents"]),
                "items": json.loads(data["items"]),
                "created_at": data["created_at"],
            })

    # Sort by created_at descending, limit results
    orders.sort(key=lambda o: o["created_at"], reverse=True)
    return orders[:limit]


def get_order(order_id: int) -> dict | None:
    """Look up a specific order by ID."""
    data = r.hgetall(f"order:{order_id}")
    if not data:
        return None

    return {
        "id": int(data["id"]),
        "customer_id": int(data["customer_id"]),
        "status": data["status"],
        "total_cents": int(data["total_cents"]),
        "items": json.loads(data["items"]),
        "created_at": data["created_at"],
    }


# Tool definitions for the LLM (OpenAI function calling format)
TOOL_DEFINITIONS = [
    {
        "type": "function",
        "function": {
            "name": "get_customer",
            "description": "Look up a customer by ID or email address",
            "parameters": {
                "type": "object",
                "properties": {
                    "customer_id": {"type": "integer", "description": "Customer ID"},
                    "email": {"type": "string", "description": "Customer email"},
                },
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_customer_orders",
            "description": "Get recent orders for a customer",
            "parameters": {
                "type": "object",
                "properties": {
                    "customer_id": {"type": "integer", "description": "Customer ID"},
                    "limit": {"type": "integer", "description": "Max orders to return", "default": 10},
                },
                "required": ["customer_id"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_order",
            "description": "Look up a specific order by ID",
            "parameters": {
                "type": "object",
                "properties": {
                    "order_id": {"type": "integer", "description": "Order ID"},
                },
                "required": ["order_id"],
            },
        },
    },
]

Step 4: Wire Up the Agent

Here’s a minimal agent loop using OpenAI’s API (the pattern is similar for Anthropic or any function-calling LLM):

# agent.py — Minimal agent with streaming context

import json
from openai import OpenAI
from agent_tools import (
    get_customer, get_customer_orders, get_order,
    TOOL_DEFINITIONS
)

client = OpenAI()

TOOL_DISPATCH = {
    "get_customer": get_customer,
    "get_customer_orders": get_customer_orders,
    "get_order": get_order,
}

SYSTEM_PROMPT = """You are a customer support agent. You have access to
real-time customer and order data. Use the available tools to look up
information before answering questions. Be specific and reference actual
data from the tools."""


def run_agent(user_message: str, conversation: list = None):
    if conversation is None:
        conversation = [{"role": "system", "content": SYSTEM_PROMPT}]

    conversation.append({"role": "user", "content": user_message})

    while True:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=conversation,
            tools=TOOL_DEFINITIONS,
        )

        message = response.choices[0].message
        conversation.append(message)

        # If no tool calls, we have the final response
        if not message.tool_calls:
            return message.content, conversation

        # Execute each tool call
        for tool_call in message.tool_calls:
            fn_name = tool_call.function.name
            fn_args = json.loads(tool_call.function.arguments)

            result = TOOL_DISPATCH[fn_name](**fn_args)

            conversation.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": json.dumps(result),
            })


# Example usage
if __name__ == "__main__":
    reply, _ = run_agent(
        "What's the status of the most recent order for customer jane@example.com?"
    )
    print(reply)

When the user asks about Jane’s order, the agent:

  1. Calls get_customer(email="jane@example.com") — Redis lookup, <1ms
  2. Calls get_customer_orders(customer_id=42) — Redis pipeline, <2ms
  3. Returns a natural language answer with the actual order status

Total data access time: under 3ms. Compare that to the direct DB query approach below.

Comparison: Cache vs. Direct DB Queries

Let’s compare the streaming context approach with the simpler “just query the database” approach:

# Direct DB approach (for comparison)
import psycopg2

def get_customer_direct(email: str) -> dict:
    """Query PostgreSQL directly — adds load to source DB."""
    conn = psycopg2.connect("postgresql://...")
    cur = conn.cursor()
    cur.execute(
        "SELECT id, email, name, plan, mrr_cents FROM customers WHERE email = %s",
        (email,)
    )
    row = cur.fetchone()
    conn.close()
    return dict(zip(["id", "email", "name", "plan", "mrr_cents"], row)) if row else None

Here’s how they stack up:

FactorDirect DB QueryCDC-Fed Cache
Read latency50-200ms (network + query)<1ms (local Redis)
Source DB load+1 query per agent tool callZero
Data freshnessReal-time (current state)1-5 seconds behind
Failure modeAgent fails if DB is downAgent uses last-known state
Connection managementNeed connection poolingRedis connection (simple)
Cost at scaleDB compute scales with agent usageFixed CDC + cache cost
Setup complexityLow (just a query)Medium (CDC pipeline + cache sync)

The direct approach is simpler and gives you perfectly current data. But the cost model is wrong for agents: every agent interaction becomes a database transaction. If you have 1,000 concurrent agents each making 3-5 tool calls per interaction, that’s 3,000-5,000 queries hitting your production database per batch of interactions.

The cache approach has a fixed cost. Whether you have 10 agents or 10,000, the CDC pipeline streams the same volume of changes. The cache handles read scaling independently.

When Direct Queries Still Make Sense

The cache approach isn’t always better. Use direct queries when:

  • You need transactional consistency: The agent needs to read-then-write in a transaction (e.g., reserving inventory)
  • The data changes constantly and 1-5 second staleness matters: Real-time bidding, live pricing
  • You have very low agent volume: <100 interactions/day doesn’t justify the CDC infrastructure
  • You need complex ad-hoc queries: The cache only serves the access patterns you built into it

For a broader discussion of these trade-offs, see Real-Time Data for AI Agents.

Making It Production-Ready

The tutorial above covers the core pattern. For production, you’ll want to add:

Health Checks and Freshness Monitoring

def check_cache_freshness() -> dict:
    """Check how fresh the cached data is."""
    # Store the last CDC event timestamp
    last_sync = r.get("meta:last_sync_timestamp")
    if not last_sync:
        return {"status": "unknown", "lag_seconds": None}

    lag = time.time() - float(last_sync)
    return {
        "status": "healthy" if lag < 30 else "degraded",
        "lag_seconds": round(lag, 1),
    }

Fallback to Direct Query

def get_customer_with_fallback(customer_id: int) -> dict | None:
    """Try cache first, fall back to direct query if cache is stale."""
    freshness = check_cache_freshness()

    if freshness["status"] == "healthy":
        return get_customer(customer_id=customer_id)

    # Cache is degraded — fall back to direct query
    logger.warning(f"Cache lag: {freshness['lag_seconds']}s, using direct query")
    return get_customer_direct(customer_id=customer_id)

Cache Rebuild

If the cache gets corrupted or you add new fields, you need to rebuild it from scratch. With Kafka, reset the consumer group offset to the beginning:

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group redis-cache-sync \
  --topic cdc.public.customers \
  --reset-offsets --to-earliest \
  --execute

Then restart the consumer. It replays all events and rebuilds the cache. This is one of the biggest advantages of having Kafka in the pipeline — you never need to do a full database dump to rebuild downstream stores.

Going Beyond Key-Value Lookups

The Redis hash approach works well for point lookups (get customer by ID, get order by ID). For more complex access patterns:

  • Search over customer data: Stream CDC events to Elasticsearch for full-text search
  • Semantic search for RAG: Stream to a vector database (Pinecone, Weaviate, Qdrant) with embeddings computed on the fly
  • Aggregations: Use Streaming Agents to compute running totals, averages, or windows before writing to the cache

The architecture stays the same — CDC as the source of change events, a processing layer in the middle, and a fast read store at the end. Only the read store and the processing logic change.

For more on connecting agents to live data through context layers, see Context Layer for AI Agents.


Ready to give your agents real-time context without loading your production database? Streamkap streams CDC events from PostgreSQL, MySQL, and MongoDB to any downstream store — Redis, vector databases, or data warehouses — with sub-second latency. Start a free trial or learn more about streaming for AI agents.