<--- Back to all resources

AI & Agents

May 22, 2025

13 min read

MCP Servers Backed by Streaming Data

How to build an MCP server that exposes live pipeline state to AI tools like Claude and Cursor — architecture, authentication, and practical examples.

The Model Context Protocol (MCP) gives AI tools a standardized way to call functions and read data from external systems. Instead of building a custom integration for every AI assistant, you build one MCP server and any compatible client — Claude, Cursor, or a custom agent — can use it.

This article covers how to build an MCP server backed by streaming data: live pipeline state, customer data, and operational metrics flowing through CDC into a fast read store, then exposed through MCP tools that AI assistants can call.

For background on MCP itself, see Model Context Protocol Explained.

Why Streaming Data + MCP

An MCP server needs to answer questions fast. When Claude asks “What’s the current pipeline status?” or “Show me this customer’s recent orders,” the response needs to come back in milliseconds, not seconds.

You could have the MCP server query your production databases directly. But that means:

  • Every AI tool call becomes a database query on your source systems
  • Response times depend on database load and query complexity
  • The MCP server needs credentials to every source database
  • Complex joins across multiple databases become your problem

The streaming approach flips this: CDC captures changes from your source databases and writes them to a read-optimized store (Redis, a search index, or a materialized view). The MCP server only reads from this store.

Source DBs ──CDC──→ Kafka ──→ Read Store ──→ MCP Server ──→ AI Tools

                          (Redis, Postgres
                           read replica,
                           Elasticsearch)

For more on how CDC feeds downstream caches, see CDC to Redis: Real-Time Cache Synchronization.

MCP Server Architecture

An MCP server exposes three types of primitives:

  • Tools: Functions the AI can call (like get_pipeline_status or lookup_customer)
  • Resources: Data the AI can read (like configuration files or documentation)
  • Prompts: Pre-built prompt templates the AI can use

For a streaming data platform, the most useful are tools and resources.

Project Structure

mcp-streaming-server/
├── src/
│   ├── server.py           # MCP server entry point
│   ├── tools/
│   │   ├── pipelines.py    # Pipeline management tools
│   │   ├── customers.py    # Customer data tools
│   │   └── metrics.py      # Operational metrics tools
│   ├── resources/
│   │   └── docs.py         # Documentation resources
│   ├── auth.py             # JWT authentication
│   └── data/
│       └── store.py        # Read store client (Redis, etc.)
├── pyproject.toml
└── config.yaml

Building the Server

Here’s an MCP server using the Python SDK that exposes streaming pipeline data:

# src/server.py

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Resource

from tools.pipelines import register_pipeline_tools
from tools.customers import register_customer_tools
from tools.metrics import register_metrics_tools
from data.store import DataStore

server = Server("streaming-platform")
store = DataStore()

# Register all tool groups
register_pipeline_tools(server, store)
register_customer_tools(server, store)
register_metrics_tools(server, store)


async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, server.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Pipeline Tools

These tools let AI assistants inspect and interact with CDC pipelines:

# src/tools/pipelines.py

import json
from mcp.server import Server
from mcp.types import Tool, TextContent
from data.store import DataStore


def register_pipeline_tools(server: Server, store: DataStore):

    @server.list_tools()
    async def list_tools():
        return [
            Tool(
                name="list_pipelines",
                description="List all CDC pipelines with their current status",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "status_filter": {
                            "type": "string",
                            "enum": ["running", "paused", "error", "all"],
                            "description": "Filter by pipeline status",
                            "default": "all",
                        }
                    },
                },
            ),
            Tool(
                name="get_pipeline_detail",
                description=(
                    "Get detailed info about a specific pipeline including "
                    "source, destination, throughput, and lag"
                ),
                inputSchema={
                    "type": "object",
                    "properties": {
                        "pipeline_id": {
                            "type": "string",
                            "description": "Pipeline ID",
                        }
                    },
                    "required": ["pipeline_id"],
                },
            ),
            Tool(
                name="get_pipeline_errors",
                description="Get recent errors for a pipeline",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "pipeline_id": {"type": "string"},
                        "limit": {"type": "integer", "default": 10},
                    },
                    "required": ["pipeline_id"],
                },
            ),
        ]

    @server.call_tool()
    async def call_tool(name: str, arguments: dict):
        if name == "list_pipelines":
            status_filter = arguments.get("status_filter", "all")
            pipelines = await store.get_pipelines(status_filter)
            return [TextContent(
                type="text",
                text=json.dumps(pipelines, indent=2),
            )]

        elif name == "get_pipeline_detail":
            pipeline = await store.get_pipeline(arguments["pipeline_id"])
            if not pipeline:
                return [TextContent(type="text", text="Pipeline not found")]

            # Enrich with real-time metrics
            metrics = await store.get_pipeline_metrics(arguments["pipeline_id"])
            pipeline["metrics"] = metrics
            return [TextContent(
                type="text",
                text=json.dumps(pipeline, indent=2),
            )]

        elif name == "get_pipeline_errors":
            errors = await store.get_pipeline_errors(
                arguments["pipeline_id"],
                limit=arguments.get("limit", 10),
            )
            return [TextContent(
                type="text",
                text=json.dumps(errors, indent=2),
            )]

The Data Store Layer

The data store reads from wherever your CDC pipeline writes. Here’s an example backed by Redis:

# src/data/store.py

import json
import redis.asyncio as redis
from typing import Optional


class DataStore:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=True)

    async def get_pipelines(self, status_filter: str = "all") -> list[dict]:
        """Get all pipelines, optionally filtered by status.

        Pipeline state is kept in sync by a CDC consumer that watches
        the platform's internal pipeline metadata tables.
        """
        pipeline_ids = await self.redis.smembers("pipelines:all")
        pipelines = []

        pipe = self.redis.pipeline()
        for pid in pipeline_ids:
            pipe.hgetall(f"pipeline:{pid}")

        results = await pipe.execute()
        for data in results:
            if data and (status_filter == "all" or data.get("status") == status_filter):
                pipelines.append(data)

        return sorted(pipelines, key=lambda p: p.get("name", ""))

    async def get_pipeline(self, pipeline_id: str) -> Optional[dict]:
        data = await self.redis.hgetall(f"pipeline:{pipeline_id}")
        return data if data else None

    async def get_pipeline_metrics(self, pipeline_id: str) -> dict:
        """Get real-time metrics for a pipeline.

        Metrics are updated every few seconds by a streaming consumer
        that processes Kafka consumer group lag and throughput data.
        """
        metrics = await self.redis.hgetall(f"pipeline:{pipeline_id}:metrics")
        return {
            "events_per_second": float(metrics.get("eps", 0)),
            "consumer_lag": int(metrics.get("lag", 0)),
            "last_event_at": metrics.get("last_event_at", "unknown"),
            "bytes_per_second": float(metrics.get("bps", 0)),
        }

    async def get_pipeline_errors(self, pipeline_id: str, limit: int = 10) -> list[dict]:
        """Get recent errors from the pipeline's dead letter queue."""
        raw_errors = await self.redis.lrange(
            f"pipeline:{pipeline_id}:errors", 0, limit - 1
        )
        return [json.loads(e) for e in raw_errors]

    async def get_customer(self, customer_id: str = None, email: str = None) -> Optional[dict]:
        if email and not customer_id:
            customer_id = await self.redis.get(f"customer:email:{email}")
        if not customer_id:
            return None
        return await self.redis.hgetall(f"customer:{customer_id}")

Authentication

MCP servers should authenticate clients, especially when they expose production data. JWT tokens are the standard approach:

# src/auth.py

import jwt
from datetime import datetime, timezone
from functools import wraps

SECRET_KEY = "your-secret-key"  # Load from env/secrets manager
ALGORITHM = "HS256"


def verify_token(token: str) -> dict:
    """Verify a JWT token and return claims."""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("exp", 0) < datetime.now(timezone.utc).timestamp():
            raise ValueError("Token expired")
        return payload
    except jwt.InvalidTokenError as e:
        raise ValueError(f"Invalid token: {e}")


def require_scope(scope: str):
    """Decorator to require a specific scope on a tool."""
    def decorator(fn):
        @wraps(fn)
        async def wrapper(*args, **kwargs):
            token = kwargs.get("_auth_token")
            if not token:
                raise PermissionError("Authentication required")

            claims = verify_token(token)
            scopes = claims.get("scopes", [])
            if scope not in scopes and "admin" not in scopes:
                raise PermissionError(f"Missing scope: {scope}")

            return await fn(*args, **kwargs)
        return wrapper
    return decorator

Scope your tools based on what they can do:

ScopeAllowed Operations
pipelines:readList pipelines, view status and metrics
pipelines:writePause, resume, restart pipelines
data:readLook up customer/order data
data:writeNone (agents shouldn’t write to source data)
adminEverything

For most AI assistant use cases, pipelines:read and data:read are sufficient. Reserve write scopes for specific automation workflows where the agent is expected to take action.

Exposing Resources

MCP resources are static or semi-static data that the AI can read for context. Good candidates:

# src/resources/docs.py

from mcp.server import Server
from mcp.types import Resource, TextContent


def register_resources(server: Server):

    @server.list_resources()
    async def list_resources():
        return [
            Resource(
                uri="docs://pipeline-types",
                name="Pipeline Types Reference",
                description="Available source and destination connector types",
                mimeType="text/markdown",
            ),
            Resource(
                uri="docs://troubleshooting",
                name="Troubleshooting Guide",
                description="Common pipeline errors and how to resolve them",
                mimeType="text/markdown",
            ),
        ]

    @server.read_resource()
    async def read_resource(uri: str):
        if uri == "docs://pipeline-types":
            return TextContent(
                type="text",
                text=PIPELINE_TYPES_DOC,  # Markdown string
            )
        elif uri == "docs://troubleshooting":
            return TextContent(
                type="text",
                text=TROUBLESHOOTING_DOC,
            )

Resources give the AI context it needs to interpret tool results correctly. If the get_pipeline_errors tool returns an error code, the troubleshooting resource tells the AI what that code means and how to fix it.

Hosting Considerations

Local (stdio) Mode

For development and personal use, run the MCP server as a local process. Claude Desktop and Cursor connect via stdio:

{
  "mcpServers": {
    "streaming": {
      "command": "python",
      "args": ["-m", "src.server"],
      "env": {
        "REDIS_URL": "redis://localhost:6379",
        "AUTH_TOKEN": "your-dev-token"
      }
    }
  }
}

Remote (SSE/HTTP) Mode

For team use, run the MCP server as a deployed service:

# For remote hosting with Server-Sent Events
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route

transport = SseServerTransport("/messages")

async def handle_sse(request):
    async with transport.connect_sse(
        request.scope, request.receive, request._send
    ) as streams:
        await server.run(
            streams[0], streams[1],
            server.create_initialization_options()
        )

app = Starlette(routes=[
    Route("/sse", endpoint=handle_sse),
    Route("/messages", endpoint=transport.handle_post_message, methods=["POST"]),
])

Deploy behind a reverse proxy with TLS. The MCP server itself should be in the same network as your Redis/read store to minimize latency.

Scaling

MCP servers are typically stateless — all state lives in the read store. This means you can run multiple instances behind a load balancer. Each AI client connection goes to one instance, and all instances read from the same Redis cluster.

For team deployments, expect low concurrency: one connection per developer using Claude or Cursor. A single instance can handle dozens of concurrent connections easily. Don’t over-engineer the scaling until you actually need it.

What to Expose (and What Not To)

Good tools for an MCP server backed by streaming data:

ToolWhy
Pipeline status/metricsDebugging and monitoring during development
Customer/entity lookupsContext for support agents and internal tools
Schema informationUnderstanding table structures during development
Error logs and DLQ contentsDebugging failed events
Connector configuration (read-only)Understanding pipeline topology

What to keep out:

Don’t ExposeWhy
Raw database credentialsSecurity risk, even with scoping
Write operations on source databasesAgents shouldn’t modify production data through MCP
Bulk data exportsMCP is for interactive queries, not ETL
Admin operations without approvalPipeline deletion, credential rotation, etc.

Practical Example: Debugging a Pipeline with Claude

Here’s what an interaction looks like once the MCP server is connected to Claude:

User: “The orders pipeline seems slow. What’s going on?”

Claude (calls list_pipelines with status_filter="running"):

I can see 4 running pipelines. The orders-to-snowflake pipeline shows a consumer lag of 45,000 events, which is significantly higher than the other pipelines (all under 100). Let me get more details.

Claude (calls get_pipeline_detail with pipeline_id="orders-to-snowflake"):

The pipeline is processing 850 events/second, but the source is producing about 1,200 events/second, so lag is growing. The destination is Snowflake with a batch size of 1,000 and flush interval of 10 seconds.

Claude (calls get_pipeline_errors):

There are 12 recent errors, all SCHEMA_MISMATCH — it looks like a new column was added to the orders table that doesn’t exist in the Snowflake destination. The pipeline is retrying these events, which is slowing down overall throughput.

This kind of interactive debugging is where MCP shines. The AI can correlate information across multiple tool calls and explain what it finds in plain language. For more on how MCP connects AI agents to live data through CDC, see MCP and CDC: Connecting AI Agents to Live Data.

Keeping the Read Store Fresh

The MCP server is only as good as the data it reads. The underlying CDC pipeline needs to be reliable and low-latency. Key things to monitor:

  • End-to-end latency: Time from source DB write to read store update. Target <5 seconds.
  • Consumer lag: If the Kafka consumer falls behind, the MCP server serves stale data.
  • Read store health: Redis memory usage, connection count, response times.

Add a health check tool to the MCP server itself so the AI can self-diagnose data freshness issues:

Tool(
    name="check_data_freshness",
    description="Check how fresh the data in the read store is",
    inputSchema={"type": "object", "properties": {}},
)

When the last sync timestamp is more than 30 seconds old, the AI can tell the user “Note: the data I’m seeing might be up to X seconds behind” instead of silently serving stale information.


Ready to connect your AI tools to live pipeline data? Streamkap provides a managed MCP server that exposes pipeline status, metrics, and data lookups to Claude, Cursor, and any MCP-compatible AI assistant. Start a free trial or learn more about Streamkap for AI agents.