<--- Back to all resources
Event-Driven Agent Orchestration: Triggering AI Agents from Database Changes
Learn how to use CDC events to trigger AI agent workflows — from architecture patterns to filtering, batching, error handling, and dead letter queues.
Most AI agent systems are request-driven: a user asks a question, the agent processes it, and returns a response. But some of the highest-value agent use cases aren’t triggered by users at all — they’re triggered by data changes.
A new order arrives → a fraud detection agent evaluates it. A support ticket is created → a classification agent routes it. Inventory drops below a threshold → a reorder agent initiates a purchase. These are event-driven agents, and the trigger mechanism is change data capture.
Why Event-Driven Agents
Request-driven agents wait for someone to ask. Event-driven agents act when something happens. The difference matters for workflows that need to be:
- Immediate — Fraud detection is useless if it runs 5 minutes after the order is placed
- Automatic — Nobody should manually trigger a ticket classification agent
- Exhaustive — Every qualifying event gets processed, not just the ones someone remembers to check
Polling-based approaches (check the database every N seconds for new rows) create latency, waste resources during quiet periods, and miss events during high-throughput periods. CDC-based event streaming captures every change with sub-second latency and zero impact on the source database.
Architecture: CDC to Agent Dispatcher
The core architecture has four components:
Source Database → CDC Stream → Event Router → Agent Dispatcher → Agents
│ │
│ ↓
│ Agent Results
│ │
↓ ↓
Filtered Out Result Store /
(discarded) Action Queue
CDC Stream
Captures every INSERT, UPDATE, and DELETE from your source database tables. Each event contains the full row state (before and after the change), the operation type, the table name, and a timestamp.
Event Router
Examines each CDC event and decides:
- Should this event trigger an agent? (filtering)
- Which agent should handle it? (routing)
- Should this event be batched with others? (aggregation)
Agent Dispatcher
Takes routed events and calls the appropriate agent. Handles concurrency limits, retries, timeouts, and dead letter queuing.
Agents
The actual AI logic — LLM calls with appropriate context and tools. Each agent is specialized for one workflow.
Event Routing Rules
Not every database change needs an agent. In a typical e-commerce system, you might have 100,000 CDC events per hour, but only 5,000 of those should trigger agent processing. The event router is your first line of defense against wasted LLM calls.
Table-Level Filtering
The simplest filter: only process events from specific tables.
AGENT_TABLES = {
"orders": "fraud_check_agent",
"support_tickets": "classification_agent",
"inventory": "reorder_agent",
}
def route_event(event: dict) -> str | None:
table = event["source"]["table"]
return AGENT_TABLES.get(table)
Operation-Level Filtering
You often only care about certain operation types:
TABLE_OPERATIONS = {
"orders": ["c"], # only new orders (create)
"support_tickets": ["c", "u"], # new and updated tickets
"inventory": ["u"], # only inventory changes (update)
}
def should_process(event: dict) -> bool:
table = event["source"]["table"]
op = event["op"] # c=create, u=update, d=delete
allowed_ops = TABLE_OPERATIONS.get(table, [])
return op in allowed_ops
Column-Level Filtering
For updates, check whether the columns that changed are relevant. An inventory table update to last_audited_at shouldn’t trigger the reorder agent — only changes to quantity_on_hand should:
RELEVANT_COLUMNS = {
"inventory": {"quantity_on_hand", "reserved_quantity"},
"support_tickets": {"status", "priority", "description"},
}
def has_relevant_changes(event: dict) -> bool:
if event["op"] != "u":
return True # inserts and deletes are always relevant
table = event["source"]["table"]
relevant = RELEVANT_COLUMNS.get(table, set())
before = event.get("before", {})
after = event.get("after", {})
for col in relevant:
if before.get(col) != after.get(col):
return True
return False
Business Logic Filtering
The most powerful filters apply domain-specific rules:
def should_fraud_check(event: dict) -> bool:
"""Only fraud-check orders over $500 or from new customers."""
order = event["after"]
if order["total_amount"] > 500:
return True
if order.get("is_first_order", False):
return True
return False
This reduces your agent call volume dramatically. If 80% of orders are under $500 from returning customers, you’ve cut fraud check costs by 80%.
Agent Workflow Examples
Example 1: Fraud Detection on New Orders
CDC Event: INSERT on orders table
→ Filter: amount > $500 OR new customer
→ Enrich: fetch customer history, IP geolocation, device fingerprint
→ Agent: Evaluate fraud risk (LLM with fraud detection prompt)
→ Action: If risk > 0.8, flag for manual review
If risk > 0.95, auto-hold the order
Otherwise, approve
The enrichment step is key. The CDC event gives you the order data, but the agent needs additional context. This is where a real-time data enrichment pipeline pays off — customer history and risk signals are pre-computed and available in a cache, not queried at agent-call time.
Example 2: Support Ticket Classification
CDC Event: INSERT on support_tickets table
→ Filter: all new tickets (no filtering needed)
→ Agent: Classify ticket (category, priority, suggested assignee)
→ Action: Update ticket with classification
Route to appropriate queue
If urgent, send Slack notification
Example 3: Inventory Reorder
CDC Event: UPDATE on inventory table
→ Filter: quantity_on_hand changed AND quantity_on_hand < reorder_point
→ Enrich: fetch supplier info, lead times, demand forecast
→ Agent: Calculate reorder quantity, select supplier
→ Action: Create purchase order draft
Notify procurement team
Batching Events
Some agent workflows perform better on batches. Instead of classifying one support ticket at a time, you might classify 10 at once — the LLM can see patterns across tickets and classify them more consistently.
import asyncio
from collections import defaultdict
class EventBatcher:
def __init__(self, batch_size: int = 10, max_wait_ms: int = 2000):
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.buffers: dict[str, list] = defaultdict(list)
self.timers: dict[str, asyncio.Task] = {}
async def add(self, agent_name: str, event: dict):
self.buffers[agent_name].append(event)
if len(self.buffers[agent_name]) >= self.batch_size:
await self.flush(agent_name)
elif agent_name not in self.timers:
self.timers[agent_name] = asyncio.create_task(
self._timer_flush(agent_name)
)
async def _timer_flush(self, agent_name: str):
await asyncio.sleep(self.max_wait_ms / 1000)
if self.buffers[agent_name]:
await self.flush(agent_name)
async def flush(self, agent_name: str):
batch = self.buffers[agent_name]
self.buffers[agent_name] = []
if agent_name in self.timers:
self.timers[agent_name].cancel()
del self.timers[agent_name]
await dispatch_batch(agent_name, batch)
Batching also helps with LLM rate limits. If you’re processing 1,000 events per minute but your LLM API allows 60 requests per minute, batching 20 events per call keeps you within limits.
Error Handling and Dead Letter Queues
Agent calls fail. The LLM returns an error, the agent produces invalid output, or a downstream action (creating a purchase order) fails. Your pipeline needs to handle this without losing events.
Retry Strategy
import asyncio
from typing import Callable
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay_ms: int = 1000,
) -> dict:
for attempt in range(max_retries + 1):
try:
return await func()
except RetryableError as e:
if attempt == max_retries:
raise
delay = base_delay_ms * (2 ** attempt) / 1000
await asyncio.sleep(delay)
raise MaxRetriesExceeded()
Not all errors should be retried:
| Error Type | Retry? | Example |
|---|---|---|
| Rate limit (429) | Yes, with backoff | LLM API throttling |
| Server error (500, 503) | Yes, with backoff | Temporary provider issue |
| Timeout | Yes, once | Slow LLM response |
| Invalid input | No | Malformed CDC event |
| Business logic error | No | Agent output doesn’t parse |
| Authentication error | No | Expired API key |
Dead Letter Queue
Events that fail after max retries go to a dead letter queue (DLQ). This pattern is well established in stream processing and applies directly to agent orchestration:
async def process_event(event: dict):
agent_name = route_event(event)
if not agent_name:
return # filtered out
try:
result = await retry_with_backoff(
lambda: call_agent(agent_name, event)
)
await store_result(result)
except MaxRetriesExceeded:
await send_to_dlq(event, agent_name, error="max_retries_exceeded")
except NonRetryableError as e:
await send_to_dlq(event, agent_name, error=str(e))
Monitor your DLQ size. A growing DLQ means something systemic is wrong — maybe the LLM prompt needs updating, or a downstream API is down.
Ordering and Concurrency
CDC events arrive in order per table (and per partition key). But agent processing takes variable time — a fraud check might take 500ms for one order and 3 seconds for another. If you process events concurrently, results may arrive out of order.
For most agent workflows, this is fine. Classifying ticket #42 before ticket #41 doesn’t cause problems.
But some workflows require ordering. If a customer updates their order three times in quick succession, the fraud check agent should evaluate the final state, not an intermediate one.
Pattern: Entity-keyed partitioning
Process events for the same entity (customer, order, account) sequentially, but different entities in parallel:
from collections import defaultdict
import asyncio
class OrderedDispatcher:
def __init__(self, max_concurrency: int = 50):
self.entity_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
self.semaphore = asyncio.Semaphore(max_concurrency)
async def dispatch(self, event: dict):
entity_key = self._get_entity_key(event)
async with self.semaphore: # limit total concurrency
async with self.entity_locks[entity_key]: # serialize per entity
await process_event(event)
def _get_entity_key(self, event: dict) -> str:
table = event["source"]["table"]
row = event["after"] or event["before"]
# Use the primary key as the entity key
return f"{table}:{row['id']}"
This gives you ordering guarantees where they matter and parallelism everywhere else.
Monitoring and Observability
Track these metrics for your event-driven agent system:
Pipeline health:
- Events received per second (by table)
- Events filtered out per second
- Events dispatched to agents per second
- DLQ size (should be near zero)
Agent performance:
- Agent call latency (p50, p95, p99)
- Agent success rate per agent type
- Agent output quality (if you have evaluation metrics)
Business metrics:
- Fraud detection rate and false positive rate
- Ticket classification accuracy
- Reorder agent cost savings vs manual process
# Example: structured logging for each agent call
import structlog
logger = structlog.get_logger()
async def call_agent_with_metrics(agent_name: str, event: dict):
start = time.monotonic()
try:
result = await call_agent(agent_name, event)
duration_ms = (time.monotonic() - start) * 1000
logger.info(
"agent_call_success",
agent=agent_name,
duration_ms=duration_ms,
event_table=event["source"]["table"],
event_op=event["op"],
)
return result
except Exception as e:
duration_ms = (time.monotonic() - start) * 1000
logger.error(
"agent_call_failure",
agent=agent_name,
duration_ms=duration_ms,
error=str(e),
event_table=event["source"]["table"],
)
raise
Scaling Considerations
As event volume grows, you’ll hit bottlenecks in this order:
- LLM API rate limits — Batch events, use smaller models for simple classifications, cache repeated patterns
- Agent dispatcher throughput — Scale horizontally with partitioned consumers
- Context retrieval — Pre-compute and cache context so agents don’t query source databases. See agent decision latency budgets for the full breakdown.
- Result storage — Agent outputs (classifications, risk scores, decisions) need a fast write path back to your operational systems
The CDC capture layer scales independently of the agent layer. You can have one CDC stream feeding multiple agent types at different rates, each with their own concurrency limits and batching strategies.
When NOT to Use Event-Driven Agents
Not every automation needs an LLM. If your “agent” is really just a rules engine (if amount > $500 and country in blocklist, flag for review), write it as a streaming transformation without the LLM call. It will be faster, cheaper, and more predictable.
Use LLM-backed agents when:
- The logic requires natural language understanding (ticket classification)
- The decision space is too complex for rules (nuanced fraud patterns)
- You need to generate natural language output (customer communications)
- The rules change frequently and are easier to express as prompts than code
Putting Event-Driven Agents Into Production
Start with one workflow. Pick the one with the clearest signal (well-defined trigger event), the most tolerance for errors (not financial transactions on day one), and measurable impact (you can compare agent decisions to human decisions).
Build the filtering layer first — getting the right events to the right agents matters more than optimizing the agent itself. A perfectly tuned fraud model is useless if it processes every $5 order along with the $5,000 ones.
Ready to trigger AI agents from database changes? Streamkap captures real-time events from your databases and routes them to agent workflows with sub-second latency. Start a free trial or learn more about event-driven agent infrastructure.