Question
· Feb 4

Process multiple messages by increasing pool size while maintaining order with keys

At the moment we're creating multiple BPLs are using a router (or another BPL) to direct to these based on a unique key modulo the amount of BPLs available, e.g. if we have 3 BPLs created.

Message key = 1 mod 3 + 1 -> BPL02
Message key = 2 mod 3 + 1 -> BPL03
Message key = 3 mod 3 + 1 -> BPL01

FIFO only matters in that each messages for each key is processed in order.
What we were considering doing is increasing the pool size to 3 and programmatically creating a BPL on each thread that processes messages that would be directed to it, rather than having to create multiple BPLs into the production.

Everything I've found so far suggests just keeping the pool size as one to maintain FIFO and I can't really find much about how that may be over-ridden, or if it's even possible or advised. So not really sure if I'm completely barking up the wrong tree here.

Any tips or advice appreciated.

 

Product version: IRIS 2024.1
Discussion (7)1
Log in or sign up to continue

There's no guarantee that BP with PoolSize=1 would process messages in enqueued order. That is only true for BOs, for BPs it's only true that message processing would start in the enqueued order. That might be (or might be not) a good enough guarantee for you depending on your use case.

An approach I saw used to guarantee FIFO for BP is to add an intermediate BP with PoolSize=1 which sends a sync request to your target BP and waits for an answer.  

Can you elaborate on a Message key, please? Is it a random or a categorical division? Why three specifically and not some other number?

What we were considering doing is increasing the pool size to 3 and programmatically creating a BPL on each thread that processes messages that would be directed to it, rather than having to create multiple BPLs into the production.

What do you want to achieve with that change?

The change would mean that the number of processes could be increased programatically and while a production is running.

If the processed was stopped, queues drained and the pool size increased we'd then be able to have more processes running if we'd noticed a spike in messages (or wanted to increase for future). This would be easier than adding new BPLs into the production as it would only require a single setting change.

How about the following setup:

  1. In Business Service or first router BP set a global: ^data(mrn, timestamp) = messageId
  2. In your FIFO BP before sending to BO check: 
    • set nexttimestamp=$o(^data(mrn,""),1,nextmessageId)
  3. If nextmessageId equals current messageid that means there's no message in a pipeline for the same patient with earlier timestamp so we can send it out.
    • Kill  ^data(mrn, nexttimestamp) so next message can be processed 
  4. If nextmessageId does not equal current messageid, compare timestamps:
    • If timestamps are equal, send the message anyway and don't kill the subscript - we have more than one message with the same timestamp. If it happens often, value should be a list of ids.
    • If nexttimestamp is earlier than timestamp, it means there are some other messages in a pipeline with the same MRN, sleep for 10 seconds and check again.

Notes:

  1. You'll need to adjust this based on what you want to do if one of the messages errors before being deleted from the ^data global, options:
    • Processing of messages for this patient effectively stops.
    • Add an additional check in (4) - get other message header and check if it's in a final state (completed, errored, etc) - if so clear ^data subscript and continue.
    • Add an additional check in (4) -  if we waited more than X seconds, continue.
  2. This can be wrapped as Custom Functions and called from rules, BPs.
  3. Locks might help with ensuring consistency.

The advantage here is that you can scale to any number of jobs immediately and since you enforce FIFO only at the end, most of the processing can be parallelized.

Took me a couple of reads to understand what you're saying. That's a very elegant solution and one that indeed would ensure consistency with only the occasional "wasted" sleep.

Your final point about locks and consistency does give me some slight pause. Due to the speed of messages coming in I can see the possibility of collisions or deadlocks and while fairly small may be enough to consider going to the simpler approach of just creating additional BPLs we're currently using.

In any case, thank you very much for the guidance.

An approach using an operation that, on message : 

  • enqueue incoming message in a queue named after patient identifier
  • dequeue and process message from all existing patient queues

If PoolSize = 1, it will process 1 message at a time.

If PoolSize > 1, multiple jobs will process incoming messages from other operations in sequence for each patient

Class dc.interop.PatientMessageOperation Extends Ens.BusinessOperation
{

Parameter SETTINGS = "DeQueueTimeout,MinProcessTime,MaxProcessTime";
Property DeQueueTimeout As %Integer [ InitialExpression = 1 ];
Property MinProcessTime As %Integer [ InitialExpression = 1 ];
Property MaxProcessTime As %Integer [ InitialExpression = 5 ];
Method HandlePatientMessage(request As test.src.ut.ks.lib.interop.PatientMessage, Output response As %Persistent) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    s sc = $$$OK
    try {
        $$$TOE(sc,..EnqueueRequest(request))
        $$$TOE(sc,..ProcessPatientMessages())
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method EnqueueRequest(request As PatientMessage) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    #Dim queue as Ens.Queue
    #Dim queueName as %String
    #Dim s as EnsLib.File.InboundAdapter
    
    s sc = $$$OK
    try {
        s queueName = ..%ConfigName_".patient."_request.PatientId
        $$$TOE(sc,##class(Ens.Queue).Create(queueName))
        s ..%RequestHeader.TargetQueueName = queueName        
        $$$TOE(sc,##class(Ens.Queue).EnQueue(..%RequestHeader))   
        s $$$EnsRuntimeAppData(..%ConfigName,"queues",queueName) = 1
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method ProcessPatientMessages() As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    s sc = $$$OK
    try {
      s queueName = ""
      s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
      while queueName '= "" {
        $$$TOE(sc,..ProcessQueuedMessages(queueName))
        s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
      }
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method ProcessQueuedMessages(queueName As %String) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    #Dim header as Ens.MessageHeader
    #Dim timedOut as %Boolean
    s sc = $$$OK
    try {
        $$$TOE(sc,##class(Ens.Queue).DeQueue(queueName,.header,..DeQueueTimeout,.timedOut,1))
        while 'timedOut && $isobject(header) {          
          $$$TOE(sc,..ProcessHeader(header))
          $$$TOE(sc,##class(Ens.Queue).DeQueue(queueName,.header,..DeQueueTimeout,.timedOut,1))
        }
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method ProcessHeader(header As Ens.MessageHeader) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    #Dim msg as PatientMessage
    
    s sc = $$$OK
    try {
        s msg = $classmethod(header.MessageBodyClassName,"%OpenId",header.MessageBodyId)
        $$$TOE(sc,..ProcessMessage(msg))
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method ProcessMessage(msg As PatientMessage) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    #Dim processTime as %Integer
    s sc = $$$OK
    try {
        $$$TRACE("job "_$job_" is processing message for patient "_msg.PatientId_" seq "_msg.Seq)
        s processTime = $random(..MaxProcessTime-..MinProcessTime)+..MinProcessTime
        hang processTime
        $$$TRACE("job "_$job_" processed message for patient "_msg.PatientId_" seq "_msg.Seq_" in "_processTime_" seconds")
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method OnTearDown() As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    s sc = $$$OK
    try {
      s queueName = ""
      s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
      while queueName '= "" {
        $$$TOE(sc,##class(Ens.Queue).Delete(queueName,"*"))
        s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
      }
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

XData MessageMap
{
<MessageMap>
     <MapItem MessageType="ut.ks.lib.interop.PatientMessage">
      <Method>HandlePatientMessage</Method>
     </MapItem>
    </MessageMap>
}

}

The message class

Class ut.ks.lib.interop.PatientMessage Extends Ens.MessageBody
{

Property PatientId As %Integer;
Property Seq As %Integer;
Property Payload As %String;
Storage Default
{
<Data name="PatientMessageDefaultData">
<Subscript>"PatientMessage"</Subscript>
<Value name="1">
<Value>PatientId</Value>
</Value>
<Value name="2">
<Value>Seq</Value>
</Value>
<Value name="3">
<Value>Payload</Value>
</Value>
</Data>
<DefaultData>PatientMessageDefaultData</DefaultData>
<Type>%Storage.Persistent</Type>
}

}

Sample test method

ClassMethod SendMessages(patientCount As %Integer = 100, messageCount = 1000) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    s sc = $$$OK
    try {
        
        for i = 1:1:messageCount {
            s msg = ##class(PatientMessage).%New()
            s msg.PatientId = $random(patientCount)+1
            s msg.Seq = $increment(seq(msg.PatientId))
            s msg.Payload = "message "_msg.Seq_" for "_msg.PatientId
            $$$TOE(sc,##class(EnsLib.Testing.Service).SendTestRequest("PatientOperation",msg))
        }        
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}