现代数据架构利用实时数据捕获、转换、移动和加载解决方案来构建数据湖、分析仓库和大数据存储库。它能够分析来自不同来源的数据,而不会影响使用这些数据的操作。要实现这一目标,必须建立连续、可扩展、弹性和稳健的数据流。最常用的方法是 CDC(变更数据捕获)技术。CDC 监控小型数据集的生产,自动捕获这些数据,并将其传送到一个或多个接收方,包括分析数据存储库。这样做的主要好处是消除了分析中的 D+1 延迟,因为数据一产生就会在源端被检测到,随后被复制到目的地。
本文将展示 CDC 场景中最常见的两种数据源,既可以是源数据源,也可以是目的地数据源。对于数据源(origin),我们将探讨 SQL 数据库和 CSV 文件中的 CDC。对于数据目的地,我们将使用列式数据库(典型的高性能分析数据库场景)和 Kafka 主题(将数据流传输到云和/或多个实时数据消费者的标准方法)。
概述
本文将为以下互操作场景提供一个示例:

- SQLCDCAdapter 将利用 SQLInboundAdapter 监听 SQL 数据库中的新记录,并在 JDBC 连接和 SQL 语言的帮助下提取这些记录。
- SQLCDCAdapter 会将捕获的数据封装到消息中,并将其发送到 CDCProcess(使用 BPL 符号的业务流程)。
- CDC 进程以消息形式接收 SQL 数据,并使用 SQL 操作将数据持久化到 IRIS,使用 Kafka 操作将捕获的数据传输到 Kafka 主题。
- SQLOperation 会将消息数据持久化到 InterSystems IRIS 持久类(以列式存储为模型)中。列式存储是一种可为分析数据提供卓越查询性能的选项。
- Kafka操作会将消息转换为JSON格式,并将其发送至Kafka主题,供云数据湖或任何其他订阅者使用。
- 这些数据流实时执行,建立了连续的数据流。
- BAM 服务将实时计算列式表中的业务指标。
- BI 仪表板将向用户即时显示由此得出的业务指标。
安装示例
iris-cdc-sample(https://openexchange.intersystems.com/package/iris-cdc-sample) 是一个示例应用程序,用于实现上述场景。要安装它,请执行以下步骤:
1.Clone/git 拉取该 repo 到任何本地目录:
$ git clone https://github.com/yurimarx/iris-cdc-sample.git
2.在此目录下打开终端,运行以下命令:
$ docker-compose build
3.使用您的项目运行 IRIS 容器:
$ docker-compose up -d
示例组件
本示例使用以下容器:
- iris:InterSystems IRIS 平台,包括下一个:
- IRIS 列式数据库(用于存储捕获的数据)。
- IRIS 与生产环境的互操作性,用于执行 CDC(变更数据捕获)流程。生产环境从外部数据库(PostgreSQL)捕获数据,将其持久化到 IRIS 中,并将其传输到 Kafka 主题。
- IRIS BAM(业务活动监控)按产品计算实时销售指标,并将其显示在仪表板上。
- salesdb:PostgreSQL 数据库,包含实时捕获的销售数据。
- zookeeper:用于管理 Kafka 代理的服务。
- kafka:带有销售主题的 Kafka 代理,用于接收和分发作为实时事件的销售数据。
- kafka-ui:用于管理和操作主题和事件的 Kafka Web 界面。
services:
iris:
build:
context: .
dockerfile: Dockerfile
restart: always
command: --check-caps false --ISCAgent false
ports:
- 1972
- 52795:52773
- 53773
volumes:
- ./:/home/irisowner/dev/
networks:
- cdc-network
salesdb:
image: postgres:14-alpine
container_name: sales_db
restart: always
environment:
POSTGRES_USER: sales_user
POSTGRES_PASSWORD: welcome1
POSTGRES_DB: sales_db
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
- postgres_data:/var/lib/postgresql/data/
ports:
- "5433:5432"
networks:
- cdc-network
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
hostname: zookeeper
networks:
- cdc-network
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
hostname: kafka
networks:
- cdc-network
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
hostname: kafka-ui
networks:
- cdc-network
ports:
- "8080:8080"
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local_kafka_cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
volumes:
postgres_data:
driver: local
networks:
cdc-network:
driver: bridge
创建列式表
列式表用于存储非规范化数据,如下所示:
由于 "产品名称 "和 "商店名称 "的值经常重复,因此以列格式(列而不是行)存储数据可以节省存储空间,并获得出色的数据检索性能。以往,这种类型的处理需要创建 BI 立方体。但是,列式存储解决了这一问题,无需将操作数据复制到立方体中。
现在,请按照以下步骤为我们的示例建立销售列式表:
1.在 dc.cdc 包内创建一个新的 ObjectScript 类 Sales。
2.编写以下源代码:
Class dc.cdc.Sales Extends %Persistent [ DdlAllowed, Final ]
{
Parameter STORAGEDEFAULT = "columnar"
Parameter USEEXTENTSET = 1
Property ProductName As %String
Property StoreName As %String
Property SalesValue As %Double
}
3.参数 STORAGEDEFAULT = "columnar" 确保 dc_cdc.Sales 表使用列式存储,而不是传统的行格式。
创建业务操作以保存捕获的数据
使用 SalesSqlService 将销售数据捕获到 StreamContainer(无需执行;配置在 "执行 CDC "部分的生产设置中完成)后,我们需要一个业务操作来处理 StreamContainer,从 PostgreSQL 中提取销售数据,并将其保存到销售表中。执行以下步骤:
1. 在 dc.cdc 包内创建类 SalesOperation。
2. 编写以下源代码:
Class dc.cdc.SalesOperation Extends Ens.BusinessOperation
{
Method ProcessSalesData(pRequest As Ens.StreamContainer, Output pResponse As Ens.StringResponse) As %Status
{
Set tSC = $$$OK
Set pResponse = ##class(Ens.StringResponse).%New()
Try {
Set tStream = pRequest.Stream
Do tStream.Rewind()
Set content = ""
While 'tStream.AtEnd {
Set content = content _ tStream.Read(4096)
}
Set tDynamicObject = {}.%FromJSON(content)
Set sales = ##class(dc.cdc.Sales).%New()
Set sales.ProductName = tDynamicObject."product_name"
Set sales.StoreName = tDynamicObject."store_name"
Set sales.SalesValue = tDynamicObject."sales_value"
Set tSC = sales.%Save()
Set pResponse.StringValue = tDynamicObject.%ToJSON()
} Catch (ex) {
Set tSC = ex.AsStatus()
Set pResponse.StringValue = "Error while saving sales data!"
$$$LOGERROR("Error while saving sales data: " _ ex.DisplayString())
}
Quit tSC
}
XData MessageMap
{
<MapItems>
<MapItem MessageType="Ens.StreamContainer">
<Method>ProcessSalesData</Method>
</MapItem>
</MapItems>
}
}
3.ProcessSalesData 方法将接收 StreamContainer 类型的消息(由于使用了 MessageMap 定义)。
4.该方法将把捕获的销售数据读入一个 JSON 字符串,把 JSON 加载到一个 DynamicObject 中,创建一个销售对象,设置其属性值,并把它保存到销售表中。
5.最后,该方法将返回代表响应中销售数据的 JSON 字符串。
创建 BAM 服务以监控销售情况
InterSystems IRIS for Interoperability 包含 BAM 功能,可让您使用分析仪表板监控生产中处理的实时业务数据。要创建 BAM 服务,请按以下步骤操作:
1.在 dc.cdc 包中创建一个名为 SalesMetric 的新类,扩展 Ens.BusinessMetric。
2.编写以下源代码:
Class dc.cdc.SalesMetric Extends Ens.BusinessMetric
{
Property TotalSales As Ens.DataType.Metric(UNITS = "$US") [ MultiDimensional ]
Query MetricInstances() As %SQLQuery
{
SELECT distinct(ProductName) FROM dc_cdc.Sales
}
Method OnCalculateMetrics() As %Status
{
Set product = ..%Instance
Set SalesSum = 0.0
&sql(
select sum(SalesValue) into :SalesSum from dc_cdc.Sales where ProductName = :product
)
Set ..TotalSales = SalesSum
Quit $$$OK
}
}
3.通过 TotalSales 属性,可以实时监控各产品的销售总额。
4.查询 MetricInstances 定义了应监控的产品。
5.方法 OnCalculateMetrics 计算每个产品的销售额总和。
6.该类将用于仪表盘,以实时生成各产品的总销售额。
执行 CDC - 变更数据捕获流程和生产
下图是我们的最终生产图,其中包含所有必要的 ETL(提取、转换和加载)流程:

请按照以下步骤操作:
1.转到 CDC 生产:http://localhost:52795/csp/user/EnsPortal.ProductionConfig.zen?PRODUCTIO...
2. 创建名为 Java 的新 EnsLib.JavaGateway.Service(SalesSqlService 需要它)。创建名为 Java 的新 EnsLib.JavaGateway.Service(SalesSqlService 需要它)。
3. 生成名为 SalesSqlService (SQLCDCService) 的业务服务并配置以下参数:
a.DSN (PostgreSQL 的连接字符串):jdbc:postgresql://sales_db:5432/sales_db。
b. 凭据:
c. 目标配置名称:
d. 查询(选择要消耗的数据):select * from sales。
e. 关键字段名(IRIS 用于跟踪已捕获行的列):id。
f. Java 网关服务(由于 CDC 适配器使用 JDBC,因此需要):Java (Java Gateway for this production).
g. JDBC 驱动程序:org.postgresql.Driver.
h. JDBC Classpath (一个驱动程序的路径)。JDBC Classpath(用于连接 PostgreSQL 的驱动程序,通过 Dockerfile 脚本复制):/home/irisowner/dev/postgresql-42.7.8.jar。
4.创建一个新的 dc.ccdc.SalesMetric,名为 SalesMetric。
5. 生成一个新的 EnsLib.Kafka.Operation,并将其命名为 SalesKafkaOperation(Kafka 操作),参数如下:
a.ClientID: iris
b. Servers: kafka:90服务器:kafka:9092
6.新建一个名为 SalesOperation.的 dc.cdc.SalesOperation
7. 开发一个名为 SalesProcess 的业务流程。BPL 实现逻辑如下:
a.最终图:

b. 创建两个上下文属性:
i.销售,类型为 Ens.StringResponse,用于将销售数据存储为 JSON 字符串。
ii.KafkaMessage 类型为 EnsLib.Kafka.Message(用于将捕获的数据分派到 Kafka 主题 sales-topic)。
c.生成一个调用,保存到销售表,并设置以下内容:
i.目标:销售操作
ii.请求信息类:Ens.StreamContainer(以流形式捕获数据)
iii.请求操作:

iv.响应信息类:Ens.StringResponse(数据流将转换为捕获数据的 JSON 字符串表示形式)
v.响应操作

d.构建代码块并编写 ObjectScript 代码,为 Kafka 消息填充必要的属性,以便将销售数据(作为 JSON 字符串)作为事件发布到 Kafka Broker 的 sales-topic 中:
Set context.KafkaMessage.topic = "sales-topic"
Set context.KafkaMessage.value = context.Sales.StringValue
Set context.KafkaMessage.key = "iris"
e.设计发送到 Kafka 销售主题的调用和设计:
i.目标:SalesKafkaOperation
ii.请求信息类:%Library.Persistent(KafkaMessage 是持久的)
iii.请求操作:

f.创建名为 "发送响应 "的指定,内容如下:
i.属性:response.StringValue
ii.值:"处理完成!"
查看 CDC 结果
启用 CDCProduction 后,使用数据库管理工具(DBeaver 或 PgAdmin)在 PostgreSQL 销售表中完成一些记录,并观察生产信息的结果。

请参考序列图了解 CDC 流程(点击任何报文标题链接):

在分析仪表板中查看 BAM 监控
当您实时捕获数据时,自然希望立即在仪表板中看到结果。请按照以下步骤操作:
1. 转到分析 > 用户门户(Analytics > User Portal):

单击“添加仪表盘”(Add Dashboar ):

3.设置以下属性并单击确定:
a. 文件夹:Ens/Analytics
b.仪表板名称:销售 BAM
4.单击“小工具(Widgets)”:

单击加号按钮:

如下图所示配置 “小工具(Widgets)”:
7 调整新部件,使其覆盖整个仪表板区域。

现在,选择 WidgetSales:

9. 选择 "控制(Controls)":
点击加号按钮
如下图所示配置控件(实时查看销售总额,并自动刷新):

现在,当采集到新值时,仪表盘将立即显示 TotalSales 的更新值。
了解更多信息:
InterSystems 文档可帮助您深入了解 CDC、BAM、Kafka 和互操作性产品。请访问以下页面了解更多信息:
- BAM:https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EGIN_options#EGIN_options_bam
- Kafka: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ITECHREF_kafka
- SQL 适配器(用于 SQL 表的 CDC): https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ESQL_intro
- 创建 ETL/CDC 产品: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=PAGE_interop_languages
- BPL(可视化低代码业务流程): https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EBPL_use