User bio
404 bio not found
Member since Jun 6, 2021
Posts:
Replies:
An approach using an operation that, on message :
- enqueue incoming message in a queue named after patient identifier
- dequeue and process message from all existing patient queues
If PoolSize = 1, it will process 1 message at a time.
If PoolSize > 1, multiple jobs will process incoming messages from other operations in sequence for each patient
Class dc.interop.PatientMessageOperation Extends Ens.BusinessOperation
{
Parameter SETTINGS = "DeQueueTimeout,MinProcessTime,MaxProcessTime";
Property DeQueueTimeout As %Integer [ InitialExpression = 1 ];
Property MinProcessTime As %Integer [ InitialExpression = 1 ];
Property MaxProcessTime As %Integer [ InitialExpression = 5 ];
Method HandlePatientMessage(request As test.src.ut.ks.lib.interop.PatientMessage, Output response As %Persistent) As %Status
{
#Dim sc as %Status
#Dim ex as %Exception.AbstractException
s sc = $$$OK
try {
$$$TOE(sc,..EnqueueRequest(request))
$$$TOE(sc,..ProcessPatientMessages())
} catch (ex) {
s sc = ex.AsStatus()
}
return sc
}
Method EnqueueRequest(request As PatientMessage) As %Status
{
#Dim sc as %Status
#Dim ex as %Exception.AbstractException
#Dim queue as Ens.Queue
#Dim queueName as %String
#Dim s as EnsLib.File.InboundAdapter
s sc = $$$OK
try {
s queueName = ..%ConfigName_".patient."_request.PatientId
$$$TOE(sc,##class(Ens.Queue).Create(queueName))
s ..%RequestHeader.TargetQueueName = queueName
$$$TOE(sc,##class(Ens.Queue).EnQueue(..%RequestHeader))
s $$$EnsRuntimeAppData(..%ConfigName,"queues",queueName) = 1
} catch (ex) {
s sc = ex.AsStatus()
}
return sc
}
Method ProcessPatientMessages() As %Status
{
#Dim sc as %Status
#Dim ex as %Exception.AbstractException
s sc = $$$OK
try {
s queueName = ""
s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
while queueName '= "" {
$$$TOE(sc,..ProcessQueuedMessages(queueName))
s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
}
} catch (ex) {
s sc = ex.AsStatus()
}
return sc
}
Method ProcessQueuedMessages(queueName As %String) As %Status
{
#Dim sc as %Status
#Dim ex as %Exception.AbstractException
#Dim header as Ens.MessageHeader
#Dim timedOut as %Boolean
s sc = $$$OK
try {
$$$TOE(sc,##class(Ens.Queue).DeQueue(queueName,.header,..DeQueueTimeout,.timedOut,1))
while 'timedOut && $isobject(header) {
$$$TOE(sc,..ProcessHeader(header))
$$$TOE(sc,##class(Ens.Queue).DeQueue(queueName,.header,..DeQueueTimeout,.timedOut,1))
}
} catch (ex) {
s sc = ex.AsStatus()
}
return sc
}
Method ProcessHeader(header As Ens.MessageHeader) As %Status
{
#Dim sc as %Status
#Dim ex as %Exception.AbstractException
#Dim msg as PatientMessage
s sc = $$$OK
try {
s msg = $classmethod(header.MessageBodyClassName,"%OpenId",header.MessageBodyId)
$$$TOE(sc,..ProcessMessage(msg))
} catch (ex) {
s sc = ex.AsStatus()
}
return sc
}
Method ProcessMessage(msg As PatientMessage) As %Status
{
#Dim sc as %Status
#Dim ex as %Exception.AbstractException
#Dim processTime as %Integer
s sc = $$$OK
try {
$$$TRACE("job "_$job_" is processing message for patient "_msg.PatientId_" seq "_msg.Seq)
s processTime = $random(..MaxProcessTime-..MinProcessTime)+..MinProcessTime
hang processTime
$$$TRACE("job "_$job_" processed message for patient "_msg.PatientId_" seq "_msg.Seq_" in "_processTime_" seconds")
} catch (ex) {
s sc = ex.AsStatus()
}
return sc
}
Method OnTearDown() As %Status
{
#Dim sc as %Status
#Dim ex as %Exception.AbstractException
s sc = $$$OK
try {
s queueName = ""
s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
while queueName '= "" {
$$$TOE(sc,##class(Ens.Queue).Delete(queueName,"*"))
s queueName = $order($$$EnsRuntimeAppData(..%ConfigName,"queues",queueName))
}
} catch (ex) {
s sc = ex.AsStatus()
}
return sc
}
XData MessageMap
{
<MessageMap>
<MapItem MessageType="ut.ks.lib.interop.PatientMessage">
<Method>HandlePatientMessage</Method>
</MapItem>
</MessageMap>
}
}
The message class
Class ut.ks.lib.interop.PatientMessage Extends Ens.MessageBody
{
Property PatientId As %Integer;
Property Seq As %Integer;
Property Payload As %String;
Storage Default
{
<Data name="PatientMessageDefaultData">
<Subscript>"PatientMessage"</Subscript>
<Value name="1">
<Value>PatientId</Value>
</Value>
<Value name="2">
<Value>Seq</Value>
</Value>
<Value name="3">
<Value>Payload</Value>
</Value>
</Data>
<DefaultData>PatientMessageDefaultData</DefaultData>
<Type>%Storage.Persistent</Type>
}
}
Sample test method
ClassMethod SendMessages(patientCount As %Integer = 100, messageCount = 1000) As %Status
{
#Dim sc as %Status
#Dim ex as %Exception.AbstractException
s sc = $$$OK
try {
for i = 1:1:messageCount {
s msg = ##class(PatientMessage).%New()
s msg.PatientId = $random(patientCount)+1
s msg.Seq = $increment(seq(msg.PatientId))
s msg.Payload = "message "_msg.Seq_" for "_msg.PatientId
$$$TOE(sc,##class(EnsLib.Testing.Service).SendTestRequest("PatientOperation",msg))
}
} catch (ex) {
s sc = ex.AsStatus()
}
return sc
}
Another approach would be to write a custom router class extending EnsLib.MsgRouter.RoutingEngine, overriding OnRequest() method.
In OnRequest(), before calling ##super(), assign the JSON %DynamicAbstractObject build from the input request stream to a property.
Use the property in the router rules.
Open Exchange applications:
Certifications & Credly badges:
Robert has no Certifications & Credly badges yet.
Global Masters badges:




Followers:
Following:
Robert has not followed anybody yet.
You can query the message body class table with a left join to Ens.MessageHeader to get the orphaned message body identifiers :
select %NOLOCK bod.Id from MySample.BodyTable bod left join Ens.MessageHeader hdr on bod.Id=hdr.MessageBodyId and hdr.MessageBodyClassName='MySample.BodyTable' where hdr.MessageBodyId is NULL