Responding to multiple processes in OnResponse (Observer Pattern)

Following on from https://community.intersystems.com/post/custom-business-process-change-sendrequestsync-sendrequestasync we are refactoring a number of business processes to use OnRequest/SendRequestAsync/OnResponse mechanisms.

To prevent overloading some of our datasources we currently implement a simple caching system using locks similar to the code below.

Method OnRequest()
{    
    Set key = ..getKey(request)
    
    Lock +^DataCache(key)
    if ('..cacheValid(key))
    {
        Set status = ..SendRequestSync("DataProcess", dataRequest, .dataResponse)
    
        Set ^DataCache(key,"Expiry")=timestamp+seconds
    
        Lock -^DataCache(key,"Data")= dataResponse.data    
    }
    
    Set response = ..getResponseFromCache(key)
    
    Quit status
}

Some of the datasources can be slow to respond therefore using SendRequestSync in this manner can require us to need many more items in the pool than should be necessary.

What I'd like to do is implement something akin to an Observer pattern. Is that possible where I can notify each invoker of our caching process (the observers) when I receive a response from the "DataProcess"?

Method OnRequest()
{
    Do ..Observers.Insert(request)
    
    if ('..cacheValid(key) && (..cacheStatus(key)'="WaitingForResponse"))
    {
        Do ..SendRequestAsync("DataProcess",dataRequest,1,key))    
        Do ..setCacheStatus(key,"WaitingForResponse")
    }
}

Method OnResponse()
{
    Do ..addResponseToCache(completionKey,callResponse)
    
    for i=1:1:..Observers.Count()
    {
        //notifyObserver will trigger "OnResponse" in the calling process
        Do ..notifyObserver(..Observers.GetAt(i),..getCachedItem(completionKey))
    }
}

 

 

  • 0
  • 0
  • 241
  • 0
  • 1

Answers

Here's a sample BP that calls BO asynchronously. BO returns  current request state, upon which BP decides to wait some more, report an error or process an answer:

Class test.BP Extends Ens.BusinessProcess
{

/// Operation name
Property Operation As %String(MAXLEN = 250) [ Required ];

/// How long to wait for an answer. 0 - forever.
Property MaxProcessTime As %Integer(MINVAL = 0) [ InitialExpression = 3600 ];

Parameter SETTINGS = "MaxProcessTime:Basic,Operation:Basic:selector?context={Ens.ContextSearch/ProductionItems?targets=1&productionName=@productionId}";

/// Identifier for a first request
Parameter callCOMPLETIONKEY = "*call*";

/// Identifier for a state request
Parameter getStateCOMPLETIONKEY = "*getState*";

/// Alarm identifier
Parameter alarmCOMPLETIONKEY = "*alarm*";

Method OnRequest(pRequest As Ens.StringRequest, Output pResponse As Ens.StringResponse) As %Status
{
    #dim msg As Ens.StringRequest = pRequest.%ConstructClone(1)
    quit ..SendRequestAsync(..Operation, msg, $$$YES, ..#callCOMPLETIONKEY)
}

/// Process Async reply from Operation
Method OnResponse(request As Ens.StringRequest, ByRef response As Ens.StringResponse, callrequest As Ens.StringRequest, callresponse As Ens.StringResponse, pCompletionKey As %String) As %Status
{
    #dim sc As %Status = $$$OK
    
    // Got an error
    if $$$ISERR(callresponse.status)
    {
        set response = callresponse
        quit $$$OK
    }
    
    // Got primary answer
    if (pCompletionKey = ..#callCOMPLETIONKEY) || (pCompletionKey = ..#alarmCOMPLETIONKEY)
    {
        quit ..OnResponseFromCallOrAlarm(request, .response, callrequest, callresponse, pCompletionKey)
    }
    
    // Got getState
    if (pCompletionKey = ..#getStateCOMPLETIONKEY)
    {
        
        // Current processing state (1 - received; 2 - in work; 3 - done)
        #dim status As %String = callresponse.StringValue

        // If not 3, run getState again
        if (status '= "3")
        {        
            // Check how much time passed since we started
            set processTime = $system.SQL.DATEDIFF("s", ..%TimeCreated, $$$ts)
            
            if ((..MaxProcessTime=0) || (processTime<..MaxProcessTime)) {
                // Let's run getState again in 30 seconds
                #dim alarmMsg As Ens.AlarmRequest = ##class(Ens.AlarmRequest).%New()
                set alarmMsg.Duration = "PT30S"
                quit ..SendRequestAsync("Ens.Alarm", alarmMsg, $$$YES, ..#alarmCOMPLETIONKEY)
            } else {
                // Timeout
                set response = ##class(Ens.StringResponse).%New()
                set response.status = $$$ERROR($$$GeneralError, "Timeout")
                quit $$$OK    
            }
        }
        else
        {
            quit ..OnResponseFromGetState3(request, .response, callrequest, callresponse, pCompletionKey)
        }
    }
    
    // unrecognized pCompletionKey value
    quit $$$ERROR($$$InvalidArgument)
}

/// OnResponse for alarm or call - run get state
Method OnResponseFromCallOrAlarm(request As Ens.StringRequest, ByRef response As Ens.StringResponse, callrequest As Ens.StringRequest, callresponse As Ens.StringResponse, pCompletionKey As %String) As %Status [ Private ]
{
    #dim msg As Ens.StringRequest = pRequest.%ConstructClone(1)
    quit ..SendRequestAsync(..Operation, msg, $$$YES, ..#getStateCOMPLETIONKEY)
}

/// OnResponse for an answer
Method OnResponseFromGetState3(request As Ens.StringRequest, ByRef response Ens.StringResponse, callrequest As Ens.StringRequest, callresponse As Ens.StringResponse, pCompletionKey As %String) As %Status [ Private ]
{
    // Process complete response
    quit $$$OK
}

OnResponse would be an automatic observer, and can in turn notify anything else. You can use original request id or SessionID to identify what you need to notify.