<--- Back to all resources

Tutorials & How-To

March 10, 2026

14 min read

How to Build a Real-Time AI Agent with Streaming Data

A practical tutorial for building an AI agent that acts on real-time database changes. From CDC setup to agent framework integration, step by step.

This tutorial walks through building an AI agent that has real-time access to database data. Not cached data, not yesterday’s data, but data that is current to within seconds.

We will build a customer support agent that knows the current state of orders, accounts, and support history. When a customer asks “where is my order?”, the agent does not look at a stale copy from last night’s ETL job. It queries data that was streamed from the production database moments ago.

The architecture: PostgreSQL (source) to Streamkap (CDC) to Redis (cache) to LangChain (agent framework). We will also cover adding MCP for direct agent-to-data access.

Prerequisites

Before starting, you will need:

  • A PostgreSQL database with customer and order data (version 12 or later, with logical replication enabled)
  • A Streamkap account (free trial available)
  • A Redis instance (local Docker or managed, like AWS ElastiCache or Upstash)
  • Python 3.10 or later
  • An API key for your preferred LLM (OpenAI, Anthropic, or similar)

Step 1: Prepare Your Source Database

Your PostgreSQL database needs logical replication enabled for CDC to work. This is what allows Streamkap to read the write-ahead log (WAL) and capture changes without impacting query performance.

Check if logical replication is enabled:

SHOW wal_level;

If the result is not logical, update your PostgreSQL configuration:

ALTER SYSTEM SET wal_level = 'logical';

Then restart PostgreSQL. On managed databases (RDS, Cloud SQL, Aurora), you set this through the cloud provider’s parameter group or configuration settings.

Next, make sure the tables you want to stream have primary keys. CDC needs primary keys to identify which row changed:

-- Example schema for our tutorial
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) NOT NULL,
    name VARCHAR(255),
    plan VARCHAR(50) DEFAULT 'free',
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    status VARCHAR(50) DEFAULT 'pending',
    total_amount DECIMAL(10,2),
    items JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE support_tickets (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    subject VARCHAR(500),
    status VARCHAR(50) DEFAULT 'open',
    priority VARCHAR(20) DEFAULT 'normal',
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

Create a dedicated replication user (do not use your application’s database user for CDC):

CREATE USER streamkap_cdc WITH REPLICATION LOGIN PASSWORD 'your_secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO streamkap_cdc;
GRANT USAGE ON SCHEMA public TO streamkap_cdc;

Step 2: Set Up CDC with Streamkap

Log into Streamkap and create a new source connector:

  1. Select PostgreSQL as the source type
  2. Enter your database connection details (host, port, database name, the replication user credentials)
  3. Select the tables to stream: customers, orders, support_tickets
  4. Choose your snapshot mode (initial snapshot captures existing data, then switches to streaming changes)

Streamkap will validate the connection, create a replication slot, and start capturing changes. Within a few seconds, you should see data flowing in the Streamkap dashboard.

Next, create a destination connector. For this tutorial, we will use Redis:

  1. Select Redis as the destination type
  2. Enter your Redis connection details
  3. Configure key mapping (for example, customers:{id} as the key pattern for customer records)

Streamkap will start streaming data from PostgreSQL to Redis. Each change in your source tables appears in Redis within 1 to 2 seconds.

You can verify data is flowing by checking Redis:

redis-cli
> KEYS customers:*
> GET customers:1

Step 3: Build the Agent Data Access Layer

Now we need Python code that lets our agent query the real-time data in Redis. Create a data access module:

# data_access.py
import json
import redis

class CustomerDataStore:
    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=True)

    def get_customer(self, customer_id: int) -> dict | None:
        """Get current customer data."""
        data = self.redis.get(f"customers:{customer_id}")
        if data:
            return json.loads(data)
        return None

    def get_customer_by_email(self, email: str) -> dict | None:
        """Look up a customer by email."""
        # Assumes you have a secondary index or scan
        # For production, create a Redis secondary index
        for key in self.redis.scan_iter("customers:*"):
            data = json.loads(self.redis.get(key))
            if data.get("email") == email:
                return data
        return None

    def get_orders(self, customer_id: int) -> list[dict]:
        """Get all orders for a customer, sorted by most recent."""
        orders = []
        for key in self.redis.scan_iter("orders:*"):
            data = json.loads(self.redis.get(key))
            if data.get("customer_id") == customer_id:
                orders.append(data)
        return sorted(orders, key=lambda x: x.get("created_at", ""), reverse=True)

    def get_recent_order(self, customer_id: int) -> dict | None:
        """Get the most recent order for a customer."""
        orders = self.get_orders(customer_id)
        return orders[0] if orders else None

    def get_support_tickets(self, customer_id: int) -> list[dict]:
        """Get open support tickets for a customer."""
        tickets = []
        for key in self.redis.scan_iter("support_tickets:*"):
            data = json.loads(self.redis.get(key))
            if data.get("customer_id") == customer_id and data.get("status") == "open":
                tickets.append(data)
        return tickets

For production use, replace the scan_iter patterns with Redis secondary indexes (RediSearch) or use a different data store like Elasticsearch that supports efficient queries across multiple fields.

Step 4: Build the Agent with LangChain

Install the required packages:

pip install langchain langchain-openai redis

Now create the agent that uses our data access layer as tools:

# agent.py
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from data_access import CustomerDataStore

# Initialize the data store
store = CustomerDataStore()

# Define tools the agent can use
@tool
def lookup_customer(customer_id: int) -> str:
    """Look up a customer by their ID. Returns customer details including
    name, email, plan, and account creation date."""
    customer = store.get_customer(customer_id)
    if customer:
        return json.dumps(customer, indent=2)
    return "Customer not found."

@tool
def lookup_customer_by_email(email: str) -> str:
    """Look up a customer by their email address."""
    customer = store.get_customer_by_email(email)
    if customer:
        return json.dumps(customer, indent=2)
    return "No customer found with that email."

@tool
def get_order_status(customer_id: int) -> str:
    """Get the most recent order for a customer, including its
    current status and items."""
    order = store.get_recent_order(customer_id)
    if order:
        return json.dumps(order, indent=2)
    return "No orders found for this customer."

@tool
def get_all_orders(customer_id: int) -> str:
    """Get all orders for a customer, sorted by most recent first."""
    orders = store.get_orders(customer_id)
    if orders:
        return json.dumps(orders, indent=2)
    return "No orders found for this customer."

@tool
def get_open_tickets(customer_id: int) -> str:
    """Get all open support tickets for a customer."""
    tickets = store.get_support_tickets(customer_id)
    if tickets:
        return json.dumps(tickets, indent=2)
    return "No open tickets for this customer."

# Create the agent
import json

llm = ChatOpenAI(model="gpt-4o", temperature=0)

prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a customer support agent for an e-commerce company.
You have access to real-time customer data, order information, and support tickets.

When helping a customer:
1. Look up their account first to understand their context
2. Check their recent orders if the question is about an order
3. Check open tickets to avoid duplicate issues
4. Be specific and reference actual data (order IDs, statuses, dates)
5. If you need to escalate, explain why clearly

The data you access is real-time, streamed from the production database.
You can trust that it reflects the current state of the system."""),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}")
])

tools = [
    lookup_customer,
    lookup_customer_by_email,
    get_order_status,
    get_all_orders,
    get_open_tickets,
]

agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# Run the agent
if __name__ == "__main__":
    result = executor.invoke({
        "input": "I'm customer #42 and I want to know where my order is."
    })
    print(result["output"])

Run it:

python agent.py

The agent will call lookup_customer(42) to get the customer’s context, then get_order_status(42) to find the latest order, and respond with the current status, all based on data that was streamed from your production database in the last few seconds.

Step 5: Add MCP for Direct Agent Access

The Model Context Protocol (MCP) provides a standardized way for agents to access data sources. Instead of writing custom tools for each data access pattern, MCP defines a protocol that any agent framework can use.

Streamkap provides an MCP server that exposes your streaming data directly. To set it up:

  1. Enable MCP in your Streamkap dashboard for the relevant connectors
  2. Note the MCP server endpoint URL

Then configure your agent to use the MCP server. With LangChain’s MCP integration:

# agent_with_mcp.py
from langchain_mcp import MCPToolkit

# Connect to Streamkap's MCP server
toolkit = MCPToolkit(server_url="https://mcp.streamkap.com/your-endpoint")
tools = toolkit.get_tools()

# The MCP server exposes tools for querying your streamed data
# You get tools like:
# - query_customers: Search customers by any field
# - query_orders: Search orders with filters
# - get_data_freshness: Check how recent the data is

# Use these tools with your agent just like the custom tools above
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

The advantage of MCP over custom tools is standardization. Your agent does not need custom code for each data source. The MCP server describes its capabilities, and the agent uses them dynamically.

Step 6: Monitor Data Freshness and Agent Performance

A real-time agent is only as good as its monitoring. Set up three types of checks:

Data Freshness Monitoring

Track the lag between when a change happens in PostgreSQL and when it is available in Redis:

# monitoring.py
import time
from data_access import CustomerDataStore

def check_data_freshness(store: CustomerDataStore) -> float:
    """Measure the time between a write and its availability.
    Returns lag in seconds."""
    test_key = "freshness_check"

    # Write a timestamp to PostgreSQL (through your application)
    write_time = time.time()
    # ... write to PostgreSQL ...

    # Poll Redis for the change
    max_wait = 10  # seconds
    start = time.time()
    while time.time() - start < max_wait:
        data = store.redis.get(test_key)
        if data and float(data) >= write_time:
            return time.time() - write_time
        time.sleep(0.1)

    return -1  # Timeout, data did not arrive

Streamkap also provides built-in lag monitoring in its dashboard, so you do not need to build this from scratch. Use it to set alerts when lag exceeds your SLA (for example, alert if data is more than 5 seconds old).

Agent Decision Quality

Log every agent interaction and periodically review:

import logging

logger = logging.getLogger("agent_decisions")

def log_agent_decision(customer_id, question, response, tools_used, data_age_seconds):
    logger.info(json.dumps({
        "customer_id": customer_id,
        "question": question,
        "response": response,
        "tools_used": tools_used,
        "data_age_seconds": data_age_seconds,
        "timestamp": time.time()
    }))

Review these logs weekly to identify patterns: Are agents giving wrong answers? Are they using stale data? Are there questions they cannot answer because they lack access to certain data?

System Health

Monitor standard infrastructure metrics:

  • Redis memory usage and connection count
  • CDC connector status (running, paused, failed)
  • Agent response time (p50, p95, p99)
  • Tool call error rates
  • LLM API latency and error rates

What to Add for Production

This tutorial gives you a working prototype. For production, add these:

Authentication and authorization. The agent should verify that a customer is who they claim to be before revealing account data. Integrate with your auth system.

Rate limiting. Prevent runaway agents from overwhelming your data store with queries. Set per-agent and per-customer query limits.

Error handling. What happens when Redis is down? When the CDC stream falls behind? When the LLM API returns an error? Build fallback behavior for each failure mode.

Data governance. Not all data should be accessible to all agents. Use field-level access controls to prevent agents from exposing sensitive information (payment details, internal notes).

Caching with TTLs. For data that does not change frequently (customer plan, account creation date), add an in-memory cache in the agent process to reduce Redis queries. Use short TTLs (30 to 60 seconds) so the cache stays fresh.

Scaling. A single Redis instance handles thousands of queries per second. But if you have hundreds of agents, consider Redis Cluster or a purpose-built read replica architecture. Streamkap can stream to multiple destinations simultaneously if you need to shard your agent data access.

Observability. Add distributed tracing so you can follow a customer request from the agent through the tools, data store, and back. This is essential for debugging production issues.

The Result

When everything is connected, you have an AI agent that:

  1. Receives a customer question
  2. Queries data that was captured from your production database within the last 1 to 3 seconds
  3. Makes decisions based on current information
  4. Responds with specific, accurate details (real order numbers, real statuses, real dates)

No batch jobs. No stale caches. No manual data refreshes. The agent always has access to what is true right now.

That is the difference between a demo agent and a production agent. Demo agents can say “let me check on that.” Production agents can say “your order #4821 shipped yesterday via FedEx and is scheduled for delivery on Thursday.”

The streaming infrastructure makes that possible. Set it up once, and every agent you build benefits from real-time data access.


Ready to give your AI agents real-time data access? Streamkap streams database changes to Redis, Kafka, and other destinations with sub-second latency, so your agents always work with current information. Start a free trial or learn more about AI/ML pipelines.