Modern data architectures utilize real-time data capture, transformation, movement, and loading solutions to build data lakes, analytical warehouses, and big data repositories. It enables the analysis of data from various sources without impacting the operations that use them. To achieve this, establishing a continuous, scalable, elastic, and robust data flow is essential. The most prevalent method for that is through the CDC (Change Data Capture) technique. CDC monitors for small data set production, automatically captures this data, and delivers it to one or more recipients, including analytical data repositories. The major benefit is the elimination of the D+1 delay in analysis, as data is detected at the source as soon as it is produced, and later is replicated to the destination.
This article will demonstrate the two most common data sources for CDC scenarios, both as a source and a destination. For the data source (origin), we will explore the CDC in SQL databases and CSV files. For the data destination, we will use a columnar database (a typical high-performance analytical database scenario) and a Kafka topic (a standard approach for streaming data to the cloud and/or to multiple real-time data consumers).
Overview
This article will provide a sample for the following interoperability scenario:
- The SQLCDCAdapter will utilize the SQLInboundAdapter to listen for new records in the SQL Database and extract them with the help of a JDBC connection and the SQL language.
- The SQLCDCAdapter will encapsulate the captured data within a message and dispatch it to the CDCProcess (a Business Process using BPL notation).
- The CDC Process receives the SQL data as a message and employs the SQL Operation to persist the data into IRIS and the Kafka Operation to transmit the captured data to a Kafka topic.
- The SQLOperation will persist the message data into an InterSystems IRIS Persistent Class modeled as columnar storage. Columnar storage is an option that offers superior query performance for analytical data.
- The Kafka Operation will transform the message to JSON and send it to a Kafka Topic, where a cloud data lake or any other subscriber can consume it.
- These data flows execute in real time, establishing a continuous data flow
- The BAM Service will calculate business metrics from the columnar table in real time.
- A BI Dashboard will display the resulting business metrics to the user instantaneously.
Installing the Sample
The iris-cdc-sample (https://openexchange.intersystems.com/package/iris-cdc-sample) is a sample application that implements the scenario described above. To install it, proceed with these steps:
1. Clone/git pull the repo into any local directory:
$ git clone https://github.com/yurimarx/iris-cdc-sample.git
2. Open the terminal in this directory and run the command below:
$ docker-compose build
3. Run the IRIS container with your project:
$ docker-compose up -d
Sample Components
This sample uses the following containers:
- iris: InterSystems IRIS platform, including the next:
- IRIS Columnar Database (to store the captured data).
- IRIS Interoperability with a production environment to execute the CDC (change data capture) process. The production captures data from an external database (PostgreSQL), persists it into IRIS, and additionally transmits it to a Kafka topic.
- IRIS BAM (Business Activity Monitoring) to calculate real-time sales metrics by product and display them in a dashboard.
- salesdb: A PostgreSQL database containing sales data to be captured in real time.
- zookeeper: A service used to manage the Kafka broker.
- kafka: The Kafka broker with the sales topic, utilized to receive and distribute sales data as real-time events.
- kafka-ui: A Kafka Web interface for the administration and operation of topics and events.
services:
iris:
build:
context: .
dockerfile: Dockerfile
restart: always
command: --check-caps false --ISCAgent false
ports:
- 1972
- 52795:52773
- 53773
volumes:
- ./:/home/irisowner/dev/
networks:
- cdc-network
salesdb:
image: postgres:14-alpine
container_name: sales_db
restart: always
environment:
POSTGRES_USER: sales_user
POSTGRES_PASSWORD: welcome1
POSTGRES_DB: sales_db
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
- postgres_data:/var/lib/postgresql/data/
ports:
- "5433:5432"
networks:
- cdc-network
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
hostname: zookeeper
networks:
- cdc-network
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
hostname: kafka
networks:
- cdc-network
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
hostname: kafka-ui
networks:
- cdc-network
ports:
- "8080:8080"
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local_kafka_cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
volumes:
postgres_data:
driver: local
networks:
cdc-network:
driver: bridge
Creating the Columnar Table
Columnar tables are employed to store non-normalized data like the following:
Since Product Name and Store Name values are frequently repeated, storing the data in a columnar format (as columns rather than rows) conserves storage space and yields superior data retrieval performance. Historically, this type of processing required creating BI cubes. However, columnar storage resolves this issue, eliminating the need to replicate operational data into cubes.
Now, follow these steps to establish the Sales columnar table for our sample:
1. Create a new ObjectScript class Sales within the dc.cdc package.
2. Write the following source code:
Class dc.cdc.Sales Extends %Persistent [ DdlAllowed, Final ]
{
Parameter STORAGEDEFAULT = "columnar"
Parameter USEEXTENTSET = 1
Property ProductName As %String
Property StoreName As %String
Property SalesValue As %Double
}
3. The parameter STORAGEDEFAULT = "columnar" ensures the dc_cdc.Sales table uses columnar storage, rather than the traditional row format.
Creating the Business Operation to Save Captured Data
After capturing the sales data into a StreamContainer using the SalesSqlService (no implementation required; configuration is done in the production setup under the "Doing CDC" section), we need a Business Operation to process the StreamContainer, extract the sales data from PostgreSQL, and save it into the Sales table. Perform the steps below:
1. Create the class SalesOperation within the dc.cdc package.
2. Write the source code below:
Class dc.cdc.SalesOperation Extends Ens.BusinessOperation
{
Method ProcessSalesData(pRequest As Ens.StreamContainer, Output pResponse As Ens.StringResponse) As %Status
{
Set tSC = $$$OK
Set pResponse = ##class(Ens.StringResponse).%New()
Try {
Set tStream = pRequest.Stream
Do tStream.Rewind()
Set content = ""
While 'tStream.AtEnd {
Set content = content _ tStream.Read(4096)
}
Set tDynamicObject = {}.%FromJSON(content)
Set sales = ##class(dc.cdc.Sales).%New()
Set sales.ProductName = tDynamicObject."product_name"
Set sales.StoreName = tDynamicObject."store_name"
Set sales.SalesValue = tDynamicObject."sales_value"
Set tSC = sales.%Save()
Set pResponse.StringValue = tDynamicObject.%ToJSON()
} Catch (ex) {
Set tSC = ex.AsStatus()
Set pResponse.StringValue = "Error while saving sales data!"
$$$LOGERROR("Error while saving sales data: " _ ex.DisplayString())
}
Quit tSC
}
XData MessageMap
{
<MapItems>
<MapItem MessageType="Ens.StreamContainer">
<Method>ProcessSalesData</Method>
</MapItem>
</MapItems>
}
}
3. The ProcessSalesData method will receive messages of the StreamContainer type (due to the MessageMap definition).
4. The method will read the sales data captured into a JSON string, load the JSON into a DynamicObject, create a Sales object, set its property values, and save it into the Sales table.
5. Finally, the method will return the JSON string representing the sales data within the response.
Creating BAM Service to Monitor Sales
InterSystems IRIS for Interoperability includes BAM capability, enabling you to monitor real-time business data processed in production using an Analytics Dashboard. To create the BAM service, follow the steps below:
1. Create a new class called SalesMetric extending Ens.BusinessMetricin the dc.cdc package.
2. Write the following source code:
Class dc.cdc.SalesMetric Extends Ens.BusinessMetric
{
Property TotalSales As Ens.DataType.Metric(UNITS = "$US") [ MultiDimensional ]
Query MetricInstances() As %SQLQuery
{
SELECT distinct(ProductName) FROM dc_cdc.Sales
}
Method OnCalculateMetrics() As %Status
{
Set product = ..%Instance
Set SalesSum = 0.0
&sql(
select sum(SalesValue) into :SalesSum from dc_cdc.Sales where ProductName = :product
)
Set ..TotalSales = SalesSum
Quit $$$OK
}
}
3. The TotalSales property enables real-time monitoring of the sum of sales by Product.
4. The Query MetricInstances defines which products should be monitored.
5. The Method OnCalculateMetrics computes the sum of sales for each product.
6. This class will be utilized in a Dashboard to generate total sales by product in real time.
Executing the CDC - Change Data Capture Process and Production
Our final production diagram with all the required ETL (Extract, Transform, and Load) processes is shown below:
Follow the next steps:
1. Go to the CDC Production: http://localhost:52795/csp/user/EnsPortal.ProductionConfig.zen?PRODUCTION=dc.cdc.CDCProduction
2. Create a new EnsLib.JavaGateway.Service named Java (it is required for SalesSqlService).
3. Generate a Business Service called SalesSqlService (SQLCDCService) and configure the following parameters:
a. DSN (connection string for PostgreSQL): jdbc:postgresql://sales_db:5432/sales_db.
b. Credentials: Create a pg_cred with the username (sales_user) and password (welcome1) to access the PostgreSQL.
c. Target Config Names: SalesProcess (the CDC Process).
d. Query (to select the data to be consumed): select * from sales.
e. Key Field Name (the column IRIS uses to track already captured rows): id.
f. Java Gateway Service (required because the CDC adapter uses JDBC): Java (Java Gateway for this production).
g. JDBC Driver: org.postgresql.Driver.
h. JDBC Classpath (a driver to connect with PostgreSQL, copied via the Dockerfile script): /home/irisowner/dev/postgresql-42.7.8.jar.
4. Create a new dc.cdc.SalesMetric called SalesMetric.
5. Generate a new EnsLib.Kafka.Operation and name it SalesKafkaOperation (Kafka Operation) with the following parameters:
a. ClientID: iris
b. Servers: kafka:9092
6. Build a new dc.cdc.SalesOperation called SalesOperation.
7. Develop a Business Process named SalesProcess. The BPL implementation logic should be as follows:
a. Final diagram:
b. Create two Context properties:
i. Sales with type Ens.StringResponse to store the sales data as a JSON String.
ii. KafkaMessage with type EnsLib.Kafka.Message(to be used to dispatch the captured data to the Kafka topic sales-topic).
c. Generate a call, Save to Sales Table, and set the following:
i. Target: SalesOperation
ii. Request Message Class: Ens.StreamContainer (data captured as stream)
iii. Request Actions:
iv. Response Message Class: Ens.StringResponse (the stream will be converted to a JSON String representation of the captured data)
v. Response Actions:
d. Build a Code block and write ObjectScript code that will populate the Kafka Message with the necessary properties for the sales data (as a JSON string) to be published as an event into sales-topic on the Kafka Broker:
Set context.KafkaMessage.topic = "sales-topic"
Set context.KafkaMessage.value = context.Sales.StringValue
Set context.KafkaMessage.key = "iris"
e. Design a call Send To Kafka Sales Topic and Design:
i. Target: SalesKafkaOperation
ii. Request Message Class: %Library.Persistent (KafkaMessage is Persistent)
iii. Request Actions:
f. Create an Assign named Send Response with the following:
i. Property: response.StringValue
ii. Value: "Process finished!"
Seeing the CDC Results
After enabling the CDCProduction, complete some records in the PostgreSQL sales table using your database admin tool (DBeaver or PgAdmin) and observe the production messages results.
Consult the sequence diagram to understand the CDC process (click into any message header link):
Seeing BAM Monitoring in an Analytics Dashboard
When you capture data in real time, you naturally want to see the results in a dashboard instantly. Follow the steps below to achieve it:
1. Go to Analytics > User Portal:
2. Click Add Dashboard:
3. Set the properties below and click Ok:
a. Folder: Ens/Analytics
b. Dashboard Name: Sales BAM
4. Click Widgets:
5. Click the plus button:
6. Configure the Widget as shown below:
7. Adjust the new widget to cover the entire dashboard area.

8. Now, select the WidgetSales:

9. Choose Controls:
10. Click the plus button:
11. Configure the control as illustrated below (to see the total sales in real time, with automatic refreshing):

12. Now, when new values are captured, the dashboard will display the updated values for TotalSales immediately.
To Learn More:
The InterSystems documentation can help you delve deeper into CDC, BAM, Kafka, and Interoperability productions. Visit the pages below to discover more:
- BAM: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EGIN_options#EGIN_options_bam
- Kafka: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ITECHREF_kafka
- SQL Adapters (CDC for SQL tables): https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ESQL_intro
- Creating ETL/CDC productions: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=PAGE_interop_languages
- BPL (visual low code business process): https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EBPL_use