#
# Computational scaffolding for user-interface
#
# Copyright © 2020-2025 Ernst Strüngmann Institute (ESI) for Neuroscience
# in Cooperation with Max Planck Society
# Copyright © 2026 Ernst Strüngmann Institute (ESI) of the Max Planck Society
#
# SPDX-License-Identifier: BSD-3-Clause
#
# Builtin/3rd party package imports
import os
import sys
import logging
import dask.distributed as dd
from typing import TYPE_CHECKING, Optional, Any, Union, List
# Local imports
from .logger import prepare_log
from .validators import validate_pmap
from .config import ACMEConfig
from .memory_profiler import MemoryProfiler
from .argument_processor import ArgumentProcessor
from .results.output_setup import OutputDirectoryManager, HDF5ContainerFactory
from .results.result_handler import ResultStorageManager
from .results.post_processor import ResultPostProcessor
from .client_orchestrator import ClientOrchestrator
isSpyModule = False
if "syncopy" in sys.modules: # pragma: no cover
isSpyModule = True
if TYPE_CHECKING: # pragma: no cover
from frontend import ParallelMap
__all__: List["str"] = ["ACMEdaemon"]
# Fetch logger
log = logging.getLogger("ACME")
# Main manager for parallel execution of user-defined functions
[docs]
class ACMEdaemon(object):
# Restrict valid class attributes
__slots__ = ("results_container", "config", "processor", "profiler", "orchestrator")
[docs]
def __init__(
self,
pmap: "ParallelMap",
n_workers: Union[int, str] = "auto",
write_worker_results: bool = True,
output_dir: Optional[str] = None,
result_shape: Optional[tuple[Optional[int], ...]] = None,
result_dtype: str = "float",
single_file: bool = False,
write_pickle: bool = False,
dryrun: bool = False,
partition: str = "auto",
mem_per_worker: str = "auto",
setup_timeout: int = 60,
setup_interactive: bool = True,
stop_client: Union[bool, str] = "auto",
verbose: Optional[bool] = None,
logfile: Optional[Union[bool, str]] = None,
cleanup_threshold_days: Optional[int] = None,
) -> None:
"""
Manager class for performing concurrent user function calls
Parameters
----------
pmap : :class:`~acme.ParallelMap` context manager
By default, `:class:~`acme.ACMEDaemon` assumes that that
the provided :class:`~acme.ParallelMap` instance has already
been properly set up to process `func` (all input arguments parsed and
properly formatted). All other input arguments of `:class:~`acme.ACMEDaemon`
are extracted from the provided :class:`~acme.ParallelMap` instance.
n_workers : int or "auto"
Number of SLURM workers (=jobs) to spawn. See :class:`~acme.ParallelMap`
for details.
write_worker_results : bool
If `True`, the return value(s) of `func` is/are saved on disk. See
:class:`~acme.ParallelMap` for details.
output_dir : str or None
If provided, auto-generated results are stored in the given path. See
:class:`~acme.ParallelMap` for details.
result_shape : tuple or None
If provided, results are slotted into a dataset/array with layout `result_shape`. See
:class:`~acme.ParallelMap` for details.
result_dtype : str
Determines numerical datatype of dataset laid out by `result_shape`.
See :class:`~acme.ParallelMap` for details.
single_file : bool
If `True`, parallel workers write to the same results container. See
:class:`~acme.ParallelMap` for details.
write_pickle : bool
If `True`, the return value(s) of `func` is/are pickled to disk. See
:class:`~acme.ParallelMap` for details.
dryrun : bool
If `True`, a dry-run of calling `func` is performed using a single
`args`, `kwargs` tuple. See :class:`~acme.ParallelMap` for details.
partition : str
Name of SLURM partition to use. See :class:`~acme.ParallelMap` for details.
mem_per_worker : str
Memory booking for each SLURM worker. See :class:`~acme.ParallelMap` for details.
setup_timeout : int
Timeout period (in seconds) for SLURM workers to come online. See
:class:`~acme.ParallelMap` for details.
setup_interactive : bool
If `True`, user input is queried in case not enough SLURM workers could
be started within `setup_timeout` seconds. See :class:`~acme.ParallelMap`
for details.
stop_client : bool or "auto"
If `"auto"`, automatically started distributed computing clients
are shut down at the end of computation, while user-provided clients
are left untouched. See :class:`~acme.ParallelMap` for details.
verbose : None or bool
If `None` (default), general run-time information as well as warnings
and errors are shown. See :class:`~acme.ParallelMap` for details.
logfile : None or bool or str
If `None` (default) or `True`, and `write_worker_results` is
`True`, all run-time information as well as errors and
warnings are tracked in a log-file. See :class:`~acme.ParallelMap`
for details.
Returns
-------
results : list
If `write_worker_results` is `True`, `results` is a list of HDF5 file-names
containing computed results. If `write_worker_results` is `False`,
results is a list comprising the actual return values of `func`.
If `:class:~`acme.ACMEDaemon` was instantiated by :class:`~acme.ParallelMap`,
results are propagated back to :class:`~acme.ParallelMap`.
See also
--------
ParallelMap : Context manager and main user interface
"""
# First and foremost: ensure we got something useful to work with
validate_pmap(pmap)
# Create configuration
self.config = ACMEConfig(
func=pmap.func,
argv=pmap.argv,
kwargv=pmap.kwargv,
n_calls=pmap.n_inputs,
n_workers=n_workers,
write_worker_results=write_worker_results,
output_dir=output_dir,
result_shape=result_shape,
result_dtype=result_dtype,
single_file=single_file,
write_pickle=write_pickle,
dryrun=dryrun,
partition=partition,
mem_per_worker=mem_per_worker,
setup_timeout=setup_timeout,
setup_interactive=setup_interactive,
stop_client=stop_client,
verbose=verbose,
logfile=logfile,
cleanup_threshold_days=cleanup_threshold_days,
)
self.config.validate()
# Set up output handler
self.pre_process()
# Set up argument processing helper class
self.processor = ArgumentProcessor(
self.config.argv, self.config.kwargv, self.config.n_calls
)
# Set up memory profiling helper class
self.profiler = MemoryProfiler(
self.processor,
self.config.acme_func,
self.config.func.__name__,
self.config.tqdmFormat,
)
# Set up client orchestration helper class
self.orchestrator = ClientOrchestrator(
self.config, self.processor, self.profiler
)
# If requested, perform single-worker dry-run (and quit if desired)
if dryrun:
goOn = self.profiler.perform_dryrun(
output_dir=self.config.output_dir,
setup_interactive=self.config.setup_interactive,
)
if not goOn:
log.debug("Quitting after dryrun")
return
log.debug("Continuing after dryrun")
# Either use existing dask client or start a fresh instance
self.prepare_client()
[docs]
def pre_process(self) -> None:
"""
If `write_*` is `True` set up directories for saving output HDF5 containers
(or pickle files). Warn if results are to be collected in memory
"""
# If automatic saving of results is requested, make necessary preparations
if self.config.write_worker_results:
self.setup_output()
else:
# If `taskID` is not an explicit kw-arg of `func` and `func` does not
# accept "anonymous" `**kwargs`, don't save anything but return stuff
log.debug("Automatic output processing disabled.")
if self.config.kwargv.get("taskID") is None:
if not isSpyModule:
msg = (
"`write_worker_results` is `False` and `taskID` is not a keyword argument of %s. "
+ "Results will be collected in memory by caller - this might be slow and can lead "
+ "to excessive memory consumption. "
)
log.warning(msg, self.config.func.__name__)
self.config.collect_results = True # type: ignore
else:
self.config.kwargv["taskID"] = self.config.task_ids
self.config.collect_results = False # type: ignore
msg = (
"Not collecting results in memory, leaving output "
+ "processing to user-provided function"
)
log.debug(msg)
# The "raw" user-provided function is used in the computation
self.config.acme_func = self.config.func
log.debug("Not wrapping user-provided function but invoking it directly")
# If progress tracking in a log-file was requested, set it up now
prepare_log(
logname="ACME", logfile=self.config.logfile, verbose=self.config.verbose
)
log.debug("Set up logfile=%s", str(self.config.logfile))
return
[docs]
def setup_output(self) -> None:
"""
Local helper for creating output directories and preparing containers
"""
# Use output setup manager for directory creation
output_manager = OutputDirectoryManager()
# Create output directory structure
outputDir = output_manager.create_output_directory(
self.config.output_dir,
self.config.single_file,
self.config.write_pickle,
self.config.func.__name__,
)
# Re-define or allocate key "taskID" to track concurrent processing results
self.config.kwargv["taskID"] = self.config.task_ids
self.config.collect_results = False
# Set up correct file-extension for output files; in case of HDF5
# containers, prepare "main" file for collecting/symlinking worker results
if self.config.write_pickle:
fExt = "pickle"
log.debug("Pickling was requested")
else:
fExt = "h5"
self.config.results_container = os.path.join(self.config.output_dir, f"{self.config.func.__name__}.h5") # type: ignore
log.debug("Using HDF5 storage %s", self.config.results_container)
# Use HDF5 container factory for container creation
container_factory = HDF5ContainerFactory()
# By default, `results_container` is a collection of links that point to
# worker-generated HDF5 containers; if `single_file` is `True`, then
# `results_container` is a "real" container with actual dataset(s)
if self.config.single_file:
self.config.kwargv["singleFile"] = [True]
self.config.kwargv["outFile"] = [self.config.results_container]
log.debug("Saving results in single HDF5 container")
# If no output shape provided, prepare groups for storing datasets;
# otherwise allocate a single dataset w/specified dimension
if self.config.result_shape is None:
container_factory.create_single_file_container(
self.config.results_container,
self.config.task_ids,
None,
self.config.result_dtype,
)
else:
container_factory.create_single_file_container(
self.config.results_container,
self.config.task_ids,
self.config.result_shape,
self.config.result_dtype,
)
else:
self.config.kwargv["outFile"] = [
os.path.join(outputDir, f"{self.config.func.__name__}_{taskID}.{fExt}")
for taskID in self.config.task_ids
]
if not self.config.write_pickle:
# If no output shape provided, generate links to external datasets;
# otherwise allocate a virtual dataset w/specified dimension
if self.config.result_shape is None:
container_factory.create_virtual_dataset_container(
self.config.results_container,
self.config.task_ids,
self.config.kwargv["outFile"],
None,
0, # default stacking dim
self.config.result_dtype,
outputDir,
)
else:
container_factory.create_virtual_dataset_container(
self.config.results_container,
self.config.task_ids,
self.config.kwargv["outFile"],
self.config.result_shape,
self.config.stacking_dim, # type: ignore
self.config.result_dtype,
outputDir,
)
# Wrap the user-provided func and distribute it across workers
self.config.kwargv["userFunc"] = [self.config.func]
self.config.acme_func = self.func_wrapper # type: ignore
log.debug("Wrapping user-provided function inside func_wrapper")
return
[docs]
def prepare_client(self) -> None:
"""
Setup or fetch dask distributed processing client.
Delegates to ClientOrchestrator for all client lifecycle management.
"""
self.orchestrator.prepare_client()
[docs]
def compute(self, debug: bool = False) -> Union[List, None]:
"""
Perform the actual parallel execution of `func`
If `debug` is `True`, use a single-threaded dask scheduler that does
not actually process anything concurrently but uses the dask framework
in a sequential setup.
"""
# Delegate to orchestrator for computation execution
futures = self.orchestrator.execute_computation(debug=debug)
# Postprocessing of results
if futures is not None:
values = self.post_process(futures)
else:
values = None
# Either return collected by-worker results or the filepaths of results
return values
[docs]
def post_process(self, futures: dd.Future) -> Union[List, None]:
"""
Local helper to post-process results on disk/in-memory
The return `values` is either
`None` : if neither in-memory results collection or auto-writing was requested
list of file-names: if `write_worker_results` is `True`
list of objects: if in-memory results collection was requested
"""
# Use result post-processor for handling results
post_processor = ResultPostProcessor(
self.config.client,
self.config.results_container,
self.config.collect_results,
self.config.write_worker_results,
self.config.write_pickle,
self.config.single_file,
self.config.results_container,
)
# Process futures using the post-processor
result = post_processor.process_futures(
futures,
self.config.result_shape,
self.config.stacking_dim,
self.config.result_dtype,
self.config.acme_func,
self.config.func,
self.config.kwargv,
)
# Finally, establish shortcut to `results_container` (if present) for easier access
self.results_container = post_processor.results_container
return result
[docs]
def cleanup(self) -> None:
"""
Shut down any ad-hoc distributed computing clients created by `prepare_client`
Delegates to ClientOrchestrator for all client lifecycle management.
"""
self.orchestrator.cleanup()
[docs]
@staticmethod
def func_wrapper(*args: Any, **kwargs: Optional[Any]) -> None: # pragma: no cover
"""
If the output of `func` is saved to disk, wrap `func` with this static
method to take care of filling up HDF5/pickle files
If writing to HDF5 fails, use an "emergency-pickling" mechanism to try
to save the output of `func` using pickle instead
"""
# Extract everything from `kwargs` appended by `ACMEdaemon`
func = kwargs.pop("userFunc")
taskID = kwargs.pop("taskID")
fname = kwargs.pop("outFile")
singleFile = kwargs.pop("singleFile", False)
stackingDim = kwargs.pop("stackingDim", None)
memEstRun = kwargs.pop("memEstRun", False)
# Call user-provided function
result = func(*args, **kwargs) # type: ignore
# For memory estimation runs, don't start saving stuff
if memEstRun:
return
# Use result storage manager for writing HDF5/pickle
result_handler = ResultStorageManager.create_handler(
fname=fname,
result=result,
task_id=taskID,
write_pickle=fname.endswith(".pickle"),
single_file=singleFile,
stacking_dim=stackingDim,
)
result_handler.write_result()