Written by

Product Manager at InterSystems
Article Ben Schlanger · 8 hr ago 4m read

Selectively Aborting Messages from an Interoperability Queue

Back when I was a Developer Support Engineer in the InterSystems Worldwide Response Center (WRC), I would occasionally receive cases involving large message backlogs in a production. The customer would discover that a subset of queued messages were no longer useful and would ask a question similar to the following:

Is there an established best practice for mass-aborting messages in a specific component's queue that match a particular set of criteria?

For example, imagine a business operation has accumulated tens of thousands of queued messages, and it becomes apparent that all messages originating from a specific source service should be discarded. In that situation, the customer may want to selectively abort only those messages while allowing the remaining queue contents to continue processing normally.

There is no built-in "abort all matching messages" button for this use case. Instead, we often recommend creating a small utility that can iterate through a queue, identify messages that match a custom filter, and then abort only those messages.

 

A typical script consists of the following steps:

    1) Set which queue it should abort messages from

    2) Have some filtering logic for identifying which messages to selectively abort

    3) Use the Ens.Queue::AbortItem() API to abort the message

    4) [optional] Have some limit to how many messages it aborts per run (just to prevent any run from going too long)

 

The advantage of this approach is its flexibility. The filtering logic can be tailored to virtually any requirement. For example, you could identify messages based on:

  • Source configuration name
  • Message status
  • Message type
  • Creation time
  • Session ID
  • Custom message header attributes

In the example below, which I put into a custom class named Local.WRC.QueueCleanup.cls, the filter identifies messages that originated from the service Local.Production.FromPharmacy and are currently in a queued state (Status = 2). Any matching messages found in the target queue are aborted using the AbortItem() API.

It could be run with a command like: 

set tSC = ##class(Local.WRC.QueueCleanup).SelectiveQueueAbort("Local.Production.UpdateManager","Local.WRC.QueueCleanup","SampleFilterFromPharmacy",500)  

/// Example boolean filter that returns True for messages which were queued up by
/// an inbound service named "Local.Production.FromPharmacy" and which are in a Queued state
ClassMethod SampleFilterFromPharmacy(pHeaderId) As %Boolean
{
    Set tHeader = ##class(Ens.MessageHeader).%OpenId(pHeaderId)
    Quit ((tHeader.Status = 2)&&(tHeader.SourceConfigName = "Local.Production.FromPharmacy"))
}

/// This method will iterate over an active Ensemble queue and call AbortItem on any message which passes the filter check
/// You must provide a filter class method - this should only take in the Ens.MessageHeader ID value and return a Boolean
/// Example call: set tSC = ##class(Local.WRC.QueueCleanup).SelectiveQueueAbort("Local.Production.UpdateManager","Local.WRC.QueueCleanup","SampleFilterFromPharmacy",500)
ClassMethod SelectiveQueueAbort(pQueueName As %String, pFilterClass As %String, pFilterMethod As %String, maxItems As %Integer = 10000) As %Status
{
        Set tSC = $$$OK
         
        try
        {
            Set tStatement = ##class(%SQL.Statement).%New()
            Set tSC = tStatement.%PrepareClassQuery("Ens.Queue","EnumerateItem")
             
            Set tRS = tStatement.%Execute(pQueueName)
             
            set i = 0 
            while tRS.%Next()
            {
            	Quit:i>=maxItems
                Set tHeaderId = tRS.%Get("MessageId")
                 
                Set tAbort = $ClassMethod(pFilterClass,pFilterMethod,tHeaderId)
                 
                if (tAbort)
                {
                    Set tSC = ##class(Ens.Queue).AbortItem(pQueueName, tRS.%Get("Priority"), tRS.%Get("Index"))
                    Quit:$$$ISERR(tSC)
                }
            	set i = i +1
            }
        }
        catch Ex
        {
            Set tSC = Ex.AsStatus()
        }
         
        Quit tSC
}

Although this example focuses on filtering by source configuration name, the overall pattern can be adapted to many different queue-cleanup scenarios. The key idea is separating the queue traversal logic from the filter logic, allowing you to create reusable cleanup utilities while keeping the selection criteria easy to customize.

As always, be sure to test your script in a lower environment before you consider running it in your Live production environment. Then, before running SelectiveQueueAbort() in your Live environment, you may even want to validate the filter logic independently and confirm that it selects exactly the messages you intend to remove. A filter that is too broad could unintentionally abort messages that should still be processed.

Comments