Article
· Jan 8 15m de lecture

Introduction à l'interopérabilité sur Python (IoP) - Partie 2

Salut la Communauté,

Dans la première partie de cette série, on a vu les bases de l'interopérabilité sur Python Interoperability on Python (IoP), et surtout comment ça nous permet de construire des éléments d'interopérabilité comme des services métier, des processus et des opérations uniquement à l'aide de Python.

Maintenant, on est prêts à aller plus loin. Les scénarios d'intégration dans le monde réel vont au-delà du simple transfert de messages. Ils impliquent des interrogations programmées, des structures de messages personnalisées, une logique de décision, un filtrage et une gestion de la configuration. Dans cet article, on va se pencher sur ces fonctionnalités IoP plus avancées et montrer comment créer et exécuter un flux d'interopérabilité plus complexe uniquement à l'aide de Python.

Pour que ce soit plus concret, on va construire un exemple complet: La Reddit Post Analyzer Production (production d'analyseur de posts Reddit). Le concept est simple : récupérer en continu les dernières publications d'un subreddit choisi, les filtrer en fonction de leur popularité, leur ajouter des balises supplémentaires et les envoyer pour stockage ou analyse plus approfondie.

L'objectif final est ici de disposer d'un pipeline d'ingestion de données fiable et autonome. Tous les éléments principaux (service métier, processus métier et opération métier) sont implémentés en Python, ce qui montre comment utiliser l'IoP à l'aide de la méthodologie d'intégration axée sur Python.

Dans cet article, nous aborderons les sujets suivants:

✅ Définition des modèles de message à l'aide de @dataclass
✅ Extraction de données en temps réel depuis Reddit (service métier)
✅ Filtrage et enrichissement des publications (processus métier)
✅ Gestion de la livraison finale (opération métier)
✅ Utilisation de la journalisation structurée dans l'ensemble du pipeline
✅ Migration des classes IOP vers IRIS à l'aide de settings.py
✅ Présentation de la classe utilitaire IOP Director


Commençons par la structure du dossier de l'application:

reddit_iop/
 ├─ messages.py
 ├─ services/
    └─ service_reddit.py
 ├─ processes/
    └─ process_reddit.py
 ├─ operations/
    └─ operation_store.py
 ├─ settings.py    
    

✅ Définition des modèles de message à l'aide de @dataclass (messages.py)

Le concept central de tout cadre d'intégration est le Message.Dans InterSystems IRIS, les messages sont des objets de premier ordre (ils peuvent être tracés, inspectés et conservés tout au long de leur parcours dans la production). L'un des points forts de l'IoP est qu'il permet de définir ces messages comme des classes Python typisées à l'aide de @dataclass. Cela signifie que nous pouvons éviter de créer des classes de messages ObjectScript et bénéficier à la place de modèles Python propres et compatibles avec les IDE.

Dans IoP, Message est la classe de base pour tout ce qui est soumis à la transmission entre les composants. Nous nous appuierons sur celle-ci pour créer nos propres objets de message fortement typés au moyen des classes de données Python. Ces modèles de données circuleront à travers le service métier, le processus métier et l'opération métier.

from iop import Message
from dataclasses import dataclass

@dataclass
class RedditPostMessage(Message):
    Title: str = ""
    URL: str = ""
    Author: str = ""
    Score: int = 0
    Tag: str = ""
    Status: str = ""

À l'aide du décorateur @dataclass sur une classe qui hérite de iop.Message, nous obtenons plusieurs avantages avancés au moyen du code minimal:

  • Propriétés automatiques: dataclasses génére automatiquement les méthodes __init__, __repr__, les méthodes de comparaison en fonction des champs de type hinté (Title: str, Score: int, etc.).
  • Typage fort: Les annotations de type garantissent que tous les composants sont conscients du type de données attendu, ce qui améliore la qualité du code et prévient les erreurs d'exécution.
  • Intégration IoP: L'héritage de iop.Message garantit que la classe Python est compilée en une classe persistante compatible ObjectScript au sein d'InterSystems IRIS. Cela signifie que chaque message envoyé est automatiquement sauvegardé dans la base de données pour l'audit et le traçage visuel (une fonctionnalité clé de la plateforme IRIS).

✅ Extraction de données en temps réel depuis Reddit (service métier) (service_reddit.py)

Dans une production interopérable, le service métier agit comme une passerelle qui achemine les données vers le système. Pour notre démonstration, le service interrogera en continu le point de terminaison de Reddit /new.json et alimentera le pipeline de traitement avec les nouvelles soumissions.

Ce composant utilise un adaptateur entrant pour planifier et exécuter des appels API périodiques. Chaque fois que l'adaptateur s'exécute, il demande les derniers messages du subreddit spécifié, encapsule les champs pertinents dans notre classe de données RedditPostMessage et les transmet à l'étape suivante du flux.

Les principales responsabilités de ce service sont les suivantes:

  • Initialisation du flux de données à intervalles définis
  • Connexion à Reddit et récupération des dernières soumissions
  • Conversion des données brutes de réponse API en un RedditPostMessage fortement typé
  • Enregistrement des erreurs de manière claire sans interrompre la production
  • Transfert de messages bien structurés vers la couche processus métier

Cette configuration reflète un modèle d'intégration réel dans lequel une source de données externe alimente en continu le moteur d'intégration. En combinant l'adaptateur entrant IoP au moyen d'un modèle de message basé sur Python, nous obtenons une couche d'ingestion fiable et traçable, indépendante d'ObjectScript.

from iop import BusinessService
from messages import RedditPostMessage
import requests, time

class RedditService(BusinessService):
    #Nécessaire pour planifier le service
    def get_adapter_type():
        # Ceci est obligatoire pour planifier le service
        # Par défaut, le service sera exécuté toutes les 5 secondes
        return "Ens.InboundAdapter"
    #Initialisation des paramètres de boucle
    def on_init(self):
        self.subreddit =  "technology"
        self.poll_interval = 10
        self.base_url = f"https://www.reddit.com/r/{self.subreddit}/new.json?limit=5"
        self.headers = {"User-Agent": "IRIS-IoP-Reddit-Agent"}
    
    #Boucle de sondage infinie pour récupérer les événements
    def on_process_input(self, _):
        while True:
            try:
                response = requests.get(self.base_url, headers=self.headers)
                posts = response.json()["data"]["children"]
                
                for post in posts:
                    data = post["data"]

                    msg = RedditPostMessage(
                        Title=data["title"],
                        URL="https://reddit.com" + data["permalink"],
                        Author=data["author"],
                        Score=data["score"]
                    )
                    #Envoie du message au composant de Processus
                    self.send_request_sync("RedditProcess", msg)

                self.log_info(f"[RedditService] Pulled {len(posts)} posts")

            except Exception as e:
                self.log_error(f"[RedditService ERROR] {e}")

            time.sleep(self.poll_interval)

✅ Filtrage et enrichissement des publications (processus métier) (process_reddit.py)

Le processus métier agit comme le système nerveux central de la production. C'est là que les soumissions Reddit brutes sont converties en information significative et que les règles métier clés telles que le filtrage, la prise de décision et le routage sont exécutées.

Une fois que le service métier publie un RedditPostMessage,  le processus évalue son contenu et détermine la marche à suivre.

Dans cet exemple, le processus vérifie si la soumission répond à des critères spécifiques (par exemple, un score minimum ou des mots-clés spécifiques). Les publications qui réussissent au filtre sont enrichies et transmises à notre service opérationnel, tandis que celles qui échouent sont enregistrées et supprimées afin de garantir la propreté et l'efficacité du flux de travail.

from iop import BusinessProcess
from messages import RedditPostMessage

class RedditProcess(BusinessProcess):
    def on_init(self):
        self.log_info("Hello World init")
    #Point de saisie pour les messages entrants.
    def on_request(self, request: RedditPostMessage) -> RedditPostMessage:
        title = request.Title
        score = request.Score

        self.log_info(f"[Process] Received: {title} | Score: {score}")

        # Logique de filtrage : traitement uniquement des publications tendance
        min_score = 5
        if score < min_score:
            self.log_info(f"[Process] Skipped low score ({score}) post")
            response = RedditPostMessage(Status="FilteredLowScore")
            return response

        # Enrichissement
        request.Tag = self._tag_topic(title)

        self.log_info(f"[Process] Tagged topic: {request.Tag}")

        # Passage à l'opération
        return self.send_request_sync("RedditStoreOperation", request)
    
    #Détection du sujet à partir de mots-clés
    def _tag_topic(self, title: str) -> str:
        keywords = {
            "AI": "Artificial Intelligence",
            "health": "Healthcare",
            "python": "Programming",
            "data": "Data Engineering",
        }
        for key, tag in keywords.items():
            if key.lower() in title.lower():
                return tag
        return "General"
  • Filtrage et sortie anticipée: Le bloc if score < min_score: illustre le traitement conditionnel. Si le message ne répond pas aux exigences (score faible), le processus enregistre le saut et renvoie une simple StatusResponse, mettant fin prématurément au parcours de ce message sans l'envoyer en aval.
  • Enrichissement des données: La ligne request.Tag = self._tag_topic(title) montre comment modifier  l'objet message (qui est un objet Python en mémoire). La fonction _tag_topic exécute une Logique métier simple (catégorisation) et ajoute le résultat au message, rendant les données plus utiles pour le composant de stockage.
  • Méthodes internes: Python permet une conception orientée objet propre, comme le montre _tag_topic. Cette fonction, encapsulée dans la classe, permet de garder la méthode principale on_request propre et axée sur l'orchestration.
  • Poursuite du pipeline: si la transmission du message passe le filtre, le message enrichi est transmis à l'opération à l'aide de self.send_request_sync(), ce qui garantit que le flux reste synchrone pour une traçabilité complète dans le traceur de messages visuel.

✅ Gestion de la livraison finale (operation_store.py)

Les opération métier constituent le dernier élément du pipeline de production qui interagit au moyen des systèmes externes. Il peut s'agir d'une base de données, d'un système de fichiers, d'une API distante, d'une file d'attente de messages ou de toute autre destination pour les données traitées.

Une fois qu'un message atteint cette couche, il est considéré comme entièrement traité et prêt à être persistant, stocké ou consommé ultérieurement. Dans notre démonstration, l'opération enregistre les détails de la publication et simule leur enregistrement. Cependant, dans un scénario réel, c'est à ce stade que vous exécuteriez des insertions SQL, des appels REST ou enverriez des messages à d'autres systèmes.

from iop import BusinessOperation
from dataclasses import dataclass
from messages import RedditPostMessage


class RedditStoreOperation(BusinessOperation):
    def on_init(self):
        self.log_info("Operation init")
        
    #saisie d'entrée standard pour les opérations.    
    def on_message(self, request: RedditPostMessage) -> RedditPostMessage:
        self.log_info(
            f"[Store] Title: {request.Title} | Score: {request.Score} | Tag: {request.Tag}"
        )
        # Base de données fictive ou écriture de fichier ici
        # Système réel: SQL insert / Kafka / FHIR server POST
        # Simule l'enregistrement dans une base de données, un fichier ou un système externe.
        response = RedditPostMessage(Status="Saved")
        #renvoie le statut pour fermer la boucle
        return response
  • Traitement des entrées: La signature de la méthode on_message(self, request: RedditPostMessage) définit clairement le type d'entrée attendu, renforçant ainsi le contrat établi par le message personnalisé.
  • Point d'intégration externe: il s'agit du point architectural le plus crucial. Tous les paquets Python, y compris requests, numpy, pandas et des connecteurs spécialisés tels que pyodbc ou boto3, sont disponibles ici. Le développeur est libre d'utiliser l'ensemble de l'écosystème Python à l'aide de n'importe quel système externe.
  • État de retour: l'opération exécute sa tâche avec succès (simulée comme une journalisation) et renvoie une StatusResponse au processus appelant. Étant donné que le service a appelé le processus de manière synchrone, cet état final peut être retracé jusqu'à la méthode on_process_input du service, confirmant ainsi l'achèvement de bout en bout.

✅ Utilisation de la journalisation structurée dans l'ensemble du pipeline

Le framework IoP comprend son propre système de journalisation, et l'API Python fournit une manière d'exploiter les capacités de journalisation de Python tout en s'intégrant pleinement aux journaux IRIS.

Chaque composant IoP hérite de la fonctionnalité de journalisation de sa classe de base. Vous pouvez y accéder directement via la propriété logger ou à l'aide des méthodes pratiques intégrées, telles que log_info(), log_warn(), et log_error(), pour enregistrer les messages au niveau approprié.

def on_init(self):
    # Recours à des méthodes pratiques
    self.log_info("Component initialized")
    self.log_error("An error occurred")
    self.log_warning("Warning message")
    self.log_alert("Critical alert")
    self.trace("Debug trace message")

    # Recours à la propriété logger
    self.logger.info("Info via logger")
    self.logger.error("Error via logger")

✅ Migration des classes IOP vers IRIS à l'aide desettings.py

Il s'agit du "lien" qui relie vos classes Python aux éléments de production dans IRIS. Le framework IoP utilise le fichier settings.py pour définir et appliquer les détails de configuration, qui sont ensuite directement reflétés dans le portail de gestion InterSystems.

from services.service_reddit import RedditService
from processes.process_reddit import RedditProcess
from operations.operation_store import RedditStoreOperation

CLASSES = {
    "Reddit.Ingestion.Service": RedditService,
    "Reddit.Ingestion.Process": RedditProcess,
    "Reddit.Ingestion.Store": RedditStoreOperation
}

PRODUCTIONS = [
    {
        "Reddit.Ingestion.Production": {
            "@TestingEnabled": "false",
            "Item": [
                {
                    "@Name": "RedditService",
                    "@ClassName": "Reddit.Ingestion.Service",
                    "@Enabled": "true",
                       "Setting": [
                    {
                        "@Target": "Host",
                        "@Name": "subreddit",
                        "#text": "technology"
                    },
                      {
                        "@Target": "Host",
                        "@Name": "poll_interval",
                        "#text": "15"
                    },
                ]                
                },
                {
                    "@Name": "RedditProcess",
                    "@ClassName": "Reddit.Ingestion.Process",
                    "@Enabled": "true",
                         "Setting": [
                    {
                        "@Target": "Host",
                        "@Name": "MIN_SCORE",
                        "#text": "200"
                    }
                ]          
                },
                {
                    "@Name": "RedditStoreOperation",
                    "@ClassName": "Reddit.Ingestion.Store",
                    "@Enabled": "true"
                }
            ]
        }
    }
]
  • Injection dynamique des paramètres: La matrice Setting dans la définition PRODUCTIONS est le mécanisme permettant d'externaliser la configuration. Lorsque la production est chargée, IRIS lit ces valeurs et les met à la disposition des composants Python via la méthode self.get_setting() ou la propriété self.Settings.
  • Modification de la configuration en direct: L'un des avantages significatifs de l'utilisation de ce framework est que les administrateurs peuvent modifier les paramètres SUBREDDIT, POLL_INTERVAL, ou MIN_SCORE directement dans le portail de gestion InterSystems IRIS sans avoir à redémarrer la production. La méthode on_init sera déclenchée pour relire ces paramètres, permettant ainsi  un contrôle opérationnel dynamique.
  • Structure claire: le dictionnaire CLASSES agit comme une couche de mappage, simplifiant la connexion entre le XML de production orienté ObjectScript (@ClassName) et l'implémentation Python sous-jacente. Cette abstraction est essentielle pour les projets multilingues de grande envergure.

En utilisant la commande iop, nous pouvons migrer nos composants Python directement vers IRIS, les rendant ainsi disponibles en tant qu'éléments de production dans l'environnement InterSystems.

iop --migrate /path/to/reddit_iop/settings.py

✅ Présentation de la classe utilitaire IOP Director

La classe IOP Director fournit des méthodes utilitaires permettant de gérer les productions et les composants dans IRIS directement depuis Python.

Gestion de la production:

  • start_production(production_name=None) – Démarrage d'une production 
  • stop_production() – Arrêt de la production en cours
  • restart_production() – Redémarrage de la production en cours
  • shutdown_production() – Arrêt progressif de la production
  • status_production() – Obtention de l'état actuel de la production (renvoie un dictionnaire)

Gestion des services métier:

  • create_business_service(target) – Création d'une nouvelle instance de service
  • get_business_service(target) – Récupération d'une instance de service existante
  • test_component(target, message=None, classname=None, body=None) – Test de n'importe quel composant de production

Journalisation de la production:

  • log_production() – Surveillance des journaux en temps réel
  • log_production_top(top) – Affichage des N dernières saisies du journal

Configuration de la production:

  • set_default_production(production_name) – Définition de la production par défaut
  • get_default_production() – Obtention du nom de la production par défaut actuelle

La classe Director facilite le contrôle, la surveillance et le test de vos productions IoP sans quitter l'environnement Python.

Pour lancer une production, vous pouvez utiliser la méthode start_production() de la classe Director.




Aperçu de la production

La production suivante a été créée au moyen de la commande iop --migrate:

Vous trouverez ci-dessous les détails du service métier (%classname fait référence au nom de la classe dans notre fichier service_reddit.py file, %module u moyen du nom du fichier  Python et %classpaths contient le chemin d'accès au fichier Python)


Pour afficher les messages, cliquez sur les acctivités métier Business Service, puis accédez à l'onglet Message.


Cliquez sur un message pour afficher sa trace visuelle.

Les messages sont reçus par RedditService, transférés à RedditProcess, et ensuite, en fonction de la logique du processus, envoyés à RedditStoreOperation.

Conclusion

Au moyen de l'interopérabilité sur Python, vous pouvez désormais effectuer les opérations suivantes:

  • Créer des pipelines de production complets entièrement en Python
  • Tirer parti d'outils modernes tels que les classes de données, les indications de type et la prise en charge IDEt.
  • Intégrer pratiquement n'importe quelle API (Reddit, Twitter, FHIR, etc.)
  • Déployer des composants Python parallèlement à des composants ObjectScripts

Cette solution fournit une base solide pour la création de pipelines de données en temps réel dans des domaines tels que la santé, la finance, l'IoT et les réseaux sociaux (tous alimentés par Python dans InterSystems IRIS).

La production Reddit Post Analyzer Production sert de modèle pour le développement avancé de l'IoP. En utilisant des messages de classe de données personnalisés, en mettant en œuvre des services de sondage robustes, en appliquant une logique conditionnelle et un enrichissement dans le processus métier, et en externalisant la configuration via settings.py, nous avons montré comment Python peut évoluer d'un langage utilitaire à un pilier central d'une plateforme d'intégration d'entreprise haute performance.

Merci!

Discussion (0)0
Connectez-vous ou inscrivez-vous pour continuer