<--- Back to all resources
How to Give Your AI Agent Real-Time Database Access
Step-by-step guide to connecting AI agents to live database data using CDC and MCP. Build agents that act on current state, not stale snapshots.
Your AI agent just told a customer their order is “on its way.” The order was canceled two hours ago. The agent didn’t lie — it checked the database. But the database it checked was a snapshot from last night’s batch sync.
This is the stale data problem, and it’s the single biggest gap between demo-quality agents and production-grade ones. An agent is only as good as the data it can see. Feed it yesterday’s state and it will make yesterday’s decisions.
This tutorial walks through a practical setup: streaming live database changes into stores your agents can query, then connecting those agents through the Model Context Protocol (MCP) so they always work with current state.
The stale data problem
Most agent architectures today follow a pattern that worked fine for dashboards but breaks down for autonomous decision-making:
- Source database holds the current truth
- A batch job copies data to a warehouse every few hours (or overnight)
- The agent queries the warehouse
- The agent acts on what it finds
The gap between steps 1 and 3 is where things go wrong. During that gap, orders get canceled, inventory changes, prices update, customers churn, and fraud happens. The agent has no idea.
What stale data costs in practice:
- A support agent promises a refund for an order that was already refunded by a human rep 30 minutes ago — doubling the payout
- A fraud detection agent flags a transaction as suspicious based on account balances that changed an hour ago
- An operations agent spins up extra infrastructure because it’s reading load metrics from a stale cache, not the current state
The fix isn’t better models or smarter prompts. It’s fresher data.
Architecture overview
The target architecture streams every relevant database change into a store the agent can query with sub-second latency. Here’s the high-level flow:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Source DB │────▶│ Change Data │────▶│ Stream │────▶│ Agent-Ready │
│ (Postgres, │ │ Capture │ │ Processing │ │ Store │
│ MySQL, │ │ (Log-based) │ │ │ │ (Redis, ES, │
│ Mongo) │ │ │ │ │ │ Pinecone) │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
│
▼
┌──────────────┐
│ AI Agent │
│ (via MCP) │
└──────────────┘
Each component’s role:
- Source database — Where your application writes. Postgres, MySQL, MongoDB, DynamoDB — wherever your production data lives.
- Change data capture — Reads the database’s transaction log to detect every insert, update, and delete as it happens. No polling. No queries against your production DB.
- Stream processing — Filters, transforms, and routes events. Drop columns with PII. Rename fields. Route order events to one store and inventory events to another.
- Agent-ready store — A low-latency data store optimized for the queries your agent actually makes. Redis for key-value lookups. Elasticsearch for search. A vector database for semantic retrieval.
The critical property: end-to-end latency under 250 milliseconds. A database row changes, and within a quarter-second, the agent can see it.
Step 1: Set up the source connector
The first step is connecting to your source database’s change stream. Streamkap supports log-based capture for Postgres, MySQL, MongoDB, DynamoDB, and SQL Server out of the box.
For this tutorial, we’ll use Postgres as the source.
Prerequisites:
- A Postgres instance with logical replication enabled (
wal_level = logical) - A dedicated replication user with appropriate permissions
- Network access from Streamkap to your database (direct, SSH tunnel, or VPC peering)
In the Streamkap UI:
- Navigate to Sources and select PostgreSQL
- Enter your connection details (host, port, database, credentials)
- Select the tables you want to stream — pick the ones your agent needs
- Configure the replication slot name (e.g.,
streamkap_agent_slot) - Save and start the connector
Within seconds, Streamkap starts reading the Postgres write-ahead log. Every insert, update, and delete on your selected tables becomes a streaming event.
{
"op": "u",
"table": "orders",
"before": {
"order_id": "ord_8834",
"status": "processing",
"updated_at": "2026-03-12T10:30:00Z"
},
"after": {
"order_id": "ord_8834",
"status": "canceled",
"updated_at": "2026-03-12T10:45:12Z"
}
}
That event fires the moment the UPDATE statement commits. No waiting for a batch window.
Step 2: Configure the destination
Your agent needs a store it can query fast. The right choice depends on what your agent does:
| Agent type | Best destination | Why |
|---|---|---|
| Support agent | Redis | Key-value lookups by customer ID or order ID in <1ms |
| Search agent | Elasticsearch | Full-text search across product catalogs, knowledge bases |
| RAG agent | Pinecone / Weaviate | Vector similarity search on embeddings |
| Analytics agent | ClickHouse / BigQuery | Aggregation queries on event streams |
Setting up a Redis destination in Streamkap:
- Navigate to Destinations and select Redis
- Enter your Redis connection details
- Choose a key format — for agent lookups,
{table}:{primary_key}works well (e.g.,orders:ord_8834) - Map your source tables to Redis key patterns
- Save and start the connector
Now every database change flows through to Redis automatically. Your agent can look up any order, customer, or inventory record by key and get the current state — not a stale copy.
Step 3: Add transforms for agent-friendly data
Raw database rows aren’t always what your agent needs. You might want to:
- Drop sensitive columns — Strip PII fields like SSN or email before they reach the agent store
- Flatten nested JSON — Convert Postgres JSONB into flat key-value pairs
- Add computed fields — Calculate
days_since_last_orderoraccount_risk_scorein-stream - Filter events — Only send orders with status changes, not every column update
Streamkap’s built-in transforms handle this in the pipeline — no external stream processing cluster required.
Example transform configuration:
transforms:
- type: filter
condition: "table == 'orders' AND after.status != before.status"
- type: drop_columns
columns: ["customer_email", "customer_phone", "shipping_address"]
- type: add_field
name: "event_type"
value: "order_status_change"
The agent now sees clean, filtered, purpose-built data. No raw database dumps. No PII exposure.
Step 4: Connect your agent via MCP
The Model Context Protocol (MCP) gives AI agents a standard way to discover and query external data sources. Instead of writing custom API integrations for every data store, your agent connects to an MCP server that exposes the data as tools and resources.
Streamkap ships an MCP server that lets agents:
- Query pipeline status — Is the pipeline running? When was the last event processed?
- Check data freshness — How old is the data in the agent store? Is it within the latency SLA?
- Inspect specific records — Pull the current state of a record from the destination store
Connecting the MCP server to your agent:
{
"mcpServers": {
"streamkap": {
"command": "npx",
"args": ["-y", "@streamkap/mcp-server"],
"env": {
"STREAMKAP_API_KEY": "sk_live_your_api_key",
"STREAMKAP_WORKSPACE": "your-workspace-id"
}
}
}
}
Once connected, your agent can call MCP tools like:
streamkap.get_pipeline_status(pipeline_id="pipe_orders_redis")
→ { "status": "running", "last_event": "2026-03-12T10:45:12Z", "lag_ms": 142 }
streamkap.check_data_freshness(destination="redis", key="orders:ord_8834")
→ { "last_updated": "2026-03-12T10:45:12Z", "age_seconds": 3 }
The agent doesn’t just get data — it gets confidence in that data. It knows the pipeline is running, it knows how fresh the record is, and it can decide whether to act or wait.
Step 5: Build the agent logic
With fresh data flowing and MCP connected, your agent can make decisions grounded in current state. Here’s a pattern for a support agent:
async def handle_customer_query(agent, customer_id: str, question: str):
# Check pipeline health first
pipeline = await agent.mcp.call(
"streamkap.get_pipeline_status",
pipeline_id="pipe_orders_redis"
)
if pipeline["lag_ms"] > 5000:
return "Let me check with a team member — our systems are "
"updating. I'll get back to you shortly."
# Fetch current order state from Redis (via agent's data tools)
orders = await agent.mcp.call(
"streamkap.query_destination",
destination="redis",
pattern=f"orders:*:customer:{customer_id}"
)
# Agent now has current order data to reason about
context = {
"customer_orders": orders,
"data_freshness": pipeline["lag_ms"],
"question": question
}
return await agent.reason(context)
The key details:
- Check pipeline health before acting. If the pipeline is lagging, the agent knows its data might be stale and can say so honestly instead of acting on bad information.
- Query the destination store directly. The agent reads from Redis, not the source database. No load on your production DB.
- Include freshness in the context. The agent knows the data is 142ms old, not 6 hours old. It can act with confidence.
Example: Three agents, one pipeline
A single Streamkap pipeline can feed multiple agents. Here’s how one setup powers three different use cases:
Customer support agent
Source: Postgres (orders, customers, products tables) Destination: Redis Query pattern: Look up orders by customer ID, check current status, verify refund eligibility
The agent answers “Where’s my order?” with the status from 200ms ago, not 8 hours ago. It sees that order ord_8834 was canceled at 10:45am and doesn’t promise delivery.
Fraud detection agent
Source: Postgres (transactions, accounts, risk_signals tables) Destination: Elasticsearch Query pattern: Search recent transactions by account, check velocity patterns, cross-reference risk signals
The agent flags a burst of transactions against an account because it sees each transaction within 250ms of it hitting the database. Batch-based fraud detection would miss the pattern until the next sync window.
Operations agent
Source: MongoDB (service_metrics, deployments, incidents collections) Destination: Redis + ClickHouse Query pattern: Current service health from Redis, trend analysis from ClickHouse
The agent notices request latency climbing on the payments service, checks the deployment history, and finds a new release rolled out 4 minutes ago. It opens a rollback ticket before any customer reports a problem.
Monitoring and reliability
Streaming pipelines need monitoring just like any production system. Streamkap tracks:
- End-to-end latency — Time from database commit to destination write, measured per event
- Pipeline lag — How far behind the pipeline is from the source database’s current position
- Error rates — Failed transforms, destination write errors, schema mismatches
- Throughput — Events per second, useful for capacity planning
Set up alerts on pipeline lag. If your agent SLA requires data fresher than 1 second, alert when lag exceeds 500ms so you have time to investigate before breaching the SLA.
Through MCP, agents can check these metrics themselves. A well-built agent monitors its own data supply and degrades gracefully when freshness drops below acceptable thresholds.
What this replaces
Before this architecture, teams typically built one of these:
Direct database queries from the agent — Works for demos. Falls apart at scale. Every agent query hits your production database. Slow queries from the agent cause latency for your users. No way to filter or transform data before the agent sees it.
Batch ETL to a warehouse — The classic approach. Data lands in BigQuery or Snowflake every few hours. The agent queries the warehouse. Works for analytics agents. Unacceptable for anything time-sensitive.
Custom webhook handlers — Build a webhook for every table change, write a handler that updates a cache, maintain the whole thing yourself. Works but costs engineering months and breaks when schemas change.
The streaming pipeline approach gives you the freshness of direct queries, the separation of batch ETL, and none of the maintenance burden of custom webhooks.
Getting started
Here’s the shortest path from zero to a working agent-data pipeline:
- Sign up at app.streamkap.com — free trial, no credit card
- Connect your source — Postgres, MySQL, MongoDB, or DynamoDB
- Pick a destination — Redis for key-value agents, Elasticsearch for search agents
- Start the pipeline — Data flows within minutes
- Connect the MCP server — Give your agent access to the pipeline
- Build your agent — Query fresh data through MCP tools
The whole setup takes under 30 minutes. Most of that time is configuring database permissions.
For more on how Streamkap fits into agent architectures, see the Streamkap for AI agents page. For broader patterns around feeding ML and AI systems with fresh data, check out our AI/ML pipeline solutions.
Ready to give your agents real-time context? Streamkap streams database changes to agent-accessible stores in under 250ms — with a built-in MCP server for direct agent integration. Start a free trial or explore Streamkap for agents.