Run ilastik in parallel using dask¶
Description¶
We will show how to use dask to analyze images in parallel using the ilastik API. Binary data are stored in a public S3 repository in the Zarr format.
Setup¶
We recommend to use a Conda environment to install ilastik and the OMERO Python bindings. Please read first Install ilastik and OMERO Python bindings.
Step-by-Step¶
In this section, we go through the steps required to analyze the data.
The script used in this document is pixel_classification_zarr_parallel.py
.
Connect to the server:
def connect(hostname, username, password):
conn = BlitzGateway(username, password,
host=hostname, secure=True)
conn.connect()
conn.c.enableKeepAlive(60)
return conn
Load the images:
def load_images(conn, dataset_id):
return conn.getObjects('Image', opts={'dataset': dataset_id})
Define the analysis function:
def analyze(image_id, model):
args = app.parse_args([])
args.headless = True
args.project = model
args.readonly = True
shell = app.main(args)
input_data = load_from_s3(image_id)
# run ilastik headless
data = [ {"Raw Data": PreloadedArrayDatasetInfo(preloaded_array=input_data, axistags=vigra.defaultAxistags("tzyxc"))}] # noqa
return shell.workflow.batchProcessingApplet.run_export(data, export_to_array=True) # noqa
Helper function load the binary as a numpy array from the Zarr storage format:
def load_from_s3(image_id, resolution='0'):
endpoint_url = 'https://minio-dev.openmicroscopy.org/'
root = 'idr/outreach/%s.zarr/' % image_id
# data.shape is (t, c, z, y, x) by convention
with ProgressBar():
data = da.from_zarr(endpoint_url + root)
values = data[:]
# re-order tczyx -> tzyxc as expected by the ilastik project
values = values.swapaxes(1, 2).swapaxes(2, 3).swapaxes(3, 4)
return numpy.asarray(values)
Start the Dask client and a local cluster:
cluster = LocalCluster()
client = Client(cluster)
Use the Dask Future API
.
The work starts immediately as we submit work to the cluster:
def prepare(client, images, model):
futures = [client.submit(analyze, i.getId(), model) for i in images]
return futures
We wait until this work is done and gather the results to our local process:
def gather_results(client, futures):
return client.gather(futures)
When done, close the session:
def disconnect(conn):
conn.close()
In order to use the methods implemented above in a proper standalone script:
Wrap it all up in main
:
def main():
# Collect user credentials
try:
host = input("Host [wss://outreach.openmicroscopy.org/omero-ws]: ") or 'wss://outreach.openmicroscopy.org/omero-ws' # noqa
username = input("Username [trainer-1]: ") or 'trainer-1'
password = getpass("Password: ")
dataset_id = input("Dataset ID [6161]: ") or '6161'
# Connect to the server
conn = connect(host, username, password)
# path to the ilastik project
ilastik_project = "../notebooks/pipelines/pixel-class-133.ilp"
# Load the images in the dataset
images = load_images(conn, dataset_id)
# prepare ilastik
os.environ["LAZYFLOW_THREADS"] = "2"
os.environ["LAZYFLOW_TOTAL_RAM_MB"] = "2000"
# Create-client
cluster = LocalCluster()
client = Client(cluster)
# End-client
futures = prepare(client, images, ilastik_project)
start = time.time()
results = gather_results(client, futures)
done = time.time()
elapsed = (done - start) // 60
print("Compute time (in minutes): %s" % elapsed)
save_results(results)
finally:
disconnect(conn)
print("done")
if __name__ == "__main__":
main()