Question
· Jul 15

Fire-and-forget async/background non-blocking tasks

I need to implement a retry policy for an incoming message queue containing thousands of relatively small messages.

Successfully processed messages should be immediately removed from the queue.

If an error occurs while processing a message, the message should be sent back at the end of the queue, and the pause before re-processing this message should increase geometrically (1-2-4-8-16 seconds, and so on). In languages that support the async/await pattern, I'd simply create a delayed timer that triggers a fire-and-forget task. This would prevent blocking the main thread. How can this be implemented in IRIS?

Product version: IRIS 2025.1
Discussion (9)4
Log in or sign up to continue

Hi Timo,

Yesterday I experimented a lot with both Python and workers (and failed).

Here is an example. In this code I want the workers to be started as background tasks, so the program should first print "This should be printed first", then each worker (each started with random delay interval) shoud print its own message. This obviously doesn't happen because of the queue.Sync() call but I don't want to wait for all the workers to complete.

Class DelayedTest Extends %RegisteredObject
{

ClassMethod Callback(interval As %String) As %Status
{
    Hang interval
    Write "Interval = ", interval, !
    Return $$$OK
}

Method RunWorkers()
{
    #Dim queue as %SYSTEM.WorkMgr
    Set queue = ##class(%SYSTEM.WorkMgr).%New()
    For i = 1:1:5
    {
        Set status = queue.Queue("..Callback", $RANDOM(5) + 1) // Minimal delay is 1 second
        $$$ThrowOnError(status)
    }

    Set status = queue.Sync()
    $$$ThrowOnError(status)
}

ClassMethod Main()
{
    #Dim d = ##class(DelayedTest).%New()
    Do d.RunWorkers() 
    Write "This should be printed first" 
}

}

Also, I'm not sure that Hang is appropriate here to emulate some delay. If it works like Sleep it should block the main thread.

Hi Dmitrii,

Why not Sync from main()? 

Class User.DelayedTest Extends %RegisteredObject
{

ClassMethod Callback(interval As %String) As %Status
{
    Hang interval
    Write "Interval = ", interval, !
    Return $$$OK
}

Method RunWorkers(queue)
{
    #Dim queue as %SYSTEM.WorkMgr
    Set queue = ##class(%SYSTEM.WorkMgr).%New()
    For i = 1:1:5
    {
        Set status = queue.Queue("..Callback", $RANDOM(5) + 1) // Minimal delay is 1 second
        $$$ThrowOnError(status)
    }
}

ClassMethod Main()
{
    #Dim d = ##class(DelayedTest).%New()
    Do d.RunWorkers(.queue)
    Write "This should be printed first",!
    Set status = queue.Sync()
    $$$ThrowOnError(status)
    Write "Exiting...",!
}

}

Hi Dmitrii,

From the discussion in the comments it seems like you're trying to use "Workers", but did you try to use the Event mechanism for your use-case?

See also this related article (and this one as well).

By the way here's a Docs reference of using this from Python.

I don't have a wider context of this, but in the IRIS Interoperability functionality, behind the scenes of the Interoperability components, and the Messages and Queues managed there, this Event mechanism is used. So perhaps if you are already using IRIS's Interoperability capabilities, you can implement this in the higher level of the Business Components in your Interoperability Production, rather than with the lower level code using the Event class.

The Workers mechanism you tried to use is intended more for distributing parallel work, and the Event API is more for messaging and queuing scenarios, which sounds more like your use-case.

This article might also be of interest to you as it discusses moving from "Workers" to "Events".

I finally managed to solve the problem in Python. It's not perfect but it works:

Class User.Timer Extends %RegisteredObject
{

Property Executor [ Private ];

Method Initialize(maxWorkers As %Integer = 4) [ Language = python ]
{
import concurrent.futures
import time
import threading

self.Executor = concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers)
}

Method Close() [ Language = python ]
{
if self.Executor:
    self.Executor.shutdown()
}

Method Greet(name)
{
    Write "Hello ", name, !
}

Method OnCallback0(methodName As %String) [ Private ]
{
    Do $METHOD(instance, methodName)
}

Method OnCallback1(instance As %RegisteredObject, method As %String, arg1) [ Private ]
{
    Do $METHOD(instance, method, arg1)
}

Method OnCallback2(instance As %RegisteredObject, method As %String, arg1, arg2) [ Private ]
{
    Do $METHOD(instance, method, arg1, arg2)
}

Method OnCallback3(instance As %RegisteredObject, method As %String, arg1, arg2, arg3) [ Private ]
{
    Do $METHOD(instance, method, arg1, arg2, arg3)
}

Method OnCallback4(instance As %RegisteredObject, method As %String, arg1, arg2, arg3, arg4) [ Private ]
{
    Do $METHOD(instance, method, arg1, arg2, arg3, arg4)
}

Method OnCallback5(instance As %RegisteredObject, method As %String, arg1, arg2, arg3, arg4, arg5) [ Private ]
{
    Do $METHOD(instance, method, arg1, arg2, arg3, arg4, arg5)
}

Method InternalRun(delayMs As %Integer, wait As %Boolean, instance As %RegisteredObject, method As %String, args... As %List) [ Internal, Language = python ]
{
import time
import iris

if not self.Executor:
    raise Exception("The 'Initialize' method has not been called.")

def worker_function():
    time.sleep(delayMs / 1000)
    if len(args) == 0:
        self.OnCallback0(instance, method)
    elif len(args) == 1:
        self.OnCallback1(instance, method, args[0])
    elif len(args) == 2:
        self.OnCallback2(instance, method, args[0], args[1])
    elif len(args) == 3:
        self.OnCallback3(instance, method, args[0], args[1], args[2])
    elif len(args) == 4:
        self.OnCallback4(instance, method, args[0], args[1], args[2], args[3])
    elif len(args) == 5:
        self.OnCallback5(instance, method, args[0], args[1], args[2], args[3], args[4])
    else:
        raise Exception("Too many arguments.")
    return 0


future = self.Executor.submit(worker_function)

# wait == 0 means fire-and-forget
try:
    if (wait == 1):
        rv = future.result()

except Exception as e:
    print(f"{e}")
}

/// delayMs  - the parameter specifies the timer delay in milliseconds
/// wait     - if the parameter is false, the process will not wait for the Future result to be returned (fire-and-forget)
/// instance - any object which method should be called with a delay
/// method   - specifies the object's callback method name
/// args     - the callback method arguments (up to 5)
Method Run(delayMs As %Integer, wait As %Boolean, instance As %RegisteredObject, method As %String, args... As %List)
{
    Do ..InternalRun(delayMs, wait, instance, method, args...)
}

ClassMethod Test()
{
    Set obj = ##class(Timer).%New()
    Do obj.Initialize()
    Do obj.Run(1000, 0, obj, "Greet", "John")
    Do obj.Run(2000, 0, obj, "Greet", "Jessica")
    Write "If 'wait == 0' this line will be printed first", !
    Do obj.Close()
}

}