Article
· Apr 23 6m read

OMOP Odyssey - AWS HealthLake ( Strait of Messina )

Nearline FHIR® Ingestion to InterSystems OMOP from AWS HealthLake

This part of the OMOP Journey we reflect before attempting to challenge Scylla on how fortunate we are that InterSystems OMOP transform is built on the Bulk FHIR Export as the source payload.  This opens up hands off interoperability with the InterSystems OMOP transform across several FHIR® vendors, including Amazon Web Services HealthLake.

HealthLake Bulk FHIR Export
 

Healthlake supports bulk fhir import/export from the cli or api, the premise is simple and the docs are over exhaustive, we'll save a model the trouble of training on it again and link it if interested.  The more valuable thing to understand of the heading of this paragraph is the implementation of the bulk fhir export standard itself.


Nearline?

Yeah, only "Nearline" ingestion, as the HealthLake export is the whole data store, and does not have a feature to be incremental. Additionally it does not support a resource based trigger, so it has to be invoked at an interval or via some other means yet to be apparent to me at the resource activity level.  Still a great number of ways to poke the export throughout AWS, and without incremental exports you only want it to be triggered inside a tolerable processing window anyway for the whole datastore.

The Whole Datastore?

Yes, the job exports all the resources into a flat structure.  Though it may not be the cleanest process to import the same data to catch the incremental data, the InterSystems OMOP transform should handle it.
 

Walkthrough

Trying to make this short and to the point, the illustration below really encapsulates what a that a scheduled lambda can glue these two solutions together and automate your OMOP ingestion.



Step One, AWS: Create Bucket

Create a bucket with a few of keys, one is shared with InterSystems OMOP for ingesting into the FHIR Transformation, the others will support the automated ingestion.


Explanations of the keys:

  • export - landing area for the raw resource ndjson from the job
  • from-healthlake-to-intersystems-omop - landing area for the create .zip and integtration point with InterSystems OMOP
  • output - job output

Step Two, InterSystems OMOP

Create the Deployment providing the arn of the bucket and the keys from above, ie: `from-healthlake-to-intersystems-omop` key.

Snag the example policy from the post configuration step as indicated and apply it to the bucket in AWS.  There are some exhaustive examples of this in a previous post OMOP Odyssey - InterSystems OMOP Cloud Service (Troy).

Step Three, Schedule a HealthLake Export to Expected InterSystems OMOP format 💫

The explanation of the flow of things is in the code itself as well, but I will also put it in the explanation in the form of a prompt so maybe you can land in the same spot with your own changes.

In python, show me how to start a HealthLake export job, export it to a target location, and poll the status of the job until it is complete, then read all of the ndjson files it creates and into a zip them without the relative path included in the zip and upload it to another location in the same bucket, once the upload is complete, remove the exported files from the export job.

The resulting function and code are the following:

import json
import boto3
import uuid
import boto3
import zipfile
import io
import os
import time


def lambda_handler(event, context):
    # Botos
    s3 = boto3.client('s3')
    client = boto3.client('healthlake')

    # Vars
    small_guid = uuid.uuid4().hex[:8]
    bucket_name = 'intersystems-omop-fhir-bucket'
    prefix = 'export/'  # Make sure it ends with '/'
    output_zip_key = 'from-healthlake-to-intersystems-omop/healthlake_ndjson_' + small_guid + '.zip'
    datastore_id = '9ee0e51d987e#ai#8ca487e8e95b1d'
    response = client.start_fhir_export_job(
        JobName='FHIR2OMOPJob',
        OutputDataConfig={
            'S3Configuration': {
                'S3Uri': 's3://intersystems-omop-fhir-bucket/export/',
                'KmsKeyId': 'arn:aws:kms:us-east-2:12345:key/54918bec-#ai#-4710-9c18-1a65d0d4590b'
            }
        },
        DatastoreId=datastore_id,
        DataAccessRoleArn='arn:aws:iam::12345:role/service-role/AWSHealthLake-Export-2-OMOP',
        ClientToken=small_guid
    )

    job_id = response['JobId']
    print(f"Export job started: {job_id}")

    # Step 2: Poll until the job completes
    while True:
        status_response = client.describe_fhir_export_job(
            DatastoreId=datastore_id,
            JobId=job_id
        )

        status = status_response['ExportJobProperties']['JobStatus']
        print(f"Job status: {status}")

        if status in ['COMPLETED', 'FAILED', 'CANCELLED']:
            break
        time.sleep(10)  # wait before polling again
    # Step 3: Final result
    if status == 'COMPLETED':
        output_uri = status_response['ExportJobProperties']['OutputDataConfig']['S3Configuration']['S3Uri']
        print(f"Export completed. Data available at: {output_uri}")

    # Get list of all objects with .ndjson extension under the prefix
    ndjson_keys = []
    paginator = s3.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        for obj in page.get('Contents', []):
            key = obj['Key']
            if key.endswith('.ndjson'):
                ndjson_keys.append(key)

    # Create ZIP in memory
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
        for key in ndjson_keys:
            obj = s3.get_object(Bucket=bucket_name, Key=key)
            file_data = obj['Body'].read()
            arcname = os.path.basename(key)
            zf.writestr(arcname, file_data)

    zip_buffer.seek(0)

    # Upload ZIP back to S3
    s3.put_object(
        Bucket=bucket_name,
        Key=output_zip_key,
        Body=zip_buffer.getvalue()
    )
    print(f"Created ZIP with {len(ndjson_keys)} files at s3://{bucket_name}/{output_zip_key}")
    # Clean up
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    for page in pages:
        if 'Contents' in page:
            # Exclude the folder marker itself if it exists
            delete_keys = [
                {'Key': obj['Key']}
                for obj in page['Contents']
                if obj['Key'] != prefix  # protect the folder key (e.g., 'folder1/')
            ]

            if delete_keys:
                s3.delete_objects(Bucket=bucket_name, Delete={'Objects': delete_keys})
                print(f"Deleted {len(delete_keys)} objects under {prefix}")
        else:
            print(f"No objects found under {prefix}")
    else:
        print(f"Export job did not complete successfully. Status: {status}")
    
    return {
        'statusCode': 200,
        'body': json.dumps(response)
    }


This function fires at an interval of about every 10 minutes via an EventBridge schedule, this will have to be adjusted to meet your workload characteristics.
 

Step Four, Validate Ingestion ✔

LGTM! we can see the zips in the ingestion location are successfully getting picked up by the transform in InterSystems OMOP.

Step Five, Smoke Data ✔

LGTM! FHIR Organization Resource = OMOPCDM54 care_site.

Discussion (0)1
Log in or sign up to continue