Article
· 7 hr ago 13m read

Introduction to Interoperability on Python (IoP) - Part2

Hi Community,

In the first part of this series, we examined the fundamentals of Interoperability on Python (IoP), specifically how it enables us to construct such interoperability elements as business services, processes, and operations using pure Python.

Now, we are ready to take things a step further. Real-world integration scenarios extend beyond simple message handoffs.They involve scheduled polling, custom message structures, decision logic, filtering, and configuration handling.In this article, we will delve into these more advanced IoP capabilities and demonstrate how to create and run a more complex interoperability flow using only Python.

To make it practical, we will build a comprehensive example: The Reddit Post Analyzer Production. The concept is straightforward: continuously retrieving the latest submissions from a chosen subreddit, filtering them based on popularity, adding extra tags to them, and sending them off for storage or further analysis.

The ultimate goal here is a reliable, self-running data ingestion pipeline. All major parts (the Business Service, Business Process, and Business Operation) are implemented in Python, showcasing how to use IoP as a Python-first integration methodology.

We will cover the following topics in this article:

✅ Defining message models using @dataclass
✅ Pulling live data from Reddit (Business Service)
✅ Filtering and enriching posts (Business Process)
✅ Handling the final delivery (Business Operation)
✅ Using structured logging across the pipeline
✅ Migrating IOP classes into IRIS using settings.py
✅ Overview of the IOP Director utility class


Let's begin with the application folder structure:

reddit_iop/
 ├─ messages.py
 ├─ services/
    └─ service_reddit.py
 ├─ processes/
    └─ process_reddit.py
 ├─ operations/
    └─ operation_store.py
 ├─ settings.py    
    

✅ Defining Message Models Using @dataclass (messages.py)

A central concept in any integration framework is the Message.In InterSystems IRIS, messages are first-class objects (they can be traced, inspected, and persisted as they move through production). One of the strengths of IoP is that we can define these messages as typed Python classes using @dataclass. It means that we can avoid creating ObjectScript message classes and instead benefit from clean, IDE-friendly Python models.

In IoP, Message is the base class for anything passed between components. We will build upon it to create our own strongly-typed message objects with the help of Python dataclasses. These data models will flow through the Business Service, Business Process, and Business Operation.

from iop import Message
from dataclasses import dataclass

@dataclass
class RedditPostMessage(Message):
    Title: str = ""
    URL: str = ""
    Author: str = ""
    Score: int = 0
    Tag: str = ""
    Status: str = ""

By using the @dataclass decorator on a class that inherits from iop.Message, we achieve several advanced benefits with minimal code:

  • Automatic Properties: dataclasses automatically generates the __init__, __repr__, and comparison methods based on the type-hinted fields (Title: str, Score: int, etc.).
  • Strong Typing: Type hints ensure that all components are aware of the expected data type, which improves code quality and prevents runtime errors.
  • IoP Integration: The iop.Message inheritance ensures that the Python class is compiled into a persistent, ObjectScript-compatible class within InterSystems IRIS. It means that every message sent is automatically saved in the database for auditing and visual tracing (a key feature of the IRIS platform).

✅ Pulling Live Data from Reddit (service_reddit.py)

In an interoperability production, the Business Service acts as the gateway that brings data into the system. For our demonstration, the service will continuously poll Reddit’s /new.json endpoint and feed new submissions into the processing pipeline.

This component uses an inbound adapter to schedule and execute periodic API calls. Each time the adapter runs, it requests the latest posts from the specified subreddit, wraps the relevant fields in our RedditPostMessage dataclass, and forwards it to the next stage in the flow.

Key responsibilities of this service include:

  • Initiating the data flow at defined intervals
  • Connecting to Reddit and retrieving the newest submissions
  • Converting raw API response data into a strongly-typed RedditPostMessage
  • Logging errors cleanly without interrupting the production
  • Forwarding well-structured messages to the Business Process layer

This configuration mirrors a real-world integration pattern where an external data source continuously feeds the integration engine. By combining the IoP inbound adapter with a Python-based message model, we achieve a reliable and traceable ingest layer that is independent of ObjectScript.

from iop import BusinessService
from messages import RedditPostMessage
import requests, time

class RedditService(BusinessService):
    #Required to schedule service
    def get_adapter_type():
        # This is mandatory to schedule the service
        # By default, the service will be scheduled every 5 seconds
        return "Ens.InboundAdapter"
    #Initializes loop settings
    def on_init(self):
        self.subreddit =  "technology"
        self.poll_interval = 10
        self.base_url = f"https://www.reddit.com/r/{self.subreddit}/new.json?limit=5"
        self.headers = {"User-Agent": "IRIS-IoP-Reddit-Agent"}
    
    #Infinite polling loop to fetch events
    def on_process_input(self, _):
        while True:
            try:
                response = requests.get(self.base_url, headers=self.headers)
                posts = response.json()["data"]["children"]
                
                for post in posts:
                    data = post["data"]

                    msg = RedditPostMessage(
                        Title=data["title"],
                        URL="https://reddit.com" + data["permalink"],
                        Author=data["author"],
                        Score=data["score"]
                    )
                    #Sends message to Process component
                    self.send_request_sync("RedditProcess", msg)

                self.log_info(f"[RedditService] Pulled {len(posts)} posts")

            except Exception as e:
                self.log_error(f"[RedditService ERROR] {e}")

            time.sleep(self.poll_interval)

✅ Filtering and Enriching Posts (process_reddit.py)

The Business Process acts as the central nervous system of the production. This is where raw Reddit submissions are converted into meaningful information, and such key business rules as filtering, decision-making, and routing are executed.

Once the Business Service publishes a RedditPostMessage,  the process assesses its contents and determines the next course of action.

In this example, the process checks whether the submission meets specific criteria (e.g., a minimum score or specific keywords).Posts that pass the filter are augmented and forwarded toward our Business Operation, while those that don’t are logged and dropped to keep the workflow clean and efficient.

from iop import BusinessProcess
from messages import RedditPostMessage

class RedditProcess(BusinessProcess):
    def on_init(self):
        self.log_info("Hello World init")
    #Entry point for incoming messages.
    def on_request(self, request: RedditPostMessage) -> RedditPostMessage:
        title = request.Title
        score = request.Score

        self.log_info(f"[Process] Received: {title} | Score: {score}")

        # Filter logic: process only trending posts
        min_score = 5
        if score < min_score:
            self.log_info(f"[Process] Skipped low score ({score}) post")
            response = RedditPostMessage(Status="FilteredLowScore")
            return response

        # Enrichment
        request.Tag = self._tag_topic(title)

        self.log_info(f"[Process] Tagged topic: {request.Tag}")

        # Forward to operation
        return self.send_request_sync("RedditStoreOperation", request)
    
    #Detects topic from keywords
    def _tag_topic(self, title: str) -> str:
        keywords = {
            "AI": "Artificial Intelligence",
            "health": "Healthcare",
            "python": "Programming",
            "data": "Data Engineering",
        }
        for key, tag in keywords.items():
            if key.lower() in title.lower():
                return tag
        return "General"
  • Filtering and Early Exit: The if score < min_score: block demonstrates conditional processing. If the message does not meet the requirements (low score), the process logs the skip and returns a simple StatusResponse, terminating that message's journey early without sending it downstream.
  • Data Enrichment: The line request.Tag = self._tag_topic(title) shows how to modify the message object (which is a Python object in memory). The _tag_topic function performs simple business logic (categorization) and adds the result to the message, making the data more valuable for the storage component.
  • Internal Methods: Python enables clean object-oriented design, as demonstrated by _tag_topic. This function, encapsulated within the class, keeps the main on_request method clean and focused on orchestration.
  • Continuing the Pipeline: If the post passes the filter, the augmented message is passed to the Operation using self.send_request_sync(), ensuring the flow remains synchronous for full traceability in the visual message tracer.

✅ Handling the Final Delivery (operation_store.py)

The Business Operation is the final component in the production pipeline that interacts with external systems. This could be a database, a file system, a remote API, a message queue, or any other destination for processed data.

Once a message reaches this layer, it is considered fully processed and ready for persistence, storage, or further consumption. In our demonstration, the operation logs the post details and simulates saving them. Still, in a real-world scenario, this is where you would execute SQL inserts, REST calls, or send messages to other systems.

from iop import BusinessOperation
from dataclasses import dataclass
from messages import RedditPostMessage


class RedditStoreOperation(BusinessOperation):
    def on_init(self):
        self.log_info("Operation init")
        
    #standard entry point for operations.    
    def on_message(self, request: RedditPostMessage) -> RedditPostMessage:
        self.log_info(
            f"[Store] Title: {request.Title} | Score: {request.Score} | Tag: {request.Tag}"
        )
        # Mock DB or File writing here
        # Real system: SQL insert / Kafka / FHIR server POST
        # Simulates saving to a database, file, or external system.
        response = RedditPostMessage(Status="Saved")
        #returning the status to close the loop
        return response
  • Input Handling: The method signature on_message(self, request: RedditPostMessage) clearly defines the expected input type, reinforcing the contract set by the custom message.
  • External Integration Point: This is the most crucial architectural point. All Python packages, including requests, numpy, pandas, and such specialized connectors as pyodbc or boto3, are available here. The developer is free to use the entire Python ecosystem to interact with any external system.
  • Returning Status: The operation successfully executes its task (mocked as logging) and returns a StatusResponse to the calling Process. Since the Service called the Process synchronously, this final status can be traced all the way back to the Service's on_process_input method, confirming the end-to-end completion.

✅ Using Structured Logging Across the Pipeline

The IoP framework includes its own logging system, and the Python API provides a way to leverage Python’s logging capabilities while fully integrating with IRIS logs.

Every IoP component inherits the logging functionality from its base class. You can access it directly via the logger property or use the built-in convenience methods, such as log_info(), log_warn(), and log_error(), to record messages at the appropriate level.

def on_init(self):
    # Using convenience methods
    self.log_info("Component initialized")
    self.log_error("An error occurred")
    self.log_warning("Warning message")
    self.log_alert("Critical alert")
    self.trace("Debug trace message")

    # Using logger property
    self.logger.info("Info via logger")
    self.logger.error("Error via logger")

✅ Migrating IOP Classes into IRIS Usingsettings.py

This is the “glue” that links your Python classes to production items in IRIS. The IoP framework utilizes the settings.py file to define and apply configuration details, which are then reflected directly in the InterSystems Management Portal.

from services.service_reddit import RedditService
from processes.process_reddit import RedditProcess
from operations.operation_store import RedditStoreOperation

CLASSES = {
    "Reddit.Ingestion.Service": RedditService,
    "Reddit.Ingestion.Process": RedditProcess,
    "Reddit.Ingestion.Store": RedditStoreOperation
}

PRODUCTIONS = [
    {
        "Reddit.Ingestion.Production": {
            "@TestingEnabled": "false",
            "Item": [
                {
                    "@Name": "RedditService",
                    "@ClassName": "Reddit.Ingestion.Service",
                    "@Enabled": "true",
                       "Setting": [
                    {
                        "@Target": "Host",
                        "@Name": "subreddit",
                        "#text": "technology"
                    },
                      {
                        "@Target": "Host",
                        "@Name": "poll_interval",
                        "#text": "15"
                    },
                ]                
                },
                {
                    "@Name": "RedditProcess",
                    "@ClassName": "Reddit.Ingestion.Process",
                    "@Enabled": "true",
                         "Setting": [
                    {
                        "@Target": "Host",
                        "@Name": "MIN_SCORE",
                        "#text": "200"
                    }
                ]          
                },
                {
                    "@Name": "RedditStoreOperation",
                    "@ClassName": "Reddit.Ingestion.Store",
                    "@Enabled": "true"
                }
            ]
        }
    }
]
  • Dynamic Setting Injection: The Setting array within the PRODUCTIONS definition is the mechanism for externalizing configuration. When the production is loaded, IRIS reads these values and makes them available to the Python components via the self.get_setting() method or the self.Settings property.
  • Live Configuration Change: A significant advantage of using this framework is that administrators can modify the SUBREDDIT, POLL_INTERVAL, or MIN_SCORE directly in the InterSystems IRIS Management Portal without needing to restart the production. The on_init method will be triggered to re-read these settings, enabling dynamic operational control.
  • Clear Structure: The CLASSES dictionary acts as a mapping layer, simplifying the connection between the ObjectScript-facing Production XML (@ClassName) and the underlying Python implementation. This abstraction is vital for large, multi-language projects.

Using the iop command, we can migrate our Python components directly into IRIS, making them available as production items in the InterSystems environment.

iop --migrate /path/to/reddit_iop/settings.py

✅ Overview of the IOP Director Utility Class

The IOP Director class provides utility methods to manage productions and components in IRIS directly from Python.

Production Management:

  • start_production(production_name=None) – Starts a production 
  • stop_production() – Stops the current production
  • restart_production() – Restarts the current production
  • shutdown_production() – Gracefully shuts down the production
  • status_production() – Gets current production status (returns a dictionary)

Business Service Management:

  • create_business_service(target) – Creates a new service instance
  • get_business_service(target) – Retrieves an existing service instance
  • test_component(target, message=None, classname=None, body=None) – Tests any production component

Production Logging:

  • log_production() – Monitors logs in real time
  • log_production_top(top) – Displays the last N log entries

Production Configuration:

  • set_default_production(production_name) – Sets the default production
  • get_default_production() – Gets the current default production name

The Director class makes it easy to control, monitor, and test your IoP productions without leaving the Python environment.

To initiate a production, you can use the start_production() method of the Director class.




Production Overview

The following production has been created with the help of the iop --migrate command:

Below you can see the business service details (%classname refers to the class name in our service_reddit.py file, %module with the help of the Python file name, and %classpaths contains the path to the Python file)


To view the messages, click on the Business Service, then navigate to the Message tab.


Click on a message to view its visual trace.

Messages are received by RedditService, forwarded to RedditProcess, and then, based on the process logic, sent to RedditStoreOperation.

Conclusion

With Interoperability on Python, you can now do the following:

  • Build complete production pipelines entirely in Python
  • Leverage such modern tooling as dataclasses, type hints, and IDE support.
  • Integrate with virtually any API (Reddit, Twitter, FHIR, and more)
  • Deploy Python components alongside ObjectScript components

It provides a solid foundation for creating real-time data pipelines in such domains as healthcare, finance, IoT, and social media (all powered by Python within InterSystems IRIS).

The Reddit Post Analyzer Production serves as a blueprint for advanced IoP development. By utilizing custom dataclass messages, implementing robust polling services, applying conditional logic and enrichment within the Business Process, and externalizing configuration through settings.py, we have revealed how Python can evolve from a utility language into a core pillar of a high-performance enterprise integration platform.

Thanks!

Discussion (2)1
Log in or sign up to continue