Question
· Feb 6

Sorting, remove duplicate and count of Json file

My usecase is sorting and removing duplicates and getting count from a file that has json messages as a individual rows.

I am currently planning to use pandas for this purpose as its really fast. Below are the steps i am following

1) call a python function (called function) from IRIS classmethod(calling function)

2) the call python function will read the json file in a dataframe

3) perform sorting, dup removal, count in the dataframe

4) convert the dataframe into iris stream

5) return back the stream to iris calling function class method

When i try to write the stream into termial its coming as a %SYS.python object rather a iris stream object.

Below is what my questions are

1) why is the return a %Sys.python rather a iris stream object

2) is there a better way to implement sorting, dup removals n count of record, in a file within iris.

Thanks!

Discussion (6)2
Log in or sign up to continue

Hi, you are on the right tracks.

Here how I would do it:

Input file :

{"id": 1, "name": "Alice", "city": "New York", "age": 28}
{"id": 2, "name": "Bob", "city": "Chicago", "age": 35}
{"id": 1, "name": "Alice", "city": "New York", "age": 28}
{"id": 3, "name": "Charlie", "city": "Boston", "age": 42}
{"id": 4, "name": "David", "city": "Seattle", "age": 31}
{"id": 2, "name": "Bob", "city": "Chicago", "age": 35}
{"id": 5, "name": "Eve", "city": "Miami", "age": 29}
{"id": 3, "name": "Charlie", "city": "Boston", "age": 42}
{"id": 6, "name": "Frank", "city": "Denver", "age": 38}
{"id": 1, "name": "Alice", "city": "New York", "age": 28}

The python code:

import pandas as pd
import iris

def string_to_stream(string:str,buffer=1000000):
    stream = iris.cls('%Stream.GlobalCharacter')._New()
    n = buffer
    chunks = [string[i:i+n] for i in range(0, len(string), n)]
    for chunk in chunks:
        stream.Write(chunk)
    return stream

def stream_to_string(stream,buffer=1000000)-> str:
    string = ""
    stream.Rewind()
    while not stream.AtEnd:
        string += stream.Read(buffer)
    return string

def sort_remove_count(file_name: str):   
    # read the ndjson file
    data = pd.read_json(file_name, lines=True)

    # sort the data by id
    data = data.sort_values(by='id')

    # remove the duplicates based on name, city and age
    data = data.drop_duplicates(subset=['name', 'city', 'age'])

    # count the number of unique ids
    unique_ids = data['id'].nunique()

    print(unique_ids)

    # save the data to a new ndjson string (not a file)
    buffer = data.to_json(orient='records', lines=True)

    # convert it as an iris stream
    stream = string_to_stream(buffer)
    return stream

if __name__ == '__main__':
    stream = sort_remove_count('demo/input.ndjson')
    print(stream_to_string(stream))

Result :

6
{"id":1,"name":"Alice","city":"New York","age":28}
{"id":2,"name":"Bob","city":"Chicago","age":35}
{"id":3,"name":"Charlie","city":"Boston","age":42}
{"id":4,"name":"David","city":"Seattle","age":31}
{"id":5,"name":"Eve","city":"Miami","age":29}
{"id":6,"name":"Frank","city":"Denver","age":38}

Now to industrialize this code, you can use IoP :

from iop import BusinessOperation
import pandas as pd
import iris

class SortRemoveCount(BusinessOperation):
    def string_to_stream(self, string:str,buffer=1000000):
        stream = iris.cls('%Stream.GlobalCharacter')._New()
        n = buffer
        chunks = [string[i:i+n] for i in range(0, len(string), n)]
        for chunk in chunks:
            stream.Write(chunk)
        return stream

    def stream_to_string(self, stream,buffer=1000000)-> str:
        string = ""
        stream.Rewind()
        while not stream.AtEnd:
            string += stream.Read(buffer)
        return string

    def sort_remove_count(self, file_name: str):   
        # read the ndjson file
        data = pd.read_json(file_name, lines=True)

        # sort the data by id
        data = data.sort_values(by='id')

        # remove the duplicates based on name, city and age
        data = data.drop_duplicates(subset=['name', 'city', 'age'])

        # count the number of unique ids
        unique_ids = data['id'].nunique()

        print(unique_ids)

        # save the data to a new ndjson string (not a file)
        buffer = data.to_json(orient='records', lines=True)

        # convert it as an iris stream
        stream = self.string_to_stream(buffer)
        return stream

    def iris_message(self, request: 'iris.Ens.StringRequest') -> 'iris.Ens.StreamContainer':
        stream = self.sort_remove_count(request.StringValue)
        return iris.cls('Ens.StreamContainer')._New(stream)

Hope this helps.

In case, your file contains lines NO LONGER than 500 characters than you can drop duplicated lines and sort them in one go:

ClassMethod RemoveAndSort(inpFN = "json.txt")
{
    kill ^||tmp
    set inpStr=##class(%Stream.FileCharacter).%New()
    set outStr=##class(%Stream.TmpCharacter).%New()
    do inpStr.LinkToFile(inpFN)
    
    do inpStr.Rewind()
    while 'inpStr.AtEnd {             // put the JSON lines into a tmp global
        set line=inpStr.ReadLine()    // duplicates are overwritten, all lines are sorted
        if $match(line,"[\{\[].+[\}\]]") { set ^||tmp(line)="" } // ignore non-JSON lines
    }
    
    set line=""
    for {set line=$o(^||tmp(line)) quit:line=""  do outStr.WriteLine(line) }
    
    quit outStr // save the new stream or just use it
}

A similar (short) solution exists for removing duplicated lines until MAXSTRING line lengths.

You are right, using the hash value of a line solves the problem of long lines (hence the hint in my answer: 'similar (short) solution') but does NOT solve the second requirement, the sorting.
Hash values do not keep the sort order of the original values!

set a="Sara",b="John" write $system.Encryption.SHA1Hash(a)]$system.Encryption.SHA1Hash(b),!,a]b
1
1
set a="SSSS",b="JJJJ" write $system.Encryption.SHA1Hash(a)]$system.Encryption.SHA1Hash(b),!,a]b
0
1

If I remember correctly, I developed a method for sorting long lines a few years ago. I'll look into it this weekend.

Thanks a lot @Guillaume Rongier and @Julius Kavay  for the solution!!

@Guillaume Rongier 
have couple of follow up questions

1) what is the buffer size in string to stream operation. Is this at record level or number of bytes and if my json record is say 2000 bytes each row would be written as a separate line in the stream.

2) is there any limitations on the strin size for below. I can have the file size upto 1GB. So effectively as long as the intanse supports handlig this much of data inmemory should be good right

buffer = data.to_json(orient='records', lines=True)

2)

Hi,

1) The buffer size is the number of bytes to read/write at a time. It is not related to the record size. If you have a record of 2000 bytes, it will be written as a separate line in the stream. The buffer size is used to avoid memory issues when reading/writing large files.
2) There is no limitation on the string size for the to_json method. As long as the instance supports handling the data in memory, you should be good to go. You can handle files up to 1GB in size without any issues.

TL;DR: iris string are limited in size usually 3MB, so you need to use streams to handle large data.
In python string size is limited by the memory available on the machine.
To pass large data between python and iris, you need to use stream and the buffer size is used to avoid memory issues when reading/writing large data beteween string and stream.