<--- Back to all resources
Real-Time Currency Conversion in Streaming Data Pipelines
How to implement accurate currency conversion in real-time streaming pipelines. Covers temporal joins for point-in-time rates, rate source integration, and handling edge cases.
If you run a streaming pipeline that processes transactions in multiple currencies, you have probably hit this problem: the numbers don’t add up. Your daily revenue report in USD doesn’t match the sum of the individual transactions when you convert them manually. The gap might be small on any single row, but across thousands of transactions per hour, the errors compound fast. The root cause is almost always the same - your pipeline is using the wrong exchange rate for the wrong point in time.
Currency conversion sounds simple. Multiply the amount by the rate, done. But in a streaming context, where events arrive continuously and exchange rates shift throughout the day, getting it right requires careful thinking about time, state, and data freshness.
The Correctness Problem
Consider a transaction that happened at 09:15 UTC for 1,000 EUR. The EUR/USD rate at 09:15 was 1.0842. By the time your pipeline processes that event at 09:47, the rate has moved to 1.0867. If you grab the “current” rate at processing time, you get $1,086.70 instead of the correct $1,084.20. That’s a $2.50 error on a single transaction.
Now scale that up. A mid-size e-commerce platform might process 50,000 multi-currency transactions per day. If rates shift by even 0.2% between event time and processing time, you’re looking at systematic errors in the tens of thousands of dollars per day. For financial reporting, this is unacceptable. Auditors will flag it, reconciliation will fail, and your finance team will lose trust in your data pipeline.
The core principle is straightforward: a transaction must be converted using the exchange rate that was valid at the time the transaction occurred, not the time it was processed. This is called point-in-time accuracy, and it is the foundation of correct currency conversion in streaming systems.
Batch pipelines sometimes get away with using a daily closing rate for all transactions in a given day. That works when you process data once at end-of-day. But in a streaming pipeline where results need to be available within seconds or minutes, you need finer granularity, and you need the conversion to happen inline as events flow through.
Temporal Joins: The Right Tool for the Job
Apache Flink provides a first-class solution to this problem through temporal joins. A temporal join lets you join a fact stream (your transactions) against a versioned table (exchange rates) using the event timestamp of the fact record. Instead of joining against the latest version of the rate table, Flink looks up the rate that was valid as of the transaction’s timestamp.
Here’s how the data model works. You have two streams:
- Transactions stream - each event carries an amount, a source currency, a target currency, and an event timestamp.
- Exchange rates stream - each event carries a currency pair, a rate, and a timestamp marking when that rate became effective.
The rates stream is declared as a temporal table in Flink, which means Flink maintains a versioned history of each currency pair’s rate over time. When a transaction event arrives, Flink looks at the transaction’s event timestamp, finds the most recent rate for that currency pair as of that moment, and performs the join.
Flink SQL Implementation
Let’s walk through the SQL. First, define the transactions table:
CREATE TABLE transactions (
txn_id STRING,
amount DECIMAL(18, 4),
source_currency STRING,
target_currency STRING,
txn_time TIMESTAMP(3),
WATERMARK FOR txn_time AS txn_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
Next, define the exchange rates as a temporal table. The key detail here is the PRIMARY KEY and the WATERMARK - these tell Flink to version the table by currency pair over time:
CREATE TABLE exchange_rates (
currency_pair STRING,
rate DECIMAL(18, 8),
rate_time TIMESTAMP(3),
WATERMARK FOR rate_time AS rate_time - INTERVAL '10' SECOND,
PRIMARY KEY (currency_pair) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'exchange-rates',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
Now, the temporal join itself:
SELECT
t.txn_id,
t.amount AS original_amount,
t.source_currency,
t.target_currency,
r.rate,
t.amount * r.rate AS converted_amount,
t.txn_time
FROM transactions AS t
JOIN exchange_rates FOR SYSTEM_TIME AS OF t.txn_time AS r
ON CONCAT(t.source_currency, '/', t.target_currency) = r.currency_pair;
The FOR SYSTEM_TIME AS OF t.txn_time clause is what makes this a temporal join. Flink looks up r.rate as it was at t.txn_time, not at the current processing moment. Every transaction gets the historically correct rate without any custom windowing or state management code.
Watermarks and Late Data
The WATERMARK declarations are worth paying attention to. The 5-second watermark on transactions tells Flink to expect events to arrive within 5 seconds of their event time. The 10-second watermark on exchange rates gives a bit more slack, since rate feeds may have slightly higher propagation delay.
If a transaction arrives significantly late - say, 30 seconds after its event timestamp - and the watermark has already advanced past that point, Flink may drop it. You can handle this with Flink’s side output for late events, routing them to a separate topic for reprocessing. In practice, tuning watermarks involves balancing latency (how long you’re willing to wait) against completeness (how many late events you can tolerate missing).
Exchange Rate Sources
The accuracy of your conversions depends entirely on the quality and frequency of your rate data. There are three main categories of sources, each with different trade-offs.
Central Bank Reference Rates
The European Central Bank (ECB) publishes daily reference rates for about 30 currency pairs against the euro. These rates are free, well-documented, and widely accepted for regulatory reporting. The downside is that they update only once per business day, around 16:00 CET. For many analytical and reporting use cases, daily rates are sufficient. But if your business needs intraday accuracy, you will need something more frequent.
Commercial API Providers
Services like Open Exchange Rates, CurrencyLayer, and Fixer.io provide rates via REST APIs with update frequencies ranging from every hour to every minute on premium plans. These are a good middle ground between free central bank rates and enterprise-grade feeds.
To integrate an API source into a streaming pipeline, you typically run a small polling service that fetches rates on a schedule and publishes them to a Kafka topic. A simple approach:
import requests
import json
from kafka import KafkaProducer
from datetime import datetime, timezone
producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def fetch_and_publish():
resp = requests.get(
'https://openexchangerates.org/api/latest.json',
params={'app_id': 'YOUR_APP_ID'}
)
data = resp.json()
base = data['base'] # USD
timestamp = datetime.now(timezone.utc).isoformat()
for currency, rate in data['rates'].items():
producer.send('exchange-rates', value={
'currency_pair': f'{base}/{currency}',
'rate': rate,
'rate_time': timestamp
})
producer.flush()
Run this on a cron schedule matching your API plan’s update frequency.
Internal Treasury Systems
For financial-grade accuracy, most large organizations maintain their own exchange rate tables in an internal treasury or finance system. These rates may come from a Bloomberg terminal, Reuters feed, or a negotiated rate with a counterparty bank. The rates live in a database - Oracle, SQL Server, PostgreSQL - and get updated throughout the trading day.
This is where Change Data Capture (CDC) becomes valuable. Instead of polling the database on a schedule, you can capture every rate update as it happens and stream it directly into your pipeline. If you’re using Streamkap, you can set up a CDC connector against your treasury database and route rate changes to a Kafka topic in real time. Every time a trader updates a rate, that change flows into your exchange rates topic within seconds, keeping your temporal table current without any custom integration code.
Handling Edge Cases
Real-world currency conversion has several edge cases that will bite you if you don’t plan for them.
Weekends and Holidays
Major forex markets are closed from Friday 22:00 UTC through Sunday 22:00 UTC. During this window, no new rates are published. A transaction that occurs on Saturday (for example, an e-commerce purchase) should use the last known rate from Friday’s close.
With temporal joins, this works automatically. Since there is no newer rate event between Friday evening and Monday morning, the lookup for any Saturday or Sunday transaction returns the Friday rate. No special handling required.
The same applies to market holidays. If a particular currency pair doesn’t trade on a given day, the previous day’s rate carries forward.
Exotic Currency Pairs and Cross Rates
Most rate feeds publish rates against a single base currency, typically USD or EUR. If you need to convert between two non-base currencies - say, Brazilian real (BRL) to Japanese yen (JPY) - you won’t find a direct BRL/JPY rate in most feeds.
The standard approach is triangulation through the base currency:
SELECT
t.txn_id,
t.amount,
t.amount * (r_target.rate / r_source.rate) AS converted_amount
FROM transactions AS t
JOIN exchange_rates FOR SYSTEM_TIME AS OF t.txn_time AS r_source
ON CONCAT('USD/', t.source_currency) = r_source.currency_pair
JOIN exchange_rates FOR SYSTEM_TIME AS OF t.txn_time AS r_target
ON CONCAT('USD/', t.target_currency) = r_target.currency_pair;
Here, both the source and target currencies are converted through USD as the intermediary. The result is equivalent to a direct pair conversion, with a small caveat: you accumulate the spread from two conversions rather than one. For reporting purposes, this is standard practice and well-understood by finance teams.
Rate Feed Outages
Rate feeds go down. APIs hit rate limits. Network blips happen. When your exchange rates topic stops receiving updates for an extended period, your temporal table still holds the last known rates. Transactions will continue to be converted using stale rates, which is usually better than dropping them entirely.
However, you should monitor for this condition. A straightforward approach is to track the maximum rate_time in your rates stream and alert when it falls behind wall-clock time by more than your expected update interval. You can also add a rate_staleness column to your output that records how old the rate was at the time of conversion, so downstream consumers can flag transactions that used potentially stale data.
Rounding and Precision
Financial amounts should always use fixed-point decimal types, not floating-point. In Flink SQL, use DECIMAL(18, 4) for monetary amounts and DECIMAL(18, 8) for rates. This avoids the classic floating-point representation errors that produce values like 1084.1999999997 instead of 1084.20.
Most financial standards require rounding the final converted amount to the precision of the target currency (2 decimal places for USD, EUR, GBP; 0 for JPY; 3 for KWD). Apply rounding as the last step after multiplication.
Multi-Currency Aggregation Patterns
Once individual transactions are converted, you often need to aggregate them - total revenue by region, sum of payments by merchant, daily volume by currency pair. There are two common patterns.
Convert-Then-Aggregate
Convert each transaction to the target currency at the point-in-time rate, then aggregate the converted amounts. This is the most accurate approach because each transaction uses its own historically correct rate.
CREATE TABLE converted_transactions AS
SELECT
t.txn_id,
t.amount * r.rate AS amount_usd,
t.txn_time
FROM transactions AS t
JOIN exchange_rates FOR SYSTEM_TIME AS OF t.txn_time AS r
ON CONCAT(t.source_currency, '/USD') = r.currency_pair;
-- Then aggregate
SELECT
TUMBLE_START(txn_time, INTERVAL '1' HOUR) AS window_start,
SUM(amount_usd) AS total_revenue_usd
FROM converted_transactions
GROUP BY TUMBLE(txn_time, INTERVAL '1' HOUR);
Aggregate-Then-Convert
Aggregate amounts in their original currencies first, then convert the totals. This is simpler and uses fewer rate lookups, but the conversion of an aggregated total uses a single rate for the entire window rather than per-transaction rates. The difference is usually small for short windows (1 hour or less) but can diverge noticeably for daily aggregations during volatile market conditions.
For most production systems, convert-then-aggregate is the preferred pattern. The per-transaction conversion cost is negligible in Flink, and it produces results that are defensible in an audit.
Putting It All Together
A well-designed currency conversion pipeline has a few distinct layers: rate ingestion, temporal enrichment, and downstream aggregation. The rate ingestion layer captures rates from your chosen source - whether that is a CDC stream from an internal treasury database, a scheduled API poll, or a central bank feed - and publishes them to a Kafka topic. The enrichment layer, running in Flink, performs temporal joins to attach the correct rate to each transaction. The aggregation layer rolls up converted amounts into the metrics your business cares about.
Streamkap fits naturally into this architecture. With built-in CDC connectors for databases like PostgreSQL, MySQL, and SQL Server, you can stream treasury rate updates directly into Kafka without writing custom extraction code. From there, Flink handles the temporal join logic, and the converted results flow into your warehouse or analytics platform in real time.
The key takeaway is that currency conversion in streaming is not a math problem - it is a time problem. Get the temporal alignment right, and the rest follows.