ParallelMap

class acme.ParallelMap(func: Callable, *args: Any, n_inputs: Union[int, str] = 'auto', write_worker_results: bool = True, output_dir: Optional[str] = None, result_shape: Optional[tuple[Optional[int], ...]] = None, result_dtype: str = 'float', single_file: bool = False, write_pickle: bool = False, partition: str = 'auto', n_workers: Union[int, str] = 'auto', mem_per_worker: str = 'auto', setup_timeout: int = 60, setup_interactive: bool = True, stop_client: Union[bool, str] = 'auto', verbose: Optional[bool] = None, dryrun: bool = False, logfile: Optional[Union[bool, str]] = None, **kwargs: Optional[Any])[source]

Bases: object

Attributes Summary

argv

func

kwargv

n_inputs

objName

Methods Summary

cleanup()

Shortcut to corresponding cleanup-routine provided by ACMEdaemon

compute()

Shortcut to launch parallel computation via ACMEdaemon

prepare_input(func, n_inputs, *args, **kwargs)

User input parser

Attributes Documentation

argv = None
func = None
kwargv = None
n_inputs = None
objName = '<ParallelMap>'

Methods Documentation

cleanup() None[source]

Shortcut to corresponding cleanup-routine provided by ACMEdaemon

compute() None[source]

Shortcut to launch parallel computation via ACMEdaemon

prepare_input(func: Callable, n_inputs: Union[int, str], *args: Any, **kwargs: Optional[Any]) None[source]

User input parser

Ensure func can actually process provided arguments. If n_inputs was not set, attempt to infer the number of required concurrent function calls from args and kwargs. In addition, ensure the size of each argument is “reasonable” for propagation across multiple workers.

__init__(func: Callable, *args: Any, n_inputs: Union[int, str] = 'auto', write_worker_results: bool = True, output_dir: Optional[str] = None, result_shape: Optional[tuple[Optional[int], ...]] = None, result_dtype: str = 'float', single_file: bool = False, write_pickle: bool = False, partition: str = 'auto', n_workers: Union[int, str] = 'auto', mem_per_worker: str = 'auto', setup_timeout: int = 60, setup_interactive: bool = True, stop_client: Union[bool, str] = 'auto', verbose: Optional[bool] = None, dryrun: bool = False, logfile: Optional[Union[bool, str]] = None, **kwargs: Optional[Any]) None[source]

Context manager that executes user-defined functions in parallel

Parameters:
  • func (callable) – User-defined function to be executed concurrently. Input arguments and return values should be “simple” (i.e., regular Python objects or NumPy arrays). See Examples and [1] for more information.

  • args (arguments) – Positional arguments of func. Should be regular Python objects (lists, tuples, scalars, strings etc.) or NumPy arrays. See Examples and [1] for more information.

  • kwargs (keyword arguments) – Keyword arguments of func (if any). Should be regular Python objects (lists, tuples, scalars, strings etc.) or NumPy arrays. See Examples and [1] for more information.

  • n_inputs (int or "auto") – Number of times func is supposed to be called in parallel. Usually, n_inputs does not have to be provided explicitly. If n_inputs is “auto” (default) this quantity is inferred from provided args and kwargs. This estimation may fail due to ambiguous input arguments (e.g., args and/or kwargs contain lists of differing lengths) triggering a ValueError. Only then is it required to set n_input manually. See Examples and [1] for more information.

  • write_worker_results (bool) – If True, the return value(s) of func is/are saved on disk. If False, the output of all parallel calls of func is collected in memory. See Examples as well as [1] and [2] for more information.

  • output_dir (str or None) – Only relevant if write_worker_results is True. If output_dir is None (default) and write_worker_results is True, all files auto-generated by ParallelMap are stored in a directory ‘ACME_YYYYMMDD-hhmmss-ffffff’ (encoding the current time as YearMonthDay-HourMinuteSecond-Microsecond). The path to a custom output directory can be specified via providing output_dir. See Examples and [1] for more information.

  • result_shape (tuple or None) – Only relevant if write_pickle is False. If provided, return values of func are slotted into a (virtual) dataset (if write_worker_results is True) or array (otherwise) of shape result_shape, where a single None entry designates the stacking dimension. For instance, result_shape = (None, 100) implies that func returns a 100-element array which is to be stacked along the first dimension for each concurrent call of func resulting in a (n_inputs, 100) dataset or array. See Notes and Examples for details. See Examples as well as [1] and [2] for more information.

  • result_dtype (str) – Only relevant if result_shape is not None. Determines the numerical datatype of the dataset laid out by result_shape. By default, results are stored in float64 format. See [2] for more details.

  • single_file (bool) – Only relevant if write_worker_results is True and write_pickle is False. If single_file is False (default), the results of each parallel call of func are stored in dedicated HDF5 files, such that the auto- generated HDF5 results-container is a collection of symbolic links pointing to these files. Conversely, if single_file is True, all parallel workers write to the same results container (using a distributed file-locking mechanism). See [2] for more details.

  • write_pickle (bool) – Only relevant if write_worker_results is True. If True, the return value(s) of func is/are pickled to disk (one ‘.pickle’-file per parallel worker). See Examples as well as [1] and [2] for more information.

  • partition (str) – Name of SLURM partition to use. If “auto” (default), the memory footprint of func is estimated using dry-run stubs based on randomly sampling provided args and kwargs. Estimated memory usage dictates queue auto-selection under the assumption of short run-times (currently only supported on the ESI HPC cluster). For instance, on a predicted memory footprint of 6 GB causes the “8GBXS” partition to be selected (minimal but sufficient memory and shortest runtime). To override auto-selection, provide name of SLURM queue explicitly. See, e.g., esi_cluster_setup() for details.

  • n_workers (int or "auto") – Number of SLURM workers (=jobs) to spawn. If “auto” (default), then n_workers = n_inputs, i.e., every SLURM worker performs a single call of func. If n_inputs is large and executing func is fast, setting n_workers = int(n_inputs / 2) might be beneficial. See Examples as well as [1] and [2] for more information.

  • mem_per_worker (str) – Memory booking for each SLURM worker. If “auto” (default), the standard value is inferred from the used partition (if possible). See slurm_cluster_setup() for details.

  • setup_timeout (int) – Timeout period (in seconds) for SLURM workers to come online. Refer to keyword timeout in slurm_cluster_setup() for details.

  • setup_interactive (bool) – If True (default), user input is queried in case not enough SLURM workers could be started within setup_timeout seconds. If no input is provided, the current number of spawned workers is used (even if smaller than the amount requested by n_workers). If False, no user choice is requested. Refer to keyword interactive in slurm_cluster_setup()

  • stop_client (bool or "auto") – If “auto” (default), automatically started distributed computing clients are shut down at the end of computation, while user-provided clients are left untouched. If False, automatically started clients are left running after completion, user-provided clients are left untouched. If True, auto-generated clients and user-provided clients are shut down at the end of the computation. See Examples as well as [1] and [2] for more information.

  • verbose (None or bool) – If None (default), general run-time information as well as warnings and errors are shown. If True, additionally debug information is shown. If False, only warnings and errors are propagated. See [2] for more details.

  • dryrun (bool) – If True the user-provided function func is executed once using one of the input argument tuples prepared for the parallel workers (picked at random). If setup_interactive is True, a prompt asks if the actual parallel execution of func is supposed to be launched after the dry-run. The dryrun keyword is intended to to estimate memory consumption as well as runtime of worker jobs prior to the actual concurrent computation. See [1] and [2] for more information.

  • logfile (None or bool or str) – If None (default) and write_worker_results = True, a logfile is created alongside the auto-generated on-disk results. If None and write_worker_results = False, no logfile is created. To override this mechanism, either explicitly set logfile to True or False to enforce or suppress logfile creation. Alternatively, the name of a custom log-file can be provided. The verbosity of recorded runtime information can be controlled via setting verbose. See [2] for more details.

Returns:

results – If write_worker_results is True, results is a list of HDF5 file-names containing computed results. If write_worker_results is False, results is a list comprising the actual return values of func.

Return type:

list

Examples

Call f with four different values of x while setting y to 4:

from acme import ParallelMap

with ParallelMap(f, [2, 4, 6, 8], 4) as pmap:
    results = pmap.compute()

Collect results in memory (can be slow due to network traffic and may cause memory overflow in parent caller):

with ParallelMap(f, [2, 4, 6, 8], 4, write_worker_results=False) as pmap:
    results = pmap.compute()

Manually set n_inputs in case of argument distribution cannot be determined automatically:

with ParallelMap(f, [2, 4, 6, 8], y, n_inputs=4, write_worker_results=False) as pmap:
    results = pmap.compute()

More examples and tutorials are available in the ACME online documentation.

Notes

Please consult [1] for detailed usage information.

See also

esi_cluster_setup

spawn custom SLURM worker clients on the ESI HPC cluster

local_cluster_setup

start a local Dask multi-processing cluster on the host machine

ACMEdaemon

Manager class performing the actual concurrent processing

References