Send digest emails for workflow tasks with Publish/Subscribe

Primary tabs

Ensemble supports publish and subscribe message delivery. Publish and subscribe refers to the technique of routing a message to one or more subscribers based on the fact that those subscribers have previously registered to be notified about messages on a specific topic.

This article demonstrates how several Ensemble capabilities can work together:

In this article we would send emails about:

  • New workflow tasks
  • Unassigned workflow tasks
  • Uncompleted workflow tasks
  • Ensemble alerts

Email recipients would be determined using Publish/Subscribe operation and each user would receive only digest email whenever possible.

Publish/Subscribe

Ensemble supports publish and subscribe message delivery. Publish and subscribe refers to the technique of routing a message to one or more subscribers based on the fact that those subscribers have previously registered to be notified about messages on a specific topic.

Before everything else two  Publish/Subscribe domains (one for Workflow tasks and one for Ensemble Alerts) should be defined with the structure of topics, you need for your project. The process is described in the documentation.

Workflow

To test workflow you'll need workflow tasks. Demo.Workflow production could be used to generate test tasks. Article on Community about workflow.

Email Service

We start with a Service. It does most of the work:

  • Queries workflow data to get New, Unassigned and Uncompleted tasks
  • Calls Subscription operation to get recipients
  • Writes task info and recipients into a temporary table Test.PPGEmail
  • Forms digest email for each recipient
  • Sends emails to email operation

It has several configurable parameters:

  • ApprovalDomain - Subscriptions domain for approval tasks
  • SendBO - Operation that would actually send new messages to subscribers (for example emails)
  • SubscriptionBO - Subscription operation
  • UnacceptedTime - Time (in hours) while task can be unaccepted without notification
  • UncompletedTime - Time (in hours) a task can be uncompleted without notification
  • URL - Base workflow portal url. Could be a dashboard url or a link to a custom portal, for example, Ensemble Workflow UI

Query workflow data

Ensemble workflow stores data in EnsLib_Workflow.TaskResponse table. Let's query it:

Query newWorkflowTasks(time) As %SQLQuery
{
SELECT TaskStatus_RoleName As topic, "%Subject" As text, "%Message" As message
FROM EnsLib_Workflow.TaskResponse
WHERE "%Status" = 'Unassigned' AND TaskStatus_TimeCreated >= :time
ORDER BY TaskStatus_RoleName
}

Query unassignedWorkflowTasks(hours) As %SQLQuery
{
SELECT TaskStatus_RoleName As topic, "%Subject" As text, "%Message" As message
FROM EnsLib_Workflow.TaskResponse
WHERE "%Status" = 'Unassigned' AND DATEDIFF('hh', TaskStatus_TimeCreated, NOW()) >= :hours
ORDER BY TaskStatus_RoleName
}

Query uncompletedWorkflowTasks(time) As %SQLQuery
{
SELECT TaskStatus_RoleName As topic, "%Subject" As text, "%Message" As message
FROM EnsLib_Workflow.TaskResponse
WHERE "%Status" = 'Assigned' AND DATEDIFF('hh', TaskStatus_TimeCreated, NOW()) >= :hours
ORDER BY TaskStatus_RoleName
}

/// Populates Test.PPGEmail with "messages" to send
Method generateWorkflowDataByType(type As %String(VALUELIST="new,unassigned,uncompleted")) As %Status
{
    
    #dim sc As %Status = $$$OK
    set prevTopic = ""
    
    if type = "new" {
        set query = "newWorkflowTasks"
        set arg = ..getTime()
    } elseif type = "unassigned" {
        set query = "unassignedWorkflowTasks"
        set arg = ..UnacceptedTime
    } elseif type = "uncompleted" {
        set query = "uncompletedWorkflowTasks"
        set arg = ..UncompletedTime
    }
    
    #dim rs = $classmethod(,query _ "Func", arg)

    while rs.%Next() {
        set topic = rs.topic _ "." _ type
        if prevTopic'=topic {
            set emails = ..determineEmails(..ApprovalDomain, topic)
            set prevTopic = topic
        }
        
        set message = rs.message
        set:message'="" message = " - "_ message
        
        set sc = ##class(Test.PPGEmail).add(..ApprovalDomain, topic, emails, rs.text _ message)
        quit:$$$ISERR(sc)
    }
    quit sc
}

/// Get email addresses by domain and topic.
Method determineEmails(domain As %String, topic As %String) As %List
{
	set subRequest = ##class(EnsLib.PubSub.Request).%New()
	set subRequest.Topic = topic
	set subRequest.DomainName = domain	
	
	do ..SendRequestSync(..SubscriptionBO, subRequest, .subResponse,, "Get subscribers for domain: " _ domain _ ", topic: " _ topic)
	
	set mails = ""
	for i=1:1:subResponse.TargetList.Count() {
		#dim target As EnsLib.PubSub.Target
		set target = subResponse.TargetList.GetAt(i)
		set mails = mails _ $lb(target.Address)	
	}
	return mails
}

In determineEmails method, Subscription operation is called to get a list of subscribers.

Test.PPGEmail

Query data populates temporary Test.PPGEmail table:

/// Class to store subscriptions in PPG
Class Test.PPGEmail Extends %Persistent
{

/// Domain
Property domain As %String(MAXLEN = 100);

/// Topic
Property topic As %String(MAXLEN = 1000) [ Required ];

/// Where to send current subscription. STORAGEDEFAULT and SQLPROJECTION allow SQL access
Property emails As list Of %String(SQLPROJECTION = "table/column", STORAGEDEFAULT = "array");

// Not sure if it improves performance
// Index emailsIndex On emails(ELEMENTS);

/// Text
Property text As %String(MAXLEN = "");

/// Add subscription
/// w ##class(Test.PPGEmail).add("APPROVAL", "Contract approval", $lb("1@1.com","2@2.com"), "text")
ClassMethod add(domain As %String, topic As %String, emails As %List, text As %String) As %Status
{
    set obj = ..%New()
    set obj.domain = domain
    set obj.topic = topic
    set obj.text = text
    
    if $listvalid(emails) {
        for i=1:1:$ll(emails){
            do obj.emails.Insert($lg(emails,i))
        }
    }
    return obj.%Save()
}

Query flushTable() As %Query
{
SELECT * FROM Test.PPGEmail
}

/// !!! PPG storage
Storage Default
{
<Data name="PPGEmailDefaultData">
    <Value name="1">
        <Value>%%CLASSNAME</Value>
    </Value>
    <Value name="2">
        <Value>domain</Value>
    </Value>
    <Value name="3">
        <Value>topic</Value>
    </Value>
    <Value name="4">
        <Value>text</Value>
    </Value>
</Data>
<Data name="emails">
    <Attribute>emails</Attribute>
    <Structure>subnode</Structure>
    <Subscript>"emails"</Subscript>
</Data>
<DataLocation>^||Test.PPGEmailD</DataLocation>
<DefaultData>PPGEmailDefaultData</DefaultData>
<Description>
    <![CDATA[!!! PPG storage]]>
</Description>
<IdLocation>^||Test.PPGEmailD</IdLocation>
<IndexLocation>^||Test.PPGEmailI</IndexLocation>
<StreamLocation>^||Test.PPGEmailS</StreamLocation>
<Type>%Library.CacheStorage</Type>
}

}

There are two tricks here, first it's emails property:

Property emails As list Of %String(SQLPROJECTION = "table/column", STORAGEDEFAULT = "array");

SQLPROJECTION and STORAGEDEFAULT  property parameters allow us to query it later by email (and especially GROUP BY email).

Second, we store data in process private globals so we can run several similar services without caring about

Generating digest email

As each task has several recipients and each recipient has a lot of tasks we need to send digest emails.

/// Generate mails and send to BO
Method sendEmails()
{
    &sql(DECLARE C1 CURSOR FOR
         SELECT DISTINCT emails  
         INTO :email
         FROM Test.PPGEmail_emails
    )
 
    &sql(OPEN C1)
    &sql(FETCH C1)

    While (SQLCODE = 0) {
        set text = ..generateEmailText(email, ..url)
                  
         set msg = ##class(Test.Alert).%New()
        set msg.topic = "Your tasks"
        set msg.to = email
        set msg.text = text
        if ..SendBO '="" {
            do ..SendRequestAsync(..SendBO, msg ,"Email with wf tasks for: " _ email)
        }
        &sql(FETCH C1)
    }
    &sql(CLOSE C1)
}

With our storage schema for emails property getting distinct emails is very fast, next we iterate over recipients, build email for each and pass it to email operation. Instead of email, it could be Telegram Operation or anything else really.

To get list of tasks by email we can use FOR SOME %ELEMENT syntax:

SELECT "domain", topic, text  
FROM Test.PPGEmail
WHERE FOR SOME %ELEMENT(emails) (%VALUE=:email)
ORDER BY "domain", topic

Email Operation

Finally, everything is ready to send alerts. The operation receives either our Test.Alert message or Ensemble alert and sends email to a list of recipients. For Ensemble alerts, it also gets subscribers from Publish operation. Check the code for sources, but there's nothing interesting there to justify copy&pasting code into this article.

Conclusion

Ensemble can implement various Publish/Subscribe flows. Digest emails can be easily generated using various storage mechanisms, allowing for M:N relationships between data and recipients.

Links

Comments

Hey

Its really hard to gather any flow or re-use. 

Did you made any operation that uses Ens.PubSubRouting?

Both Service and Operation presented in the article use EnsLib.PubSub.PubSubOperation to get subscribers.

Here's getting a list of emails by domain and topic:

/// Get email addresses by domain and topic.
Method determineEmails(domain As %String, topic As %String) As %List
{
    set subRequest = ##class(EnsLib.PubSub.Request).%New()
    set subRequest.Topic = topic
    set subRequest.DomainName = domain    
    
    do ..SendRequestSync(..SubscriptionBO, subRequest, .subResponse,, "Get subscribers for domain: " _ domain _ ", topic: " _ topic)
    
    set mails = ""
    for i=1:1:subResponse.TargetList.Count() {
        #dim target As EnsLib.PubSub.Target
        set target = subResponse.TargetList.GetAt(i)
        set mails = mails _ $lb(target.Address)    
    }
    return mails
}

Neerav, pls see your pub/sub post. I left my comments there last week. Look forward having your code implementation to understand pub/sub processing.

Thanks