· Dec 30, 2017

Ensemble Message Suspense/Work Queue?

Synopsis: I need to "sideline" messages for subsequent selection and data enrichment while maintaining manageability within the Ensemble framework.

Messages will arrive in Ensemble via a Business Service. In a Business Process, those matching certain criteria will generate a query and be "sidelined" for the response, which will arrive via a separate Business Service. The response may take some time. Once received it will trigger the data enrichment, but they're not guaranteed to arrive in FIFO order. I'll need to select the message(s) that match the response, update them with the returned data, forward them to a Business Operation and mark them  as "completed" in the work queue. The order of the delivery of the enriched messages is not important; they must be sent as soon as they're populated with the enrichment data received.

My initial design is to build a persistent class with indexed properties needed for selection/retrieval, along with one defined as type EnsLib.HL7.Message* for the message to be enriched. My problem with this design is that these "work queue" messages exist outside of the Ensemble management/monitoring framework and a custom management mechanism will be required to monitor and maintain the work queue.

Am I missing some inherent functionality in Ensemble that supports this requirement natively? I don't want to reinvent the wheel ...

*EDIT: Actually, the message object type may vary in the future, but the initial project requires HL7v2.

Discussion (8)1
Log in or sign up to continue

Hi Jeffrey,

Your descriptions matches pretty well what is titled in Ensemble as "Workflow"

It uses specialized Request and Response messages that are designed to allow a significant time gap in between.
It's originally designed for human interactions but to my understanding it implements exactly your "sideline".
And human interaction is just there to show it may take long time to get a reply.
There is also an example in ENSDEMO  Demo.Workflow.Production  ​



You'll need to develop the criteria yourself as it's heavily dependent upon your business logic. There's no common solution there. Generally you need to:

  1. Pass some unique identifier with your request (as one of request properties or in the filename or as a header, etc.).
  2. In your business service parse the response and determine  what is this response to.
  3. Send deferred response

I usually have a Deferred token table where I keep track of tokens and objects to which they relate:

#Include Ensemble
/// Stored Deferred token.
/// Assumption: one BH generates only one type of Deferred token.
Class Production.DeferredToken Extends %Persistent

/// Token
Property token As %String;

Index IDKEY On token [ IdKey ];

/// Object's class anme
Property objectContext As %Dictionary.CacheClassname;

/// Object's primary key
Property objectId As %String;

Index objectIndex On (objectContext, objectId) [ Unique ];

/// Update ts (UTC)
Property updatedOn As %TimeStamp [ Required, SqlComputeCode = { set {*} = $$$ts}, SqlComputed, SqlComputeOnChange = (%%INSERT, %%UPDATE) ];

/// Ensemble session
Property sessionId As %Integer [ InitialExpression = {$get($$$JobSessionId)} ];

/// Ensemble BH
Property configName As %Integer [ InitialExpression = {$get($$$EnsJobLocal("ConfigName"))} ];

/// Ensemble BH class
Property ensembleContext As %Integer [ InitialExpression = {$get($$$ConfigClassName($get($$$EnsJobLocal("ConfigName")," ")))} ];

/// Tries to store a new token if possible
/// w ##class(Production.DeferredToken).add()
ClassMethod add(token As %String, class As %String = "", id As %Integer = "") As %Status
    return:((class'="") && ('##class(%Dictionary.ClassDefinition).%ExistsId(class))) $$$ERROR($$$ClassDoesNotExist, class)
    return:((class'="") && (id'="") && ('$classmethod(class, "%ExistsId", id))) $$$ERROR($$$GeneralError, $$$FormatText("Object '%2' from class '%1' does not exist", class, id))
    return:..IDKEYExists(token) $$$ERROR($$$GeneralError, $$$FormatText("Token '%1' already exist", token))
    set obj = ..%New()
    set obj.objectContext = class
    set obj.objectId = id
    set obj.token = token
    set sc = obj.%Save()
    return sc

Use %ExistsId, %DeleId, objectIndexExists, objectIndexOpen to manipulate stored tokens.

Thank you so much for the detailed response, Eduard.

Upon further reflection, though, I'm not sure this is exactly the solution I'm looking for. There may be multiple "sidelined" messages that are eligible for update from a single response; consider the situation where lab orders may be created in an external system for delivery to a Lab system, but the patient is registered in the Lab via a separate interface that is driven by a request made of the registration system by Ensemble. All orders for a given patient encounter would need to be held until the registration is received, but only one event will trigger the release (and enrichment) of all orders for that encounter.

Assuming tokens are unique across all deferred responses, I'll need to create a token management system that supports a one-to-many relationship between the "public" token sent to/received from the external system and the "private" tokens that identify deferred messages eligible for update from a given deferred response. If a single token can satisfy the response requirement for multiple messages, though, that may not be necessary. If that capability is mentioned in the documentation, I haven't come across it yet.

I encountered somewhat similar problem, here are the conditions:

  • Several types of business objects
  • Business objects may depend upon other objects, so it's like a dependency graph
  • Updates arrive for objects and they need to be approved by humans
  • Humans approve updates randomly
  • But updates should be applied top to bottom (so base object first, then all objects which depend on it, then all objects which depends on them and so on)
  • Updates arrives continuously

To solve this, we used deferred responses and the class I posted above except it also has dependsOn property which contained a list of tokens which should be satisfied before current token (message corresponding to current token) can be returned.

It looked like this:

Property dependsOn As List Of %String(SQLPROJECTION = "table/column", STORAGEDEFAULT = "array");
// Index dependsOnIndex On dependsOn (ELEMENTS);

So our process was:

  1. if we found new dependency we added to list, if dependency was satisfied we removed it from all lists.
  2. If there was no more dependencies than update would be fired.
  3. Then there was a check to see if current update was a dependency for something else.

Projecting list property as a table helped. Also collection properties are indexable, so it all worked quite fast.