Advanced Usage and Customization
One of ACME’s main objectives is to be as application-agnostic as possible so that it can be a drop-in parallelization engine for a wide spectrum of use cases. Thus, ACME features a variety of options for custom-tailoring its functionality to specific problems. This section discusses settings details and associated technical intricacies.
User-Function Requirements
The user-provided function func has to meet some basic requirements to
permit parallel execution with ParallelMap:
input arguments of `func` should be regular Python objects (lists, tuples, scalars, strings etc.) or NumPy arrays. Custom user-defined classes may or may not work. In general, anything that can be serialized via cloudpickle should work out of the box.
if automatic result saving is used (
write_worker_resultsisTrue), the return value(s) of `func` have to be suitable for storage in HDF5 containers. Thus, anything returned byfuncshould be either purely numeric (scalars or NumPy arrays) or purely lexical (strings). Hybrid text/numeric data-types (e.g., Python dicts or Pandas dataframes), custom class instances, functions, generators or complex objects (like matplotlib figures) will not work.
Auto-Generated HDF5-Files
Directory Layout
All HDF5 files auto-generated by ParallelMap are stored in a directory
named ACME_YYYYMMDD-hhmmss-ffffff (encoding the current time as
YearMonthDay-HourMinuteSecond-Microsecond) that is created in the current
working directory (or the user’s home directory on /mnt/hpc if ACME is
running on the ESI or CoBIC HPC cluster).
By default, ACME generates n_inputs + 1 HDF5 files: one aggregate results
container and n_inputs payload HDF5 files for each computation. The aggregate
“main file” only contains links to the payload files and is called
funcname.h5 based on the name of the user-provided function.
The naming of the payload files follows the schema funcname_{compid}.h5,
where {compid} encodes the number of the performed computation (between 0
and n_inputs - 1):
ACME_YYYYMMDD-hhmmss-ffffff/
└─ funcname.h5
└─ funcname_payload/
└─ funcname_0.h5
└─ funcname_1.h5
└─ funcname_2.h5
.
.
.
└─ funcname_{n_inputs - 1}.h5
The main file funcname.h5 is composed of relative links to the payload files in
the funcname_payload subdirectory. This means, funcname.h5 is only
functional alongside its “payload” sub-folder. Therefore, if funcname.h5
is moved to another file-system location, its payload directory needs to be
moved as well.
Note
The number of computations carried out (i.e., how many times a user-provided
function func is called) does not have to be equals to the number of parallel
workers used to perform these calculations. Particularly if n_inputs
is large but evaluating func is computationally cheap, starting
n_inputs SLURM workers (=jobs) might take longer than the actual concurrent
calculation. In such a scenario it is usually better to use fewer workers
(set n_workers < n_inputs) that each perform multiple evaluations of func.
To make a HDF5 file with regular datasets based on the auto-generated
aggregate results container created by ACME, the h5py.Group.copy()
routine can be used:
import h5py
with h5py.File("new_file.h5", "w") as h5target:
with h5py.File(pmap.results_container, "r") as h5source:
for comp in h5source.keys():
h5source.copy(comp, h5target, expand_external=True, name=comp)
Note that ACME itself can create an aggregate results container with “regular” datasets
from the get-go by setting the keyword single_file to True in ParallelMap.
The potential benefits and downsides are discussed in Single vs. Multiple Result Files (single_file)
below.
Container Structure
The internal structure of all HDF5 files generated by ACME is kept as simple
as possible: the aggregate main file is partitioned into n_inputs groups
(‘comp_0’, ‘comp_1’, …) that each point to the respective payload file
actually holding the results of the corresponding computation. Within every
payload file each return value of the user-provided function func is saved in a
separate dataset in the file’s root group. For instance, processing
the following user-provided function
def this_func(a, b, c):
# ...some complicated calculations...
return r0, r1, r2
for 50 different input triplets (a, b, c) generates one aggregate container
this_func.h5 and a payload of 50 HDF5 files this_func_0.h5,
this_func_1.h5, …, this_func_49.h5. The aggregate results container
this_func.h5 is structured as follows:
this_func.h5
└─ comp_0
| └─ result_0
| └─ result_1
| └─ result_2
└─ comp_1
| └─ result_0
| └─ result_1
| └─ result_2
└─ comp_2
| └─ result_0
| └─ result_1
| └─ result_2
.
.
.
└─ comp_49
└─ result_0
└─ result_1
└─ result_2
Each payload file this_func_0.h5, this_func_1.h5, …, this_func_49.h5
contains three datasets “result_0” (holding r0), “result_1” (holding r1)
and “result_2” (holding r2) in its root group, e.g.,
this_func_33.h5
└─ result_0
└─ result_1
└─ result_2
User-provided functions with only a single return value correspondingly generate payload files that only contain one dataset (“result_0”) in their respective root group.
Single vs. Multiple Result Files (single_file)
By default, ACME generates a dedicated HDF5 file for every computational run
performed by ParallelMap leveraging the independent nature
of embarassingly parallel workloads. This strategy has the substantial advantage,
that parallel workers are independent when writing results to disk: every
worker generates dedicated payload files corresponding to the computational
runs it is processing. Not relying on a shared writing resource means saving
does not require any synchronization: no worker has to wait for another
worker to finish its write process and release a file-lock. Consequently,
even tasks with perfectly distributed workloads (all computational runs finish
at the same time) can jointly save their results without any wait time.
However, for some applications the creation of n_inputs payload files
might actually deteriorate performance. Depending on the underlying filesystem
generating numerous very small HDF containers may be detrimental to I/O throughput.
To remedy this, ParallelMap offers the option to write results
together in a joint output file by setting single_file to True.
Consider the function
def randout(x, y=3):
if x > 0:
return x / y
else:
return x * y
Suppose randout needs to be evaluated for 5000 values of x randomly
sampled from a standard normal distribution. To avoid the creation of 5000
payload files, use the single_file keyword in the invocation of
ParallelMap
import numpy as np
N = 5000
rng = np.random.default_rng()
x = rng.normal(size=N)
with ParallelMap(randout, x, n_workers=10, single_file=True) as pmap:
results = pmap.compute()
Note that the output does not mention the creation of a payload directory and
results is a single-element list that only contains pmap.results_container:
>>> results
['/my/current/workdir/ACME_20221007-100302-976973/randout.h5']
>>> pmap.results_container
'/my/current/workdir/ACME_20221007-100302-976973/randout.h5'
While the output of randout is small (a scalar), its execution time
for random independent input values is identical within measurement accuracy.
Thus, on a filesystem optimized for parallel I/O, running the given example
with single_file = False (default) is most likely significantly faster
than the approach shown above since parallel workers do not have to wait for
their turn to access the single results container.
Customized Stacking of Results (result_shape and result_dtype)
Most scientific data-processing functions do not return random unstructured objects but numerical data arranged in arrays. ACME offers options to slot incoming data into pre-defined (multi-dimensional) arrays for easier access. Consider the function
import numpy as np
def matconstruct(a, k):
rng = np.random.default_rng(seed=k)
i = rng.integers(low=0, high=a.shape[0], size=1)[0]
arr = np.delete(np.corrcoef(a), i, axis=1)
return arr
Calling matconstruct returns a 2d-array arr of shape (M, N).
Suppose, K = 200 of these arrays have to be arranged in a tensor of
shape (K, M, N). Instead of letting ACME create K HDF5 groups for
each call of matconstruct which then have to be accessed post-hoc to
create the desired array, the keyword result_shape can be used to tell
ParallelMap to slot results into a pre-allocated dataset.
import numpy as np
M = 10
N = M - 1
K = 200
a = np.random.default_rng().random((M, 2*M))
with ParallelMap(matconstruct, a, range(K), n_workers=50, result_shape=(None, M, N)) as pmap:
results = pmap.compute()
A single None entry in result_shape indicates the dimension along which
incoming results are to be stacked. Note that exactly one None entry
must be specified in result_shape.
Using result_shape impacts the container structure generated by ACME:
the results of each computational run do not need to be stored in dedicated
HDF5 groups (‘comp_0’, ‘comp_1’, …) but are slotted into a
Virtual Dataset. Thus, the
aggregate results container only contains the single Virtual Dataset “result_0”.
Note
By default, ACME uses
Virtual HDF5 Datasets
for slotting results of
concurrent computational runs. The real datasets in the generated payload
files are mapped together into a single virtual dataset via the a-priori
definition of a h5py.VirtualLayout. The constructed Virtual Dataset can be sliced,
viewed and loaded like a regular HDF5 dataset with the Virtual Layout acting
as interface layer for fetching the requested data from the associated payload
files. This strategy provides a simple single-dataset interface to access
results while maintaining the benefit of independent file access for parallel
workers. Note that ACME can also create a single regular dataset in a single
results container by combining result_shape with single_file = True
which comes with all benefits and downsides discussed in Single vs. Multiple Result Files (single_file).
Now consider the case of matconstruct returning multiple quantities:
def matconstruct_multi(a, k):
rng = np.random.default_rng(seed=k)
i = rng.integers(low=0, high=a.shape[0], size=1)[0]
arr = np.delete(np.corrcoef(a), i, axis=1)
return arr, k, np.linalg.svd(arr, compute_uv=False)
In this case, using result_shape when calling ParallelMap
only affects the first return variable arr, the remaining two quantities
(k and an array containing arr’s singular values) are filed under K
HDF5 groups (“comp_0”, …, “comp_{K}”) each containing two datasets
corresponding to the non-slotted return quantities:
with ParallelMap(matconstruct_multi, a, range(K), n_workers=50, result_shape=(None, M, N)) as pmap_multi:
results = pmap_multi.compute()
Then (focusing on “comp_56” as an exemplary group)
>>> h5f = h5py.File(pmap_multi.results_container, "r")
>>> h5f.keys()
<KeysViewHDF5 ['comp_0', 'comp_1', ... , 'comp_199', 'result_0']>
>>> h5f["result_0"]
<HDF5 dataset "result_0": shape (200, 10, 9), type "<f8">
>>> h5f["comp_56"].keys()
<KeysViewHDF5 ['result_1', 'result_2']>
>>> h5f["comp_56"]["result_1"][()]
56
>>> h5f["comp_56"]["result_2"][()]
array([2.21726934, 1.96445424, 1.35668273, 0.96739928, 0.94735141,
0.78221836, 0.49308408, 0.2719983 , 0.17343296])
By default, ACME assumes the virtual dataset to contain 64-bit floating point
numbers. A different numerical datatype can be specified via the result_dtype
keyword:
with ParallelMap(matconstruct, a, range(K), n_workers=50, result_shape=(None, M, N), result_dtype="float16") as pmap16:
results = pmap16.compute()
Then
>>> h64f = h5py.File(pmap.results_container, "r")
>>> h16f = h5py.File(pmap16.results_container, "r")
>>> h64f["result_0"].dtype.name
'float64'
>>> h16f["result_0"].dtype.name
'float16'
Note that using lower-precision numerical data-types may substantially reduce
the disk-space footprint of generated containers. Finally, both result_shape
and result_dtype can be combined with write_worker_results = False
to gather results of computational runs in local memory (not recommended).
To tread lightly on client memory the following example only performs K = 5
concurrent evaluations of matconstruct
with ParallelMap(matconstruct,
a,
range(5),
n_workers=50,
result_shape=(None, M, N),
result_dtype="float16",
write_worker_results=False) as pmap:
results = pmap.compute()
which yields
>>> results
[array([[[ 1. , -0.0329 , 0.2554 , -0.2394 , 0.12286, -0.255 , -0.2352 , 0.2335 , 0.3445 ],
[-0.0329 , 1. , -0.02238, 0.3845 , -0.1865 , -0.0376 , -0.02928, -0.2076 , -0.1846 ],
[ 0.2554 , -0.02238, 1. , 0.0505 , -0.2776 , -0.2284 , -0.1227 , -0.2605 , -0.0252 ],
[-0.2394 , 0.3845 , 0.0505 , 1. , -0.506 , 0.05316, 0.417 , 0.1661 , -0.2454 ],
[ 0.12286, -0.1865 , -0.2776 , -0.506 , 1. , -0.05228, -0.519 , 0.2091 , -0.1207 ],
[-0.255 , -0.0376 , -0.2284 , 0.05316, -0.05228, 1. , 0.209 , -0.233 , -0.2363 ],
[-0.2352 , -0.02928, -0.1227 , 0.417 , -0.519 , 0.209 , 1. , -0.1864 , -0.07697],
[ 0.2335 , -0.2076 , -0.2605 , 0.1661 , 0.2091 , -0.233 , -0.1864 , 1. , 0.1531 ],
[ 0.06573, 0.01949, -0.3123 , -0.215 , 0.296 , 0.162 , -0.1965 , -0.0765 , 0.337 ],
[ 0.3445 , -0.1846 , -0.0252 , -0.2454 , -0.1207 , -0.2363 , -0.07697, 0.1531 , 1. ]],
[[ 1. , -0.0329 , 0.2554 , -0.2394 , -0.255 , -0.2352 , 0.2335 , 0.06573, 0.3445 ],
[-0.0329 , 1. , -0.02238, 0.3845 , -0.0376 , -0.02928, -0.2076 , 0.01949, -0.1846 ],
[ 0.2554 , -0.02238, 1. , 0.0505 , -0.2284 , -0.1227 , -0.2605 , -0.3123 , -0.0252 ],
[-0.2394 , 0.3845 , 0.0505 , 1. , 0.05316, 0.417 , 0.1661 , -0.215 , -0.2454 ],
[ 0.12286, -0.1865 , -0.2776 , -0.506 , -0.05228, -0.519 , 0.2091 , 0.296 , -0.1207 ],
[-0.255 , -0.0376 , -0.2284 , 0.05316, 1. , 0.209 , -0.233 , 0.162 , -0.2363 ],
[-0.2352 , -0.02928, -0.1227 , 0.417 , 0.209 , 1. , -0.1864 , -0.1965 , -0.07697],
[ 0.2335 , -0.2076 , -0.2605 , 0.1661 , -0.233 , -0.1864 , 1. , -0.0765 , 0.1531 ],
[ 0.06573, 0.01949, -0.3123 , -0.215 , 0.162 , -0.1965 , -0.0765 , 1. , 0.337 ],
[ 0.3445 , -0.1846 , -0.0252 , -0.2454 , -0.2363 , -0.07697, 0.1531 , 0.337 , 1. ]],
...
...
Unlimited Dataset Dimensions (np.inf in result_shape)
Sometimes the final shape of an array is not straight-forward to predict
upfront. Assume sensor data acquired by 200 probes needs to be smoothed
and stored in a single array for further downstream processing. Each sensor
emits a single data-point per time step, i.e., a 1D time-series, start and
stop of sensor recordings have been synchronized by hardware triggers.
Thus, a set of 200 1D-timeseries of the same length needs to be smoothed
and stored in a 200 x nSamples array, where nSamples is not known
a-priori. Instead of manually inspecting the sensor data to garner the exact
value of nSamples, ACME can allocate HDF5 datasets of variable dimensions
by using np.inf in result_shape:
import numpy as np
from scipy.ndimage import uniform_filter1d
from acme import ParallelMap
# Construct 200 artificial time-series
# (sine wave + 10% additive Gaussian noise)
# mocking sensor data
nChannels = 200
sine_wave = np.sin(np.linspace(0, 2*np.pi, 15000))
sensor_data = [sine_wave + 0.1 * np.random.randn(sine_wave.size) for _ in range(nChannels)]
# Apply a moving average filter to all channels in parallel
with ParallelMap(uniform_filter1d,
sensor_data,
size=10,
mode="nearest",
n_workers=10,
result_shape=(None, np.inf)) as pmap:
pmap.compute()
Inspecting the generated HDF5 container shows that ACME created a dataset with 200 rows and an “unlimited” number of columns (up to the HDF5 per-axis limit of 2^64).
>>> import h5py
>>> h5f = h5py.File(pmap.results_container, "r")
>>> h5f["result_0"].maxshape # no limit on no. of columns
(200, None)
>>> h5f["result_0"].shape # actual shape correctly inferred from data
(200, 15000)
>>> h5f["result_0"].is_virtual # still a virtual dataset
True
Note
Unlimited dimensions are supported for on-disk results in HDF5 containers,
for both virtual and regular datasets (single_file = True or single_file = False)
However, in-memory NumPy arrays (write_worker_results = False) do not support
unlimited dimensions (pickle files cannot process shape specifications at all).
Alternative Storage Format: Pickle (write_pickle)
In some cases it might be necessary to work with objects that are not
HDF5 compatible, e.g., sparse matrices created by scipy.sparse. Consider
from scipy.sparse import spdiags
ndim = 4
x = spdiags(np.ones((ndim,)), 0, ndim, ndim)
y = spdiags(3 * np.ones((ndim,)), 0, ndim, ndim)
Then
>>> x
<4x4 sparse matrix of type '<class 'numpy.float64'>'
with 4 stored elements (1 diagonals) in DIAgonal format>
>>> y
<4x4 sparse matrix of type '<class 'numpy.float64'>'
with 4 stored elements (1 diagonals) in DIAgonal format>
>>> x.toarray()
array([[1., 0., 0., 0.],
[0., 1., 0., 0.],
[0., 0., 1., 0.],
[0., 0., 0., 1.]])
>>> y.toarray()
array([[3., 0., 0., 0.],
[0., 3., 0., 0.],
[0., 0., 3., 0.],
[0., 0., 0., 3.]])
>>> f(x, y)
<4x4 sparse matrix of type '<class 'numpy.float64'>'
with 4 stored elements (1 diagonals) in DIAgonal format>
In this case, the default HDF5 storage format can be overridden using the
keyword write_pickle
with ParallelMap(f, [x, x, x, x], y, write_pickle=True) as pmap:
results = pmap.compute()
which yields
>>> results
['/my/current/workdir/ACME_20221007-100302-976973/f_0.pickle',
'/my/current/workdir/ACME_20221007-100302-976973/f_1.pickle',
'/my/current/workdir/ACME_20221007-100302-976973/f_2.pickle',
'/my/current/workdir/ACME_20221007-100302-976973/f_3.pickle']
Note that pmap.results_container is None in this case, as no aggregate
HDF5 container is generated.
Randomization and Concurrency (taskID)
ACME uses distributed.Client objects to perform concurrent function
evaluations. Internally, distributed.Client.submit() is called to register
a user-provided function func with dask’s distributed.Scheduler.
The object reference to the function at time of submission is subsequently
invoked by every parallel worker once the concurrent computation starts.
In other words, every parallel worker uses the same identical version of func.
As a consequence, random numbers generated inside of func during concurrent
execution via ParallelMap are all based on the same seed.
Consider
def rand1(x):
rng = np.random.default_rng()
return x * rng.random()
Executing rand1 ten times sequentially produces ten randomized scalars:
>>> import numpy as np
>>> x = np.pi
>>> n_calls = 10
>>> for _ in range(n_calls):
print(rand1(x)))
0.17134908691066583
2.418723132470787
1.7704368838632325
2.190969197942654
1.3759020289180253
1.2496653180656538
0.27625615910822265
2.158340321806345
1.239542094404893
3.050249434982493
However, performing ten concurrent calls of rand using ParallelMap
(write_worker_results is set to False for illustration purposes only)
with ParallelMap(rand1, x, n_inputs=n_calls, write_worker_results=False) as pmap:
results = pmap.compute()
yields
>>> results
[1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794,
1.7776464046015794]
In order to use a different seed in every computational run, a unique identifier
is required to differentiate runs. This can be achieved by a simple modification
of rand1
def rand2(x, counter):
rng = np.random.default_rng(counter)
return x * rng.random()
The introduced counter can be integrated in ParallelMap by
using a simple range
with ParallelMap(rand2, x, range(n_calls), write_worker_results=False) as pmap:
results = pmap.compute()
which yields
>>> results
[2.0010741575072397,
1.6079350561067187,
0.8218787590475991,
0.2690747942844946,
2.9626981331891504,
2.528991271356791,
1.690693173008172,
1.9637953256775056,
1.0272137021115593,
2.7339685059847834]
Note that internally ACME keeps track of computational runs by injecting the
keyword taskID into user-provided functions.
Messaging verbosity and logging (verbose and logfile)
Suppose some function f has to be called for 20000 different values of z.
Under the assumption that this computation takes a while, any run-time
messages are to be written to a file my_log.txt
z = rng.integers(low=1, high=10, size=20000, endpoint=True)
with ParallelMap(f, x, y, z=z, logfile="my_log.txt") as pmap:
results = pmap.compute()
To make ACME less “chatty” in its output, decrease the employed verbosity level:
z = rng.integers(low=1, high=10, size=20000, endpoint=True)
with ParallelMap(f, x, y, z=z, logfile="my_log.txt", verbose=False) as pmap:
results = pmap.compute()
Conversely, by setting verbose to True all internal debug messages
are logged alongside standard output.