<--- Back to all resources

Engineering

February 25, 2026

12 min read

Building a Real-Time Customer 360 View with CDC and Apache Flink

Learn how to build a continuously updated Customer 360 profile using change data capture and Apache Flink. Covers multi-source CDC, identity resolution, profile aggregation, serving the unified view, and keeping it fresh.

TL;DR: - A Customer 360 built on CDC and Flink stays current with every database change rather than waiting for nightly batch refreshes. - Identity resolution in Flink uses keyed state to merge records from different source systems by matching on email, phone, or external IDs. - Profile aggregation combines orders, support tickets, product usage, and behavioral data into a single unified record per customer. - Streamkap handles the CDC ingestion from multiple databases and delivers normalized events to Kafka, removing the need to manage Debezium or Kafka Connect yourself.

The idea behind Customer 360 is simple: take all the data you have about a customer, scattered across dozens of systems, and bring it together into a single, unified profile. The profile should include everything relevant. Their account information from the CRM. Their purchase history from the order management system. Their support tickets from the helpdesk. Their product usage from the application database. Their payment history from the billing system.

The hard part has never been the idea. It is the execution. Most Customer 360 implementations are batch ETL jobs that run nightly, pulling data from each source, matching records, resolving conflicts, and loading the result into a warehouse or a dedicated profile store. By the time the batch finishes, the data is already hours old. A customer calls support and the agent sees yesterday’s view. A marketing campaign targets users based on stale engagement data. A risk model evaluates a customer with outdated transaction history.

Building a Customer 360 on change data capture and stream processing fixes this. Instead of periodically pulling snapshots, you capture every change as it happens and update the unified profile in real time. The profile is always current. When a customer places an order, the profile reflects it within seconds, not hours.

This article covers how to build that system using Streamkap for CDC ingestion and Apache Flink for identity resolution, profile aggregation, and serving.

The Source Data Problem

A typical company has customer data spread across at least five to ten different systems. Each system has its own schema, its own identifier for the customer, and its own update cadence.

Here is a realistic example:

SystemCustomer IDKey Data
CRM (Salesforce/HubSpot)crm_contact_idName, email, company, lifecycle stage
Billing (Stripe)stripe_customer_idPayment method, subscription tier, invoices
Order Management (PostgreSQL)user_idOrders, shipping addresses, order status
Support (Zendesk)requester_emailTickets, satisfaction scores, last contact
Product (PostgreSQL)account_idFeature usage, last login, plan limits
Marketing (HubSpot)emailCampaign engagement, lead score

Notice the ID problem. The CRM uses one ID, billing uses another, the product database uses a third, and the support system uses email as the identifier. Before you can build a unified profile, you have to figure out which records across these systems belong to the same person. This is identity resolution, and it is the core technical challenge of any Customer 360 project.

CDC Ingestion with Streamkap

The first step is getting change events out of each source system and into Kafka. This is where Streamkap fits in.

For each database that holds customer data, you configure a Streamkap source connector. Streamkap reads the database’s transaction log (the WAL for PostgreSQL, the binlog for MySQL, the oplog for MongoDB) and produces a stream of change events to a Kafka topic. Each event represents a single row-level change: an insert, update, or delete.

For SaaS systems like Salesforce, Zendesk, or Stripe that do not expose a transaction log, you can use API-based connectors or webhook ingestion to get changes into Kafka.

The result is a set of Kafka topics, each carrying a stream of changes from one source system:

orders.public.orders         -> order change events
orders.public.customers      -> customer record changes
billing.stripe.customers     -> Stripe customer updates
billing.stripe.subscriptions -> subscription changes
support.zendesk.tickets      -> support ticket events
product.public.accounts      -> account usage updates

Streamkap handles schema management for these topics. When a source table schema changes (a new column is added, a column type is altered), Streamkap manages the schema evolution so the Flink job does not break. This is a meaningful operational benefit. In a self-managed Debezium setup, schema changes are one of the most common causes of pipeline failures.

With change events flowing from all sources into Kafka, the next step is to match records that belong to the same customer. This is identity resolution.

The simplest form of identity resolution uses a shared key that exists across systems. If every system stores the customer’s email address, you can key the Flink stream by email and merge records on that basis. In practice, it is rarely that clean. Some systems have email, others have phone numbers, others have external IDs from another system.

A practical approach is to maintain a mapping table that links identifiers across systems. This mapping might already exist in your CRM or a master data management system. If so, you capture changes to it via CDC just like any other source.

Here is how identity resolution works in Flink:

public class IdentityResolver
    extends KeyedProcessFunction<String, SourceEvent, ResolvedEvent> {

    // Maps source-specific IDs to a canonical customer ID
    private MapState<String, String> idMappings;

    @Override
    public void open(Configuration parameters) {
        idMappings = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("id-mappings", String.class, String.class));
    }

    @Override
    public void processElement(SourceEvent event, Context ctx,
                               Collector<ResolvedEvent> out) throws Exception {
        String sourceKey = event.sourceSystem + ":" + event.sourceId;
        String canonicalId = idMappings.get(sourceKey);

        if (canonicalId == null) {
            // Try to resolve using email as a fallback
            canonicalId = idMappings.get("email:" + event.email);
        }

        if (canonicalId == null) {
            // New customer, generate a canonical ID
            canonicalId = generateCanonicalId(event);
            // Store all known identifiers for this customer
            idMappings.put(sourceKey, canonicalId);
            if (event.email != null) {
                idMappings.put("email:" + event.email, canonicalId);
            }
            if (event.phone != null) {
                idMappings.put("phone:" + event.phone, canonicalId);
            }
        }

        out.collect(new ResolvedEvent(canonicalId, event));
    }
}

The stream is keyed by email (or another common identifier). The idMappings state accumulates all the different identifiers for each customer over time. When an event arrives from a new source system, the resolver checks whether any of the event’s identifiers (source ID, email, phone) are already mapped to a canonical ID. If so, the event is tagged with that canonical ID. If not, a new canonical customer is created.

This approach handles the common cases. For more advanced identity resolution involving fuzzy matching (name similarity, address normalization), you would either pre-process the data in a separate Flink job or use an external identity resolution service and call it via async I/O.

Handling Merge Events

Sometimes you discover that two canonical IDs actually represent the same person. A customer used different email addresses in different systems, and you only discover the connection when they link their accounts. In this case, you need to merge two profiles.

Flink can handle this by processing merge events that come from either the application or a manual data quality process:

if (event.type.equals("IDENTITY_MERGE")) {
    String keepId = event.data.getString("keepCanonicalId");
    String mergeId = event.data.getString("mergeCanonicalId");

    // Re-point all identifiers from mergeId to keepId
    for (Map.Entry<String, String> entry : idMappings.entries()) {
        if (entry.getValue().equals(mergeId)) {
            idMappings.put(entry.getKey(), keepId);
        }
    }

    // Emit a merge event so downstream aggregation can combine the profiles
    out.collect(new ResolvedEvent(keepId, createMergeEvent(keepId, mergeId)));
}

This re-maps all identifiers from the obsolete canonical ID to the surviving one and emits a merge event that the downstream profile aggregation will handle.

Profile Aggregation

After identity resolution, every event carries a canonical customer ID. The next step is to aggregate these events into a single, unified profile per customer.

The profile is a structured document that combines data from all sources:

public class CustomerProfile {
    public String canonicalId;
    public String name;
    public String email;
    public String phone;
    public String company;
    public String lifecycleStage;

    // Billing
    public String subscriptionTier;
    public String paymentStatus;
    public double lifetimeValue;

    // Orders
    public int totalOrders;
    public double totalSpend;
    public long lastOrderTimestamp;

    // Support
    public int openTickets;
    public int totalTickets;
    public double avgSatisfactionScore;
    public long lastSupportContact;

    // Product usage
    public long lastLoginTimestamp;
    public Map<String, Integer> featureUsageCounts;

    // Metadata
    public long lastUpdated;
    public Set<String> dataSources;
}

The aggregation function maintains this profile in Flink’s keyed state and updates the relevant fields when events arrive from each source:

public class ProfileAggregator
    extends KeyedProcessFunction<String, ResolvedEvent, CustomerProfile> {

    private ValueState<CustomerProfile> profileState;

    @Override
    public void open(Configuration parameters) {
        profileState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("profile", CustomerProfile.class));
    }

    @Override
    public void processElement(ResolvedEvent event, Context ctx,
                               Collector<CustomerProfile> out) throws Exception {
        CustomerProfile profile = profileState.value();
        if (profile == null) {
            profile = new CustomerProfile();
            profile.canonicalId = event.canonicalId;
            profile.dataSources = new HashSet<>();
        }

        applyEvent(profile, event);
        profile.lastUpdated = ctx.timestamp();
        profile.dataSources.add(event.sourceEvent.sourceSystem);

        profileState.update(profile);
        out.collect(profile);
    }

    private void applyEvent(CustomerProfile profile, ResolvedEvent event) {
        SourceEvent source = event.sourceEvent;
        switch (source.sourceSystem) {
            case "crm":
                applyCrmUpdate(profile, source);
                break;
            case "billing":
                applyBillingUpdate(profile, source);
                break;
            case "orders":
                applyOrderUpdate(profile, source);
                break;
            case "support":
                applySupportUpdate(profile, source);
                break;
            case "product":
                applyProductUpdate(profile, source);
                break;
        }
    }
}

Each apply*Update method knows which profile fields to update based on the source event. For example:

private void applyOrderUpdate(CustomerProfile profile, SourceEvent source) {
    if (source.operation.equals("INSERT")) {
        profile.totalOrders++;
        profile.totalSpend += source.data.getDouble("total_amount");
        profile.lastOrderTimestamp = source.eventTimestamp;
        profile.lifetimeValue += source.data.getDouble("total_amount");
    }
}

private void applySupportUpdate(CustomerProfile profile, SourceEvent source) {
    if (source.operation.equals("INSERT")) {
        profile.totalTickets++;
        profile.openTickets++;
    }
    if (source.data.has("status") && source.data.getString("status").equals("closed")) {
        profile.openTickets = Math.max(0, profile.openTickets - 1);
        if (source.data.has("satisfaction_score")) {
            // Running average
            double current = profile.avgSatisfactionScore;
            int count = profile.totalTickets;
            double newScore = source.data.getDouble("satisfaction_score");
            profile.avgSatisfactionScore = current + (newScore - current) / count;
        }
    }
    profile.lastSupportContact = source.eventTimestamp;
}

Conflict Resolution

When multiple sources provide the same field (e.g., both CRM and billing have the customer’s email), you need a strategy for which value wins.

Common strategies:

Timestamp-based. The most recently updated value wins. This works when all sources include reliable timestamps.

Source priority. You assign a priority to each source for each field. For contact information, the CRM might be authoritative. For payment details, the billing system is authoritative. This is the most common approach:

private void applyCrmUpdate(CustomerProfile profile, SourceEvent source) {
    // CRM is authoritative for name, email, company, lifecycle stage
    if (source.data.has("name")) {
        profile.name = source.data.getString("name");
    }
    if (source.data.has("email")) {
        profile.email = source.data.getString("email");
    }
    if (source.data.has("company")) {
        profile.company = source.data.getString("company");
    }
    if (source.data.has("lifecycle_stage")) {
        profile.lifecycleStage = source.data.getString("lifecycle_stage");
    }
}

Field-level rules. Different sources own different fields. The CRM owns name and company. The billing system owns subscriptionTier and paymentStatus. This prevents conflicts entirely because no two sources write to the same field.

Serving the Unified Profile

The aggregated profile needs to be accessible to the systems that consume it: customer support tools, marketing platforms, personalization engines, risk models, and internal dashboards.

Flink writes the updated profile to a serving layer every time it changes. The choice of serving layer depends on your access patterns:

Low-latency key-value lookups (a support agent pulling up a customer profile): Write to Redis, DynamoDB, or another key-value store.

// Sink to Redis via Kafka topic and a Redis consumer
profileStream.sinkTo(
    KafkaSink.<CustomerProfile>builder()
        .setBootstrapServers("kafka-broker:9092")
        .setRecordSerializer(new ProfileSerializer("customer-profiles"))
        .build()
);

A separate consumer reads from the customer-profiles topic and writes to Redis. You can also use Flink’s Redis sink connector directly if your setup supports it.

Analytical queries (dashboards, reporting, segmentation): Write to a data warehouse like Snowflake, BigQuery, or ClickHouse. Streamkap can handle the Kafka-to-warehouse delivery, closing the loop from CDC source to unified profile in the warehouse.

Event-driven consumers (triggering workflows when a profile changes): Keep the profile updates in Kafka. Downstream services subscribe and react. For example, a marketing automation service might trigger an onboarding campaign when a profile’s lifecycleStage changes from “lead” to “customer.”

In practice, you often write to multiple serving layers simultaneously. The Kafka topic acts as the central distribution point, and different consumers handle different destinations.

Keeping It Fresh

The “real-time” in a real-time Customer 360 requires that the pipeline stays healthy and up to date. Here is what to watch:

Source lag. Monitor the lag between when a change occurs in the source database and when it appears in Kafka. Streamkap provides visibility into CDC lag per source connector. If lag increases, it might mean the source database is under heavy load or a connector needs attention.

Flink consumer lag. Monitor the offset lag between what Kafka has received and what Flink has processed. If Flink falls behind, you need to increase parallelism or investigate a slow operator.

Profile freshness. Track the lastUpdated timestamp on profiles. If a profile’s lastUpdated is more than a few seconds behind current time for an active customer, something in the pipeline is delayed.

State size. The profile state grows with the number of customers and the amount of data per profile. Monitor Flink’s state size metrics and checkpoint durations. If checkpoints are taking too long, consider trimming historical data from the profile (moving detailed history to the warehouse and keeping only summaries in the real-time profile).

Backfilling Historical Data

When you first deploy the Customer 360 pipeline or add a new data source, you need to load the existing data, not just future changes. This is the backfill problem.

Streamkap handles this with snapshot mode on CDC connectors. When you create a new source connector, Streamkap first reads the current state of all rows in the configured tables and produces them to Kafka as insert events. After the snapshot completes, it switches to streaming mode and captures only incremental changes going forward.

From Flink’s perspective, the backfill events look identical to normal CDC events. The identity resolution and profile aggregation logic processes them the same way. The only difference is the volume: the initial backfill produces a burst of events that settles down once the snapshot is complete.

You should size your Flink job’s parallelism to handle the backfill throughput, which is typically much higher than steady-state throughput. After the backfill completes, you can scale down if needed. With Streamkap’s managed Flink, this scaling is a configuration change rather than a provisioning project.

Schema Evolution and Maintenance

Source databases change. New columns are added, old ones renamed, data types adjusted. In a batch ETL world, you discover these changes when the nightly job fails. In a CDC-based pipeline, you discover them when the event schema changes.

Streamkap manages schema evolution at the CDC layer. When a source table adds a column, Streamkap updates the schema in the schema registry and includes the new field in subsequent events. Older events without the field are still valid.

In the Flink job, use defensive deserialization. If you are working with JSON, the @JsonIgnoreProperties(ignoreUnknown = true) annotation on your POJOs ensures that new fields do not cause deserialization errors. When you want to incorporate the new field into the profile, you update the Flink job’s code and redeploy. Flink restores from the last checkpoint and continues processing with the updated logic.

For schema changes that remove or rename columns, the Flink job needs to handle the absence of the field gracefully. Null checks and default values are sufficient for most cases.

The Full Pipeline

Here is the end-to-end architecture of a real-time Customer 360:

  1. Source databases (PostgreSQL, MySQL, MongoDB, SaaS APIs) hold customer data.
  2. Streamkap CDC connectors capture every change and produce events to Kafka topics, one per source table. Schema evolution is managed automatically.
  3. Kafka acts as the durable event bus between ingestion and processing.
  4. Flink identity resolution matches records across sources and assigns a canonical customer ID.
  5. Flink profile aggregation maintains a unified profile per customer in keyed state, updating it with every incoming event.
  6. Flink output writes the updated profile to Kafka.
  7. Serving consumers read from Kafka and write to Redis (for real-time lookups), Snowflake (for analytics), and other downstream systems.

Each layer is independently scalable. Adding a new data source means adding a new Streamkap connector and a new input branch in the Flink job. Adding a new consumer means subscribing to the profile Kafka topic. The core identity resolution and aggregation logic does not change when you add sources or consumers.

Batch vs. Streaming Customer 360

The batch approach to Customer 360 is well understood. Run an ETL job nightly that queries all source systems, performs matching and deduplication, and loads the result into a warehouse table. This works if your use cases tolerate stale data.

The streaming approach with CDC and Flink is better when:

  • Support agents need to see the latest customer state during a live conversation.
  • Risk models need to evaluate customers based on their most recent transactions.
  • Personalization engines need to react to behavior changes within minutes.
  • Compliance requires an audit trail of every change to the customer record with timestamps.

The operational cost of running the streaming pipeline is where teams used to hesitate. Managing Debezium connectors, Kafka Connect, Kafka clusters, and Flink clusters is a lot of infrastructure. Streamkap reduces that surface area significantly by managing the CDC, Kafka, and Flink layers as a single platform. You define the sources, write the Flink logic, and configure the destinations. The infrastructure runs without your intervention.

A real-time Customer 360 is one of the highest-value applications of CDC and stream processing. It touches every part of the business that interacts with customers. And with Streamkap handling the data plumbing, your team can focus on the matching rules, aggregation logic, and serving patterns that are specific to your business rather than on keeping connectors and clusters running.