User bio
404 bio not found
Member since Jun 6, 2021
Replies:

An approach using an operation that, on message : 

  • enqueue incoming message in a queue named after patient identifier
  • dequeue and process message from all existing patient queues

If PoolSize = 1, it will process 1 message at a time.

If PoolSize > 1, multiple jobs will process incoming messages from other operations in sequence for each patient

Class dc.interop.PatientMessageOperation Extends Ens.BusinessOperation
{

Parameter SETTINGS = "DeQueueTimeout,MinProcessTime,MaxProcessTime";
Property DeQueueTimeout As %Integer [ InitialExpression = 1 ];
Property MinProcessTime As %Integer [ InitialExpression = 1 ];
Property MaxProcessTime As %Integer [ InitialExpression = 5 ];
Method HandlePatientMessage(request As test.src.ut.ks.lib.interop.PatientMessage, Output response As %Persistent) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    s sc = $$$OK
    try {
        $$$TOE(sc,..EnqueueRequest(request))
        $$$TOE(sc,..ProcessPatientMessages())
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method EnqueueRequest(request As PatientMessage) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    #Dim queue as Ens.Queue
    #Dim queueName as %String
    #Dim s as EnsLib.File.InboundAdapter
    
    s sc = $$$OK
    try {
        s queueName = ..%ConfigName_".patient."_request.PatientId
        $$$TOE(sc,##class(Ens.Queue).Create(queueName))
        s ..%RequestHeader.TargetQueueName = queueName        
        $$$TOE(sc,##class(Ens.Queue).EnQueue(..%RequestHeader))   
        s $$$EnsRuntimeAppData(..%ConfigName,"queues",queueName) = 1
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method ProcessPatientMessages() As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    s sc = $$$OK
    try {
      s queueName = ""
      s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
      while queueName '= "" {
        $$$TOE(sc,..ProcessQueuedMessages(queueName))
        s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
      }
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method ProcessQueuedMessages(queueName As %String) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    #Dim header as Ens.MessageHeader
    #Dim timedOut as %Boolean
    s sc = $$$OK
    try {
        $$$TOE(sc,##class(Ens.Queue).DeQueue(queueName,.header,..DeQueueTimeout,.timedOut,1))
        while 'timedOut && $isobject(header) {          
          $$$TOE(sc,..ProcessHeader(header))
          $$$TOE(sc,##class(Ens.Queue).DeQueue(queueName,.header,..DeQueueTimeout,.timedOut,1))
        }
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method ProcessHeader(header As Ens.MessageHeader) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    #Dim msg as PatientMessage
    
    s sc = $$$OK
    try {
        s msg = $classmethod(header.MessageBodyClassName,"%OpenId",header.MessageBodyId)
        $$$TOE(sc,..ProcessMessage(msg))
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method ProcessMessage(msg As PatientMessage) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    #Dim processTime as %Integer
    s sc = $$$OK
    try {
        $$$TRACE("job "_$job_" is processing message for patient "_msg.PatientId_" seq "_msg.Seq)
        s processTime = $random(..MaxProcessTime-..MinProcessTime)+..MinProcessTime
        hang processTime
        $$$TRACE("job "_$job_" processed message for patient "_msg.PatientId_" seq "_msg.Seq_" in "_processTime_" seconds")
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

Method OnTearDown() As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    s sc = $$$OK
    try {
      s queueName = ""
      s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
      while queueName '= "" {
        $$$TOE(sc,##class(Ens.Queue).Delete(queueName,"*"))
        s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
      }
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

XData MessageMap
{
<MessageMap>
     <MapItem MessageType="ut.ks.lib.interop.PatientMessage">
      <Method>HandlePatientMessage</Method>
     </MapItem>
    </MessageMap>
}

}

The message class

Class ut.ks.lib.interop.PatientMessage Extends Ens.MessageBody
{

Property PatientId As %Integer;
Property Seq As %Integer;
Property Payload As %String;
Storage Default
{
<Data name="PatientMessageDefaultData">
<Subscript>"PatientMessage"</Subscript>
<Value name="1">
<Value>PatientId</Value>
</Value>
<Value name="2">
<Value>Seq</Value>
</Value>
<Value name="3">
<Value>Payload</Value>
</Value>
</Data>
<DefaultData>PatientMessageDefaultData</DefaultData>
<Type>%Storage.Persistent</Type>
}

}

Sample test method

ClassMethod SendMessages(patientCount As %Integer = 100, messageCount = 1000) As %Status
{
    #Dim sc as %Status
    #Dim ex as %Exception.AbstractException
    s sc = $$$OK
    try {
        
        for i = 1:1:messageCount {
            s msg = ##class(PatientMessage).%New()
            s msg.PatientId = $random(patientCount)+1
            s msg.Seq = $increment(seq(msg.PatientId))
            s msg.Payload = "message "_msg.Seq_" for "_msg.PatientId
            $$$TOE(sc,##class(EnsLib.Testing.Service).SendTestRequest("PatientOperation",msg))
        }        
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

You can override the Page() method of %CSP.Page and redirect output to a stream, process the resulting stream and write it to the original device the page is using to send data back to client.

Here is a quick and dirty example, using IO-Redirect package available on OpenExchange.

The redirecting page : 

Class test.src.RedirectedPage Extends %CSP.Page
{

ClassMethod Page(skipHeader As %Boolean = 1) As %Status [ ServerOnly = 1 ]
{
    #dim sc as %Status
    #dim ex as %Exception.AbstractException
    #dim pageStream,processedPageStream As %Stream.Object
    #dim len as %Integer
    #dim buffer as %String
    
    s sc = $$$OK
    try {
    
    Set pageStream = ##class(%Stream.GlobalCharacter).%New()
    Do ##class(IORedirect.Redirect).ToStream(pageStream)
    $$$TOE(sc,##super(skipHeader))
    Do ##class(IORedirect.Redirect).RestoreIO()
    Set pageStream = ##class(IORedirect.Redirect).Get()
    $$$TOE(sc,..ProcessPageStream(pageStream,.processedPageStream))
    while 'processedPageStream.AtEnd {
        s len = 32768
        s buffer = processedPageStream.Read(.len,.sc)
        $$$TOE(sc,sc)
        write buffer
    }
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

ClassMethod ProcessPageStream(pageStream As %Stream.Object, Output processedPageStream As %Stream.Object) As %Status
{
    #dim sc as %Status
    #dim ex as %Exception.AbstractException
    s sc = $$$OK
    try {
        s processedPageStream = ##class(%Stream.TmpCharacter).%New()
        $$$TOE(sc,processedPageStream.CopyFrom(pageStream))
        d processedPageStream.Write("<div><span>original page had "_pageStream.Size_" bytes </span></div>")
    } catch (ex) {
      s sc = ex.AsStatus()
    }
    return sc
}

}

The original page :

Class test.src.OriginalPage Extends test.src.RedirectedPage
{

ClassMethod OnPage() As %Status [ ServerOnly = 1 ]
{
    &html<
        <div>
         <span>Hello, world, again</span>
        </div>
    >
    return $$$OK
}
Open Exchange applications:
Certifications & Credly badges:
Robert has no Certifications & Credly badges yet.
Global Masters badges:
Followers:
Following:
Robert has not followed anybody yet.