User bio
404 bio not found
Member since Apr 5, 2019
Pinned posts:
Replies:

Hi,

1) The buffer size is the number of bytes to read/write at a time. It is not related to the record size. If you have a record of 2000 bytes, it will be written as a separate line in the stream. The buffer size is used to avoid memory issues when reading/writing large files.
2) There is no limitation on the string size for the to_json method. As long as the instance supports handling the data in memory, you should be good to go. You can handle files up to 1GB in size without any issues.

TL;DR: iris string are limited in size usually 3MB, so you need to use streams to handle large data.
In python string size is limited by the memory available on the machine.
To pass large data between python and iris, you need to use stream and the buffer size is used to avoid memory issues when reading/writing large data beteween string and stream.

Hi, you are on the right tracks.

Here how I would do it:

Input file :

{"id": 1, "name": "Alice", "city": "New York", "age": 28}
{"id": 2, "name": "Bob", "city": "Chicago", "age": 35}
{"id": 1, "name": "Alice", "city": "New York", "age": 28}
{"id": 3, "name": "Charlie", "city": "Boston", "age": 42}
{"id": 4, "name": "David", "city": "Seattle", "age": 31}
{"id": 2, "name": "Bob", "city": "Chicago", "age": 35}
{"id": 5, "name": "Eve", "city": "Miami", "age": 29}
{"id": 3, "name": "Charlie", "city": "Boston", "age": 42}
{"id": 6, "name": "Frank", "city": "Denver", "age": 38}
{"id": 1, "name": "Alice", "city": "New York", "age": 28}

The python code:

import pandas as pd
import iris

def string_to_stream(string:str,buffer=1000000):
    stream = iris.cls('%Stream.GlobalCharacter')._New()
    n = buffer
    chunks = [string[i:i+n] for i in range(0, len(string), n)]
    for chunk in chunks:
        stream.Write(chunk)
    return stream

def stream_to_string(stream,buffer=1000000)-> str:
    string = ""
    stream.Rewind()
    while not stream.AtEnd:
        string += stream.Read(buffer)
    return string

def sort_remove_count(file_name: str):   
    # read the ndjson file
    data = pd.read_json(file_name, lines=True)

    # sort the data by id
    data = data.sort_values(by='id')

    # remove the duplicates based on name, city and age
    data = data.drop_duplicates(subset=['name', 'city', 'age'])

    # count the number of unique ids
    unique_ids = data['id'].nunique()

    print(unique_ids)

    # save the data to a new ndjson string (not a file)
    buffer = data.to_json(orient='records', lines=True)

    # convert it as an iris stream
    stream = string_to_stream(buffer)
    return stream

if __name__ == '__main__':
    stream = sort_remove_count('demo/input.ndjson')
    print(stream_to_string(stream))

Result :

6
{"id":1,"name":"Alice","city":"New York","age":28}
{"id":2,"name":"Bob","city":"Chicago","age":35}
{"id":3,"name":"Charlie","city":"Boston","age":42}
{"id":4,"name":"David","city":"Seattle","age":31}
{"id":5,"name":"Eve","city":"Miami","age":29}
{"id":6,"name":"Frank","city":"Denver","age":38}

Now to industrialize this code, you can use IoP :

from iop import BusinessOperation
import pandas as pd
import iris

class SortRemoveCount(BusinessOperation):
    def string_to_stream(self, string:str,buffer=1000000):
        stream = iris.cls('%Stream.GlobalCharacter')._New()
        n = buffer
        chunks = [string[i:i+n] for i in range(0, len(string), n)]
        for chunk in chunks:
            stream.Write(chunk)
        return stream

    def stream_to_string(self, stream,buffer=1000000)-> str:
        string = ""
        stream.Rewind()
        while not stream.AtEnd:
            string += stream.Read(buffer)
        return string

    def sort_remove_count(self, file_name: str):   
        # read the ndjson file
        data = pd.read_json(file_name, lines=True)

        # sort the data by id
        data = data.sort_values(by='id')

        # remove the duplicates based on name, city and age
        data = data.drop_duplicates(subset=['name', 'city', 'age'])

        # count the number of unique ids
        unique_ids = data['id'].nunique()

        print(unique_ids)

        # save the data to a new ndjson string (not a file)
        buffer = data.to_json(orient='records', lines=True)

        # convert it as an iris stream
        stream = self.string_to_stream(buffer)
        return stream

    def iris_message(self, request: 'iris.Ens.StringRequest') -> 'iris.Ens.StreamContainer':
        stream = self.sort_remove_count(request.StringValue)
        return iris.cls('Ens.StreamContainer')._New(stream)

Hope this helps.

Open Exchange applications:
Certifications & Credly badges:
Global Masters badges:
Followers:
Following:
Guillaume has not followed anybody yet.