<--- Back to all resources

Tutorials & How-To

March 12, 2026

10 min read

How to Give Your AI Agent Real-Time Database Access

Step-by-step guide to connecting AI agents to live database data using CDC and MCP. Build agents that act on current state, not stale snapshots.

TL;DR: AI agents making decisions on stale data make wrong decisions. This tutorial shows how to connect agents to live database changes using CDC pipelines and the Model Context Protocol (MCP).

Your AI agent just told a customer their order is “on its way.” The order was canceled two hours ago. The agent didn’t lie — it checked the database. But the database it checked was a snapshot from last night’s batch sync.

This is the stale data problem, and it’s the single biggest gap between demo-quality agents and production-grade ones. An agent is only as good as the data it can see. Feed it yesterday’s state and it will make yesterday’s decisions.

This tutorial walks through a practical setup: streaming live database changes into stores your agents can query, then connecting those agents through the Model Context Protocol (MCP) so they always work with current state.

The stale data problem

Most agent architectures today follow a pattern that worked fine for dashboards but breaks down for autonomous decision-making:

  1. Source database holds the current truth
  2. A batch job copies data to a warehouse every few hours (or overnight)
  3. The agent queries the warehouse
  4. The agent acts on what it finds

The gap between steps 1 and 3 is where things go wrong. During that gap, orders get canceled, inventory changes, prices update, customers churn, and fraud happens. The agent has no idea.

What stale data costs in practice:

  • A support agent promises a refund for an order that was already refunded by a human rep 30 minutes ago — doubling the payout
  • A fraud detection agent flags a transaction as suspicious based on account balances that changed an hour ago
  • An operations agent spins up extra infrastructure because it’s reading load metrics from a stale cache, not the current state

The fix isn’t better models or smarter prompts. It’s fresher data.

Architecture overview

The target architecture streams every relevant database change into a store the agent can query with sub-second latency. Here’s the high-level flow:

┌──────────────┐     ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│  Source DB   │────▶│  Change Data │────▶│   Stream     │────▶│ Agent-Ready  │
│  (Postgres,  │     │  Capture     │     │  Processing  │     │   Store      │
│   MySQL,     │     │  (Log-based) │     │              │     │ (Redis, ES,  │
│   Mongo)     │     │              │     │              │     │  Pinecone)   │
└──────────────┘     └──────────────┘     └──────────────┘     └──────────────┘


                                                               ┌──────────────┐
                                                               │   AI Agent   │
                                                               │  (via MCP)   │
                                                               └──────────────┘

Each component’s role:

  • Source database — Where your application writes. Postgres, MySQL, MongoDB, DynamoDB — wherever your production data lives.
  • Change data capture — Reads the database’s transaction log to detect every insert, update, and delete as it happens. No polling. No queries against your production DB.
  • Stream processing — Filters, transforms, and routes events. Drop columns with PII. Rename fields. Route order events to one store and inventory events to another.
  • Agent-ready store — A low-latency data store optimized for the queries your agent actually makes. Redis for key-value lookups. Elasticsearch for search. A vector database for semantic retrieval.

The critical property: end-to-end latency under 250 milliseconds. A database row changes, and within a quarter-second, the agent can see it.

Step 1: Set up the source connector

The first step is connecting to your source database’s change stream. Streamkap supports log-based capture for Postgres, MySQL, MongoDB, DynamoDB, and SQL Server out of the box.

For this tutorial, we’ll use Postgres as the source.

Prerequisites:

  • A Postgres instance with logical replication enabled (wal_level = logical)
  • A dedicated replication user with appropriate permissions
  • Network access from Streamkap to your database (direct, SSH tunnel, or VPC peering)

In the Streamkap UI:

  1. Navigate to Sources and select PostgreSQL
  2. Enter your connection details (host, port, database, credentials)
  3. Select the tables you want to stream — pick the ones your agent needs
  4. Configure the replication slot name (e.g., streamkap_agent_slot)
  5. Save and start the connector

Within seconds, Streamkap starts reading the Postgres write-ahead log. Every insert, update, and delete on your selected tables becomes a streaming event.

{
  "op": "u",
  "table": "orders",
  "before": {
    "order_id": "ord_8834",
    "status": "processing",
    "updated_at": "2026-03-12T10:30:00Z"
  },
  "after": {
    "order_id": "ord_8834",
    "status": "canceled",
    "updated_at": "2026-03-12T10:45:12Z"
  }
}

That event fires the moment the UPDATE statement commits. No waiting for a batch window.

Step 2: Configure the destination

Your agent needs a store it can query fast. The right choice depends on what your agent does:

Agent typeBest destinationWhy
Support agentRedisKey-value lookups by customer ID or order ID in <1ms
Search agentElasticsearchFull-text search across product catalogs, knowledge bases
RAG agentPinecone / WeaviateVector similarity search on embeddings
Analytics agentClickHouse / BigQueryAggregation queries on event streams

Setting up a Redis destination in Streamkap:

  1. Navigate to Destinations and select Redis
  2. Enter your Redis connection details
  3. Choose a key format — for agent lookups, {table}:{primary_key} works well (e.g., orders:ord_8834)
  4. Map your source tables to Redis key patterns
  5. Save and start the connector

Now every database change flows through to Redis automatically. Your agent can look up any order, customer, or inventory record by key and get the current state — not a stale copy.

Step 3: Add transforms for agent-friendly data

Raw database rows aren’t always what your agent needs. You might want to:

  • Drop sensitive columns — Strip PII fields like SSN or email before they reach the agent store
  • Flatten nested JSON — Convert Postgres JSONB into flat key-value pairs
  • Add computed fields — Calculate days_since_last_order or account_risk_score in-stream
  • Filter events — Only send orders with status changes, not every column update

Streamkap’s built-in transforms handle this in the pipeline — no external stream processing cluster required.

Example transform configuration:

transforms:
  - type: filter
    condition: "table == 'orders' AND after.status != before.status"
  - type: drop_columns
    columns: ["customer_email", "customer_phone", "shipping_address"]
  - type: add_field
    name: "event_type"
    value: "order_status_change"

The agent now sees clean, filtered, purpose-built data. No raw database dumps. No PII exposure.

Step 4: Connect your agent via MCP

The Model Context Protocol (MCP) gives AI agents a standard way to discover and query external data sources. Instead of writing custom API integrations for every data store, your agent connects to an MCP server that exposes the data as tools and resources.

Streamkap ships an MCP server that lets agents:

  • Query pipeline status — Is the pipeline running? When was the last event processed?
  • Check data freshness — How old is the data in the agent store? Is it within the latency SLA?
  • Inspect specific records — Pull the current state of a record from the destination store

Connecting the MCP server to your agent:

{
  "mcpServers": {
    "streamkap": {
      "command": "npx",
      "args": ["-y", "@streamkap/mcp-server"],
      "env": {
        "STREAMKAP_API_KEY": "sk_live_your_api_key",
        "STREAMKAP_WORKSPACE": "your-workspace-id"
      }
    }
  }
}

Once connected, your agent can call MCP tools like:

streamkap.get_pipeline_status(pipeline_id="pipe_orders_redis")
→ { "status": "running", "last_event": "2026-03-12T10:45:12Z", "lag_ms": 142 }

streamkap.check_data_freshness(destination="redis", key="orders:ord_8834")
→ { "last_updated": "2026-03-12T10:45:12Z", "age_seconds": 3 }

The agent doesn’t just get data — it gets confidence in that data. It knows the pipeline is running, it knows how fresh the record is, and it can decide whether to act or wait.

Step 5: Build the agent logic

With fresh data flowing and MCP connected, your agent can make decisions grounded in current state. Here’s a pattern for a support agent:

async def handle_customer_query(agent, customer_id: str, question: str):
    # Check pipeline health first
    pipeline = await agent.mcp.call(
        "streamkap.get_pipeline_status",
        pipeline_id="pipe_orders_redis"
    )

    if pipeline["lag_ms"] > 5000:
        return "Let me check with a team member — our systems are "
               "updating. I'll get back to you shortly."

    # Fetch current order state from Redis (via agent's data tools)
    orders = await agent.mcp.call(
        "streamkap.query_destination",
        destination="redis",
        pattern=f"orders:*:customer:{customer_id}"
    )

    # Agent now has current order data to reason about
    context = {
        "customer_orders": orders,
        "data_freshness": pipeline["lag_ms"],
        "question": question
    }

    return await agent.reason(context)

The key details:

  1. Check pipeline health before acting. If the pipeline is lagging, the agent knows its data might be stale and can say so honestly instead of acting on bad information.
  2. Query the destination store directly. The agent reads from Redis, not the source database. No load on your production DB.
  3. Include freshness in the context. The agent knows the data is 142ms old, not 6 hours old. It can act with confidence.

Example: Three agents, one pipeline

A single Streamkap pipeline can feed multiple agents. Here’s how one setup powers three different use cases:

Customer support agent

Source: Postgres (orders, customers, products tables) Destination: Redis Query pattern: Look up orders by customer ID, check current status, verify refund eligibility

The agent answers “Where’s my order?” with the status from 200ms ago, not 8 hours ago. It sees that order ord_8834 was canceled at 10:45am and doesn’t promise delivery.

Fraud detection agent

Source: Postgres (transactions, accounts, risk_signals tables) Destination: Elasticsearch Query pattern: Search recent transactions by account, check velocity patterns, cross-reference risk signals

The agent flags a burst of transactions against an account because it sees each transaction within 250ms of it hitting the database. Batch-based fraud detection would miss the pattern until the next sync window.

Operations agent

Source: MongoDB (service_metrics, deployments, incidents collections) Destination: Redis + ClickHouse Query pattern: Current service health from Redis, trend analysis from ClickHouse

The agent notices request latency climbing on the payments service, checks the deployment history, and finds a new release rolled out 4 minutes ago. It opens a rollback ticket before any customer reports a problem.

Monitoring and reliability

Streaming pipelines need monitoring just like any production system. Streamkap tracks:

  • End-to-end latency — Time from database commit to destination write, measured per event
  • Pipeline lag — How far behind the pipeline is from the source database’s current position
  • Error rates — Failed transforms, destination write errors, schema mismatches
  • Throughput — Events per second, useful for capacity planning

Set up alerts on pipeline lag. If your agent SLA requires data fresher than 1 second, alert when lag exceeds 500ms so you have time to investigate before breaching the SLA.

Through MCP, agents can check these metrics themselves. A well-built agent monitors its own data supply and degrades gracefully when freshness drops below acceptable thresholds.

What this replaces

Before this architecture, teams typically built one of these:

Direct database queries from the agent — Works for demos. Falls apart at scale. Every agent query hits your production database. Slow queries from the agent cause latency for your users. No way to filter or transform data before the agent sees it.

Batch ETL to a warehouse — The classic approach. Data lands in BigQuery or Snowflake every few hours. The agent queries the warehouse. Works for analytics agents. Unacceptable for anything time-sensitive.

Custom webhook handlers — Build a webhook for every table change, write a handler that updates a cache, maintain the whole thing yourself. Works but costs engineering months and breaks when schemas change.

The streaming pipeline approach gives you the freshness of direct queries, the separation of batch ETL, and none of the maintenance burden of custom webhooks.

Getting started

Here’s the shortest path from zero to a working agent-data pipeline:

  1. Sign up at app.streamkap.com — free trial, no credit card
  2. Connect your source — Postgres, MySQL, MongoDB, or DynamoDB
  3. Pick a destination — Redis for key-value agents, Elasticsearch for search agents
  4. Start the pipeline — Data flows within minutes
  5. Connect the MCP server — Give your agent access to the pipeline
  6. Build your agent — Query fresh data through MCP tools

The whole setup takes under 30 minutes. Most of that time is configuring database permissions.

For more on how Streamkap fits into agent architectures, see the Streamkap for AI agents page. For broader patterns around feeding ML and AI systems with fresh data, check out our AI/ML pipeline solutions.


Ready to give your agents real-time context? Streamkap streams database changes to agent-accessible stores in under 250ms — with a built-in MCP server for direct agent integration. Start a free trial or explore Streamkap for agents.