Object mapping between Pydantic and %RegisteredObject
I'm exploring this right now: given a bunch of types defined as Pydantic models, how can I come up with an equivalent %RegisteredObject/%SerialObject and convert to/from (e.g., to support persistence and match validation as much as possible)?
People who know Python better than I do (e.g., your average undergraduate from this decade): is this a stupid idea or a cool idea? Has anyone else done this before?
Is this the kind of behavior you want?
The above is achievable using the following. You can also implement %JSONExport, %Validate, etc.
Neat use of Dynamic Dispatch! I was thinking something more like (note - this is very quick and dirty/WIP):
/// Generate a set of ObjectScript classes corresponding to Pydantic models defined in a given Python module. /// /// Args: /// sourceModule: Path to the Python module containing Pydantic models. /// targetPackage: Target package for generated ObjectScript classes. /// baseClass: Base class for generated ObjectScript classes. /// /// Significant contributions by Windsurf / Claude 3.7 Sonnet (Thinking) /// That is to say, if it doesn't work, it's the AI's fault. (Plus mine for being bad at Python.) ClassMethod Generate(sourceModule = "mcp", targetPackage = "pkg.isc.mcp.types.test", baseClass = "pkg.isc.mcp.types.BaseModel") [ Language = python ] { import importlib import inspect import traceback import sys from pydantic import BaseModel import iris import datetime from typing import Union, Literal from types import NoneType, UnionType from logging import getLogger # Map complex type expressions to ObjectScript types complex_type_map = { 'dict[str, typing.Any]': '%DynamicObject', 'list[typing.Any]': '%DynamicArray' } # Other complex expressions that should be flagged as required properties complex_required_type_map = { } # Map Pydantic field types to ObjectScript types type_map = { 'str': '%String', 'int': '%Integer', 'float': '%Float', 'bool': '%Boolean', 'datetime.datetime': '%TimeStamp', 'datetime.date': '%Date', 'dict': '%DynamicObject', 'list': '%DynamicArray' } def get_all_models(module_name): models = [] processed_models = set() # Keep track of models we've seen to avoid duplicates def find_models(module_name): module = importlib.import_module(module_name) discovered = [] # Find all top-level models in this module for name, obj in inspect.getmembers(module): if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel: if obj.__name__ not in processed_models: discovered.append(obj) processed_models.add(obj.__name__) return discovered # First find all top-level models in the specified module module = importlib.import_module(module_name) top_models = [] for name, obj in inspect.getmembers(module): if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel: top_models.append(obj) processed_models.add(obj.__name__) models.extend(top_models) # Now recursively find all referenced models i = 0 while i < len(models): current_model = models[i] i += 1 # Check each field for model references for field_name, field_info in current_model.__fields__.items(): annotation = field_info.annotation referenced_models = find_referenced_models(annotation) for model in referenced_models: if model.__name__ not in processed_models: models.append(model) processed_models.add(model.__name__) print(f"Added referenced model: {model.__name__}") return models def process_model(targetPackage, model): # Format class name with package prefix class_name = f"{targetPackage}.{model.__name__}" # Check if class already exists cls_def = iris.cls('%Dictionary.ClassDefinition')._OpenId(class_name) if cls_def != "": print(f"Updating existing class: {class_name}") else: # Create new class definition cls_def = iris.cls('%Dictionary.ClassDefinition')._New() cls_def.Name = class_name print(f"Creating new class: {class_name}") cls_def.Super = baseClass cls_def.ProcedureBlock = 1 # Add parameter to indicate this is an auto-generated class cls_def.Parameters.Clear() auto_gen_param = iris.cls('%Dictionary.ParameterDefinition')._New() auto_gen_param.Name = "AUTOGENERATED" auto_gen_param.Default = "1" auto_gen_param.parent = cls_def # Clear existing properties - always start from a clean slate cls_def.Properties.Clear() # Process model fields to create properties for field_name, field_info in model.__fields__.items(): # Skip fields that start with underscore if field_name.startswith('_'): continue # Simplify property checking - create it fresh # The _Save() call will handle merging if it's already defined prop = iris.cls('%Dictionary.PropertyDefinition')._New() prop.Name = field_name prop.parent = cls_def print(f"Processing field: {field_name}: {field_info.annotation}") annotation = field_info.annotation (os_type, collection_type, required) = process_annotation(annotation) print(f"\tType: {os_type}, Collection type: {collection_type}") prop.Type = os_type prop.Collection = collection_type prop.Required = 1 if required else 0 # Save the class definition sc = cls_def._Save() if not iris.cls('%SYSTEM.Status').IsOK(sc): print(f"Error saving class {class_name}: {iris.cls('%SYSTEM.Status').GetErrorText(sc)}") def process_annotation(annotation, topLevel = True): # Set up logger once logger = getLogger("Generator") logger.setLevel("DEBUG") os_type = '' collection_type = '' required = True logger.debug(f"Processing annotation: {annotation}") if complex_type_map.__contains__(str(annotation)): os_type = complex_type_map[str(annotation)] return (os_type, collection_type, False) if complex_required_type_map.__contains__(str(annotation)): os_type = complex_required_type_map[str(annotation)] return (os_type, collection_type, True) # Check if it's a Union type (Python 3.10+ pipe syntax) if isinstance(annotation, UnionType): union_types = annotation.__args__ logger.debug(f"Native union type with args: {union_types}") # Check if it's an Optional (Union with NoneType) if (type(None) in union_types) or (NoneType in union_types): # Get the actual type (filter out None) actual_type = next(arg for arg in union_types if arg is not type(None) and arg is not NoneType) logger.debug(f"Optional type detected: {actual_type}") (os_type, collection_type, required) = process_annotation(actual_type) required = False else: # For regular union types, use a strategy that picks the most flexible type logger.debug(f"Processing union with multiple types") # Default to using the last type in the union for type_arg in union_types: (os_type, collection_type, required) = process_annotation(type_arg, False) # Handle typing.Union elif hasattr(annotation, "__origin__") and annotation.__origin__ is Union: union_types = annotation.__args__ logger.debug(f"typing.Union with args: {union_types}") # Check if it's an Optional (Union with NoneType) if (type(None) in union_types) or (NoneType in union_types): # Get the actual type (filter out None) actual_type = next(arg for arg in union_types if arg is not type(None) and arg is not NoneType) logger.debug(f"Optional type detected: {actual_type}") (os_type, collection_type, required) = process_annotation(actual_type) required = False else: # For regular union types, use the same strategy as above logger.debug(f"Processing union with multiple types") for type_arg in union_types: (os_type, collection_type, required) = process_annotation(type_arg, False) # Handle container types (List, Dict, etc.) elif hasattr(annotation, "__origin__"): container_type = annotation.__origin__ # Handle Literal separately if container_type is Literal: logger.debug(f"Literal type: {annotation}") os_type = '%String' elif topLevel == False: # For nested complex types, just fall back to %DynamicArray/%DynamicObject os_type = type_map.get(annotation.__name__, '%DynamicObject') else: type_args = annotation.__args__ logger.debug(f"Container type: {container_type} with args: {type_args}") # For List[str], type_args would be (str,) # For Dict[str, int], type_args would be (str, int) if len(type_args) == 1: # For a single type, it's a collection (os_type, collection_type, required) = process_annotation(type_args[0], False) collection_type = "list" logger.debug(f"List type with element type: {os_type}") elif len(type_args) == 2: # For a key-value pair, it's a dictionary (os_type, collection_type, required) = process_annotation(type_args[1], False) collection_type = "array" logger.debug(f"Dictionary type with value type: {os_type}") # Handle types with a __name__ attribute (basic types) elif hasattr(annotation, "__name__"): type_name = annotation.__name__ os_type = type_map.get(type_name, '%String') logger.debug(f"Named type: {type_name} -> {os_type}") # Handle any other types else: os_type = type_map.get(str(annotation), '%String') logger.debug(f"Other type: {annotation} -> {os_type}") logger.debug(f"Final mapping: {os_type}, collection: {collection_type}, required: {required}") return (os_type, collection_type, required) def find_referenced_models(annotation): """Find all Pydantic models referenced in this type annotation.""" result = [] # Direct model reference if inspect.isclass(annotation) and issubclass(annotation, BaseModel) and annotation != BaseModel: result.append(annotation) # Check for container types (Union, List, etc.) elif hasattr(annotation, "__origin__"): # For Union types, check each argument if annotation.__origin__ is Union: for arg in annotation.__args__: result.extend(find_referenced_models(arg)) # For container types like List, Dict elif hasattr(annotation, "__args__"): for arg in annotation.__args__: result.extend(find_referenced_models(arg)) return result try: # Find all Pydantic models in the module models = get_all_models(sourceModule) # Add referenced classes to type_map for model in models: # Format class name with package prefix class_name = f"{targetPackage}.{model.__name__}" type_map[model.__name__] = class_name print(models); # Process each model for model in models: print(f"\r\n") process_model(targetPackage, model) # Compile the whole package status = iris.cls('%SYSTEM.OBJ').CompilePackage(targetPackage, 'ck') if not iris.cls('%SYSTEM.Status').IsOK(status): print(f"Error compiling package {targetPackage}: {iris.cls('%SYSTEM.Status').GetErrorText(status)}") # Return success return 1 except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() lines = traceback.format_exception(exc_type, exc_value, exc_traceback) print("Exception caught in Generator.Generate:") print(''.join(lines)) print(f"Error details: {str(e)}") return 0 }
There's still a TON of nuances to deal with here, but it's a start at least...
I was thinking about something like that too but wasn't sure how to dynamically construct an objectscript class.
Hummm, interesting idea, but I think there is some missing context here.
First about objects by themselves. We have Embedded Python that already bridge/bind Python objects to ObjectScript objects. So try to cast a Python object to a %RegisteredObject may not be the optimal way to go. The Embedded Python is already doing that for you.
Second, about Pydantic/ORM. The end goal of this idea is to persist the Pydantic model to a database, right?
There are many ways to do that, I would prefer to stick to the 'pythonic' way of doing things. So, if you want to persist a Pydantic model, I would suggest using SQLAlchemy or SQLModel. They are both great libraries for ORM in Python and have a lot of features that make it easy to work with databases.
Now, if your second goal is to be able to leverage DTL for Python objects, then I would suggest to use an Vdoc approach. You can find a POC here :
https://grongierisc.github.io/interoperability-embedded-python/dtl/
In a nutshell, don't try to bind python way of doing things to ObjectScript. Use the best of both worlds. Use Python for what it is good at and use ObjectScript for what it is good at.
Thank you! This is a really helpful perspective.
Ultimately I'm looking at both persistence and DTL.