<--- Back to all resources

AI & Agents

May 22, 2025

11 min read

Event-Driven Agent Orchestration: Triggering AI Agents from Database Changes

Learn how to use CDC events to trigger AI agent workflows — from architecture patterns to filtering, batching, error handling, and dead letter queues.

Most AI agent systems are request-driven: a user asks a question, the agent processes it, and returns a response. But some of the highest-value agent use cases aren’t triggered by users at all — they’re triggered by data changes.

A new order arrives → a fraud detection agent evaluates it. A support ticket is created → a classification agent routes it. Inventory drops below a threshold → a reorder agent initiates a purchase. These are event-driven agents, and the trigger mechanism is change data capture.

Why Event-Driven Agents

Request-driven agents wait for someone to ask. Event-driven agents act when something happens. The difference matters for workflows that need to be:

  • Immediate — Fraud detection is useless if it runs 5 minutes after the order is placed
  • Automatic — Nobody should manually trigger a ticket classification agent
  • Exhaustive — Every qualifying event gets processed, not just the ones someone remembers to check

Polling-based approaches (check the database every N seconds for new rows) create latency, waste resources during quiet periods, and miss events during high-throughput periods. CDC-based event streaming captures every change with sub-second latency and zero impact on the source database.

Architecture: CDC to Agent Dispatcher

The core architecture has four components:

Source Database → CDC Stream → Event Router → Agent Dispatcher → Agents
                                    │                              │
                                    │                              ↓
                                    │                        Agent Results
                                    │                              │
                                    ↓                              ↓
                              Filtered Out                  Result Store /
                              (discarded)                   Action Queue

CDC Stream

Captures every INSERT, UPDATE, and DELETE from your source database tables. Each event contains the full row state (before and after the change), the operation type, the table name, and a timestamp.

Event Router

Examines each CDC event and decides:

  • Should this event trigger an agent? (filtering)
  • Which agent should handle it? (routing)
  • Should this event be batched with others? (aggregation)

Agent Dispatcher

Takes routed events and calls the appropriate agent. Handles concurrency limits, retries, timeouts, and dead letter queuing.

Agents

The actual AI logic — LLM calls with appropriate context and tools. Each agent is specialized for one workflow.

Event Routing Rules

Not every database change needs an agent. In a typical e-commerce system, you might have 100,000 CDC events per hour, but only 5,000 of those should trigger agent processing. The event router is your first line of defense against wasted LLM calls.

Table-Level Filtering

The simplest filter: only process events from specific tables.

AGENT_TABLES = {
    "orders": "fraud_check_agent",
    "support_tickets": "classification_agent",
    "inventory": "reorder_agent",
}

def route_event(event: dict) -> str | None:
    table = event["source"]["table"]
    return AGENT_TABLES.get(table)

Operation-Level Filtering

You often only care about certain operation types:

TABLE_OPERATIONS = {
    "orders": ["c"],                 # only new orders (create)
    "support_tickets": ["c", "u"],   # new and updated tickets
    "inventory": ["u"],              # only inventory changes (update)
}

def should_process(event: dict) -> bool:
    table = event["source"]["table"]
    op = event["op"]  # c=create, u=update, d=delete
    allowed_ops = TABLE_OPERATIONS.get(table, [])
    return op in allowed_ops

Column-Level Filtering

For updates, check whether the columns that changed are relevant. An inventory table update to last_audited_at shouldn’t trigger the reorder agent — only changes to quantity_on_hand should:

RELEVANT_COLUMNS = {
    "inventory": {"quantity_on_hand", "reserved_quantity"},
    "support_tickets": {"status", "priority", "description"},
}

def has_relevant_changes(event: dict) -> bool:
    if event["op"] != "u":
        return True  # inserts and deletes are always relevant

    table = event["source"]["table"]
    relevant = RELEVANT_COLUMNS.get(table, set())

    before = event.get("before", {})
    after = event.get("after", {})

    for col in relevant:
        if before.get(col) != after.get(col):
            return True
    return False

Business Logic Filtering

The most powerful filters apply domain-specific rules:

def should_fraud_check(event: dict) -> bool:
    """Only fraud-check orders over $500 or from new customers."""
    order = event["after"]
    if order["total_amount"] > 500:
        return True
    if order.get("is_first_order", False):
        return True
    return False

This reduces your agent call volume dramatically. If 80% of orders are under $500 from returning customers, you’ve cut fraud check costs by 80%.

Agent Workflow Examples

Example 1: Fraud Detection on New Orders

CDC Event: INSERT on orders table
  → Filter: amount > $500 OR new customer
  → Enrich: fetch customer history, IP geolocation, device fingerprint
  → Agent: Evaluate fraud risk (LLM with fraud detection prompt)
  → Action: If risk > 0.8, flag for manual review
            If risk > 0.95, auto-hold the order
            Otherwise, approve

The enrichment step is key. The CDC event gives you the order data, but the agent needs additional context. This is where a real-time data enrichment pipeline pays off — customer history and risk signals are pre-computed and available in a cache, not queried at agent-call time.

Example 2: Support Ticket Classification

CDC Event: INSERT on support_tickets table
  → Filter: all new tickets (no filtering needed)
  → Agent: Classify ticket (category, priority, suggested assignee)
  → Action: Update ticket with classification
            Route to appropriate queue
            If urgent, send Slack notification

Example 3: Inventory Reorder

CDC Event: UPDATE on inventory table
  → Filter: quantity_on_hand changed AND quantity_on_hand < reorder_point
  → Enrich: fetch supplier info, lead times, demand forecast
  → Agent: Calculate reorder quantity, select supplier
  → Action: Create purchase order draft
            Notify procurement team

Batching Events

Some agent workflows perform better on batches. Instead of classifying one support ticket at a time, you might classify 10 at once — the LLM can see patterns across tickets and classify them more consistently.

import asyncio
from collections import defaultdict

class EventBatcher:
    def __init__(self, batch_size: int = 10, max_wait_ms: int = 2000):
        self.batch_size = batch_size
        self.max_wait_ms = max_wait_ms
        self.buffers: dict[str, list] = defaultdict(list)
        self.timers: dict[str, asyncio.Task] = {}

    async def add(self, agent_name: str, event: dict):
        self.buffers[agent_name].append(event)

        if len(self.buffers[agent_name]) >= self.batch_size:
            await self.flush(agent_name)
        elif agent_name not in self.timers:
            self.timers[agent_name] = asyncio.create_task(
                self._timer_flush(agent_name)
            )

    async def _timer_flush(self, agent_name: str):
        await asyncio.sleep(self.max_wait_ms / 1000)
        if self.buffers[agent_name]:
            await self.flush(agent_name)

    async def flush(self, agent_name: str):
        batch = self.buffers[agent_name]
        self.buffers[agent_name] = []

        if agent_name in self.timers:
            self.timers[agent_name].cancel()
            del self.timers[agent_name]

        await dispatch_batch(agent_name, batch)

Batching also helps with LLM rate limits. If you’re processing 1,000 events per minute but your LLM API allows 60 requests per minute, batching 20 events per call keeps you within limits.

Error Handling and Dead Letter Queues

Agent calls fail. The LLM returns an error, the agent produces invalid output, or a downstream action (creating a purchase order) fails. Your pipeline needs to handle this without losing events.

Retry Strategy

import asyncio
from typing import Callable

async def retry_with_backoff(
    func: Callable,
    max_retries: int = 3,
    base_delay_ms: int = 1000,
) -> dict:
    for attempt in range(max_retries + 1):
        try:
            return await func()
        except RetryableError as e:
            if attempt == max_retries:
                raise
            delay = base_delay_ms * (2 ** attempt) / 1000
            await asyncio.sleep(delay)
    raise MaxRetriesExceeded()

Not all errors should be retried:

Error TypeRetry?Example
Rate limit (429)Yes, with backoffLLM API throttling
Server error (500, 503)Yes, with backoffTemporary provider issue
TimeoutYes, onceSlow LLM response
Invalid inputNoMalformed CDC event
Business logic errorNoAgent output doesn’t parse
Authentication errorNoExpired API key

Dead Letter Queue

Events that fail after max retries go to a dead letter queue (DLQ). This pattern is well established in stream processing and applies directly to agent orchestration:

async def process_event(event: dict):
    agent_name = route_event(event)
    if not agent_name:
        return  # filtered out

    try:
        result = await retry_with_backoff(
            lambda: call_agent(agent_name, event)
        )
        await store_result(result)
    except MaxRetriesExceeded:
        await send_to_dlq(event, agent_name, error="max_retries_exceeded")
    except NonRetryableError as e:
        await send_to_dlq(event, agent_name, error=str(e))

Monitor your DLQ size. A growing DLQ means something systemic is wrong — maybe the LLM prompt needs updating, or a downstream API is down.

Ordering and Concurrency

CDC events arrive in order per table (and per partition key). But agent processing takes variable time — a fraud check might take 500ms for one order and 3 seconds for another. If you process events concurrently, results may arrive out of order.

For most agent workflows, this is fine. Classifying ticket #42 before ticket #41 doesn’t cause problems.

But some workflows require ordering. If a customer updates their order three times in quick succession, the fraud check agent should evaluate the final state, not an intermediate one.

Pattern: Entity-keyed partitioning

Process events for the same entity (customer, order, account) sequentially, but different entities in parallel:

from collections import defaultdict
import asyncio

class OrderedDispatcher:
    def __init__(self, max_concurrency: int = 50):
        self.entity_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def dispatch(self, event: dict):
        entity_key = self._get_entity_key(event)

        async with self.semaphore:  # limit total concurrency
            async with self.entity_locks[entity_key]:  # serialize per entity
                await process_event(event)

    def _get_entity_key(self, event: dict) -> str:
        table = event["source"]["table"]
        row = event["after"] or event["before"]
        # Use the primary key as the entity key
        return f"{table}:{row['id']}"

This gives you ordering guarantees where they matter and parallelism everywhere else.

Monitoring and Observability

Track these metrics for your event-driven agent system:

Pipeline health:

  • Events received per second (by table)
  • Events filtered out per second
  • Events dispatched to agents per second
  • DLQ size (should be near zero)

Agent performance:

  • Agent call latency (p50, p95, p99)
  • Agent success rate per agent type
  • Agent output quality (if you have evaluation metrics)

Business metrics:

  • Fraud detection rate and false positive rate
  • Ticket classification accuracy
  • Reorder agent cost savings vs manual process
# Example: structured logging for each agent call
import structlog

logger = structlog.get_logger()

async def call_agent_with_metrics(agent_name: str, event: dict):
    start = time.monotonic()
    try:
        result = await call_agent(agent_name, event)
        duration_ms = (time.monotonic() - start) * 1000
        logger.info(
            "agent_call_success",
            agent=agent_name,
            duration_ms=duration_ms,
            event_table=event["source"]["table"],
            event_op=event["op"],
        )
        return result
    except Exception as e:
        duration_ms = (time.monotonic() - start) * 1000
        logger.error(
            "agent_call_failure",
            agent=agent_name,
            duration_ms=duration_ms,
            error=str(e),
            event_table=event["source"]["table"],
        )
        raise

Scaling Considerations

As event volume grows, you’ll hit bottlenecks in this order:

  1. LLM API rate limits — Batch events, use smaller models for simple classifications, cache repeated patterns
  2. Agent dispatcher throughput — Scale horizontally with partitioned consumers
  3. Context retrieval — Pre-compute and cache context so agents don’t query source databases. See agent decision latency budgets for the full breakdown.
  4. Result storage — Agent outputs (classifications, risk scores, decisions) need a fast write path back to your operational systems

The CDC capture layer scales independently of the agent layer. You can have one CDC stream feeding multiple agent types at different rates, each with their own concurrency limits and batching strategies.

When NOT to Use Event-Driven Agents

Not every automation needs an LLM. If your “agent” is really just a rules engine (if amount > $500 and country in blocklist, flag for review), write it as a streaming transformation without the LLM call. It will be faster, cheaper, and more predictable.

Use LLM-backed agents when:

  • The logic requires natural language understanding (ticket classification)
  • The decision space is too complex for rules (nuanced fraud patterns)
  • You need to generate natural language output (customer communications)
  • The rules change frequently and are easier to express as prompts than code

Putting Event-Driven Agents Into Production

Start with one workflow. Pick the one with the clearest signal (well-defined trigger event), the most tolerance for errors (not financial transactions on day one), and measurable impact (you can compare agent decisions to human decisions).

Build the filtering layer first — getting the right events to the right agents matters more than optimizing the agent itself. A perfectly tuned fraud model is useless if it processes every $5 order along with the $5,000 ones.


Ready to trigger AI agents from database changes? Streamkap captures real-time events from your databases and routes them to agent workflows with sub-second latency. Start a free trial or learn more about event-driven agent infrastructure.