Announcement
Neerav Adam Verma · Sep 17, 2018

PubSub - Publish & Subscribe Messages

Hello,
Fellow community members, here is an extendable / re-usable / generic pub-sub model.
An initial calling service reads a CSV file, parses through all records and transforms it to a generic JSON message.
The message is then transported via a pub-sub business process to the end-points Targets (business operations)
as configured for the subscribers for each Topic as demonstrated in the image below.


Steps and flow to implement this are as follows
1. Service 
a. Read a csv file, loop through all records
b. In each loop make one json msg and add it to the json array of messages
The message structure is as follows.
Property 
RequestJSONArray As array Of %Text(MAXLEN 10000);
Property Topic As %String(MAXLEN 100);

c. End of loop 
d. Send the json array to the BP

Here is the service code for reference

Class File.BS.FileServiceBS Extends Ens.BusinessService
{
Parameter ADAPTER = "EnsLib.File.InboundAdapter";
Method OnProcessInput(
pInput As %FileCharacterStream,
pOutput As %RegisteredObjectAs %Status
{
   set DELIMITER "^"
   set counter=1 //records read
   set JSONDynamicBPRequest ##class(Ent.Library.JSON.BP.JSONDynamicBPRequest).%New()
   set JSONDynamicBPRequest.Topic "TESTCSV"
while 'pInput.AtEnd {
    set line=pInput.ReadLine()
    if counter > 1 ) {
    set req ##class(%DynamicObject).%New()

     set req.ClaimId  $piece(line,DELIMITER,1)
     set req.Company  $piece(line,DELIMITER,2)
     set req.ClaimProvider  $piece(line,DELIMITER,3)
     set req.PrimaryInsuranceCarrier  $piece(line,DELIMITER,4)
     set req.EncounterDate  $piece(line,DELIMITER,5)
      $$$TRACE("req.%ToJSON() "_req.%ToJSON())
      do JSONDynamicBPRequest.RequestJSONArray.SetAt(req.%ToJSON(), counter)
     req
    }    
    set counter=counter+1
  }  
  do ..SendRequestSync("JSONDynamicBP",JSONDynamicBPRequest    
  $$$TRACE("JSONDynamicBP loaded : " _ (counter - 1)_" "_$ZDATETIME($ZTIMESTAMP,3,1,6))
  Quit $$$OK
}

}

Step 2 : Business Process
a. Bp gets the topic and json array of json records
b. It calls the PubsubBo to get all the subscribers for this topic
c. It loops through json records
d. for each records. loops through the targets 
e. Sends the message to target via the Call activity of the BP

Notes :
- This above example can be extended and scaled as per requirement.
- Each json msg can have a topic instead of the topic being outside of array etc.
- You can use your own messages instead of json too if you need.
- You can even send messages via the pubsub bo instead of just getting targets. (version attached)


Download Code
Any issues pls email me nv@nv-enterprises.biz

00
2 0 10 491

Replies

The Pub/Sub operation only gives you a list of subscribers, you decide what to do with that.

I have a sample of Pub/Sub service/operation, I'll publish it in a few hours.

There is a little modification now what I want to do 

Basicallly I want to read the Event Log (Ens_Util.Log) where type is not in INfo , trace etc

and on every new warning, error etc . send the error msg to my BP - (Which is the pubsub bp)

Is there any service which continously polls for new additions to this log besides the Sql inbound adapter?

Hi Neerav,

I am working on a production (PubSubService Produciton) that I have exclusively created to manage Publish/Subscriber operations (not business operation). The purpose of my production is to receive the publisher's request, gets its subscriber based on topics and send them a message to perform some updates at subscriber level. I also want to receive an acknowledgement of updates from each subscriber. It would be very help to have your code that you have implemented to get a better architectural understanding.

Thanks in Advance

LeadCacheDeveloper 

please share code if possible.

Thanks


Step 2 : BP
a. Bp gets the topic and json array of json records
b. It calls the PubsubBo to get all the subscribers for this topic
c. It loops through json records
d. for each records. loops through the targets 
e. Sends the message to target via the Call activity of the BP

PFA : Screenshot of subscribers to a topic 
Note :  This can be moulded as per the requirement. Each json msg can have a topic instead of the topic being outside of array etc.
You can use your own messages instead of json too if you need.
You can even send messages via the pubsub bo instead of just getting targets. (version attached)
I mean all sorts of adjustments can be made as this is very generic and re-usable

Link to the Code
Download Code

Any issues pls email me nv@nv-enterprises.biz

 Hi Neerav, Have you get a chance to check my email that I sent you a couple of weeks.

Please let me know @Neerav Adam Verma">@Neerav Verma

Regards,

Example I made aims to make a resusable / generic pub-sub model which is passing a generic json message from start to finish and all bp/bo etc then need to transform / parse as per their own need once they receive the json.  Do let me know if any queries


So this is how the flow works . 
1. Service 
a. Read a csv file, loop through all records
b. In each loop make one json msg and add it to the json array of messages (which is nothing but just an array of text as json is just text)
Property RequestJSONArray As array Of %Text(MAXLEN = 10000);
Property Topic As %String(MAXLEN = 100);


c. End of loop 
Send the array to the BP
This is few lines of code of Service

Class File.BS.FileServiceBS Extends Ens.BusinessService
{

Parameter ADAPTER = "EnsLib.File.InboundAdapter";

Method OnProcessInput(
pInput As %FileCharacterStream,
pOutput As %RegisteredObject) As %Status
{
   set DELIMITER = "^"
   set counter=1 //records read
   set JSONDynamicBPRequest = ##class(Ent.Library.JSON.BP.JSONDynamicBPRequest).%New()
   set JSONDynamicBPRequest.Topic = "TESTCSV"
while 'pInput.AtEnd {
   while counter < 3 {
    set line=pInput.ReadLine()
    if ( counter > 1 ) {
    set req = ##class(%DynamicObject).%New()

     set req.ClaimId  = $piece(line,DELIMITER,1)
     set req.Company  = $piece(line,DELIMITER,2)
set req.ClaimProvider  = $piece(line,DELIMITER,3)
set req.PrimaryInsuranceCarrier  = $piece(line,DELIMITER,4)
set req.EncounterDate  = $piece(line,DELIMITER,5)

$$$TRACE("req.%ToJSON() "_req.%ToJSON())
do JSONDynamicBPRequest.RequestJSONArray.SetAt(req.%ToJSON(), counter)


     req
    
    }    
    set counter=counter+1
  }  
  do ..SendRequestSync("JSONDynamicBP",JSONDynamicBPRequest)     
  $$$TRACE("JSONDynamicBP loaded : " _ (counter - 1)_" "_$ZDATETIME($ZTIMESTAMP,3,1,6))
  Quit $$$OK
}

}

Thanx to all and esp to Eduard

I used a mix of Eduard's code, docs, EnsLib.PubSubOperation and some of my code

From my BP -  I am calling the PubSubBo with a response of arrays that gives me targets in a string array 

- BP then sends to all targets from a call inside a loop of targets array. The target of which is set dynamically from context of the loop element.

This gives me an extra functionality of putting some extra logic but most importantly in Visual Trace it doesn't look odd if target is a BP.
If target is a BP and it is forwarded to, from a normal pubsubBo then in visual trace it shows a backward arrrow which is hard to explain to non tech people as it seems its going backward not forward. So forwarding it to targets from a BP is a better and cleaner option .

Can share code to anyone if needed.