Ages ago I've impleted project named BlocksExplorer, with a various of ways of using it

One of them is generating a picture of of the database, and visualization of defragmentation.

I think I tried it once on 2TB Database, but that was long time ago.

Anyway, this tool generates picture in BMP format, where each pixel represents each block in the database, so, for 

For 2TB database, it will be around 268435456 blocks, that will be a picture 16384x16384, and in BMP with a 3 bytes per pixel, it will be around 768MB for this picture, quite big one

This project is fully open, you can try to modify it a bit, and maybe if you do it per global, you will be able to make a map

USER>:py Python 3.12.3 (main, Feb  4 2025, 14:48:35) [GCC 13.3.0] on linux
Type quit() or Ctrl-D to exit this shell.
>>> import sys
>>> sys.path
['/usr/irissys/mgr/python', '/usr/lib/python312.zip', '/usr/lib/python3.12', '/usr/lib/python3.12/lib-dynload', '/usr/local/lib/python3.12/dist-packages', '/usr/lib/python3/dist-packages', '/usr/irissys/mgr/python', '/usr/irissys/lib/python', '/usr/local/lib/python3.12/dist-packages', '/usr/lib/python3/dist-packages', '/usr/lib/python3.12/dist-packages']
>>>

did you try /usr/irissys/mgr/python, that's the path expected to be used by IRIS packages

pip install whatever --target /usr/irissys/mgr/python

and another place is /usr/irissys/lib/python, it's a place where InterSystems places Python Embedded support

sys.path, is a list of folders where python will look for the installed modules

Did you try using more appropriate package %Stream for it?

In your case, you would need to use %Stream.FileCharacter

 Set stream=##class(%Stream.FileCharacter).%New()
 $$$ThrowOnError(stream.LinkToFile("c:\export.csv"))
 set rs = ##class(%SQL.Statement).%ExecDirect(, "SELECT a, b FROM table")
 if rs.%SQLCODE'=0 {
     throw ##class(%Exception.SQL).CreateFromSQLCODE(rs.%SQLCODE, rs.%Message)
 }
 while rs.%Next() {
    set line = $listbuild(rs.a, rs.b)
    do f.WriteLine($listtostring(line, ","))
 }
 $$$ThrowOnError(stream.%Save())

Additionally, nowadays using embedded classes does not give much advantage, if it's not for one row result, only makes the code a bit harder to read

Implemented for Interoperability: it can check status, including items in it, restart, update, recover, and check for queues and errors.
There is also SQL Query executon

https://www.youtube.com/embed/EVzZnkjIvoM
[This is an embedded link, but you cannot view embedded content directly on the site because you have declined the cookies necessary to access it. To view embedded content, you would need to accept all cookies in your Cookies Settings]

While we moved our application to AWS, and we have some data, which we need to keep for a while. With this feature, we can move old data to a cheaper storage.

I believe the ability to move to a cheaper storage is mostly the case. Another option is that some table is too big, and someone would like to split it to be stored in multiple different databases, together with the indexes.

I don't see any difference

Time taken: 23.218561416957527 seconds to insert 100000 at  4306.899045302852  records per second.

Time taken: 23.179011167027056 seconds to insert 100000 at  4314.247889152987  records per second.

from sqlalchemy import create_engine
import numpy as np
import time
import iris

hostname = "localhost"
port = 1972
namespace = "USER"
username = "_SYSTEM"
password = "SYS"

# Create the SQLAlchemy engine
DATABASE_URL = (
    f"iris+intersystems://{username}:{password}@{hostname}:{port}/{namespace}"
)
engine = create_engine(DATABASE_URL, echo=True)
args = {
    "hostname": hostname,
    "port": port,
    "namespace": namespace,
    "username": username,
    "password": password,
}
connection = iris.connect(**args)
# connection = engine.raw_connection()

# Generate data for each row (50 fields)
columns_count = 50
drop_table_sql = f"DROP TABLE IF EXISTS test_table"
columns = [f"field_{i + 1}" for i in range(columns_count)]
create_table_sql = f"CREATE TABLE test_table ({', '.join([f'{col} DOUBLE' for col in columns])})"

num_records = 100000
# Define SQL insert statement
sql_insert = f"INSERT INTO SQLUser . test_table ({', '.join(columns)}) VALUES ({', '.join(['?'] * columns_count)})"
record_values = []
# Execute SQL insert
try:
    start_time = time.perf_counter()  # Capture start time
    batch = 0
    cursor = connection.cursor()
    cursor.execute(drop_table_sql)
    cursor.execute(create_table_sql)
    connection.commit()
    for _ in range(num_records):
        record_values = [np.random.rand() for _ in range(columns_count)]
        cursor.execute(sql_insert, record_values)
        batch = batch + 1
        if batch >= 10000:
            connection.commit()
            print("Batch inserted successfully!")
            batch = 0
    connection.commit()
    end_time = time.perf_counter()  # Capture end time
    elapsed_time = end_time - start_time
    print(
        f"Time taken: {elapsed_time} seconds to insert {num_records} at ",
        num_records / elapsed_time,
        " records per second.",
    )
except Exception as e:
    print("Error inserting record:", e)
finally:
    cursor.close()
    connection.close()
    engine.dispose()

I think you just confusing $listbuild with list in BPL context. In BPL when you define it as List Collection, it will use class %Collection.ListOfDT, or in case of array it will be %Collection.ArrayOfDT

That means, that you should use 

 if 'context.Facilities.Find(##class(Ens.Rule.FunctionSet).SubString(context.EpicDepartmentID,1,4))
 {
   do context.Facilities.Insert(##class(Ens.Rule.FunctionSet).SubString(context.EpicDepartmentID,1,4))
 }