User bio
404 bio not found
Member since Nov 9, 2015
Pinned posts:
Replies:

Neat use of Dynamic Dispatch! I was thinking something more like (note - this is very quick and dirty/WIP):

/// Generate a set of ObjectScript classes corresponding to Pydantic models defined in a given Python module.
/// 
/// Args:
///     sourceModule: Path to the Python module containing Pydantic models.
///     targetPackage: Target package for generated ObjectScript classes.
///     baseClass: Base class for generated ObjectScript classes.
///
/// Significant contributions by Windsurf / Claude 3.7 Sonnet (Thinking)
/// That is to say, if it doesn't work, it's the AI's fault. (Plus mine for being bad at Python.)
ClassMethod Generate(sourceModule = "mcp", targetPackage = "pkg.isc.mcp.types.test", baseClass = "pkg.isc.mcp.types.BaseModel") [ Language = python ]
{
    import importlib
    import inspect
    import traceback
    import sys
    from pydantic import BaseModel
    import iris
    import datetime
    from typing import Union, Literal
    from types import NoneType, UnionType
    from logging import getLogger

    # Map complex type expressions to ObjectScript types
    complex_type_map = {
        'dict[str, typing.Any]': '%DynamicObject',
        'list[typing.Any]': '%DynamicArray'
    }

    # Other complex expressions that should be flagged as required properties
    complex_required_type_map = {
    }
    
    # Map Pydantic field types to ObjectScript types
    type_map = {
        'str': '%String',
        'int': '%Integer',
        'float': '%Float',
        'bool': '%Boolean',
        'datetime.datetime': '%TimeStamp',
        'datetime.date': '%Date',
        'dict': '%DynamicObject',
        'list': '%DynamicArray'
    }

    def get_all_models(module_name):
        models = []
        processed_models = set()  # Keep track of models we've seen to avoid duplicates
        
        def find_models(module_name):
            module = importlib.import_module(module_name)
            discovered = []
            
            # Find all top-level models in this module
            for name, obj in inspect.getmembers(module):
                if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel:
                    if obj.__name__ not in processed_models:
                        discovered.append(obj)
                        processed_models.add(obj.__name__)
            
            return discovered
        
        # First find all top-level models in the specified module
        module = importlib.import_module(module_name)
        top_models = []
        for name, obj in inspect.getmembers(module):
            if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel:
                top_models.append(obj)
                processed_models.add(obj.__name__)
        
        models.extend(top_models)
        
        # Now recursively find all referenced models
        i = 0
        while i < len(models):
            current_model = models[i]
            i += 1
            
            # Check each field for model references
            for field_name, field_info in current_model.__fields__.items():
                annotation = field_info.annotation
                referenced_models = find_referenced_models(annotation)
                
                for model in referenced_models:
                    if model.__name__ not in processed_models:
                        models.append(model)
                        processed_models.add(model.__name__)
                        print(f"Added referenced model: {model.__name__}")
        
        return models

    def process_model(targetPackage, model):
        # Format class name with package prefix
        class_name = f"{targetPackage}.{model.__name__}"
        
        # Check if class already exists
        cls_def = iris.cls('%Dictionary.ClassDefinition')._OpenId(class_name)
        if cls_def != "":
            print(f"Updating existing class: {class_name}")
        else:
            # Create new class definition
            cls_def = iris.cls('%Dictionary.ClassDefinition')._New()
            cls_def.Name = class_name
            print(f"Creating new class: {class_name}")
        
        cls_def.Super = baseClass
        cls_def.ProcedureBlock = 1
        
        # Add parameter to indicate this is an auto-generated class
        cls_def.Parameters.Clear()
        auto_gen_param = iris.cls('%Dictionary.ParameterDefinition')._New()
        auto_gen_param.Name = "AUTOGENERATED"
        auto_gen_param.Default = "1"
        auto_gen_param.parent = cls_def

        # Clear existing properties - always start from a clean slate
        cls_def.Properties.Clear()
        
        # Process model fields to create properties
        for field_name, field_info in model.__fields__.items():
            # Skip fields that start with underscore
            if field_name.startswith('_'):
                continue
            
            # Simplify property checking - create it fresh
            # The _Save() call will handle merging if it's already defined
            prop = iris.cls('%Dictionary.PropertyDefinition')._New()
            prop.Name = field_name
            prop.parent = cls_def

            print(f"Processing field: {field_name}: {field_info.annotation}")
            
            annotation = field_info.annotation
            (os_type, collection_type, required) = process_annotation(annotation)
            print(f"\tType: {os_type}, Collection type: {collection_type}")
            prop.Type = os_type
            prop.Collection = collection_type
            prop.Required = 1 if required else 0
        
        # Save the class definition
        sc = cls_def._Save()
        if not iris.cls('%SYSTEM.Status').IsOK(sc):
            print(f"Error saving class {class_name}: {iris.cls('%SYSTEM.Status').GetErrorText(sc)}")

    def process_annotation(annotation, topLevel = True):
        # Set up logger once
        logger = getLogger("Generator")
        logger.setLevel("DEBUG")
        
        os_type = ''
        collection_type = ''
        required = True
        
        logger.debug(f"Processing annotation: {annotation}")

        if complex_type_map.__contains__(str(annotation)):
            os_type = complex_type_map[str(annotation)]
            return (os_type, collection_type, False)

        if complex_required_type_map.__contains__(str(annotation)):
            os_type = complex_required_type_map[str(annotation)]
            return (os_type, collection_type, True)
        
        # Check if it's a Union type (Python 3.10+ pipe syntax)
        if isinstance(annotation, UnionType):
            union_types = annotation.__args__
            logger.debug(f"Native union type with args: {union_types}")
            
            # Check if it's an Optional (Union with NoneType)
            if (type(None) in union_types) or (NoneType in union_types):
                # Get the actual type (filter out None)
                actual_type = next(arg for arg in union_types if arg is not type(None) and arg is not NoneType)
                logger.debug(f"Optional type detected: {actual_type}")
                (os_type, collection_type, required) = process_annotation(actual_type)
                required = False
            else:
                # For regular union types, use a strategy that picks the most flexible type
                logger.debug(f"Processing union with multiple types")
                # Default to using the last type in the union
                for type_arg in union_types:
                    (os_type, collection_type, required) = process_annotation(type_arg, False)
        
        # Handle typing.Union
        elif hasattr(annotation, "__origin__") and annotation.__origin__ is Union:
            union_types = annotation.__args__
            logger.debug(f"typing.Union with args: {union_types}")
            
            # Check if it's an Optional (Union with NoneType)
            if (type(None) in union_types) or (NoneType in union_types):
                # Get the actual type (filter out None)
                actual_type = next(arg for arg in union_types if arg is not type(None) and arg is not NoneType)
                logger.debug(f"Optional type detected: {actual_type}")
                (os_type, collection_type, required) = process_annotation(actual_type)
                required = False
            else:
                # For regular union types, use the same strategy as above
                logger.debug(f"Processing union with multiple types")
                for type_arg in union_types:
                    (os_type, collection_type, required) = process_annotation(type_arg, False)
        
        # Handle container types (List, Dict, etc.)
        elif hasattr(annotation, "__origin__"):
            container_type = annotation.__origin__
            
            # Handle Literal separately
            if container_type is Literal:
                logger.debug(f"Literal type: {annotation}")
                os_type = '%String'
            elif topLevel == False:
                # For nested complex types, just fall back to %DynamicArray/%DynamicObject
                os_type = type_map.get(annotation.__name__, '%DynamicObject')
            else:
                type_args = annotation.__args__
                logger.debug(f"Container type: {container_type} with args: {type_args}")
                
                # For List[str], type_args would be (str,)
                # For Dict[str, int], type_args would be (str, int)
                if len(type_args) == 1:
                    # For a single type, it's a collection
                    (os_type, collection_type, required) = process_annotation(type_args[0], False)
                    collection_type = "list"
                    logger.debug(f"List type with element type: {os_type}")
                elif len(type_args) == 2:
                    # For a key-value pair, it's a dictionary
                    (os_type, collection_type, required) = process_annotation(type_args[1], False)
                    collection_type = "array"
                    logger.debug(f"Dictionary type with value type: {os_type}")
        
        # Handle types with a __name__ attribute (basic types)
        elif hasattr(annotation, "__name__"):
            type_name = annotation.__name__
            os_type = type_map.get(type_name, '%String')
            logger.debug(f"Named type: {type_name} -> {os_type}")
        
        # Handle any other types
        else:
            os_type = type_map.get(str(annotation), '%String')
            logger.debug(f"Other type: {annotation} -> {os_type}")
        
        logger.debug(f"Final mapping: {os_type}, collection: {collection_type}, required: {required}")
        return (os_type, collection_type, required)

    def find_referenced_models(annotation):
        """Find all Pydantic models referenced in this type annotation."""
        result = []
        
        # Direct model reference
        if inspect.isclass(annotation) and issubclass(annotation, BaseModel) and annotation != BaseModel:
            result.append(annotation)
        
        # Check for container types (Union, List, etc.)
        elif hasattr(annotation, "__origin__"):
            # For Union types, check each argument
            if annotation.__origin__ is Union:
                for arg in annotation.__args__:
                    result.extend(find_referenced_models(arg))
            
            # For container types like List, Dict
            elif hasattr(annotation, "__args__"):
                for arg in annotation.__args__:
                    result.extend(find_referenced_models(arg))
        
        return result

    try:
        # Find all Pydantic models in the module
        models = get_all_models(sourceModule)
        
        # Add referenced classes to type_map
        for model in models:
            # Format class name with package prefix
            class_name = f"{targetPackage}.{model.__name__}"
            type_map[model.__name__] = class_name

        print(models);

        # Process each model
        for model in models:
            print(f"\r\n")
            process_model(targetPackage, model)
        
        # Compile the whole package
        status = iris.cls('%SYSTEM.OBJ').CompilePackage(targetPackage, 'ck')
        if not iris.cls('%SYSTEM.Status').IsOK(status):
            print(f"Error compiling package {targetPackage}: {iris.cls('%SYSTEM.Status').GetErrorText(status)}")
        
        # Return success
        return 1
        
    except Exception as e:
        exc_type, exc_value, exc_traceback = sys.exc_info()
        lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
        print("Exception caught in Generator.Generate:")
        print(''.join(lines))
        print(f"Error details: {str(e)}")
        return 0
}

There's still a TON of nuances to deal with here, but it's a start at least...

Short version: MCP provides a standard means to integrate GenAI with the data it needs to be helpful in the user's context (resources) and the things it can do (tools). Plus a few other things.

Especially since OpenAI has joined the bandwagon with MCP, it seems like it's here to stay.

Bottom line, I think there's a fantastic role for IRIS Interoperability to play with MCP. I'm hacking around with it a little bit personally - though as an InterSystems employee I won't be submitting my hacks to the contest, of course.

There are three major use cases I see:

IRIS as an MCP client - orchestrating activities across MCP servers with traceability and in a centralized way (which could be of value in an enterprise setting - one chatbot backed by IRIS interop that has access to all the right resources with proper access controls). This is where I've been playing around. In case anyone else is too: I've had more luck with SSE than stdio due to some Embedded Python oddities I haven't had time to fully explore, but this is probably better architecturally anyway: put the MCP server in its own container that you connect to rather than worrying about having IRIS call out to Python.

IRIS as an MCP server - do things in IRIS and get access to your data in IRIS with MCP (I've caught wind of one awesome project along these lines already...)

IRIS as an MCP proxy - why not both? To support use cases like Claude Desktop where you want to work against local files and such but also don't want each person in the company setting up and updating their own MCP servers, re-expose all of the appropriate tools/resources/etc. (with proper access controls) as a single MCP server everyone can connect to.

Open Exchange applications:
Certifications & Credly badges:
Global Masters badges:
Followers:
Following: