Find

Announcement
· 11 hr ago

Leading PTFE Manufacturers in India for Quality Products


Looking for reliable ptfe manufacturers in India who deliver high-quality products? Goa Polymer is a trusted name known for its precision, durability, and customer-focused solutions. Whether you need PTFE sheets, rods, gaskets, or customized components, they offer products designed to meet industrial standards. Their advanced production processes ensure strong performance and long-lasting use. Choosing the right manufacturer matters when it comes to safety and efficiency, and Goa Polymer stands out with its commitment to quality and timely service. If you want dependable PTFE products for your business, this brand is a great option to explore.

Discussion (0)1
Log in or sign up to continue
Article
· 13 hr ago 10m read

使用 InterSystems IRIS 互操作性进行数据流处理

现代数据架构利用实时数据捕获、转换、移动和加载解决方案来构建数据湖、分析仓库和大数据存储库。它能够分析来自不同来源的数据,而不会影响使用这些数据的操作。要实现这一目标,必须建立连续、可扩展、弹性和稳健的数据流。最常用的方法是 CDC(变更数据捕获)技术。CDC 监控小型数据集的生产,自动捕获这些数据,并将其传送到一个或多个接收方,包括分析数据存储库。这样做的主要好处是消除了分析中的 D+1 延迟,因为数据一产生就会在源端被检测到,随后被复制到目的地。

本文将展示 CDC 场景中最常见的两种数据源,既可以是源数据源,也可以是目的地数据源。对于数据源(origin),我们将探讨 SQL 数据库和 CSV 文件中的 CDC。对于数据目的地,我们将使用列式数据库(典型的高性能分析数据库场景)和 Kafka 主题(将数据流传输到云和/或多个实时数据消费者的标准方法)。

概述

本文将为以下互操作场景提供一个示例:

  1. SQLCDCAdapter 将利用 SQLInboundAdapter 监听 SQL 数据库中的新记录,并在 JDBC 连接和 SQL 语言的帮助下提取这些记录。
  2. SQLCDCAdapter 会将捕获的数据封装到消息中,并将其发送到 CDCProcess(使用 BPL 符号的业务流程)。
  3. CDC 进程以消息形式接收 SQL 数据,并使用 SQL 操作将数据持久化到 IRIS,使用 Kafka 操作将捕获的数据传输到 Kafka 主题。
  4. SQLOperation 会将消息数据持久化到 InterSystems IRIS 持久类(以列式存储为模型)中。列式存储是一种可为分析数据提供卓越查询性能的选项。
  5. Kafka操作会将消息转换为JSON格式,并将其发送至Kafka主题,供云数据湖或任何其他订阅者使用。
  6. 这些数据流实时执行,建立了连续的数据流。
  7. BAM 服务将实时计算列式表中的业务指标。
  8. BI 仪表板将向用户即时显示由此得出的业务指标。

安装示例

iris-cdc-sample(https://openexchange.intersystems.com/package/iris-cdc-sample) 是一个示例应用程序,用于实现上述场景。要安装它,请执行以下步骤:

1.Clone/git 拉取该 repo 到任何本地目录:

$ git clone https://github.com/yurimarx/iris-cdc-sample.git

2.在此目录下打开终端,运行以下命令:

$ docker-compose build

3.使用您的项目运行 IRIS 容器:

$ docker-compose up -d


示例组件

本示例使用以下容器:

  • iris:InterSystems IRIS 平台,包括下一个:
    • IRIS 列式数据库(用于存储捕获的数据)。
    • IRIS 与生产环境的互操作性,用于执行 CDC(变更数据捕获)流程。生产环境从外部数据库(PostgreSQL)捕获数据,将其持久化到 IRIS 中,并将其传输到 Kafka 主题。
    • IRIS BAM(业务活动监控)按产品计算实时销售指标,并将其显示在仪表板上。
  • salesdb:PostgreSQL 数据库,包含实时捕获的销售数据。
  • zookeeper:用于管理 Kafka 代理的服务。
  • kafka:带有销售主题的 Kafka 代理,用于接收和分发作为实时事件的销售数据。
  • kafka-ui:用于管理和操作主题和事件的 Kafka Web 界面。
services:
  iris:
    build:
      context: .
      dockerfile: Dockerfile
    restart: always
    command: --check-caps false --ISCAgent false
    ports:
      - 1972
      - 52795:52773
      - 53773
    volumes:
      - ./:/home/irisowner/dev/
    networks:
      - cdc-network
  salesdb:
    image: postgres:14-alpine
    container_name: sales_db
    restart: always
    environment:
      POSTGRES_USER: sales_user
      POSTGRES_PASSWORD: welcome1
      POSTGRES_DB: sales_db
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
      - postgres_data:/var/lib/postgresql/data/
    ports:
      - "5433:5432"
    networks:
      - cdc-network


  zookeeper:
      image: confluentinc/cp-zookeeper:7.5.0
      container_name: zookeeper
      hostname: zookeeper
      networks:
        - cdc-network
      ports:
        - "2181:2181"
      environment:
        ZOOKEEPER_CLIENT_PORT: 2181
        ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    hostname: kafka
    networks:
      - cdc-network
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  kafka-ui:
      image: provectuslabs/kafka-ui:latest
      container_name: kafka-ui
      hostname: kafka-ui
      networks:
        - cdc-network
      ports:
        - "8080:8080"
      depends_on:
        - kafka
      environment:
        KAFKA_CLUSTERS_0_NAME: local_kafka_cluster
        KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
volumes:
  postgres_data:
    driver: local


networks:
  cdc-network:
    driver: bridge

创建列式表

列式表用于存储非规范化数据,如下所示:

产品名称

商店名称

销售价值(价格)

橙色

商店 1

120

橙色

1 号店

200

香蕉

2 号店

100

香蕉

1 号店

120

橙子

2 号店

110

由于 "产品名称 "和 "商店名称 "的值经常重复,因此以列格式(列而不是行)存储数据可以节省存储空间,并获得出色的数据检索性能。以往,这种类型的处理需要创建 BI 立方体。但是,列式存储解决了这一问题,无需将操作数据复制到立方体中。


现在,请按照以下步骤为我们的示例建立销售列式表:

1.在 dc.cdc 包内创建一个新的 ObjectScript 类 Sales。

2.编写以下源代码:

Class dc.cdc.Sales Extends %Persistent [ DdlAllowed, Final ]
{

Parameter STORAGEDEFAULT = "columnar";
Parameter USEEXTENTSET = 1;
Property ProductName As %String;
Property StoreName As %String;
Property SalesValue As %Double;
}

3.参数 STORAGEDEFAULT = "columnar" 确保 dc_cdc.Sales 表使用列式存储,而不是传统的行格式。

创建业务操作以保存捕获的数据

使用 SalesSqlService 将销售数据捕获到 StreamContainer(无需执行;配置在 "执行 CDC "部分的生产设置中完成)后,我们需要一个业务操作来处理 StreamContainer,从 PostgreSQL 中提取销售数据,并将其保存到销售表中。执行以下步骤:
1. 在 dc.cdc 包内创建类 SalesOperation。
2. 编写以下源代码:

Class dc.cdc.SalesOperation Extends Ens.BusinessOperation
{

Method ProcessSalesData(pRequest As Ens.StreamContainer, Output pResponse As Ens.StringResponse) As %Status
{
    Set tSC = $$$OK
    Set pResponse = ##class(Ens.StringResponse).%New()

    Try {
        
        Set tStream = pRequest.Stream
        
        Do tStream.Rewind()

        Set content = ""
        While 'tStream.AtEnd {
            Set content = content _ tStream.Read(4096) 
        }

        Set tDynamicObject = {}.%FromJSON(content)
        
        Set sales = ##class(dc.cdc.Sales).%New()
        Set sales.ProductName = tDynamicObject."product_name"
        Set sales.StoreName = tDynamicObject."store_name"
        Set sales.SalesValue = tDynamicObject."sales_value"
        Set tSC = sales.%Save()

        Set pResponse.StringValue = tDynamicObject.%ToJSON()
        
    } Catch (ex) {
        Set tSC = ex.AsStatus()
        Set pResponse.StringValue = "Error while saving sales data!"
        $$$LOGERROR("Error while saving sales data: " _ ex.DisplayString())
    }

    Quit tSC
}

XData MessageMap
{
<MapItems>
  <MapItem MessageType="Ens.StreamContainer">
    <Method>ProcessSalesData</Method>
  </MapItem>
</MapItems>
}

}

3.ProcessSalesData 方法将接收 StreamContainer 类型的消息(由于使用了 MessageMap 定义)。

4.该方法将把捕获的销售数据读入一个 JSON 字符串,把 JSON 加载到一个 DynamicObject 中,创建一个销售对象,设置其属性值,并把它保存到销售表中。

5.最后,该方法将返回代表响应中销售数据的 JSON 字符串。

 

创建 BAM 服务以监控销售情况

InterSystems IRIS for Interoperability 包含 BAM 功能,可让您使用分析仪表板监控生产中处理的实时业务数据。要创建 BAM 服务,请按以下步骤操作:
1.在 dc.cdc 包中创建一个名为 SalesMetric 的新类,扩展 Ens.BusinessMetric。
2.编写以下源代码:

Class dc.cdc.SalesMetric Extends Ens.BusinessMetric
{

Property TotalSales As Ens.DataType.Metric(UNITS = "$US") [ MultiDimensional ];
Query MetricInstances() As %SQLQuery
{
  SELECT distinct(ProductName) FROM dc_cdc.Sales
}

/// Calculate and update the set of metrics for this class
Method OnCalculateMetrics() As %Status
{
    Set product = ..%Instance
    Set SalesSum = 0.0
    &sql(
        select sum(SalesValue) into :SalesSum from dc_cdc.Sales where ProductName = :product 
    )

    Set ..TotalSales = SalesSum

    Quit $$$OK
}

}

3.通过 TotalSales 属性,可以实时监控各产品的销售总额。
4.查询 MetricInstances 定义了应监控的产品。
5.方法 OnCalculateMetrics 计算每个产品的销售额总和。
6.该类将用于仪表盘,以实时生成各产品的总销售额。

执行 CDC - 变更数据捕获流程和生产


下图是我们的最终生产图,其中包含所有必要的 ETL(提取、转换和加载)流程:

请按照以下步骤操作:

1.转到 CDC 生产http://localhost:52795/csp/user/EnsPortal.ProductionConfig.zen?PRODUCTIO...
2. 创建名为 Java 的新 EnsLib.JavaGateway.Service(SalesSqlService 需要它)。创建名为 Java 的新 EnsLib.JavaGateway.Service(SalesSqlService 需要它)。
3. 生成名为 SalesSqlService (SQLCDCService) 的业务服务并配置以下参数:

a.DSN (PostgreSQL 的连接字符串):jdbc:postgresql://sales_db:5432/sales_db。
b. 凭据:
c. 目标配置名称:
d. 查询(选择要消耗的数据):select * from sales。
e. 关键字段名(IRIS 用于跟踪已捕获行的列):id。
f. Java 网关服务(由于 CDC 适配器使用 JDBC,因此需要):Java (Java Gateway for this production).
g. JDBC 驱动程序:org.postgresql.Driver.
h. JDBC Classpath (一个驱动程序的路径)。JDBC Classpath(用于连接 PostgreSQL 的驱动程序,通过 Dockerfile 脚本复制):/home/irisowner/dev/postgresql-42.7.8.jar。

4.创建一个新的 dc.ccdc.SalesMetric,名为 SalesMetric。
5. 生成一个新的 EnsLib.Kafka.Operation,并将其命名为 SalesKafkaOperation(Kafka 操作),参数如下:

a.ClientID: iris
b. Servers: kafka:90服务器:kafka:9092

6.新建一个名为 SalesOperation.的 dc.cdc.SalesOperation

7. 开发一个名为 SalesProcess 的业务流程。BPL 实现逻辑如下:

a.最终图:

b. 创建两个上下文属性:

i.销售,类型为 Ens.StringResponse,用于将销售数据存储为 JSON 字符串。
ii.KafkaMessage 类型为 EnsLib.Kafka.Message(用于将捕获的数据分派到 Kafka 主题 sales-topic)。

c.生成一个调用,保存到销售表,并设置以下内容:

i.目标:销售操作
ii.请求信息类:Ens.StreamContainer(以流形式捕获数据)
iii.请求操作:

iv.响应信息类:Ens.StringResponse(数据流将转换为捕获数据的 JSON 字符串表示形式)

v.响应操作

d.构建代码块并编写 ObjectScript 代码,为 Kafka 消息填充必要的属性,以便将销售数据(作为 JSON 字符串)作为事件发布到 Kafka Broker 的 sales-topic 中:

Set context.KafkaMessage.topic = "sales-topic"
 Set context.KafkaMessage.value = context.Sales.StringValue 
 Set context.KafkaMessage.key = "iris"

e.设计发送到 Kafka 销售主题的调用和设计:

i.目标:SalesKafkaOperation
ii.请求信息类:%Library.Persistent(KafkaMessage 是持久的)
iii.请求操作:

f.创建名为 "发送响应 "的指定,内容如下:

i.属性:response.StringValue
ii.值:"处理完成!"

查看 CDC 结果

启用 CDCProduction 后,使用数据库管理工具(DBeaver 或 PgAdmin)在 PostgreSQL 销售表中完成一些记录,并观察生产信息的结果。

请参考序列图了解 CDC 流程(点击任何报文标题链接):

在分析仪表板中查看 BAM 监控

当您实时捕获数据时,自然希望立即在仪表板中看到结果。请按照以下步骤操作:
1. 转到分析 > 用户门户(
Analytics > User Portal)

单击“添加仪表盘”(Add Dashboar ):

3.设置以下属性并单击确定:

a.   文件夹:Ens/Analytics
b.仪表板名称:销售 BAM

4.单击“小工具(Widgets

单击加号按钮:

如下图所示配置 “小工具(Widgets”:

7 调整新部件,使其覆盖整个仪表板区域。

现在,选择 WidgetSales:

9. 选择 "控制(Controls)":

点击加号按钮

如下图所示配置控件(实时查看销售总额,并自动刷新):

现在,当采集到新值时,仪表盘将立即显示 TotalSales 的更新值。

了解更多信息:

InterSystems 文档可帮助您深入了解 CDC、BAM、Kafka 和互操作性产品。请访问以下页面了解更多信息:

  1. BAM:https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EGIN_options#EGIN_options_bam
  2. Kafka: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ITECHREF_kafka
  3. SQL 适配器(用于 SQL 表的 CDC): https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ESQL_intro
  4. 创建 ETL/CDC 产品: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=PAGE_interop_languages
  5. BPL(可视化低代码业务流程): https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EBPL_use
Discussion (0)1
Log in or sign up to continue
Digest
· 14 hr ago

InterSystems 开发者社区中文版:每周摘要(2025年11月17日-23日)

十一月 17 - 23, 2025Week at a GlanceInterSystems Developer Community
Article
· 18 hr ago 4m read

Connexion d'InterSystems IRIS à une base de données relationnelle via ODBC

InterSystems IRIS est une plateforme de données haute performance offrant une persistance native et la prise en charge de divers modèles de données (objet, document et relationnel). Cependant, dans de nombreux environnements d'entreprise, l'intégration avec les systèmes de gestion de bases de données relationnelles (RDBMS) existants est nécessaire. Le RDBMS que vous souhaitez interroger importe peu ; par souci de simplicité, je prendrai Microsoft Access comme exemple dans cet article. L'une des méthodes les plus faciles et standardisées pour établir cette connectivité consiste à utiliser ODBC. 

Puisque j'utilise Access comme exemple, les deux premières étapes seront effectuées automatiquement. Mais si vous utilisez un logiciel plus complexe que le bon vieux Access, vous devrez les configurer vous-même.

Étape 1 : Installation du pilote ODBC de la base de données externe

InterSystems IRIS est le client dans ce scénario. Il nécessite donc l’installation du pilote du DBMS cible sur le même ordinateur que celui où IRIS est exécuté. Téléchargement du pilote. Assurez-vous que la version et l’architecture (32 bits ou 64 bits) du pilote sont compatibles avec le système d’exploitation et l’architecture de votre serveur IRIS. Installez le pilote sur la machine hébergeant votre instance InterSystems IRIS. Après l’installation, vérifiez que le pilote est bien répertorié dans l’utilitaire Administrateur de sources de données ODBC sur la machine hôte IRIS.


Étape 2 : Configurer le nom de la source de données ODBC (DSN)

Pour simplifier la connexion à IRIS, il est possible de créer un DSN système pointant directement vers l’instance de DBMS externe. Pour cela, lancez l’Administrateur de sources de données ODBC sur le serveur hôte IRIS. Accédez à l’onglet DSN système (à privilégier par rapport au DSN utilisateur pour les applications serveur comme IRIS). Cliquez sur Ajouter. Sélectionnez le pilote ODBC installé à l’étape 1. Configurez le DSN en fournissant les informations nécessaires :

  • Nom de la source de données
  • Serveur/Hôte - l’adresse IP ou le nom d’hôte du DBMS externe.
  • Port
  • Nom de la base de données

Utilisez la fonction de test de connexion intégrée au pilote. En cas de succès, vous avez vérifié que le serveur IRIS peut accéder à la base de données externe via le pilote.

Étape 3 : Configurer SQL Gateway dans InterSystems IRIS

SQL Gateway est la fonctionnalité d’IRIS qui gère les connexions externes et rend les données accessibles via son propre moteur SQL. Pour le configurer accédez au Portail de gestion, allez dans Administration système → Configuration → Connectivité → Connexions passerelle SQL.

Créez une nouvelle connexion et  : cliquez sur « Créer une nouvelle connexion » et

  • sélectionnez ODBC comme Type de connexion
  • choisissez un nom qui sera utilisé en interne par IRIS
  • saisissez le nom du DSN système que vous avez créé à l’étape 2
  • saisissez les informations d’identification requises pour accéder au DBMS externe
  • enregistrez la connexion.

IRIS tentera d’établir la connexion et indiquera si la tentative a réussi ou échoué.

Étape 4 : Interroger les données externes

Une fois le SQL Gateway est créé, vous pouvez traiter les tables DBMS externes comme s’il s’agissait de tables locales dans IRIS, en utilisant le SQL standard d’IRIS. Vous pouvez désormais exécuter des requêtes directement sur les données DBMS externes :

select *
  from ACCESS_LINK.Person

Principaux avantages :

  • Requêtes unifiées - les développeurs peuvent utiliser une interface SQL unique (le moteur SQL d’IRIS) pour interroger à la fois les données natives d’IRIS et les données DBMS externes.
  • Virtualisation des données - les données restent dans la base de données externe ; IRIS récupère simplement les résultats via la connexion ODBC à la demande.
  • Interopérabilité - cette configuration permet au code IRIS (ObjectScript, Python ou composants de production) d’interagir facilement avec des sources de données externes existantes ou spécialisées.

En suivant ces étapes, vous pouvez intégrer InterSystems IRIS avec succès dans un environnement de données hétérogène, en tirant parti de la puissance d'ODBC pour une connectivité relationnelle fiable et standardisée.

Discussion (0)1
Log in or sign up to continue
Question
· 23 hr ago

Production Status Details

Hi,

I want to get the status of a Production (Running, Stopped), Queue length, Time the Production started

I did this:

>zn "test"
>set status=##class(Ens.Director).GetProductionSummary(.pinfo)
>write status
1
>zwrite pinfo
pinfo("PKG.FoundationProduction")=$lb("Stopped","","",0)
pinfo("Report.Print.EnsemblePrintService")=$lb("Stopped","2021-02-09 23:15:57.538","2021-02-09 23:16:15.264",0)
pinfo("TC.hmf.Production")=$lb("Stopped","","",0)
pinfo("TC.hmf.System.Production")=$lb("Running","2025-10-30 10:18:24.057","",1)
 

When I run the System Management Portal (Interoperability>Production Configuration)  shows that for the namespace "test" there is this production:

"TC.hmf.System.Production"

The questions I have:

1. I understand that there is only one production per namespace.
What are the remaining lines?
If there is only one production, "TC.hmf.System.Production", how can I get the name of the production programmatically and then print that line only?

2. Method "GetProductionSummary" is not documented
"Running",C,"",1
I understand that it means the production is
Running
Started "Running","2025-10-30 10:18:24.057"
Queue Length 1
Correct?
What is the 3rd parameter? ("")

Thanks for any help

1 new Comment
Discussion (1)1
Log in or sign up to continue