Analyze data stored in a public S3 repository in parallel

Description

We will show how to use dask to analyze an IDR image stored in a public S3 repository

We will show:

  • How to connect to IDR to retrieve the image metadata.
  • How to load the Zarr binary stored in a public repository.
  • How to run a segmentation on each plane in parallel.

Setup

We recommend to use a Conda environment to install the OMERO Python bindings. Please read first Install omero-py.

Step-by-Step

In this section, we go through the steps required to analyze the data. The script used in this document is public_s3_segmentation_parallel.py.

Connect to the server:

def connect(hostname, username, password):
    conn = BlitzGateway(username, password,
                        host=hostname, secure=True)
    print("Connected: %s" % conn.connect())
    conn.c.enableKeepAlive(60)
    return conn


Load the image:

def load_image(conn, image_id):
    return conn.getObject('Image', image_id)


Create a dask array from the Zarr storage format:

def load_binary_from_s3(id, resolution='4'):
    endpoint_url = 'https://s3.embassy.ebi.ac.uk/'
    root = 'idr/zarr/v0.1/%s.zarr/%s/' % (id, resolution)
    return da.from_zarr(endpoint_url + root)


Define the analysis function:

def analyze(t, c, z):
    plane = data[t, c, z, :, :]
    smoothed_image = dask_image.ndfilters.gaussian_filter(plane, sigma=[1, 1])
    threshold_value = 0.75 * da.max(smoothed_image).compute()
    threshold_image = smoothed_image > threshold_value
    label_image, num_labels = dask_image.ndmeasure.label(threshold_image)
    name = 't:%s, c: %s, z:%s' % (t, c, z)
    print("Plane coordinates: %s" % name)
    ref = 't_%s_c_%s_z_%s' % (t, c, z)
    return label_image, ref


Make our function lazy using dask.delayed. It records what we want to compute as a task into a graph that we will run later in parallel:

def prepare_call(image):
    middle_z = image.getSizeZ() // 2
    middle_t = image.getSizeT() // 2
    range_t = 5
    range_z = 5
    number_c = image.getSizeC()
    lazy_results = []
    for t in range(middle_t - range_t, middle_t + range_t):
        for z in range(middle_z - range_z, middle_z + range_z):
            for c in range(number_c):
                lazy_result = dask.delayed(analyze)(t, c, z)
                lazy_results.append(lazy_result)
    return lazy_results


We are now ready to run in parallel using the default number of workers see Configure dask.compute:

def compute(lazy_results):
    return dask.compute(*lazy_results)


When done, close the session:

def disconnect(conn):
    conn.close()


# Save the first 5 results on disk
def save_results(results):
    print("Saving locally the first 5 results as png")
    for r, name in results[:5]:
        array = numpy.asarray(r)
        value = "image_%s.png" % name
        plt.imsave(value, array)


In order to use the methods implemented above in a proper standalone script: Wrap it all up in main:

def main():
    # Collect user credentials
    try:
        host = "ws://idr.openmicroscopy.org/omero-ws"
        username = "public"
        password = "public"
        image_id = "4007801"

        # Connect to the server
        conn = connect(host, username, password)

        # Load the image
        image = load_image(conn, image_id)

        global data
        data = load_binary_from_s3(image_id)
        print("Dask array: %s" % data)

        lazy_results = prepare_call(image)

        start = time.time()
        results = compute(lazy_results)
        elapsed = time.time() - start
        print('Compute time (in seconds): %s' % elapsed)
        save_results(results)

    finally:
        disconnect(conn)
    print('done')


if __name__ == "__main__":
    main()