Run ilastik in parallel using dask

Description

We will show how to use dask to analyze images in parallel using the ilastik API. Binary data are stored in a public S3 repository in the Zarr format.

Setup

We recommend to use a Conda environment to install ilastik and the OMERO Python bindings. Please read first Install ilastik and OMERO Python bindings.

Step-by-Step

In this section, we go through the steps required to analyze the data. The script used in this document is pixel_classification_zarr_parallel.py.

Connect to the server:

def connect(hostname, username, password):
    conn = BlitzGateway(username, password,
                        host=hostname, secure=True)
    conn.connect()
    conn.c.enableKeepAlive(60)
    return conn


Load the images:

def load_images(conn, dataset_id):
    return conn.getObjects('Image', opts={'dataset': dataset_id})


Define the analysis function:

def analyze(image_id, model):
    args = app.parse_args([])
    args.headless = True
    args.project = model
    args.readonly = True
    shell = app.main(args)
    input_data = load_from_s3(image_id)
    # run ilastik headless
    data = [ {"Raw Data": PreloadedArrayDatasetInfo(preloaded_array=input_data, axistags=vigra.defaultAxistags("tzyxc"))}]  # noqa
    return shell.workflow.batchProcessingApplet.run_export(data, export_to_array=True)  # noqa


Helper function load the binary as a numpy array from the Zarr storage format:

def load_from_s3(image_id, resolution='0'):
    endpoint_url = 'https://minio-dev.openmicroscopy.org/'
    root = 'idr/outreach/%s.zarr/' % image_id
    # data.shape is (t, c, z, y, x) by convention
    with ProgressBar():
        data = da.from_zarr(endpoint_url + root)
        values = data[:]
        # re-order tczyx -> tzyxc as expected by the ilastik project
        values = values.swapaxes(1, 2).swapaxes(2, 3).swapaxes(3, 4)
        return numpy.asarray(values)


Start the Dask client and a local cluster:

        cluster = LocalCluster()
        client = Client(cluster)

Use the Dask Future API. The work starts immediately as we submit work to the cluster:

def prepare(client, images, model):
    futures = [client.submit(analyze, i.getId(), model) for i in images]
    return futures


We wait until this work is done and gather the results to our local process:

def gather_results(client, futures):
    return client.gather(futures)


When done, close the session:

def disconnect(conn):
    conn.close()


In order to use the methods implemented above in a proper standalone script: Wrap it all up in main:

def main():
    # Collect user credentials
    try:
        host = input("Host [wss://outreach.openmicroscopy.org/omero-ws]: ") or 'wss://outreach.openmicroscopy.org/omero-ws'  # noqa
        username = input("Username [trainer-1]: ") or 'trainer-1'
        password = getpass("Password: ")
        dataset_id = input("Dataset ID [6161]: ") or '6161'
        # Connect to the server
        conn = connect(host, username, password)

        # path to the ilastik project
        ilastik_project = "../notebooks/pipelines/pixel-class-133.ilp"

        # Load the images in the dataset
        images = load_images(conn, dataset_id)

        # prepare ilastik
        os.environ["LAZYFLOW_THREADS"] = "2"
        os.environ["LAZYFLOW_TOTAL_RAM_MB"] = "2000"

        # Create-client
        cluster = LocalCluster()
        client = Client(cluster)
        # End-client

        futures = prepare(client, images, ilastik_project)

        start = time.time()
        results = gather_results(client, futures)
        done = time.time()
        elapsed = (done - start) // 60
        print("Compute time (in minutes): %s" % elapsed)
        save_results(results)
    finally:
        disconnect(conn)

    print("done")


if __name__ == "__main__":
    main()