Advanced Usage and Customization
One of ACME’s main objectives is to be as application-agnostic as possible so that it can be a drop-in parallelization engine for a wide spectrum of use cases. Thus, ACME features a variety of options for custom-tailoring its functionality to specific problems. This section discusses settings details and associated technical intricacies.
User-Function Requirements
The user-provided function func has to meet some basic requirements to
permit parallel execution with ParallelMap
:
input arguments of `func` should be regular Python objects (lists, tuples, scalars, strings etc.) or NumPy arrays. Custom user-defined classes may or may not work. In general, anything that can be serialized via cloudpickle should work out of the box.
if automatic result saving is used (
write_worker_results
isTrue
), the return value(s) of `func` have to be suitable for storage in HDF5 containers. Thus, anything returned byfunc
should be either purely numeric (scalars or NumPy arrays) or purely lexical (strings). Hybrid text/numeric data-types (e.g., Pandas dataframes), custom class instances, functions, generators or complex objects (like matplotlib figures) will not work.
Auto-Generated HDF5-Files
Directory Layout
All HDF5 files auto-generated by ParallelMap
are stored in a directory
named ACME_YYYYMMDD-hhmmss-ffffff
(encoding the current time as
YearMonthDay-HourMinuteSecond-Microsecond
) that is created in the user’s
home directory on /cs
(if ACME is running on the ESI HPC cluster) or the
current working directory.
By default, ACME generates n_inputs + 1
HDF5 files: one aggregate results
container and n_inputs
payload HDF5 files for each computation. The aggregate
“main file” only contains links to the payload files and is called
funcname.h5
based on the name of the user-provided function.
The naming of the payload files follows the schema funcname_{compid}.h5
,
where {compid}
encodes the number of the performed computation (between 0
and n_inputs - 1
):
ACME_YYYYMMDD-hhmmss-ffffff/
└─ funcname.h5
└─ funcname_payload/
└─ funcname_0.h5
└─ funcname_1.h5
└─ funcname_2.h5
.
.
.
└─ funcname_{n_inputs - 1}.h5
The main file funcname.h5
is composed of relative links to the payload files in
the funcname_payload
subdirectory. This means, funcname.h5
is only
functional alongside its “payload” sub-folder. Therefore, if funcname.h5
is moved to another file-system location, its payload directory needs to be
moved as well.
Note
The number of computations carried out (i.e., how many times a user-provided
function func
is called) does not have to be equals to the number of parallel
workers used to perform these calculations. Particularly if n_inputs
is large but evaluating func
is computationally cheap, starting
n_inputs
SLURM workers (=jobs) might take longer than the actual concurrent
calculation. In such a scenario it is usually better to use fewer workers
(set n_workers < n_inputs
) that each perform multiple evaluations of func
.
To make a HDF5 file with regular datasets based on the auto-generated
aggregate results container created by ACME, the h5py.Group.copy()
routine can be used:
import h5py
with h5py.File("new_file.h5", "w") as h5target:
with h5py.File(pmap.results_container, "r") as h5source:
for comp in h5source.keys():
h5source.copy(comp, h5target, expand_external=True, name=comp)
Note that ACME itself can create an aggregate results container with “regular” datasets
from the get-go by setting the keyword single_file
to True
in ParallelMap
.
The potential benefits and downsides are discussed in Single vs. Multiple Result Files (single_file)
below.
Container Structure
The internal structure of all HDF5 files generated by ACME is kept as simple
as possible: the aggregate main file is partitioned into n_inputs
groups
(‘comp_0’, ‘comp_1’, …) that each points to the respective payload file
actually holding the results of the corresponding computation. Within every
payload file each return value of the user-provided function func
is saved in a
separate dataset in the file’s root group. For instance, processing
the following user-provided function
def this_func(a, b, c):
# ...some complicated calculations...
return r0, r1, r2
for 50 different input triplets (a, b, c)
generates one aggregate container
this_func.h5
and a payload of 50 HDF5 files this_func_0.h5
,
this_func_1.h5
, …, this_func_49.h5
. The aggregate results container
this_func.h5
is structured as follows:
this_func.h5
└─ comp_0
| └─ result_0
| └─ result_1
| └─ result_2
└─ comp_1
| └─ result_0
| └─ result_1
| └─ result_2
└─ comp_2
| └─ result_0
| └─ result_1
| └─ result_2
.
.
.
└─ comp_49
└─ result_0
└─ result_1
└─ result_2
Each payload file this_func_0.h5
, this_func_1.h5
, …, this_func_49.h5
contains three datasets “result_0” (holding r0
), “result_1” (holding r1
)
and “result_2” (holding r2
) in its root group, e.g.,
this_func_33.h5
└─ result_0
└─ result_1
└─ result_2
User-provided functions with only a single return value correspondingly generate payload files that only contain one dataset (“result_0”) in their respective root group.
Single vs. Multiple Result Files (single_file
)
By default, ACME generates a dedicated HDF5 file for every computational run
performed by ParallelMap
leveraging the independent nature
of embarassingly parallel workloads. This strategy has the substantial advantage,
that parallel workers are independent when writing results to disk: every
worker generates dedicated payload files corresponding to the computational
runs it is processing. Not relying on a shared writing resource means saving
does not require any synchronization: no worker has to wait for another
worker to finish its write process and release a file-lock. Consequently,
even tasks with perfectly distributed workloads (all computational runs finish
at the same time) can jointly save their results without any wait time.
However, for some applications the creation of n_inputs
payload files
might actually deteriorate performance. Depending on the underlying filesystem
generating numerous very small HDF containers may be detrimental to I/O throughput.
To remedy this, ParallelMap
offers the option to write results
together in a joint output file by setting single_file
to True
.
Consider the function
def randout(x, y=3):
if x > 0:
return x / y
else:
return x * y
Suppose randout
needs to be evaluated for 5000 values of x
randomly
sampled from a standard normal distribution. To avoid the creation of 5000
payload files, use the single_file
keyword in the invocation of
ParallelMap
import numpy as np
N = 5000
rng = np.random.default_rng()
x = rng.normal(size=N)
with ParallelMap(randout, x, n_workers=10, single_file=True) as pmap:
results = pmap.compute()
Note that the output does not mention the creation of a payload directory and
results
is a single-element list that only contains pmap.results_container
:
>>> results
['/my/current/workdir/ACME_20221007-100302-976973/randout.h5']
>>> pmap.results_container
'/my/current/workdir/ACME_20221007-100302-976973/randout.h5'
While the output of randout
is small (a scalar), its execution time
for random independent input values is identical within measurement accuracy.
Thus, on a filesystem optimized for parallel I/O, running the given example
with single_file = False
(default) is most likely significantly faster
than the approach shown above since parallel workers do not have to wait for
their turn to access the single results container.
Customized Stacking of Results (result_shape
and result_dtype
)
Most scientific data-processing functions do not return random unstructured objects but numerical data arranged in arrays. ACME offers options to slot incoming data into pre-defined (multi-dimensional) arrays for easier access. Consider the function
import numpy as np
def matconstruct(a, k):
rng = np.random.default_rng(seed=k)
i = rng.integers(low=0, high=a.shape[0], size=1)[0]
arr = np.delete(np.corrcoef(a), i, axis=1)
return arr
Calling matconstruct
returns a 2d-array arr
of shape (M, N)
.
Suppose, K = 200
of these arrays have to be arranged in a tensor of
shape (K, M, N)
. Instead of letting ACME create K
HDF5 groups for
each call of matconstruct
which then have to be accessed post-hoc to
create the desired array, the keyword result_shape
can be used to tell
ParallelMap
to slot results into a pre-allocated dataset.
import numpy as np
M = 10
N = M -1
K = 200
a = np.random.default_rng().random((M, 2*M))
with ParallelMap(matconstruct, a, range(K), n_workers=50, result_shape=(None, M, N)) as pmap:
results = pmap.compute()
A single None
entry in result_shape
indicates the dimension along which
incoming results are to be stacked. Note that exactly one None
entry
must be specified in result_shape
.
Using result_shape
impacts the container structure generated by ACME:
the results of each computational run do not need to be stored in dedicated
HDF5 groups (‘comp_0’, ‘comp_1’, …) but are slotted into a
Virtual Dataset. Thus, the
aggregate results container only contains the single Virtual Dataset “result_0”.
Note
By default, ACME uses
Virtual HDF5 Datasets
for slotting results of
concurrent computational runs. The real datasets in the generated payload
files are mapped together into a single virtual dataset via the a-priori
definition of a h5py.VirtualLayout
. The constructed Virtual Dataset can be sliced,
viewed and loaded like a regular HDF5 dataset with the Virtual Layout acting
as interface layer for fetching the requested data from the associated payload
files. This strategy provides a simple single-dataset interface to access
results while maintaining the benefit of independent file access for parallel
workers. Note that ACME can also create a single regular dataset in a single
results container by combining result_shape
with single_file = True
which comes with all benefits and downsides discussed in Single vs. Multiple Result Files (single_file).
Now consider the case of matconstruct
returning multiple quantities:
def matconstruct_multi(a, k):
rng = np.random.default_rng(seed=k)
i = rng.integers(low=0, high=a.shape[0], size=1)[0]
arr = np.delete(np.corrcoef(a), i, axis=1)
return arr, k, np.linalg.svd(arr, compute_uv=False)
In this case, using result_shape
when calling ParallelMap
only affects the first return variable arr
, the remaining two quantities
(k
and an array containing arr
’s singular values) are filed under K
HDF5 groups (“comp_0”, …, “comp_{K}”) each containing two datasets
corresponding to the non-slotted return quantities:
with ParallelMap(matconstruct_multi, a, range(K), n_workers=50, result_shape=(None, M, N)) as pmap_multi:
results = pmap_multi.compute()
Then (focusing on “comp_56” as an exemplary group)
>>> h5f = h5py.File(pmap_multi.results_container, "r")
>>> h5f.keys()
<KeysViewHDF5 ['comp_0', 'comp_1', ... , 'comp_199', 'result_0']>
>>> h5f["result_0"]
<HDF5 dataset "result_0": shape (200, 10, 9), type "<f8">
>>> h5f["comp_56"].keys()
<KeysViewHDF5 ['result_1', 'result_2']>
>>> h5f["comp_56"]["result_1"][()]
56
>>> h5f["comp_56"]["result_2"][()]
array([2.21726934, 1.96445424, 1.35668273, 0.96739928, 0.94735141,
0.78221836, 0.49308408, 0.2719983 , 0.17343296])
By default, ACME assumes the virtual dataset to contain 64-bit floating point
numbers. A different numerical datatype can be specified via the result_dtype
keyword:
with ParallelMap(matconstruct, a, range(K), n_workers=50, result_shape=(None, M, N), result_dtype="float16") as pmap16:
results = pmap16.compute()
Then
>>> h64f = h5py.File(pmap.results_container, "r")
>>> h16f = h5py.File(pmap16.results_container, "r")
>>> h64f["result_0"].dtype.name
'float64'
>>> h16f["result_0"].dtype.name
'float16'
Note that using lower-precision numerical data-types may substantially reduce
the disk-space footprint of generated containers. Finally, both result_shape
and result_dtype
can be combined with write_worker_results = False
to gather results of computational runs in local memory (not recommended).
To tread lightly on client memory the following example only performs K = 5
concurrent evaluations of matconstruct
with ParallelMap(matconstruct,
a,
range(5),
n_workers=50,
result_shape=(None, M, N),
result_dtype="float16",
write_worker_results=False) as pmap:
results = pmap.compute()
which yields
>>> results
[array([[[ 1. , -0.0329 , 0.2554 , -0.2394 , 0.12286, -0.255 , -0.2352 , 0.2335 , 0.3445 ],
[-0.0329 , 1. , -0.02238, 0.3845 , -0.1865 , -0.0376 , -0.02928, -0.2076 , -0.1846 ],
[ 0.2554 , -0.02238, 1. , 0.0505 , -0.2776 , -0.2284 , -0.1227 , -0.2605 , -0.0252 ],
[-0.2394 , 0.3845 , 0.0505 , 1. , -0.506 , 0.05316, 0.417 , 0.1661 , -0.2454 ],
[ 0.12286, -0.1865 , -0.2776 , -0.506 , 1. , -0.05228, -0.519 , 0.2091 , -0.1207 ],
[-0.255 , -0.0376 , -0.2284 , 0.05316, -0.05228, 1. , 0.209 , -0.233 , -0.2363 ],
[-0.2352 , -0.02928, -0.1227 , 0.417 , -0.519 , 0.209 , 1. , -0.1864 , -0.07697],
[ 0.2335 , -0.2076 , -0.2605 , 0.1661 , 0.2091 , -0.233 , -0.1864 , 1. , 0.1531 ],
[ 0.06573, 0.01949, -0.3123 , -0.215 , 0.296 , 0.162 , -0.1965 , -0.0765 , 0.337 ],
[ 0.3445 , -0.1846 , -0.0252 , -0.2454 , -0.1207 , -0.2363 , -0.07697, 0.1531 , 1. ]],
[[ 1. , -0.0329 , 0.2554 , -0.2394 , -0.255 , -0.2352 , 0.2335 , 0.06573, 0.3445 ],
[-0.0329 , 1. , -0.02238, 0.3845 , -0.0376 , -0.02928, -0.2076 , 0.01949, -0.1846 ],
[ 0.2554 , -0.02238, 1. , 0.0505 , -0.2284 , -0.1227 , -0.2605 , -0.3123 , -0.0252 ],
[-0.2394 , 0.3845 , 0.0505 , 1. , 0.05316, 0.417 , 0.1661 , -0.215 , -0.2454 ],
[ 0.12286, -0.1865 , -0.2776 , -0.506 , -0.05228, -0.519 , 0.2091 , 0.296 , -0.1207 ],
[-0.255 , -0.0376 , -0.2284 , 0.05316, 1. , 0.209 , -0.233 , 0.162 , -0.2363 ],
[-0.2352 , -0.02928, -0.1227 , 0.417 , 0.209 , 1. , -0.1864 , -0.1965 , -0.07697],
[ 0.2335 , -0.2076 , -0.2605 , 0.1661 , -0.233 , -0.1864 , 1. , -0.0765 , 0.1531 ],
[ 0.06573, 0.01949, -0.3123 , -0.215 , 0.162 , -0.1965 , -0.0765 , 1. , 0.337 ],
[ 0.3445 , -0.1846 , -0.0252 , -0.2454 , -0.2363 , -0.07697, 0.1531 , 0.337 , 1. ]],
...
...
Alternative Storage Format: Pickle (write_pickle
)
In some cases it might be necessary to work with objects that are not
HDF5 compatible, e.g., sparse matrices created by scipy.sparse
. Consider
from scipy.sparse import spdiags
ndim = 4
x = spdiags(np.ones((ndim,)), 0, ndim, ndim)
y = spdiags(3 * np.ones((ndim,)), 0, ndim, ndim)
Then
>>> x
<4x4 sparse matrix of type '<class 'numpy.float64'>'
with 4 stored elements (1 diagonals) in DIAgonal format>
>>> y
<4x4 sparse matrix of type '<class 'numpy.float64'>'
with 4 stored elements (1 diagonals) in DIAgonal format>
>>> x.toarray()
array([[1., 0., 0., 0.],
[0., 1., 0., 0.],
[0., 0., 1., 0.],
[0., 0., 0., 1.]])
>>> y.toarray()
array([[3., 0., 0., 0.],
[0., 3., 0., 0.],
[0., 0., 3., 0.],
[0., 0., 0., 3.]])
>>> f(x, y)
<4x4 sparse matrix of type '<class 'numpy.float64'>'
with 4 stored elements (1 diagonals) in DIAgonal format>
In this case, the default HDF5 storage format can be overridden using the
keyword write_pickle
with ParallelMap(f, [x, x, x, x], y, write_pickle=True) as pmap:
results = pmap.compute()
which yields
>>> results
['/my/current/workdir/ACME_20221007-100302-976973/f_0.pickle',
'/my/current/workdir/ACME_20221007-100302-976973/f_1.pickle',
'/my/current/workdir/ACME_20221007-100302-976973/f_2.pickle',
'/my/current/workdir/ACME_20221007-100302-976973/f_3.pickle']
Note that pmap.results_container
is None
in this case, as no aggregate
HDF5 container is generated.
Randomization and Concurrency (taskID
)
ACME uses distributed.Client
objects to perform concurrent function
evaluations. Internally, distributed.Client.submit()
is called to register
a user-provided function func
with dask’s distributed.Scheduler
.
The object reference to the function at time of submission is subsequently
invoked by every parallel worker once the concurrent computation starts.
In other words, every parallel worker uses the same identical version of func
.
As a consequence, random numbers generated inside of func
during concurrent
execution via ParallelMap
are all based on the same seed.
Consider
def rand1(x):
rng = np.random.default_rng()
return x * rng.random()
Executing rand1
ten times sequentially produces ten randomized scalars:
>>> import numpy as np
>>> x = np.pi
>>> n_calls = 10
>>> for _ in range(n_calls):
print(rand1(x)))
0.17134908691066583
2.418723132470787
1.7704368838632325
2.190969197942654
1.3759020289180253
1.2496653180656538
0.27625615910822265
2.158340321806345
1.239542094404893
3.050249434982493
However, performing ten concurrent calls of rand
using ParallelMap
(write_worker_results
is set to False
for illustration purposes only)
with ParallelMap(rand1, x, n_inputs=n_calls, write_worker_results=False) as pmap:
results = pmap.compute()
yields
>>> results
[1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794]
In order to use a different seed in every computational run, a unique identifier
is required to differentiate runs. This can be achieved by a simple modification
of rand1
def rand2(x, counter):
rng = np.random.default_rng(counter)
return x * rng.random()
The introduced counter can be integrated in ParallelMap
by
using a simple range
with ParallelMap(rand2, x, range(n_calls), write_worker_results=False) as pmap:
results = pmap.compute()
which yields
>>> results
[2.0010741575072397,
1.6079350561067187,
0.8218787590475991,
0.2690747942844946,
2.9626981331891504,
2.528991271356791,
1.690693173008172,
1.9637953256775056,
1.0272137021115593,
2.7339685059847834]
Note that internally ACME keeps track of computational runs by injecting the
keyword taskID
into user-provided functions.
Messaging verbosity and logging (verbose
and logfile
)
Suppose some function f
has to be called for 20000 different values of z
.
Under the assumption that this computation takes a while, any run-time
messages are to be written to a file my_log.txt
z = rng.integers(low=1, high=10, size=20000, endpoint=True)
with ParallelMap(f, x, y, z=z, logfile="my_log.txt") as pmap:
results = pmap.compute()
To make ACME less “chatty” in its output, decrease the employed verbosity level:
z = rng.integers(low=1, high=10, size=20000, endpoint=True)
with ParallelMap(f, x, y, z=z, logfile="my_log.txt", verbose=False) as pmap:
results = pmap.compute()
Conversely, by setting verbose
to True
all internal debug messages
are logged alongside standard output.