Source code for acme.backend

#
# Computational scaffolding for user-interface
#
# Copyright © 2020-2025 Ernst Strüngmann Institute (ESI) for Neuroscience
# in Cooperation with Max Planck Society
# Copyright © 2026 Ernst Strüngmann Institute (ESI) of the Max Planck Society
#
# SPDX-License-Identifier: BSD-3-Clause
#

# Builtin/3rd party package imports
import os
import sys
import logging
import dask.distributed as dd
from typing import TYPE_CHECKING, Optional, Any, Union, List

# Local imports
from .logger import prepare_log
from .validators import validate_pmap
from .config import ACMEConfig
from .memory_profiler import MemoryProfiler
from .argument_processor import ArgumentProcessor
from .results.output_setup import OutputDirectoryManager, HDF5ContainerFactory
from .results.result_handler import ResultStorageManager
from .results.post_processor import ResultPostProcessor
from .client_orchestrator import ClientOrchestrator

isSpyModule = False
if "syncopy" in sys.modules:  # pragma: no cover
    isSpyModule = True
if TYPE_CHECKING:  # pragma: no cover
    from frontend import ParallelMap

__all__: List["str"] = ["ACMEdaemon"]

# Fetch logger
log = logging.getLogger("ACME")


# Main manager for parallel execution of user-defined functions
[docs] class ACMEdaemon(object): # Restrict valid class attributes __slots__ = ("results_container", "config", "processor", "profiler", "orchestrator")
[docs] def __init__( self, pmap: "ParallelMap", n_workers: Union[int, str] = "auto", write_worker_results: bool = True, output_dir: Optional[str] = None, result_shape: Optional[tuple[Optional[int], ...]] = None, result_dtype: str = "float", single_file: bool = False, write_pickle: bool = False, dryrun: bool = False, partition: str = "auto", mem_per_worker: str = "auto", setup_timeout: int = 60, setup_interactive: bool = True, stop_client: Union[bool, str] = "auto", verbose: Optional[bool] = None, logfile: Optional[Union[bool, str]] = None, cleanup_threshold_days: Optional[int] = None, ) -> None: """ Manager class for performing concurrent user function calls Parameters ---------- pmap : :class:`~acme.ParallelMap` context manager By default, `:class:~`acme.ACMEDaemon` assumes that that the provided :class:`~acme.ParallelMap` instance has already been properly set up to process `func` (all input arguments parsed and properly formatted). All other input arguments of `:class:~`acme.ACMEDaemon` are extracted from the provided :class:`~acme.ParallelMap` instance. n_workers : int or "auto" Number of SLURM workers (=jobs) to spawn. See :class:`~acme.ParallelMap` for details. write_worker_results : bool If `True`, the return value(s) of `func` is/are saved on disk. See :class:`~acme.ParallelMap` for details. output_dir : str or None If provided, auto-generated results are stored in the given path. See :class:`~acme.ParallelMap` for details. result_shape : tuple or None If provided, results are slotted into a dataset/array with layout `result_shape`. See :class:`~acme.ParallelMap` for details. result_dtype : str Determines numerical datatype of dataset laid out by `result_shape`. See :class:`~acme.ParallelMap` for details. single_file : bool If `True`, parallel workers write to the same results container. See :class:`~acme.ParallelMap` for details. write_pickle : bool If `True`, the return value(s) of `func` is/are pickled to disk. See :class:`~acme.ParallelMap` for details. dryrun : bool If `True`, a dry-run of calling `func` is performed using a single `args`, `kwargs` tuple. See :class:`~acme.ParallelMap` for details. partition : str Name of SLURM partition to use. See :class:`~acme.ParallelMap` for details. mem_per_worker : str Memory booking for each SLURM worker. See :class:`~acme.ParallelMap` for details. setup_timeout : int Timeout period (in seconds) for SLURM workers to come online. See :class:`~acme.ParallelMap` for details. setup_interactive : bool If `True`, user input is queried in case not enough SLURM workers could be started within `setup_timeout` seconds. See :class:`~acme.ParallelMap` for details. stop_client : bool or "auto" If `"auto"`, automatically started distributed computing clients are shut down at the end of computation, while user-provided clients are left untouched. See :class:`~acme.ParallelMap` for details. verbose : None or bool If `None` (default), general run-time information as well as warnings and errors are shown. See :class:`~acme.ParallelMap` for details. logfile : None or bool or str If `None` (default) or `True`, and `write_worker_results` is `True`, all run-time information as well as errors and warnings are tracked in a log-file. See :class:`~acme.ParallelMap` for details. Returns ------- results : list If `write_worker_results` is `True`, `results` is a list of HDF5 file-names containing computed results. If `write_worker_results` is `False`, results is a list comprising the actual return values of `func`. If `:class:~`acme.ACMEDaemon` was instantiated by :class:`~acme.ParallelMap`, results are propagated back to :class:`~acme.ParallelMap`. See also -------- ParallelMap : Context manager and main user interface """ # First and foremost: ensure we got something useful to work with validate_pmap(pmap) # Create configuration self.config = ACMEConfig( func=pmap.func, argv=pmap.argv, kwargv=pmap.kwargv, n_calls=pmap.n_inputs, n_workers=n_workers, write_worker_results=write_worker_results, output_dir=output_dir, result_shape=result_shape, result_dtype=result_dtype, single_file=single_file, write_pickle=write_pickle, dryrun=dryrun, partition=partition, mem_per_worker=mem_per_worker, setup_timeout=setup_timeout, setup_interactive=setup_interactive, stop_client=stop_client, verbose=verbose, logfile=logfile, cleanup_threshold_days=cleanup_threshold_days, ) self.config.validate() # Set up output handler self.pre_process() # Set up argument processing helper class self.processor = ArgumentProcessor( self.config.argv, self.config.kwargv, self.config.n_calls ) # Set up memory profiling helper class self.profiler = MemoryProfiler( self.processor, self.config.acme_func, self.config.func.__name__, self.config.tqdmFormat, ) # Set up client orchestration helper class self.orchestrator = ClientOrchestrator( self.config, self.processor, self.profiler ) # If requested, perform single-worker dry-run (and quit if desired) if dryrun: goOn = self.profiler.perform_dryrun( output_dir=self.config.output_dir, setup_interactive=self.config.setup_interactive, ) if not goOn: log.debug("Quitting after dryrun") return log.debug("Continuing after dryrun") # Either use existing dask client or start a fresh instance self.prepare_client()
[docs] def pre_process(self) -> None: """ If `write_*` is `True` set up directories for saving output HDF5 containers (or pickle files). Warn if results are to be collected in memory """ # If automatic saving of results is requested, make necessary preparations if self.config.write_worker_results: self.setup_output() else: # If `taskID` is not an explicit kw-arg of `func` and `func` does not # accept "anonymous" `**kwargs`, don't save anything but return stuff log.debug("Automatic output processing disabled.") if self.config.kwargv.get("taskID") is None: if not isSpyModule: msg = ( "`write_worker_results` is `False` and `taskID` is not a keyword argument of %s. " + "Results will be collected in memory by caller - this might be slow and can lead " + "to excessive memory consumption. " ) log.warning(msg, self.config.func.__name__) self.config.collect_results = True # type: ignore else: self.config.kwargv["taskID"] = self.config.task_ids self.config.collect_results = False # type: ignore msg = ( "Not collecting results in memory, leaving output " + "processing to user-provided function" ) log.debug(msg) # The "raw" user-provided function is used in the computation self.config.acme_func = self.config.func log.debug("Not wrapping user-provided function but invoking it directly") # If progress tracking in a log-file was requested, set it up now prepare_log( logname="ACME", logfile=self.config.logfile, verbose=self.config.verbose ) log.debug("Set up logfile=%s", str(self.config.logfile)) return
[docs] def setup_output(self) -> None: """ Local helper for creating output directories and preparing containers """ # Use output setup manager for directory creation output_manager = OutputDirectoryManager() # Create output directory structure outputDir = output_manager.create_output_directory( self.config.output_dir, self.config.single_file, self.config.write_pickle, self.config.func.__name__, ) # Re-define or allocate key "taskID" to track concurrent processing results self.config.kwargv["taskID"] = self.config.task_ids self.config.collect_results = False # Set up correct file-extension for output files; in case of HDF5 # containers, prepare "main" file for collecting/symlinking worker results if self.config.write_pickle: fExt = "pickle" log.debug("Pickling was requested") else: fExt = "h5" self.config.results_container = os.path.join(self.config.output_dir, f"{self.config.func.__name__}.h5") # type: ignore log.debug("Using HDF5 storage %s", self.config.results_container) # Use HDF5 container factory for container creation container_factory = HDF5ContainerFactory() # By default, `results_container` is a collection of links that point to # worker-generated HDF5 containers; if `single_file` is `True`, then # `results_container` is a "real" container with actual dataset(s) if self.config.single_file: self.config.kwargv["singleFile"] = [True] self.config.kwargv["outFile"] = [self.config.results_container] log.debug("Saving results in single HDF5 container") # If no output shape provided, prepare groups for storing datasets; # otherwise allocate a single dataset w/specified dimension if self.config.result_shape is None: container_factory.create_single_file_container( self.config.results_container, self.config.task_ids, None, self.config.result_dtype, ) else: container_factory.create_single_file_container( self.config.results_container, self.config.task_ids, self.config.result_shape, self.config.result_dtype, ) else: self.config.kwargv["outFile"] = [ os.path.join(outputDir, f"{self.config.func.__name__}_{taskID}.{fExt}") for taskID in self.config.task_ids ] if not self.config.write_pickle: # If no output shape provided, generate links to external datasets; # otherwise allocate a virtual dataset w/specified dimension if self.config.result_shape is None: container_factory.create_virtual_dataset_container( self.config.results_container, self.config.task_ids, self.config.kwargv["outFile"], None, 0, # default stacking dim self.config.result_dtype, outputDir, ) else: container_factory.create_virtual_dataset_container( self.config.results_container, self.config.task_ids, self.config.kwargv["outFile"], self.config.result_shape, self.config.stacking_dim, # type: ignore self.config.result_dtype, outputDir, ) # Wrap the user-provided func and distribute it across workers self.config.kwargv["userFunc"] = [self.config.func] self.config.acme_func = self.func_wrapper # type: ignore log.debug("Wrapping user-provided function inside func_wrapper") return
[docs] def prepare_client(self) -> None: """ Setup or fetch dask distributed processing client. Delegates to ClientOrchestrator for all client lifecycle management. """ self.orchestrator.prepare_client()
[docs] def compute(self, debug: bool = False) -> Union[List, None]: """ Perform the actual parallel execution of `func` If `debug` is `True`, use a single-threaded dask scheduler that does not actually process anything concurrently but uses the dask framework in a sequential setup. """ # Delegate to orchestrator for computation execution futures = self.orchestrator.execute_computation(debug=debug) # Postprocessing of results if futures is not None: values = self.post_process(futures) else: values = None # Either return collected by-worker results or the filepaths of results return values
[docs] def post_process(self, futures: dd.Future) -> Union[List, None]: """ Local helper to post-process results on disk/in-memory The return `values` is either `None` : if neither in-memory results collection or auto-writing was requested list of file-names: if `write_worker_results` is `True` list of objects: if in-memory results collection was requested """ # Use result post-processor for handling results post_processor = ResultPostProcessor( self.config.client, self.config.results_container, self.config.collect_results, self.config.write_worker_results, self.config.write_pickle, self.config.single_file, self.config.results_container, ) # Process futures using the post-processor result = post_processor.process_futures( futures, self.config.result_shape, self.config.stacking_dim, self.config.result_dtype, self.config.acme_func, self.config.func, self.config.kwargv, ) # Finally, establish shortcut to `results_container` (if present) for easier access self.results_container = post_processor.results_container return result
[docs] def cleanup(self) -> None: """ Shut down any ad-hoc distributed computing clients created by `prepare_client` Delegates to ClientOrchestrator for all client lifecycle management. """ self.orchestrator.cleanup()
[docs] @staticmethod def func_wrapper(*args: Any, **kwargs: Optional[Any]) -> None: # pragma: no cover """ If the output of `func` is saved to disk, wrap `func` with this static method to take care of filling up HDF5/pickle files If writing to HDF5 fails, use an "emergency-pickling" mechanism to try to save the output of `func` using pickle instead """ # Extract everything from `kwargs` appended by `ACMEdaemon` func = kwargs.pop("userFunc") taskID = kwargs.pop("taskID") fname = kwargs.pop("outFile") singleFile = kwargs.pop("singleFile", False) stackingDim = kwargs.pop("stackingDim", None) memEstRun = kwargs.pop("memEstRun", False) # Call user-provided function result = func(*args, **kwargs) # type: ignore # For memory estimation runs, don't start saving stuff if memEstRun: return # Use result storage manager for writing HDF5/pickle result_handler = ResultStorageManager.create_handler( fname=fname, result=result, task_id=taskID, write_pickle=fname.endswith(".pickle"), single_file=singleFile, stacking_dim=stackingDim, ) result_handler.write_result()