Analyze data stored in a public S3 repository in parallel

Description

We will show how to use dask to analyze an IDR image stored in a public S3 repository

We will show:

  • How to connect to IDR to retrieve the image metadata.

  • How to load the Zarr binary stored in a public repository.

  • How to run a segmentation on each plane in parallel.

Setup

We recommend to use a Conda environment to install the OMERO Python bindings. Please read first Install omero-py.

Step-by-Step

In this section, we go through the steps required to analyze the data. The script used in this document is public_s3_segmentation_parallel.py.

Load the image and reate a dask array from the Zarr storage format:

def load_binary_from_s3(id, resolution='4'):
    endpoint_url = 'https://uk1s3.embassy.ebi.ac.uk/'
    root = 'idr/zarr/v0.1/%s.zarr/%s/' % (id, resolution)
    return da.from_zarr(endpoint_url + root)


Define the analysis function:

def analyze(t, c, z):
    plane = data[t, c, z, :, :]
    smoothed_image = dask_image.ndfilters.gaussian_filter(plane, sigma=[1, 1])
    threshold_value = 0.33 * da.max(smoothed_image).compute()
    threshold_image = smoothed_image > threshold_value
    label_image, num_labels = dask_image.ndmeasure.label(threshold_image)
    name = 't:%s, c: %s, z:%s' % (t, c, z)
    print("Plane coordinates: %s" % name)
    ref = 't_%s_c_%s_z_%s' % (t, c, z)
    return label_image, ref


Make our function lazy using dask.delayed. It records what we want to compute as a task into a graph that we will run later in parallel:

def prepare_call(dimensions):
    middle_z = dimensions[2] // 2
    middle_t = dimensions[0] // 2
    range_t = 2
    range_z = 2
    number_c = dimensions[1]
    lazy_results = []
    for t in range(middle_t - range_t, middle_t + range_t):
        for z in range(middle_z - range_z, middle_z + range_z):
            for c in range(number_c):
                lazy_result = dask.delayed(analyze)(t, c, z)
                lazy_results.append(lazy_result)
    return lazy_results


We are now ready to run in parallel using the default number of workers see Configure dask.compute:

def compute(lazy_results):
    return dask.compute(*lazy_results)


# Save the first 5 results on disk
def save_results(results):
    print("Saving locally the first 5 results as png")
    for r, name in results[:5]:
        array = numpy.asarray(r)
        value = "image_%s.png" % name
        plt.imsave(value, array)


In order to use the methods implemented above in a proper standalone script: Wrap it all up in main:

def main():
    # Collect image ID
    image_id = "4007801"

    global data
    data = load_binary_from_s3(image_id)
    print("Dask array: %s" % data)

    lazy_results = prepare_call(data.shape)

    start = time.time()
    results = compute(lazy_results)
    elapsed = time.time() - start
    print('Compute time (in seconds): %s' % elapsed)
    save_results(results)
    print('done')


if __name__ == "__main__":
    main()