Article
· Jan 14 6m read

The Power of Embedded Python in Interoperability - a mongoDB CDC Use-Case

Using embedded Python while building your InterSystems-based solution can add very powerful and deep capabilities to your toolbox.

I'd like to share one sample use-case I encountered - enabling a CDC (Change Data Capture) for a mongoDB Collection - capturing those changes, digesting them through an Interoperability flow, and eventually updating an EMR via a REST API.

The Core Functionality - "Change Watching" 👀

The basic idea is to use the PyMongo package, which, among other things, enables tracking changes within a mongo database via it's change_streams.

The important part is in the Inbound Adapter I created that has an OnInit() method that includes this line:

self.changeStream = client.get_database(...).get_collection(...).watch()

I'm using the watch() method here to monitor the changes. It watches a specific Database and a specific Collection.

I set these through the Business Service settings:

Then in the OnTask() method I'm using this to capture the changes:

while self.changeStream.alive:
    change = self.changeStream.try_next()
    if change is not None:

Here I'm using the alive property and try_next() method to get the change details that occurred.

For every change I'm creating an IRIS Stream object with the JSON content and sending that to the Business Service ProcessInput() method.

In the Business Service I create a Request imported from the JSON payload, and send that down to the Router.

The general flow can be illustrated like this -

Here's a video demonstration:

https://www.youtube.com/embed/Z_6rq-bhB_s
[This is an embedded link, but you cannot view embedded content directly on the site because you have declined the cookies necessary to access it. To view embedded content, you would need to accept all cookies in your Cookies Settings]

Some More Technical Details 👩‍💻

The Business Service needs to "massage" the JSON a little because it includes some "fuzzy" JSON, before I can allow it to be imported as valid JSON.

For example one can look like this:

 
Change JSON Sample

You can see for example the Timestamp and new UUID parts which are not valid JSON.

You can see that the actual "Document" is under the fullDocument part, and for this example I used a specific schema.

My document would look like this:

 
Sample JSON Document for this demo

In your case you can either change this to fit your schema, or even consider adapting my sample to use a more "dynamic" approach where you can define a setting that would be used to dynamically import the JSON to a variable class name (vs. my hard-coded one).

[In any case, to load your JSON schema and create a class from it, you may use the Sample.OpenAPIClassGenerator class I included in my sample (adapted from @Guillaume Rongier's OpenAPI Definition Class Generator (and call the ProcessFile() method I added on your JSON schema file).]

In essence this is the main part of this functionality: Adapter + Business Service that send off messages with changes that were made on a mongo DB Collection.

Widening the Picture 👩‍⚕️

For demonstration purposes, and to make a full flow that "tells a story" (which was the actual use-case I had) I also added a target for this CDC, which is a REST API of a mock EMR, which takes patient data and inserts it into a Patient Table.

You'll find this part under the Demo.EMR package (Data.Patient for the Table, and Util.API for the REST API); and the Demo.Int package for the Business Operation part.

By the way you can reuse this part for any other needs you might find when you need some mock API to send Patient data for testing as a target destination.

[Note the "mock EMR" table and API include data elements which I didn't use in this demo, like email address and phone number]

A Sample Full Flow (with screenshots) 📷

So here is a sample flow of how this would work -

1. Add a document to your mongo Collection.

 
Shell commands (Docker exec to connect to mongo container, and mongosh commands, to initiate ReplicaSet, and a Document to a Collection)

2. Examine your "EMR" table and see the new data from Mongo inserted into it.

 
DBeaver viewing the data in the Patient table

3. Examine the "Behind the Scenes" inside InterSystems IRIS taking care of this flow.

 
Visual Trace of messages flow

Take it for a Ride 🏎

The Open Exchange app includes all the related code, and setup instructions for a full Docker Container based demo per above (with all the related parts, including mongo containers).

[Note Mongo has a notion of a ReplicaSet and only with this feature would change_streams work, for simplicity the demo above assumes mongo1 (there are also 2 and 3) is the "primary"].

 

Again - this is just one example of how Embedded Python can get you up and running with your interoperability challenges very quickly and easily.

Discussion (0)1
Log in or sign up to continue