Question
· Mar 31

Object mapping between Pydantic and %RegisteredObject

I'm exploring this right now: given a bunch of types defined as Pydantic models, how can I come up with an equivalent %RegisteredObject/%SerialObject and convert to/from (e.g., to support persistence and match validation as much as possible)?

People who know Python better than I do (e.g., your average undergraduate from this decade): is this a stupid idea or a cool idea? Has anyone else done this before?

Discussion (5)2
Log in or sign up to continue

Is this the kind of behavior you want?

Set pyType = ##class(%IPM.Utils.PydanticModelAdaptor).ExamplePydanticModel() // a convenience method to return a pydantic type object, with 3 fields defined: id, name, and email
Set object = ##class(%IPM.Utils.PydanticModelAdaptor).%New(pyType)
Set object.id = "1234", object.name = "John Doe", object.email = "john.doe@intersystems.com"
Zwrite object.id, object.name, object.email
// output
// 1234
// "John Doe"
// "john.doe@intersystems.com"

The above is achievable using the following. You can also implement %JSONExport, %Validate, etc.

Class %IPM.Utils.PydanticModelAdaptor Extends %RegisteredObject
{

/// Probably worthwhile to change the `PyModel` to something less likely to conflict with Pydantic field names
/// E.g. PyModel can be the type (aka class) object defined as 
/// class UserModel(BaseModel):
///     id: int
///     name: str = Field(..., min_length=3)
///     email: Optional[str] = None
Property PyModel As %SYS.Python;

/// E.g. 
/// PyFieldArray("id") = 1234
/// PyFieldArray("name") = "John Doe"
/// PyFieldEmail("email") = "john.doe@example.com" or unset
Property PyFieldArray As %String [ MultiDimensional ];

Method %OnNew(pyType As %SYS.Python) As %Status [ Private, ServerOnly = 1 ]
{
    Set ..PyModel = pyType
    Quit $$$OK
}

Method %DispatchGetProperty(Property As %String) [ ServerOnly = 1 ]
{
    Set userFields = ..PyModel."model_fields"
    Set fieldInfo = userFields.get(Property)
    If '$IsObject(fieldInfo) {
        $$$ThrowStatus($$$ERROR($$$GeneralError, "Unknown field name: "_Property))
    }
    Set fieldType = fieldInfo.annotation
    If $Data(..PyFieldArray(Property), output) # 2 {
        // TODO check the `fieldType`, raise error if not matching or violates min_length, max_length, etc.
        Return output
    }

    $$$ThrowStatus($$$ERROR($$$GeneralError, "Field not set: "_Property))
}

Method %DispatchSetProperty(Property As %String, Val) [ ServerOnly = 1 ]
{
    Set userFields = ..PyModel."model_fields"
    Set fieldInfo = userFields.get(Property)
    If '$IsObject(fieldInfo) {
        $$$ThrowStatus($$$ERROR($$$GeneralError, "Unknown field name: "_Property))
    }
    Set fieldType = fieldInfo.annotation
    // TODO check the `fieldType`, raise error if not matching or violates min_length, max_length, etc.

    Set ..PyFieldArray(Property) = Val
}

ClassMethod ExamplePydanticModel() [ Language = python ]
{
    from pydantic import BaseModel, Field
    from typing import Optional

    class UserModel(BaseModel):
        id: int
        name: str = Field(..., min_length=3)
        email: Optional[str] = None

    return UserModel
}

}

Neat use of Dynamic Dispatch! I was thinking something more like (note - this is very quick and dirty/WIP):

/// Generate a set of ObjectScript classes corresponding to Pydantic models defined in a given Python module.
/// 
/// Args:
///     sourceModule: Path to the Python module containing Pydantic models.
///     targetPackage: Target package for generated ObjectScript classes.
///     baseClass: Base class for generated ObjectScript classes.
///
/// Significant contributions by Windsurf / Claude 3.7 Sonnet (Thinking)
/// That is to say, if it doesn't work, it's the AI's fault. (Plus mine for being bad at Python.)
ClassMethod Generate(sourceModule = "mcp", targetPackage = "pkg.isc.mcp.types.test", baseClass = "pkg.isc.mcp.types.BaseModel") [ Language = python ]
{
    import importlib
    import inspect
    import traceback
    import sys
    from pydantic import BaseModel
    import iris
    import datetime
    from typing import Union, Literal
    from types import NoneType, UnionType
    from logging import getLogger

    # Map complex type expressions to ObjectScript types
    complex_type_map = {
        'dict[str, typing.Any]': '%DynamicObject',
        'list[typing.Any]': '%DynamicArray'
    }

    # Other complex expressions that should be flagged as required properties
    complex_required_type_map = {
    }
    
    # Map Pydantic field types to ObjectScript types
    type_map = {
        'str': '%String',
        'int': '%Integer',
        'float': '%Float',
        'bool': '%Boolean',
        'datetime.datetime': '%TimeStamp',
        'datetime.date': '%Date',
        'dict': '%DynamicObject',
        'list': '%DynamicArray'
    }

    def get_all_models(module_name):
        models = []
        processed_models = set()  # Keep track of models we've seen to avoid duplicates
        
        def find_models(module_name):
            module = importlib.import_module(module_name)
            discovered = []
            
            # Find all top-level models in this module
            for name, obj in inspect.getmembers(module):
                if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel:
                    if obj.__name__ not in processed_models:
                        discovered.append(obj)
                        processed_models.add(obj.__name__)
            
            return discovered
        
        # First find all top-level models in the specified module
        module = importlib.import_module(module_name)
        top_models = []
        for name, obj in inspect.getmembers(module):
            if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel:
                top_models.append(obj)
                processed_models.add(obj.__name__)
        
        models.extend(top_models)
        
        # Now recursively find all referenced models
        i = 0
        while i < len(models):
            current_model = models[i]
            i += 1
            
            # Check each field for model references
            for field_name, field_info in current_model.__fields__.items():
                annotation = field_info.annotation
                referenced_models = find_referenced_models(annotation)
                
                for model in referenced_models:
                    if model.__name__ not in processed_models:
                        models.append(model)
                        processed_models.add(model.__name__)
                        print(f"Added referenced model: {model.__name__}")
        
        return models

    def process_model(targetPackage, model):
        # Format class name with package prefix
        class_name = f"{targetPackage}.{model.__name__}"
        
        # Check if class already exists
        cls_def = iris.cls('%Dictionary.ClassDefinition')._OpenId(class_name)
        if cls_def != "":
            print(f"Updating existing class: {class_name}")
        else:
            # Create new class definition
            cls_def = iris.cls('%Dictionary.ClassDefinition')._New()
            cls_def.Name = class_name
            print(f"Creating new class: {class_name}")
        
        cls_def.Super = baseClass
        cls_def.ProcedureBlock = 1
        
        # Add parameter to indicate this is an auto-generated class
        cls_def.Parameters.Clear()
        auto_gen_param = iris.cls('%Dictionary.ParameterDefinition')._New()
        auto_gen_param.Name = "AUTOGENERATED"
        auto_gen_param.Default = "1"
        auto_gen_param.parent = cls_def

        # Clear existing properties - always start from a clean slate
        cls_def.Properties.Clear()
        
        # Process model fields to create properties
        for field_name, field_info in model.__fields__.items():
            # Skip fields that start with underscore
            if field_name.startswith('_'):
                continue
            
            # Simplify property checking - create it fresh
            # The _Save() call will handle merging if it's already defined
            prop = iris.cls('%Dictionary.PropertyDefinition')._New()
            prop.Name = field_name
            prop.parent = cls_def

            print(f"Processing field: {field_name}: {field_info.annotation}")
            
            annotation = field_info.annotation
            (os_type, collection_type, required) = process_annotation(annotation)
            print(f"\tType: {os_type}, Collection type: {collection_type}")
            prop.Type = os_type
            prop.Collection = collection_type
            prop.Required = 1 if required else 0
        
        # Save the class definition
        sc = cls_def._Save()
        if not iris.cls('%SYSTEM.Status').IsOK(sc):
            print(f"Error saving class {class_name}: {iris.cls('%SYSTEM.Status').GetErrorText(sc)}")

    def process_annotation(annotation, topLevel = True):
        # Set up logger once
        logger = getLogger("Generator")
        logger.setLevel("DEBUG")
        
        os_type = ''
        collection_type = ''
        required = True
        
        logger.debug(f"Processing annotation: {annotation}")

        if complex_type_map.__contains__(str(annotation)):
            os_type = complex_type_map[str(annotation)]
            return (os_type, collection_type, False)

        if complex_required_type_map.__contains__(str(annotation)):
            os_type = complex_required_type_map[str(annotation)]
            return (os_type, collection_type, True)
        
        # Check if it's a Union type (Python 3.10+ pipe syntax)
        if isinstance(annotation, UnionType):
            union_types = annotation.__args__
            logger.debug(f"Native union type with args: {union_types}")
            
            # Check if it's an Optional (Union with NoneType)
            if (type(None) in union_types) or (NoneType in union_types):
                # Get the actual type (filter out None)
                actual_type = next(arg for arg in union_types if arg is not type(None) and arg is not NoneType)
                logger.debug(f"Optional type detected: {actual_type}")
                (os_type, collection_type, required) = process_annotation(actual_type)
                required = False
            else:
                # For regular union types, use a strategy that picks the most flexible type
                logger.debug(f"Processing union with multiple types")
                # Default to using the last type in the union
                for type_arg in union_types:
                    (os_type, collection_type, required) = process_annotation(type_arg, False)
        
        # Handle typing.Union
        elif hasattr(annotation, "__origin__") and annotation.__origin__ is Union:
            union_types = annotation.__args__
            logger.debug(f"typing.Union with args: {union_types}")
            
            # Check if it's an Optional (Union with NoneType)
            if (type(None) in union_types) or (NoneType in union_types):
                # Get the actual type (filter out None)
                actual_type = next(arg for arg in union_types if arg is not type(None) and arg is not NoneType)
                logger.debug(f"Optional type detected: {actual_type}")
                (os_type, collection_type, required) = process_annotation(actual_type)
                required = False
            else:
                # For regular union types, use the same strategy as above
                logger.debug(f"Processing union with multiple types")
                for type_arg in union_types:
                    (os_type, collection_type, required) = process_annotation(type_arg, False)
        
        # Handle container types (List, Dict, etc.)
        elif hasattr(annotation, "__origin__"):
            container_type = annotation.__origin__
            
            # Handle Literal separately
            if container_type is Literal:
                logger.debug(f"Literal type: {annotation}")
                os_type = '%String'
            elif topLevel == False:
                # For nested complex types, just fall back to %DynamicArray/%DynamicObject
                os_type = type_map.get(annotation.__name__, '%DynamicObject')
            else:
                type_args = annotation.__args__
                logger.debug(f"Container type: {container_type} with args: {type_args}")
                
                # For List[str], type_args would be (str,)
                # For Dict[str, int], type_args would be (str, int)
                if len(type_args) == 1:
                    # For a single type, it's a collection
                    (os_type, collection_type, required) = process_annotation(type_args[0], False)
                    collection_type = "list"
                    logger.debug(f"List type with element type: {os_type}")
                elif len(type_args) == 2:
                    # For a key-value pair, it's a dictionary
                    (os_type, collection_type, required) = process_annotation(type_args[1], False)
                    collection_type = "array"
                    logger.debug(f"Dictionary type with value type: {os_type}")
        
        # Handle types with a __name__ attribute (basic types)
        elif hasattr(annotation, "__name__"):
            type_name = annotation.__name__
            os_type = type_map.get(type_name, '%String')
            logger.debug(f"Named type: {type_name} -> {os_type}")
        
        # Handle any other types
        else:
            os_type = type_map.get(str(annotation), '%String')
            logger.debug(f"Other type: {annotation} -> {os_type}")
        
        logger.debug(f"Final mapping: {os_type}, collection: {collection_type}, required: {required}")
        return (os_type, collection_type, required)

    def find_referenced_models(annotation):
        """Find all Pydantic models referenced in this type annotation."""
        result = []
        
        # Direct model reference
        if inspect.isclass(annotation) and issubclass(annotation, BaseModel) and annotation != BaseModel:
            result.append(annotation)
        
        # Check for container types (Union, List, etc.)
        elif hasattr(annotation, "__origin__"):
            # For Union types, check each argument
            if annotation.__origin__ is Union:
                for arg in annotation.__args__:
                    result.extend(find_referenced_models(arg))
            
            # For container types like List, Dict
            elif hasattr(annotation, "__args__"):
                for arg in annotation.__args__:
                    result.extend(find_referenced_models(arg))
        
        return result

    try:
        # Find all Pydantic models in the module
        models = get_all_models(sourceModule)
        
        # Add referenced classes to type_map
        for model in models:
            # Format class name with package prefix
            class_name = f"{targetPackage}.{model.__name__}"
            type_map[model.__name__] = class_name

        print(models);

        # Process each model
        for model in models:
            print(f"\r\n")
            process_model(targetPackage, model)
        
        # Compile the whole package
        status = iris.cls('%SYSTEM.OBJ').CompilePackage(targetPackage, 'ck')
        if not iris.cls('%SYSTEM.Status').IsOK(status):
            print(f"Error compiling package {targetPackage}: {iris.cls('%SYSTEM.Status').GetErrorText(status)}")
        
        # Return success
        return 1
        
    except Exception as e:
        exc_type, exc_value, exc_traceback = sys.exc_info()
        lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
        print("Exception caught in Generator.Generate:")
        print(''.join(lines))
        print(f"Error details: {str(e)}")
        return 0
}
ObjectScript
ObjectScript

There's still a TON of nuances to deal with here, but it's a start at least...

Hummm, interesting idea, but I think there is some missing context here.

First about objects by themselves. We have Embedded Python that already bridge/bind Python objects to ObjectScript objects. So try to cast a Python object to a %RegisteredObject may not be the optimal way to go. The Embedded Python is already doing that for you.

Second, about Pydantic/ORM. The end goal of this idea is to persist the Pydantic model to a database, right?
There are many ways to do that, I would prefer to stick to the 'pythonic' way of doing things. So, if you want to persist a Pydantic model, I would suggest using SQLAlchemy or SQLModel. They are both great libraries for ORM in Python and have a lot of features that make it easy to work with databases.

Now, if your second goal is to be able to leverage DTL for Python objects, then I would suggest to use an Vdoc approach. You can find a POC here :
https://grongierisc.github.io/interoperability-embedded-python/dtl/

In a nutshell, don't try to bind python way of doing things to ObjectScript. Use the best of both worlds. Use Python for what it is good at and use ObjectScript for what it is good at.