Analyze data stored in a public S3 repository in parallel
Description
We will show how to use dask to analyze an IDR image stored in a public S3 repository
We will show:
How to connect to IDR to retrieve the image metadata.
How to load the Zarr binary stored in a public repository.
How to run a segmentation on each plane in parallel.
Setup
We recommend to use a Conda environment to install the OMERO Python bindings. Please read first Install omero-py.
Step-by-Step
In this section, we go through the steps required to analyze the data.
The script used in this document is public_s3_segmentation_parallel.py
.
Load the image and reate a dask array from the Zarr storage format:
def load_binary_from_s3(id, resolution='4'):
endpoint_url = 'https://uk1s3.embassy.ebi.ac.uk/'
root = 'idr/zarr/v0.1/%s.zarr/%s/' % (id, resolution)
return da.from_zarr(endpoint_url + root)
Define the analysis function:
def analyze(t, c, z):
plane = data[t, c, z, :, :]
smoothed_image = dask_image.ndfilters.gaussian_filter(plane, sigma=[1, 1])
threshold_value = 0.33 * da.max(smoothed_image).compute()
threshold_image = smoothed_image > threshold_value
label_image, num_labels = dask_image.ndmeasure.label(threshold_image)
name = 't:%s, c: %s, z:%s' % (t, c, z)
print("Plane coordinates: %s" % name)
ref = 't_%s_c_%s_z_%s' % (t, c, z)
return label_image, ref
Make our function lazy using dask.delayed
.
It records what we want to compute as a task into a graph that we will run later in parallel:
def prepare_call(dimensions):
middle_z = dimensions[2] // 2
middle_t = dimensions[0] // 2
range_t = 2
range_z = 2
number_c = dimensions[1]
lazy_results = []
for t in range(middle_t - range_t, middle_t + range_t):
for z in range(middle_z - range_z, middle_z + range_z):
for c in range(number_c):
lazy_result = dask.delayed(analyze)(t, c, z)
lazy_results.append(lazy_result)
return lazy_results
We are now ready to run in parallel using the default number of workers see Configure dask.compute:
def compute(lazy_results):
return dask.compute(*lazy_results)
# Save the first 5 results on disk
def save_results(results):
print("Saving locally the first 5 results as png")
for r, name in results[:5]:
array = numpy.asarray(r)
value = "image_%s.png" % name
plt.imsave(value, array)
In order to use the methods implemented above in a proper standalone script:
Wrap it all up in main
:
def main():
# Collect image ID
image_id = "4007801"
global data
data = load_binary_from_s3(image_id)
print("Dask array: %s" % data)
lazy_results = prepare_call(data.shape)
start = time.time()
results = compute(lazy_results)
elapsed = time.time() - start
print('Compute time (in seconds): %s' % elapsed)
save_results(results)
print('done')
if __name__ == "__main__":
main()