Article
· Jan 24 9m read

Database Driven IRIS Production with Custom Inbound Adapters

The traditional use of an IRIS production is for an inbound adapter to receive input from an external source, send that input to an IRIS service, then have that service send that input through the production.

With a custom inbound adapter though, we can make an IRIS production do more. We can use an IRIS production to process data from our own database without any external trigger.

By using an IRIS production in this way your data processing tasks now get to leverage all the built in features of an IRIS production, including:

  • Advanced tracking and monitoring
  • Multithreaded processing for scalability
  • Configuration based business logic
  • Built-in IRIS operations to quickly connect to external systems
  • Quick recovery from system failures

The documentation for making a custom inbound adapter can be found at: https://docs.intersystems.com/hs20231/csp/docbook/DocBook.UI.Page.cls?KEY=EGDV_adv#EGDV_adv_adapterdev_inbound

Let’s look at 3 examples of a simple production configured to process “Fish” objects from a database.

In the first example we will make a data driven production that will continually process data.

In the second example we will modify this production to process data only during specific times.

In the third example we will modify this production to process data only when triggered via a system task.

Example 1: Continual Data Processing

This example is a simple production configured to continually process “Fish” objects from a database. All the production does is continually look for new fish objects, convert those fish objects to JSON, and then spit that JSON out to a file.

First, we make the Fish object we intend to process:

Class Sample.Fish Extends (%Persistent, Ens.Util.MessageBodyMethods, %JSON.Adaptor, %XML.Adaptor)
{

Parameter ENSPURGE As %Boolean = 0;
Property Type As %String;
Property Size As %Numeric;
Property FirstName As %String;
Property Status As %String [ InitialExpression = "Initialized" ];
Index StatusIndex On Status;
}

Status is important as that is how we will track unprocessed Fish from processed.

Setting ENSPURGE to 0 will prevent this object from being purged along with the message headers in the future.

Second, we make a custom adapter to look for new Fish:

Class Sample.Adapter.FishMonitorAdapter Extends Ens.InboundAdapter
{

/// Fish status value the adapter will query for. All matching fish will have their status set to SetFishStatus and then will be sent to the service.
Property GetFishStatus As %String [ InitialExpression = "Initialized", Required ];
/// Fish status value the service will set fish to before they are sent to the service.
Property SetFishStatus As %String [ InitialExpression = "Processed", Required ];
Parameter SETTINGS = "GetFishStatus:Basic,SetFishStatus:Basic";
Parameter SERVICEINPUTCLASS = "Sample.Fish";
Method OnTask() As %Status
{
	//Cursor to seach for any matching fish
	set getFishStatus = ..GetFishStatus
	&sql(declare fishCursor cursor for
		select ID into :fishId
		from Sample.Fish
		where Status = :getFishStatus)
	
	//Execute the cursor
	&sql(open fishCursor)
	for {
		&sql(fetch fishCursor)
		quit:SQLCODE'=0
		//For each matching fish, change its Status and send it to the service (BusinessHost)
		set fishObj = ##class(Sample.Fish).%OpenId(fishId)
		set fishObj.Status = ..SetFishStatus
		$$$ThrowOnError(fishObj.%Save())
		$$$ThrowOnError(..BusinessHost.ProcessInput(fishObj))
	}
	&sql(close fishCursor)
	if SQLCODE < 0 {
		throw ##class(%Exception.SQL).CreateFromSQLCODE(SQLCODE,%msg)
	}
	
	quit $$$OK
}

The OnTask() method searches for any fish matching the configured GetFishStatus value. For each fish it finds it changes its status to the configured SetFishStatus value and then passes it to the service’s ProcessInput method.

Third, we make a custom service to use this adapter:

Class Sample.Service.FishMonitorService Extends Ens.BusinessService
{

/// Configuration item to which to send messages
Property TargetConfigName As Ens.DataType.ConfigName;
Parameter SETTINGS = "TargetConfigName:Basic";
Parameter ADAPTER = "Sample.Adapter.FishMonitorAdapter";
Method OnProcessInput(pInput As Sample.Fish, pOutput As %RegisteredObject) As %Status
{
    quit:..TargetConfigName=""
    //Send fish to configured target
    quit ..SendRequestAsync(..TargetConfigName, pInput)
}

}

This service takes fish as input and passes them via an async request to the configured target.

Fourth, we make a custom business process to convert the fish to JSON.

Class Sample.Process.FishToJSONProcess Extends Ens.BusinessProcess
{

/// Configuration item to which to send messages
Property TargetConfigName As Ens.DataType.ConfigName;
Parameter SETTINGS = "TargetConfigName:Basic";
Method OnRequest(pRequest As Sample.Fish, Output pResponse As Ens.Response) As %Status
{
	//Convert the fish to a JSON stream
	do pRequest.%JSONExportToStream(.jsonFishStream)
	//Make a new stream container with JSON stream
	set tRequest = ##class(Ens.StreamContainer).%New(jsonFishStream)
	//Send stream container to configured target
	quit ..SendRequestAsync(..TargetConfigName, tRequest, 0)
}

Method OnResponse(request As Ens.Request, ByRef response As Ens.Response, callrequest As Ens.Request, callresponse As Ens.Response, pCompletionKey As %String) As %Status
{
    quit $$$OK
}

}

The OnRequest() method is the only one that does anything. It accepts fish, generates a JSON stream from the fish, packages that stream into a Ens.StreamContainer, and then passes that stream container via an async request to the configured target.

Finally, we configure the production:

Class Sample.DataProduction Extends Ens.Production
{

XData ProductionDefinition
{
<Production Name="Sample.DataProduction" LogGeneralTraceEvents="false">
  <Description></Description>
  <ActorPoolSize>2</ActorPoolSize>
  <Item Name="Sample.Service.FishMonitorService" Category="" ClassName="Sample.Service.FishMonitorService" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
    <Setting Target="Host" Name="TargetConfigName">Sample.Process.FishToJSONProcess</Setting>
  </Item>
  <Item Name="Sample.Process.FishToJSONProcess" Category="" ClassName="Sample.Process.FishToJSONProcess" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
    <Setting Target="Host" Name="TargetConfigName">EnsLib.File.PassthroughOperation</Setting>
  </Item>
  <Item Name="EnsLib.File.PassthroughOperation" Category="" ClassName="EnsLib.File.PassthroughOperation" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
    <Setting Target="Adapter" Name="FilePath">C:\temp\fish\</Setting>
  </Item>
</Production>
}

}

All that is left to do is to test it. For this we just need to open a terminal window and make a new fish object.

Looking at the production messages we can see the fish was found and processed:

We can inspect the trace of both messages:

And looking at the output folder (C:\temp\fish\) we can see the output file:

 

Example 2: Schedule-Based Data Processing

For use cases where we only want to process data at specific times, like overnight, we can configure the service to run on a schedule.

To modify example 1 to run on a schedule we first make a Schedule Spec. The documentation on how to do this can be found here: https://docs.intersystems.com/iris20231/csp/docbook/DocBook.UI.PortalHelpPage.cls?KEY=Ensemble%2C%20Schedule%20Editor

Then we change the service configuration to use this schedule:

Class Sample.DataProduction Extends Ens.Production
{

XData ProductionDefinition
{
<Production Name="Sample.DataProduction" LogGeneralTraceEvents="false">
  <Description></Description>
  <ActorPoolSize>2</ActorPoolSize>
  <Item Name="Sample.Service.FishMonitorService" Category="" ClassName="Sample.Service.FishMonitorService" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="@Midnight Processing">
    <Setting Target="Host" Name="TargetConfigName">Sample.Process.FishToJSONProcess</Setting>
  </Item>
  <Item Name="Sample.Process.FishToJSONProcess" Category="" ClassName="Sample.Process.FishToJSONProcess" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
    <Setting Target="Host" Name="TargetConfigName">EnsLib.File.PassthroughOperation</Setting>
  </Item>
  <Item Name="EnsLib.File.PassthroughOperation" Category="" ClassName="EnsLib.File.PassthroughOperation" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
    <Setting Target="Adapter" Name="FilePath">C:\temp\fish\</Setting>
  </Item>
</Production>
}

}

Now when we look at this “Jobs” tab of the service we see there are no jobs running:

Form now on this service will only ever have jobs running between the hours of midnight and 1AM.

 

Example 3: Event-Based Data Processing with Task Manager

For use cases where we only want to process data once at a specific time or when a particular event takes place, we can configure the service to only execute when a system task is run.

To modify example 1 to run only when triggered by a task we first make a custom task to trigger the service.

Class Sample.Task.TriggerServiceTask Extends %SYS.Task.Definition
{

/// The name of the Business Service this task should run.
Property BuinessServiceName As %String [ Required ];
Method OnTask() As %Status
{
	#dim pBusinessService As Ens.BusinessService
	$$$ThrowOnError(##class(Ens.Director).CreateBusinessService(..BuinessServiceName, .pBusinessService))
	Quit pBusinessService.OnTask()
}

}

Second, we configure a new system task. Documentation on how to configure system tasks can be found here: https://docs.intersystems.com/iris20233/csp/docbook/Doc.View.cls?KEY=GSA_manage_taskmgr

This is the custom part of the configuration process for this example is:

In addition, I am configuring the task to be an on-demand task, but you could set up a schedule instead.

Finally, we configure the production:

Class Sample.DataProduction Extends Ens.Production
{

XData ProductionDefinition
{
<Production Name="Sample.DataProduction" LogGeneralTraceEvents="false">
  <Description></Description>
  <ActorPoolSize>2</ActorPoolSize>
  <Item Name="Sample.Service.FishMonitorService" Category="" ClassName="Sample.Service.FishMonitorService" PoolSize="0" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
    <Setting Target="Host" Name="TargetConfigName">Sample.Process.FishToJSONProcess</Setting>
  </Item>
  <Item Name="Sample.Process.FishToJSONProcess" Category="" ClassName="Sample.Process.FishToJSONProcess" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
    <Setting Target="Host" Name="TargetConfigName">EnsLib.File.PassthroughOperation</Setting>
  </Item>
  <Item Name="EnsLib.File.PassthroughOperation" Category="" ClassName="EnsLib.File.PassthroughOperation" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
    <Setting Target="Adapter" Name="FilePath">C:\temp\fish\</Setting>
  </Item>
</Production>
}

}

Note that we set the PoolSize of Sample.Service.FishMonitorService to 0.

All that is left to do is to test it. For this we just need to open a terminal window and make a new fish object.

Looking at the production messages we can see the fish has not been processed yet:

Then we run the on-demand task to trigger the service:

Now looking at the production messages we can see the service was triggered causing the fish to be found and processed:

We can inspect the trace of both messages:

And looking at the output folder (C:\temp\fish\) we can see the output file:

 

Conclusion

The above examples are quite simple. You could configure the productions to do far more though. Including…

Basically, anything that could be done in a typical IRIS production could be done here too.

Discussion (0)1
Log in or sign up to continue