Article
· May 5 10m read

How to produce and consume RabbitMQ messages on IRIS

RabbitMQ is a message broker that allows producers (those who send a data message) and consumers (those who receive a data message) to establish asynchronous, real-time, and high-performance massive data flows. RabbitMQ supports AMQP (Advanced Message Queuing Protocol), an open standard application layer protocol. 
The main reasons to employ RabbitMQ include the following:

  • You can improve the performance of the applications using an asynchronous approach.
  • It lets you decouple and reduce dependencies between services, microservices, and applications with the help of a data message mediator, meaning that there is no need for producers and consumers of exchanged data to know each other.
  • It allows the long-running processing of sent data (with the results) to be delivered after utilizing a response queue.
  • It helps you migrate from monolithic to microservices, where microservices exchange data via Rabbit in a decoupled and asynchronous way.
  • It offers reliability and resilience by making it possible for messages to be stored and forwarded. A message can be delivered multiple times until it is processed.
  • Message queueing is the key to scaling your application. As the workload increases, you will only have to add more workers to handle the queues faster.
  • It works well with data streaming applications.
  • It is beneficial for IoT applications.
  • It is a must for Bots’ communication.

RabbitMQ basic concepts

We will utilize a temperature monitor as a case to detail RabbitMQ concepts:


Diagrama O conteúdo gerado por IA pode estar incorreto.

  • Producer: it is a software program that sends messages.
  • Exchange: it receives a message from the producer and routes it to zero or more queues.
  • Queue: it is a message store/buffer supporting small and big binary messages.
  • Consumer: it is a program that primarily waits for messages to be received.
  • Message: it is the data sent and received with header and body sections.
  • Message  Broker: it is AMQP message middleware software that allows the exchange of data between producers and consumers.

     
There are a few existing types of possible exchanges (source: https://www.cloudamqp.com/blog/part1-rabbitmq-for-beginners-what-is-rabbitmq.html):  

RabbitMQ Topic Exchange 

  • Direct: The message is routed to the queues whose binding key is the exact match to the routing key of the message. For example, if the queue is bound to the exchange with the binding key pdfprocess, a message published to the exchange with a routing key pdfprocess will be routed to that queue.
  • Fanout: A fanout exchange routes messages to all the queues bound to them.
  • Topic: The topic exchange does a wildcard match between the routing key and the routing pattern specified in the binding.
  • Headers: Headers make exchanges using the message header attributes for routing.

Install an example of IRIS and RabbitMQ running a temperature monitoring case

To learn more about IRIS and RabbitMQ, we will play with a sample from Open Exchange:

  1. Go to the https://openexchange.intersystems.com/package/rabbit-iris-sample.
  2. For Docker installation, clone/git pull the repo into any local directory:
    git clone https://github.com/yurimarx/rabbit-iris-sample.git
     
  3. Open the terminal in this directory and run the following:
    docker-compose build
     
  4. Run the IRIS container with your project:  
    docker-compose up -d
  5. For ZPM (IPM) installation, use the line below:
    USER>zpm “rabbit-iris-sample”

Configure the RabbitMQ

1. Proceed to the RabbitMQ console (http://localhost:15672) and log in with the Username "guest" and Password "guest" to see the initial page:


Interface gráfica do usuário, Aplicativo O conteúdo gerado por IA pode estar incorreto.

2. Move to the tab "Exchange" > section "Add a new exchange". Set the "temperature" value for the Name field and select the option Topic on the field Type. Then click "Add exchange":

Interface gráfica do usuário, Aplicativo O conteúdo gerado por IA pode estar incorreto.

3. Go to the tab "Queues and Streams" > section "Add a new queue". Set the "temperature" value for the Name field and click "Add queue":

Interface gráfica do usuário, Aplicativo, Teams O conteúdo gerado por IA pode estar incorreto.

4. In the tab "Queues and Streams", click the temperature queue:

Interface gráfica do usuário, Aplicativo O conteúdo gerado por IA pode estar incorreto.


5. Head to the section "Add binding to this queue" and set the "temperature" value for the From exchange field. Then set "temperature.current" to the Routing key field and click the button "Bind":


Interface gráfica do usuário, Texto O conteúdo gerado por IA pode estar incorreto.

6. Now, the queue and exchange are connected and ready to manage messages with the routing key "temperature.current":

Interface gráfica do usuário, Aplicativo O conteúdo gerado por IA pode estar incorreto.

Start the IRIS production and see the results

1. Proceed to the sample production (http://localhost:52795/csp/user/EnsPortal.ProductionConfig.zen?PRODUCTIO...) and click the button “Start”:


Interface gráfica do usuário, Texto, Aplicativo O conteúdo gerado por IA pode estar incorreto.

2. The production has started successfully:


Interface gráfica do usuário, Texto, Aplicativo, Email O conteúdo gerado por IA pode estar incorreto.

3. At this point, move to the [SQL](http://localhost:52795/csp/sys/exp/%25CSP.UI.Portal.SQL.Home.zen?$NAMESPACE=USER&$NAMESPACE=USER) to query the temperature table and see the results:


Interface gráfica do usuário, Texto, Aplicativo O conteúdo gerado por IA pode estar incorreto.

4. Check the production business services, business operations, and the produced messages to observe how the samples work.
 

InterSystems IRIS RabbitMQ (AMQP) services, operations, and message types to consume and produce messages

Overview

Interface gráfica do usuário O conteúdo gerado por IA pode estar incorreto.

  1. The Business Service GetCurrentTemperature uses the Inbound Adapter dc.rabbit.TemperatureInboundAdapter to get the current temperature for a latitude/longitude localization. It consumes the external API (https://api.open-meteo.com/v1/forecast?latitude="_..Latitude_"&longitude="_..Longitude_"&current=temperature_2m&temperature_unit=fahrenheit&forecast_days=1) and sends the result to SendTemperatureOperation.
  2. The SendTemperatureOperation receives the current temperature and utilizes the EnsLib.RabbitMQ.OutboundAdapter to publish it on the RabbitMQ topic "Temperature".
  3. The RabbitTemperatureClientService consumes the RabbitMQ "Temperature" topic, gets the temperature data, and sends it to RabbitTemperatureOperation.
  4. The RabbitTemperatureOperation receives the temperature data and saves it in the IRIS database in the table dc.rabbit.Temperature.

The temperature Inbound Adapter

Inbound adapters are employed by Business Services to connect to external data sources, APIs, File systems, Web services, etc. In our case, the Inbound Adapter will connect to an external API to get the temperature for latitude/longitude locations. Check out the source code comments below:

/// Extends the Ens.InboundAdapter to create a new custom adapter
Class dc.rabbit.TemperatureInboundAdapter Extends Ens.InboundAdapter
{

Property Latitude As %String;
Property Longitude As %String;
Property SSLConfig As %String;
/// To access HTTPS endpoints
Parameter SERVER = "api.open-meteo.com";
/// To the user config SSL config and the Latitude and Longitude parameters
Parameter SETTINGS = "SSLConfig:Basic:sslConfigSelector,Latitude:Basic,Longitude:Basic";
/// Method utilized to validate the configurations before starting the service using this adapter
Method OnInit() As %Status
{
    If (..SSLConfig = "" || ..Latitude = "" || ..Longitude = "") {
        return $$$ERROR(5001, "SSLConfig required")
    }
    
    Quit $$$OK
}

/// Method with the business implementation
Method OnTask() As %Status
{
    If ((..SSLConfig = "" || ..Latitude = "" || ..Longitude = "")) {
        Return $$$OK
    }
    
    Set tSC = 1
    // HTTP Request
    #dim httprequest as %Net.HttpRequest
    #dim httpResponse as %Net.HttpResponse
    Try {
        Set httprequest = ##class(%Net.HttpRequest).%New()    
        Do httprequest.ServerSet(..#SERVER)
        Do httprequest.SSLConfigurationSet(..SSLConfig)
        
        /// URL to be requested
        Set requestString = "/v1/forecast?latitude="_..Latitude_"&longitude="_..Longitude_"&current=temperature_2m&temperature_unit=fahrenheit&forecast_days=1"
        /// Requests the data from the temperature API
        Do httprequest.Get(requestString)
        Set httpResponse = httprequest.HttpResponse
        
        /// If not OK, returns error
        If (httpResponse.StatusCode '=200) {
            $$$ThrowStatus($$$ERROR(5001, "HTTP StatusCode = "_httpResponse.StatusCode))
        }
        
        /// Gets the response and creates a dc.rabbit.TemperatureMessage to send to the Business Operation configure to receive the message
        Set apiResult = {}.%FromJSON(httpResponse.Data)
        Set temperature = ##class(dc.rabbit.TemperatureMessage).%New()
        Set temperature.Latitude = ..Latitude
        Set temperature.Longitude = ..Longitude
        Set temperature.Temperature = apiResult.current."temperature_2m"
        Set temperature.TemperatureDate = apiResult.current.time    
        $$$ThrowOnError(..BusinessHost.ProcessInput(temperature))
        
    } Catch ex {
        Do ex.Log()
        Set tSC = ex.AsStatus()
    }

    /// Gets the current temperature again after the time configured in the service using this adapter (pulling time)
    Set ..BusinessHost.%WaitForNextCallInterval=1
    Quit tSC
}
}

The service dc.rabbit.TemperatureService

This service (dc.rabbit.TemperatureInboundAdapter) was designed to get the current temperature and send it to the SendTemperatureOperation (pay attention to the following comments):

/// Service to get the current temperature and send it to the configured operation
Class dc.rabbit.TemperatureService Extends Ens.BusinessService
{

Property Adapter As dc.rabbit.TemperatureInboundAdapter;
/// Uses the TemperatureInboundAdapter
Parameter ADAPTER = "dc.rabbit.TemperatureInboundAdapter";
/// Receives the temperature data from the inbound adapter and sends it to the operation
Method OnProcessInput(pInput As dc.rabbit.TemperatureMessage, pOutput As %RegisteredObject) As %Status
{
    /// Creates a RabbitMQ message to send the temperature data as the content inside the RabbitMQ message
    #dim rabbitMesssage As EnsLib.RabbitMQ.Message
    Set rabbitMesssage = ##class(EnsLib.RabbitMQ.Message).%New()
    Do pInput.%JSONExportToString(.content)
    /// Sets the body of the message with the temperature data
    Do rabbitMesssage.SetEncodedContent(content)
    Set rabbitMesssage.appId = "IRIS"
    Set rabbitMesssage.exchange = "temperature" /// Sets the RabbitMQ exchange that will receive the message
    Set rabbitMesssage.routingKey = "temperature.current" /// Sets the right routing key to the exchange route of the message to the "Temperature" topic
    Return ..SendRequestAsync("SendTemperatureOperation", rabbitMesssage) /// Sends the Rabbit message to the operation and passes the message to the Rabbit
}
}

 

The Business Operation SendTemperatureOperation

It is a business operation from IRIS(EnsLib.RabbitMQ.Operation) utilized to publish a message on the RabbitMQ topic. It receives an EnsLib.RabbitMQ.Message and sends it to the RabbitMQ server configured on RabbitMQ settings:


Interface gráfica do usuário, Aplicativo O conteúdo gerado por IA pode estar incorreto.

The message destination is defined in the source code that created the message (inside the GetCurrentTemperature service):


Texto O conteúdo gerado por IA pode estar incorreto.

The properties exchange and routing key are needed to route the message to the right topic.

The Business service RabbitTemperatureClientService

This business service subscribes to a RabbitMQ topic, configured on RabbitMQ Settings, to receive messages and dispatch them for business operations or business processes configured on Target Config Names:


Interface gráfica do usuário, Texto, Aplicativo, chat ou mensagem de texto O conteúdo gerado por IA pode estar incorreto.


The BusinessOperation dc.rabbit.RabbitTemperatureOperation

This Business Operation receives a message from RabbitTemperatureClientService and saves it in the IRIS Database:


Class dc.rabbit.RabbitTemperatureOperation Extends Ens.BusinessOperation
{

Property Adapter As Ens.OutboundAdapter;
Parameter ADAPTER = "Ens.OutboundAdapter";
Method ProcessRabbitTemperature(pInput As EnsLib.RabbitMQ.Message, Output pOutput As %RegisteredObject) As %Status
{
    Set content = {}.%FromJSON(pInput.encodedContent)

    Set temperature = ##class(Temperature).%New()
    Set temperature.Latitude = content.Latitude
    Set temperature.Longitude = content.Longitude
    Set temperature.Temperature = content.Temperature
    Set temperature.TemperatureDate = content.TemperatureDate
    Set tSC = temperature.%Save()

    Return tSC
}

XData MessageMap
{
<MapItems>
  <MapItem MessageType="EnsLib.RabbitMQ.Message">
    <Method>ProcessRabbitTemperature</Method>
  </MapItem>
</MapItems>
}
}

The EnsLib.RabbitMQ.Message class
This class from IRIS is used to transport and configure which RabbitMQ exchange will be chosen to receive or publish messages:
/// Receives the temperature data from the inbound adapter and sends it to the operation
Method OnProcessInput(pInput As dc.rabbit.TemperatureMessage, pOutput As %RegisteredObject) As %Status
{
    /// Creates a RabbitMQ message to send the temperature data as the content inside the RabbitMQ message
    #dim rabbitMesssage As EnsLib.RabbitMQ.Message
    Set rabbitMesssage = ##class(EnsLib.RabbitMQ.Message).%New()
    Do pInput.%JSONExportToString(.content)
    /// Sets the body of the message with the temperature data
    Do rabbitMesssage.SetEncodedContent(content)
    Set rabbitMesssage.appId = "IRIS"
    Set rabbitMesssage.exchange = "temperature" /// Sets the RabbitMQ exchange that will receive the message
    Set rabbitMesssage.routingKey = "temperature.current" /// Sets the right routing key to the exchange route of the message to the "Temperature" topic
    Return ..SendRequestAsync("SendTemperatureOperation", rabbitMesssage) /// Sends the Rabbit message to the operation and passes the message to the Rabbit
}

More information about this class can be found below:
https://docs.intersystems.com/iris20243/csp/docbook/Doc.View.cls?KEY=EME...  

 
More information

Additional details about operating RabbitMQ adapters and utility classes may be found on the following resources:

  1. Using RabbitMQ on productions: https://docs.intersystems.com/iris20243/csp/docbook/DocBook.UI.Page.cls?...
  2. Using RabbitMQ on any part of your source code: https://docs.intersystems.com/iris20243/csp/docbook/DocBook.UI.Page.cls?...
  3. My sample: https://openexchange.intersystems.com/package/rabbit-iris-sample
Discussion (0)1
Log in or sign up to continue