Advanced Usage and Customization

One of ACME’s main objectives is to be as application-agnostic as possible so that it can be a drop-in parallelization engine for a wide spectrum of use cases. Thus, ACME features a variety of options for custom-tailoring its functionality to specific problems. This section discusses settings details and associated technical intricacies.

User-Function Requirements

The user-provided function func has to meet some basic requirements to permit parallel execution with ParallelMap:

  • input arguments of `func` should be regular Python objects (lists, tuples, scalars, strings etc.) or NumPy arrays. Custom user-defined classes may or may not work. In general, anything that can be serialized via cloudpickle should work out of the box.

  • if automatic result saving is used (write_worker_results is True), the return value(s) of `func` have to be suitable for storage in HDF5 containers. Thus, anything returned by func should be either purely numeric (scalars or NumPy arrays) or purely lexical (strings). Hybrid text/numeric data-types (e.g., Pandas dataframes), custom class instances, functions, generators or complex objects (like matplotlib figures) will not work.

Auto-Generated HDF5-Files

Directory Layout

All HDF5 files auto-generated by ParallelMap are stored in a directory named ACME_YYYYMMDD-hhmmss-ffffff (encoding the current time as YearMonthDay-HourMinuteSecond-Microsecond) that is created in the user’s home directory on /cs (if ACME is running on the ESI HPC cluster) or the current working directory.

By default, ACME generates n_inputs + 1 HDF5 files: one aggregate results container and n_inputs payload HDF5 files for each computation. The aggregate “main file” only contains links to the payload files and is called funcname.h5 based on the name of the user-provided function. The naming of the payload files follows the schema funcname_{compid}.h5, where {compid} encodes the number of the performed computation (between 0 and n_inputs - 1):

ACME_YYYYMMDD-hhmmss-ffffff/
  └─ funcname.h5
  └─ funcname_payload/
       └─ funcname_0.h5
       └─ funcname_1.h5
       └─ funcname_2.h5
       .
       .
       .
       └─ funcname_{n_inputs - 1}.h5

The main file funcname.h5 is composed of relative links to the payload files in the funcname_payload subdirectory. This means, funcname.h5 is only functional alongside its “payload” sub-folder. Therefore, if funcname.h5 is moved to another file-system location, its payload directory needs to be moved as well.

Note

The number of computations carried out (i.e., how many times a user-provided function func is called) does not have to be equals to the number of parallel workers used to perform these calculations. Particularly if n_inputs is large but evaluating func is computationally cheap, starting n_inputs SLURM workers (=jobs) might take longer than the actual concurrent calculation. In such a scenario it is usually better to use fewer workers (set n_workers < n_inputs) that each perform multiple evaluations of func.

To make a HDF5 file with regular datasets based on the auto-generated aggregate results container created by ACME, the h5py.Group.copy() routine can be used:

import h5py

with h5py.File("new_file.h5", "w") as h5target:
    with h5py.File(pmap.results_container, "r")  as h5source:
        for comp in h5source.keys():
            h5source.copy(comp, h5target, expand_external=True, name=comp)

Note that ACME itself can create an aggregate results container with “regular” datasets from the get-go by setting the keyword single_file to True in ParallelMap. The potential benefits and downsides are discussed in Single vs. Multiple Result Files (single_file) below.

Container Structure

The internal structure of all HDF5 files generated by ACME is kept as simple as possible: the aggregate main file is partitioned into n_inputs groups (‘comp_0’, ‘comp_1’, …) that each points to the respective payload file actually holding the results of the corresponding computation. Within every payload file each return value of the user-provided function func is saved in a separate dataset in the file’s root group. For instance, processing the following user-provided function

def this_func(a, b, c):
    # ...some complicated calculations...
    return r0, r1, r2

for 50 different input triplets (a, b, c) generates one aggregate container this_func.h5 and a payload of 50 HDF5 files this_func_0.h5, this_func_1.h5, …, this_func_49.h5. The aggregate results container this_func.h5 is structured as follows:

this_func.h5
    └─ comp_0
    |    └─ result_0
    |    └─ result_1
    |    └─ result_2
    └─ comp_1
    |    └─ result_0
    |    └─ result_1
    |    └─ result_2
    └─ comp_2
    |    └─ result_0
    |    └─ result_1
    |    └─ result_2
    .
    .
    .
    └─ comp_49
         └─ result_0
         └─ result_1
         └─ result_2

Each payload file this_func_0.h5, this_func_1.h5, …, this_func_49.h5 contains three datasets “result_0” (holding r0), “result_1” (holding r1) and “result_2” (holding r2) in its root group, e.g.,

this_func_33.h5
    └─ result_0
    └─ result_1
    └─ result_2

User-provided functions with only a single return value correspondingly generate payload files that only contain one dataset (“result_0”) in their respective root group.

Single vs. Multiple Result Files (single_file)

By default, ACME generates a dedicated HDF5 file for every computational run performed by ParallelMap leveraging the independent nature of embarassingly parallel workloads. This strategy has the substantial advantage, that parallel workers are independent when writing results to disk: every worker generates dedicated payload files corresponding to the computational runs it is processing. Not relying on a shared writing resource means saving does not require any synchronization: no worker has to wait for another worker to finish its write process and release a file-lock. Consequently, even tasks with perfectly distributed workloads (all computational runs finish at the same time) can jointly save their results without any wait time.

However, for some applications the creation of n_inputs payload files might actually deteriorate performance. Depending on the underlying filesystem generating numerous very small HDF containers may be detrimental to I/O throughput. To remedy this, ParallelMap offers the option to write results together in a joint output file by setting single_file to True. Consider the function

def randout(x, y=3):
    if x > 0:
        return x / y
    else:
        return x * y

Suppose randout needs to be evaluated for 5000 values of x randomly sampled from a standard normal distribution. To avoid the creation of 5000 payload files, use the single_file keyword in the invocation of ParallelMap

import numpy as np

N = 5000
rng = np.random.default_rng()
x = rng.normal(size=N)
with ParallelMap(randout, x, n_workers=10, single_file=True) as pmap:
    results = pmap.compute()

Note that the output does not mention the creation of a payload directory and results is a single-element list that only contains pmap.results_container:

>>> results
['/my/current/workdir/ACME_20221007-100302-976973/randout.h5']
>>> pmap.results_container
'/my/current/workdir/ACME_20221007-100302-976973/randout.h5'

While the output of randout is small (a scalar), its execution time for random independent input values is identical within measurement accuracy. Thus, on a filesystem optimized for parallel I/O, running the given example with single_file = False (default) is most likely significantly faster than the approach shown above since parallel workers do not have to wait for their turn to access the single results container.

Customized Stacking of Results (result_shape and result_dtype)

Most scientific data-processing functions do not return random unstructured objects but numerical data arranged in arrays. ACME offers options to slot incoming data into pre-defined (multi-dimensional) arrays for easier access. Consider the function

import numpy as np

def matconstruct(a, k):
    rng = np.random.default_rng(seed=k)
    i = rng.integers(low=0, high=a.shape[0], size=1)[0]
    arr = np.delete(np.corrcoef(a), i, axis=1)
    return arr

Calling matconstruct returns a 2d-array arr of shape (M, N). Suppose, K = 200 of these arrays have to be arranged in a tensor of shape (K, M, N). Instead of letting ACME create K HDF5 groups for each call of matconstruct which then have to be accessed post-hoc to create the desired array, the keyword result_shape can be used to tell ParallelMap to slot results into a pre-allocated dataset.

import numpy as np

M = 10
N = M -1
K = 200
a = np.random.default_rng().random((M, 2*M))
with ParallelMap(matconstruct, a, range(K), n_workers=50, result_shape=(None, M, N)) as pmap:
    results = pmap.compute()

A single None entry in result_shape indicates the dimension along which incoming results are to be stacked. Note that exactly one None entry must be specified in result_shape.

Using result_shape impacts the container structure generated by ACME: the results of each computational run do not need to be stored in dedicated HDF5 groups (‘comp_0’, ‘comp_1’, …) but are slotted into a Virtual Dataset. Thus, the aggregate results container only contains the single Virtual Dataset “result_0”.

Note

By default, ACME uses Virtual HDF5 Datasets for slotting results of concurrent computational runs. The real datasets in the generated payload files are mapped together into a single virtual dataset via the a-priori definition of a h5py.VirtualLayout. The constructed Virtual Dataset can be sliced, viewed and loaded like a regular HDF5 dataset with the Virtual Layout acting as interface layer for fetching the requested data from the associated payload files. This strategy provides a simple single-dataset interface to access results while maintaining the benefit of independent file access for parallel workers. Note that ACME can also create a single regular dataset in a single results container by combining result_shape with single_file = True which comes with all benefits and downsides discussed in Single vs. Multiple Result Files (single_file).

Now consider the case of matconstruct returning multiple quantities:

def matconstruct_multi(a, k):
    rng = np.random.default_rng(seed=k)
    i = rng.integers(low=0, high=a.shape[0], size=1)[0]
    arr = np.delete(np.corrcoef(a), i, axis=1)
    return arr, k, np.linalg.svd(arr, compute_uv=False)

In this case, using result_shape when calling ParallelMap only affects the first return variable arr, the remaining two quantities (k and an array containing arr’s singular values) are filed under K HDF5 groups (“comp_0”, …, “comp_{K}”) each containing two datasets corresponding to the non-slotted return quantities:

with ParallelMap(matconstruct_multi, a, range(K), n_workers=50, result_shape=(None, M, N)) as pmap_multi:
    results = pmap_multi.compute()

Then (focusing on “comp_56” as an exemplary group)

>>> h5f = h5py.File(pmap_multi.results_container, "r")
>>> h5f.keys()
<KeysViewHDF5 ['comp_0', 'comp_1', ... , 'comp_199', 'result_0']>
>>> h5f["result_0"]
<HDF5 dataset "result_0": shape (200, 10, 9), type "<f8">
>>> h5f["comp_56"].keys()
<KeysViewHDF5 ['result_1', 'result_2']>
>>> h5f["comp_56"]["result_1"][()]
56
>>> h5f["comp_56"]["result_2"][()]
array([2.21726934, 1.96445424, 1.35668273, 0.96739928, 0.94735141,
       0.78221836, 0.49308408, 0.2719983 , 0.17343296])

By default, ACME assumes the virtual dataset to contain 64-bit floating point numbers. A different numerical datatype can be specified via the result_dtype keyword:

with ParallelMap(matconstruct, a, range(K), n_workers=50, result_shape=(None, M, N), result_dtype="float16") as pmap16:
    results = pmap16.compute()

Then

>>> h64f = h5py.File(pmap.results_container, "r")
>>> h16f = h5py.File(pmap16.results_container, "r")
>>> h64f["result_0"].dtype.name
'float64'
>>> h16f["result_0"].dtype.name
'float16'

Note that using lower-precision numerical data-types may substantially reduce the disk-space footprint of generated containers. Finally, both result_shape and result_dtype can be combined with write_worker_results = False to gather results of computational runs in local memory (not recommended). To tread lightly on client memory the following example only performs K = 5 concurrent evaluations of matconstruct

with ParallelMap(matconstruct,
                 a,
                 range(5),
                 n_workers=50,
                 result_shape=(None, M, N),
                 result_dtype="float16",
                 write_worker_results=False) as pmap:
    results = pmap.compute()

which yields

>>> results
[array([[[ 1.     , -0.0329 ,  0.2554 , -0.2394 ,  0.12286, -0.255  , -0.2352 ,  0.2335 ,  0.3445 ],
         [-0.0329 ,  1.     , -0.02238,  0.3845 , -0.1865 , -0.0376 , -0.02928, -0.2076 , -0.1846 ],
         [ 0.2554 , -0.02238,  1.     ,  0.0505 , -0.2776 , -0.2284 , -0.1227 , -0.2605 , -0.0252 ],
         [-0.2394 ,  0.3845 ,  0.0505 ,  1.     , -0.506  ,  0.05316,  0.417  ,  0.1661 , -0.2454 ],
         [ 0.12286, -0.1865 , -0.2776 , -0.506  ,  1.     , -0.05228, -0.519  ,  0.2091 , -0.1207 ],
         [-0.255  , -0.0376 , -0.2284 ,  0.05316, -0.05228,  1.     ,  0.209  , -0.233  , -0.2363 ],
         [-0.2352 , -0.02928, -0.1227 ,  0.417  , -0.519  ,  0.209  ,  1.     , -0.1864 , -0.07697],
         [ 0.2335 , -0.2076 , -0.2605 ,  0.1661 ,  0.2091 , -0.233  , -0.1864 ,  1.     ,  0.1531 ],
         [ 0.06573,  0.01949, -0.3123 , -0.215  ,  0.296  ,  0.162  , -0.1965 , -0.0765 ,  0.337  ],
         [ 0.3445 , -0.1846 , -0.0252 , -0.2454 , -0.1207 , -0.2363 , -0.07697,  0.1531 ,  1.     ]],
        [[ 1.     , -0.0329 ,  0.2554 , -0.2394 , -0.255  , -0.2352 ,  0.2335 ,  0.06573,  0.3445 ],
         [-0.0329 ,  1.     , -0.02238,  0.3845 , -0.0376 , -0.02928, -0.2076 ,  0.01949, -0.1846 ],
         [ 0.2554 , -0.02238,  1.     ,  0.0505 , -0.2284 , -0.1227 , -0.2605 , -0.3123 , -0.0252 ],
         [-0.2394 ,  0.3845 ,  0.0505 ,  1.     ,  0.05316,  0.417  ,  0.1661 , -0.215  , -0.2454 ],
         [ 0.12286, -0.1865 , -0.2776 , -0.506  , -0.05228, -0.519  ,  0.2091 ,  0.296  , -0.1207 ],
         [-0.255  , -0.0376 , -0.2284 ,  0.05316,  1.     ,  0.209  , -0.233  ,  0.162  , -0.2363 ],
         [-0.2352 , -0.02928, -0.1227 ,  0.417  ,  0.209  ,  1.     , -0.1864 , -0.1965 , -0.07697],
         [ 0.2335 , -0.2076 , -0.2605 ,  0.1661 , -0.233  , -0.1864 ,  1.     , -0.0765 ,  0.1531 ],
         [ 0.06573,  0.01949, -0.3123 , -0.215  ,  0.162  , -0.1965 , -0.0765 ,  1.     ,  0.337  ],
         [ 0.3445 , -0.1846 , -0.0252 , -0.2454 , -0.2363 , -0.07697,  0.1531 ,  0.337  ,  1.     ]],
        ...
        ...

Alternative Storage Format: Pickle (write_pickle)

In some cases it might be necessary to work with objects that are not HDF5 compatible, e.g., sparse matrices created by scipy.sparse. Consider

from scipy.sparse import spdiags
ndim = 4
x = spdiags(np.ones((ndim,)), 0, ndim, ndim)
y = spdiags(3 * np.ones((ndim,)), 0, ndim, ndim)

Then

>>> x
<4x4 sparse matrix of type '<class 'numpy.float64'>'
    with 4 stored elements (1 diagonals) in DIAgonal format>
>>> y
<4x4 sparse matrix of type '<class 'numpy.float64'>'
    with 4 stored elements (1 diagonals) in DIAgonal format>
>>> x.toarray()
array([[1., 0., 0., 0.],
    [0., 1., 0., 0.],
    [0., 0., 1., 0.],
    [0., 0., 0., 1.]])
>>> y.toarray()
array([[3., 0., 0., 0.],
    [0., 3., 0., 0.],
    [0., 0., 3., 0.],
    [0., 0., 0., 3.]])
>>> f(x, y)
<4x4 sparse matrix of type '<class 'numpy.float64'>'
    with 4 stored elements (1 diagonals) in DIAgonal format>

In this case, the default HDF5 storage format can be overridden using the keyword write_pickle

with ParallelMap(f, [x, x, x, x], y, write_pickle=True) as pmap:
    results = pmap.compute()

which yields

>>> results
['/my/current/workdir/ACME_20221007-100302-976973/f_0.pickle',
 '/my/current/workdir/ACME_20221007-100302-976973/f_1.pickle',
 '/my/current/workdir/ACME_20221007-100302-976973/f_2.pickle',
 '/my/current/workdir/ACME_20221007-100302-976973/f_3.pickle']

Note that pmap.results_container is None in this case, as no aggregate HDF5 container is generated.

Randomization and Concurrency (taskID)

ACME uses distributed.Client objects to perform concurrent function evaluations. Internally, distributed.Client.submit() is called to register a user-provided function func with dask’s distributed.Scheduler. The object reference to the function at time of submission is subsequently invoked by every parallel worker once the concurrent computation starts. In other words, every parallel worker uses the same identical version of func. As a consequence, random numbers generated inside of func during concurrent execution via ParallelMap are all based on the same seed. Consider

def rand1(x):
    rng = np.random.default_rng()
    return x * rng.random()

Executing rand1 ten times sequentially produces ten randomized scalars:

>>> import numpy as np
>>> x = np.pi
>>> n_calls = 10
>>> for _ in range(n_calls):
        print(rand1(x)))
0.17134908691066583
2.418723132470787
1.7704368838632325
2.190969197942654
1.3759020289180253
1.2496653180656538
0.27625615910822265
2.158340321806345
1.239542094404893
3.050249434982493

However, performing ten concurrent calls of rand using ParallelMap (write_worker_results is set to False for illustration purposes only)

with ParallelMap(rand1, x, n_inputs=n_calls, write_worker_results=False) as pmap:
    results = pmap.compute()

yields

>>> results
[1.7776464046015794,
 1.7776464046015794,
 1.7776464046015794,
 1.7776464046015794,
 1.7776464046015794,
 1.7776464046015794,
 1.7776464046015794,
 1.7776464046015794,
 1.7776464046015794,
 1.7776464046015794]

In order to use a different seed in every computational run, a unique identifier is required to differentiate runs. This can be achieved by a simple modification of rand1

def rand2(x, counter):
    rng = np.random.default_rng(counter)
    return x * rng.random()

The introduced counter can be integrated in ParallelMap by using a simple range

with ParallelMap(rand2, x, range(n_calls), write_worker_results=False) as pmap:
    results = pmap.compute()

which yields

>>> results
[2.0010741575072397,
 1.6079350561067187,
 0.8218787590475991,
 0.2690747942844946,
 2.9626981331891504,
 2.528991271356791,
 1.690693173008172,
 1.9637953256775056,
 1.0272137021115593,
 2.7339685059847834]

Note that internally ACME keeps track of computational runs by injecting the keyword taskID into user-provided functions.

Messaging verbosity and logging (verbose and logfile)

Suppose some function f has to be called for 20000 different values of z. Under the assumption that this computation takes a while, any run-time messages are to be written to a file my_log.txt

z = rng.integers(low=1, high=10, size=20000, endpoint=True)
with ParallelMap(f, x, y, z=z, logfile="my_log.txt") as pmap:
    results = pmap.compute()

To make ACME less “chatty” in its output, decrease the employed verbosity level:

z = rng.integers(low=1, high=10, size=20000, endpoint=True)
with ParallelMap(f, x, y, z=z, logfile="my_log.txt", verbose=False) as pmap:
    results = pmap.compute()

Conversely, by setting verbose to True all internal debug messages are logged alongside standard output.