Hi,

1) The buffer size is the number of bytes to read/write at a time. It is not related to the record size. If you have a record of 2000 bytes, it will be written as a separate line in the stream. The buffer size is used to avoid memory issues when reading/writing large files.
2) There is no limitation on the string size for the to_json method. As long as the instance supports handling the data in memory, you should be good to go. You can handle files up to 1GB in size without any issues.

TL;DR: iris string are limited in size usually 3MB, so you need to use streams to handle large data.
In python string size is limited by the memory available on the machine.
To pass large data between python and iris, you need to use stream and the buffer size is used to avoid memory issues when reading/writing large data beteween string and stream.

Hi, you are on the right tracks.

Here how I would do it:

Input file :

{"id": 1, "name": "Alice", "city": "New York", "age": 28}
{"id": 2, "name": "Bob", "city": "Chicago", "age": 35}
{"id": 1, "name": "Alice", "city": "New York", "age": 28}
{"id": 3, "name": "Charlie", "city": "Boston", "age": 42}
{"id": 4, "name": "David", "city": "Seattle", "age": 31}
{"id": 2, "name": "Bob", "city": "Chicago", "age": 35}
{"id": 5, "name": "Eve", "city": "Miami", "age": 29}
{"id": 3, "name": "Charlie", "city": "Boston", "age": 42}
{"id": 6, "name": "Frank", "city": "Denver", "age": 38}
{"id": 1, "name": "Alice", "city": "New York", "age": 28}

The python code:

import pandas as pd
import iris

def string_to_stream(string:str,buffer=1000000):
    stream = iris.cls('%Stream.GlobalCharacter')._New()
    n = buffer
    chunks = [string[i:i+n] for i in range(0, len(string), n)]
    for chunk in chunks:
        stream.Write(chunk)
    return stream

def stream_to_string(stream,buffer=1000000)-> str:
    string = ""
    stream.Rewind()
    while not stream.AtEnd:
        string += stream.Read(buffer)
    return string

def sort_remove_count(file_name: str):   
    # read the ndjson file
    data = pd.read_json(file_name, lines=True)

    # sort the data by id
    data = data.sort_values(by='id')

    # remove the duplicates based on name, city and age
    data = data.drop_duplicates(subset=['name', 'city', 'age'])

    # count the number of unique ids
    unique_ids = data['id'].nunique()

    print(unique_ids)

    # save the data to a new ndjson string (not a file)
    buffer = data.to_json(orient='records', lines=True)

    # convert it as an iris stream
    stream = string_to_stream(buffer)
    return stream

if __name__ == '__main__':
    stream = sort_remove_count('demo/input.ndjson')
    print(stream_to_string(stream))

Result :

6
{"id":1,"name":"Alice","city":"New York","age":28}
{"id":2,"name":"Bob","city":"Chicago","age":35}
{"id":3,"name":"Charlie","city":"Boston","age":42}
{"id":4,"name":"David","city":"Seattle","age":31}
{"id":5,"name":"Eve","city":"Miami","age":29}
{"id":6,"name":"Frank","city":"Denver","age":38}

Now to industrialize this code, you can use IoP :

from iop import BusinessOperation
import pandas as pd
import iris

class SortRemoveCount(BusinessOperation):
    def string_to_stream(self, string:str,buffer=1000000):
        stream = iris.cls('%Stream.GlobalCharacter')._New()
        n = buffer
        chunks = [string[i:i+n] for i in range(0, len(string), n)]
        for chunk in chunks:
            stream.Write(chunk)
        return stream

    def stream_to_string(self, stream,buffer=1000000)-> str:
        string = ""
        stream.Rewind()
        while not stream.AtEnd:
            string += stream.Read(buffer)
        return string

    def sort_remove_count(self, file_name: str):   
        # read the ndjson file
        data = pd.read_json(file_name, lines=True)

        # sort the data by id
        data = data.sort_values(by='id')

        # remove the duplicates based on name, city and age
        data = data.drop_duplicates(subset=['name', 'city', 'age'])

        # count the number of unique ids
        unique_ids = data['id'].nunique()

        print(unique_ids)

        # save the data to a new ndjson string (not a file)
        buffer = data.to_json(orient='records', lines=True)

        # convert it as an iris stream
        stream = self.string_to_stream(buffer)
        return stream

    def iris_message(self, request: 'iris.Ens.StringRequest') -> 'iris.Ens.StreamContainer':
        stream = self.sort_remove_count(request.StringValue)
        return iris.cls('Ens.StreamContainer')._New(stream)

Hope this helps.

Thanks for this valuable feedback.

Few years ago (2020 ish), i had to do a project based on DocDB.
We encontered the same issues :

  • API first, not code first
    • Workaround : We relied heavily on scripts to generate the "Databases", "Properties", "Indexes" and so.
  • When you create a property it's not automatically indexed
    • Workaround : We created a wrapper around the SDK to ensure that every property was indexed
  • No way to enforce a schema
    • Workaround : No workaround, we didn't really care about that at the time

What you didn't mention and that we encountered :

  • Composite indexes are not supported
    • Workaround : We solved this with the "wrapper" we created
  • No support of neasted objects
    • Workaround : We didn't solve this, we had to flatten all the objects :(
  • Some operators was not supported or not working as expected
    • Workaround : We created some wrc tickets and most of them were fixed :) or built our own sql statements based on indexes

What great is that we never get blocked by those issues, we always found a workaround.

I'm glad to see that DocDB is still alive and the team is working on it.

It's a step in the right direction to support "json" databases. I can't wait to see the next steps maybe a client side library, support for nested objects, composite indexes, a great sql function to support json objects, etc.

can you give a try to https://github.com/grongierisc/iris-embedded-python-wrapper.

follow the readme, it will give you the instruction to work with venv and a chosen version of python and bind it to iris.

behind the scene, this module help to setup the PythonPath, PythonRuntimeLibray and PythonRuntimeLibrayVersion

let me know, if you find any issue.

Btw, it will not solve the python 3.13 issue, you need to upgrade to 2025.1 to support it.

I just build it, i had no issue on my side.

In your case, it seems that you don't have the permissions to access /tmp while building the image.

It's weird because /tmp is a public directory, so you should have access to it.

Make sure you haven't mounted a volume on /tmp.

Otherwise, you can try to modify the Dockerfile to use another directory, like /home/your_user/tmp.

# run iris and initial 
RUN iris start IRIS \
    && iris session IRIS < /opt/irisapp/iris.script \
    && iris stop IRIS quietly
import iris

GLOBAL_QUEUE = iris.gref("Ens.Queue")

def get_list_host_queue() -> dict:
    dict_queue = {}
    for composed_key, v in GLOBAL_QUEUE.query():
        host = composed_key[0]
        if host not in dict_queue:
            try:
                dict_queue[host] = v if composed_key[2] == 'count' else None
            except IndexError:
                dict_queue[host] = None
    return dict_queue

if __name__ == "__main__":
    print(get_list_host_queue())
    
# {'Ens.Actor': 0, 'Ens.Alarm': 0, 'Ens.ScheduleHandler': 0, 'EnsLib.Testing.Process': 0, 'Python.MyAsyncNGBO': 10, 'Python.MyAsyncNGBP': 0, 'SystemSignal:29404': 0, '_SyncCall:29420': 0}

Try some thing like that

As Robert said it's because of the list build serialization.

You can give a try to :

https://pypi.org/project/iris-dollar-list/

which is an list build parser in python :

from iris_dollar_list import DollarList

dollar_list_str = b'\x1B\x01SERVERA.FOO.BAR.ORG/STAGE\x1A\x01SERVERA.foo.bar.org|2188\t\x01Primary\x08\x01Active\x13\x01172.31.33.69|1972\x1A\x01SERVERA.foo.bar.org|1972'
dollar_list = DollarList(dollar_list_str)
print(dollar_list)

## $lb("SERVERA.FOO.BAR.ORG/STAGE","SERVERA.foo.bar.org|2188","Primary","Active","172.31.33.69|1972","SERVERA.foo.bar.org|1972")

Thanks, but i can't find the Python Adapter do you mean EnsLib.PEX.BusinessOperation or IOP.BusinessOperation.

Next if Embedded Python runs natively on iris, why i have to declare a connection as you mention in your example ?

Does this work best ?

from iop import BusinessOperation

class HelloWorld(BusinessOperation):

    def on_message(self, request):
        self.log_info("Hello World")