Run ilastik in parallel using dask


We will show how to use dask to analyze images in parallel using the ilastik API. Binary data are stored in a public S3 repository in the Zarr format.


We recommend to use a Conda environment to install ilastik and the OMERO Python bindings. Please read first Install ilastik and OMERO Python bindings.


In this section, we go through the steps required to analyze the data. The script used in this document is

Connect to the server:

def connect(hostname, username, password):
    conn = BlitzGateway(username, password,
                        host=hostname, secure=True)
    return conn

Load the images:

def load_images(conn, dataset_id):
    return conn.getObjects('Image', opts={'dataset': dataset_id})

Define the analysis function:

def analyze(image_id, model):
    args = app.parse_args([])
    args.headless = True
    args.project = model
    args.readonly = True
    shell = app.main(args)
    input_data = load_from_s3(image_id)
    # run ilastik headless
    data = [ {"Raw Data": PreloadedArrayDatasetInfo(preloaded_array=input_data, axistags=vigra.defaultAxistags("tzyxc"))}]  # noqa
    return shell.workflow.batchProcessingApplet.run_export(data, export_to_array=True)  # noqa

Helper function load the binary as a numpy array from the Zarr storage format:

def load_from_s3(image_id, resolution='0'):
    endpoint_url = ''
    root = 'idr/outreach/%s.zarr/' % image_id
    # data.shape is (t, c, z, y, x) by convention
    with ProgressBar():
        data = da.from_zarr(endpoint_url + root)
        values = data[:]
        # re-order tczyx -> tzyxc as expected by the ilastik project
        values = values.swapaxes(1, 2).swapaxes(2, 3).swapaxes(3, 4)
        return numpy.asarray(values)

Start the Dask client and a local cluster:

        cluster = LocalCluster()
        client = Client(cluster)

Use the Dask Future API. The work starts immediately as we submit work to the cluster:

def prepare(client, images, model):
    futures = [client.submit(analyze, i.getId(), model) for i in images]
    return futures

We wait until this work is done and gather the results to our local process:

def gather_results(client, futures):
    return client.gather(futures)

When done, close the session:

def disconnect(conn):

In order to use the methods implemented above in a proper standalone script: Wrap it all up in main:

def main():
    # Collect user credentials
        host = input("Host [wss://]: ") or 'wss://'  # noqa
        username = input("Username [trainer-1]: ") or 'trainer-1'
        password = getpass("Password: ")
        dataset_id = input("Dataset ID [6161]: ") or '6161'
        # Connect to the server
        conn = connect(host, username, password)

        # path to the ilastik project
        ilastik_project = "../notebooks/pipelines/pixel-class-133.ilp"

        # Load the images in the dataset
        images = load_images(conn, dataset_id)

        # prepare ilastik
        os.environ["LAZYFLOW_THREADS"] = "2"
        os.environ["LAZYFLOW_TOTAL_RAM_MB"] = "2000"

        # Create-client
        cluster = LocalCluster()
        client = Client(cluster)
        # End-client

        futures = prepare(client, images, ilastik_project)

        start = time.time()
        results = gather_results(client, futures)
        done = time.time()
        elapsed = (done - start) // 60
        print("Compute time (in minutes): %s" % elapsed)


if __name__ == "__main__":