#
# Helper routines for working w/dask
#
# Copyright © 2025 Ernst Strüngmann Institute (ESI) for Neuroscience
# in Cooperation with Max Planck Society
#
# SPDX-License-Identifier: BSD-3-Clause
#
# Builtin/3rd party package imports
import os
import sys
import socket
import platform
import subprocess
import getpass
import time
import inspect
import textwrap
import psutil
import numpy as np
from tqdm import tqdm
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, get_client, LocalCluster
from datetime import datetime, timedelta
from typing import List, Optional, Any, Union, Tuple, Dict
# Local imports
from .shared import user_input, user_yesno, is_jupyter, get_interface, get_free_port
from .spy_interface import scalar_parser, log
__all__: List["str"] = ["esi_cluster_setup", "bic_cluster_setup", "local_cluster_setup", "cluster_cleanup", "slurm_cluster_setup"]
# Setup SLURM workers on the ESI HPC cluster
[docs]
def esi_cluster_setup(
partition: str,
n_workers: int = 2,
mem_per_worker: str = "auto",
cores_per_worker: Optional[int] = None,
n_workers_startup: int = 1,
timeout: int = 60,
interactive: bool = True,
interactive_wait: int = 120,
start_client: bool = True,
job_extra: List = [],
mem_cushion : int = 100,
**kwargs: Optional[Any]) -> Union[None, Client, SLURMCluster, LocalCluster]:
"""
Start a Dask distributed SLURM worker cluster on the ESI HPC infrastructure
(or local multi-processing)
Parameters
----------
partition : str
Name of SLURM partition/queue to start workers in. Use the command `sinfo`
in the terminal to see a list of available SLURM partitions on the ESI HPC
cluster.
n_workers : int
Number of SLURM workers (=jobs) to spawn
mem_per_worker : str
Memory booking for each worker. Can be specified either in megabytes
(e.g., ``mem_per_worker = 1500MB``) or gigabytes (e.g., ``mem_per_worker = "2GB"``).
If `mem_per_worker` is `"auto"` it is attempted to infer a sane default value
from the chosen partition, e.g., for ``partition = "8GBS"`` `mem_per_worker` is
automatically set to the allowed maximum of `'8GB'`. On the IBM POWER
partition "E880", `mem_per_worker` is set to 16 GB if not provided.
Note, even in queues with guaranteed memory bookings, it is possible to allocate less
memory than the allowed maximum per worker to spawn numerous low-memory
workers. See Examples for details.
cores_per_worker : None or int
Number of CPU cores allocated for each worker. If `None`, core-count
is set based on partition settings (`DefMemPerCPU` and QoS) with respect to
CPU architecture (minimum 1 on x86_64, and 4 on IBM POWER).
n_workers_startup : int
Number of spawned workers to wait for. If `n_workers_startup` is `1` (default),
the code does not proceed until either 1 SLURM job is running or the
`timeout` interval has been exceeded.
timeout : int
Number of seconds to wait for requested workers to start (see `n_workers_startup`).
interactive : bool
If `True`, user input is queried in case not enough workers (set by `n_workers_startup`)
could be started in the provided waiting period (determined by `timeout`).
The code waits `interactive_wait` seconds for a user choice - if none is
provided, it continues with the current number of running workers (if greater
than zero). If `interactive` is `False` and no worker could not be started
within `timeout` seconds, a `TimeoutError` is raised.
interactive_wait : int
Countdown interval (seconds) to wait for a user response in case fewer than
`n_workers_startup` workers could be started. If no choice is provided within
the given time, the code automatically proceeds with the current number of
active dask workers.
start_client : bool
If `True`, a distributed computing client is launched and attached to
the dask worker cluster. If `start_client` is `False`, only a distributed
computing cluster is started to which compute-clients can connect.
job_extra : list
Extra sbatch parameters to pass to SLURMCluster.
mem_cushion : int
Amount of memory to "withhold" from `mem_per_worker` to stay clear of
partition limits (either imposed via QoS or `MaxMemPerCPU`)
**kwargs : dict
Additional keyword arguments can be used to control job-submission details.
Returns
-------
proc : object
A distributed computing client (if ``start_client = True``) or
a distributed computing cluster (otherwise).
Examples
--------
The following command launches 10 SLURM workers with 2 gigabytes memory each
in the `8GBS` partition
>>> client = esi_cluster_setup(n_workers=10, partition="8GBS", mem_per_worker="2GB")
Use default settings to start 2 SLURM workers in the IBM POWER E880 partition
(allocating 4 cores and 16 GB memory per worker)
>>> client = esi_cluster_setup(partition="E880")
The underlying distributed computing cluster can be accessed using
>>> client.cluster
Notes
-----
The employed parallel computing engine relies on the concurrent processing library
`Dask <https://docs.dask.org/en/latest/>`_. Thus, the distributed computing
clients generated here are in fact instances of :class:`distributed.Client`.
This function specifically acts as a wrapper for :class:`dask_jobqueue.SLURMCluster`.
Users familiar with Dask in general and its distributed scheduler and cluster
objects in particular, may leverage Dask's entire API to fine-tune parallel
processing jobs to their liking (if wanted).
See also
--------
dask_jobqueue.SLURMCluster : launch a dask cluster of SLURM workers
slurm_cluster_setup : start a distributed Dask cluster of parallel processing workers using SLURM
local_cluster_setup : start a local Dask multi-processing cluster on the host machine
cluster_cleanup : remove dangling parallel processing worker-clusters
"""
# For later reference: dynamically fetch name of current function
funcName = f"<{inspect.currentframe().f_code.co_name}>" # type: ignore
# Don't start a new cluster on top of an existing one
active_client = _probe_existing_client(start_client)
if active_client:
return active_client
# Check if SLURM's `sinfo` can be accessed
start_local = _probe_sinfo_or_start_local(interactive)
if start_local: # pragma: no cover
return local_cluster_setup(interactive=interactive)
# Use default by-worker process count or extract it from anonymous keyword args (if provided)
processes_per_worker = kwargs.pop("processes_per_worker", 1)
log.debug("Found `sinfo`, set `processes_per_worker` to %d", processes_per_worker)
# Get micro-architecture of submitting host
mArch = platform.machine()
# Fetch available and define invalid partitions and probe for auto-selection
avail_partitions = _get_slurm_partitions()
invalid_partitions = ["PREPO", "ESI"]
auto_partition, auto_memory = _probe_auto_partition(partition, avail_partitions, invalid_partitions, mem_per_worker)
if auto_partition is not None:
if mArch == "x86_64":
partition = auto_partition
mem_per_worker = None # type: ignore
else: # pragma: no cover
partition = "E880"
mem_per_worker = auto_memory # type: ignore
msg = "Picked partition %s based on estimated memory consumption of %s"
log.info(msg, partition, auto_memory)
if (partition == "E880" and mArch == "x86_64") or \
(mArch == "ppc64le" and partition != "E880"):
otherArch = list(set(["x86_64", "ppc64le"]).difference([mArch]))[0]
msg = "Cannot start SLURM workers in partition %s with " +\
"architecture %s from submitting host with architecture %s. " +\
"Start x86_64 workers from esi-svhpc{1,2,3} and POWER workers from the hub."
raise ValueError(msg%(partition, otherArch, mArch))
# Convert memory selections to MB, "auto" is converted to `None`
mem_per_worker = _probe_mem_spec(mem_per_worker)
# If either core-count or mem-spec is undefined, go and ask partition for
# mem specs; set "sane" (fat quotes) defaults on IBM POWER (if nothing was
# provided, run with 4 cores/16 GB per worker -> set `partMem` accordingly)
if cores_per_worker is None or mem_per_worker is None:
defMem, partMem = _probe_scontrol(partition)
if mArch == "ppc64le":
partMem = 16000
# If not explicitly provided, extract by-worker CPU core count from
# partition via `DefMeMPerCPU` and `mem_per_worker` (if defined)
if cores_per_worker is None:
# Set core-count per worker (applies to both x86_64 and ppc64le)
if mem_per_worker is not None:
partMem = int(mem_per_worker.replace("MB", ""))
cores_per_worker = max(1, int(partMem / defMem))
log.debug("Derived core-count from partition: `cores_per_worker=%d`", cores_per_worker)
# If `mem_per_worker` is still unassigned, use extracted partition limit
if mem_per_worker is None:
mem_per_worker = f"{partMem}MB"
log.debug("No `mem_per_worker` specified, using default of %s", mem_per_worker)
# Determine if `job_extra`` is a list (this is also checked in `slurm_cluster_setup`,
# but we may need to append to it, so ensure that's possible)
_probe_job_extra(job_extra)
# If '--output' was not provided, append default output folder to `job_extra`
if not any(option.startswith("--output") or option.startswith("-o") for option in job_extra):
log.debug("Auto-populating `--output` setting for sbatch")
usr = getpass.getuser()
slurm_wdir = f"/cs/slurm/{usr}/{usr}_{datetime.now().strftime('%Y%m%d-%H%M%S')}"
os.makedirs(slurm_wdir, exist_ok=True)
log.debug("Using %s for slurm logs", slurm_wdir)
out_files = os.path.join(slurm_wdir, "slurm-%j.out")
job_extra.append(f"--output={out_files}")
log.debug("Setting `--output=%s`", out_files)
# Let the SLURM-specific setup function do the rest (returns client or cluster)
return slurm_cluster_setup(partition, cores_per_worker, n_workers,
processes_per_worker, mem_per_worker, # type: ignore
n_workers_startup, timeout, interactive,
interactive_wait, start_client, job_extra,
avail_partitions=avail_partitions,
invalid_partitions=invalid_partitions,
mem_cushion=mem_cushion, **kwargs)
# Setup SLURM workers on the CoBIC HPC cluster
[docs]
def bic_cluster_setup( # pragma: no cover
partition: str,
n_workers: int = 2,
mem_per_worker: str = "auto",
cores_per_worker: Optional[int] = None,
n_workers_startup: int = 1,
timeout: int = 120,
interactive: bool = True,
interactive_wait: int = 120,
start_client: bool = True,
job_extra: List = [],
mem_cushion : int = 500,
**kwargs: Optional[Any]) -> Union[None, Client, SLURMCluster, LocalCluster]:
"""
Start a Dask distributed SLURM worker cluster on the CoBIC HPC infrastructure
Parameters
----------
partition : str
Name of SLURM partition/queue to start workers in. Use the command `sinfo`
in the terminal to see a list of available SLURM partitions on the CoBIC HPC
cluster.
n_workers : int
Number of SLURM workers (=jobs) to spawn
mem_per_worker : str
Memory booking for each worker. Can be specified either in megabytes
(e.g., ``mem_per_worker = 1500MB``) or gigabytes (e.g., ``mem_per_worker = "2GB"``).
If `mem_per_worker` is `"auto"` it is attempted to infer a sane default value
from the chosen partition, e.g., for ``partition = "8GBSppc"`` `mem_per_worker` is
automatically set to the allowed maximum of `'8GB'`.
Note, even in partitions with guaranteed memory bookings, it is possible to allocate less
memory than the allowed maximum per worker to spawn numerous low-memory
workers. See Examples for details.
cores_per_worker : None or int
Number of CPU cores allocated for each worker. If `None`, core-count
is set based on partition settings (`DefMemPerCPU`).
n_workers_startup : int
Number of spawned workers to wait for. If `n_workers_startup` is `1` (default),
the code does not proceed until either 1 SLURM job is running or the
`timeout` interval has been exceeded.
timeout : int
Number of seconds to wait for requested workers to start (see `n_workers_startup`).
interactive : bool
If `True`, user input is queried in case not enough workers (set by `n_workers_startup`)
could be started in the provided waiting period (determined by `timeout`).
The code waits `interactive_wait` seconds for a user choice - if none is
provided, it continues with the current number of running workers (if greater
than zero). If `interactive` is `False` and no worker could not be started
within `timeout` seconds, a `TimeoutError` is raised.
interactive_wait : int
Countdown interval (seconds) to wait for a user response in case fewer than
`n_workers_startup` workers could be started. If no choice is provided within
the given time, the code automatically proceeds with the current number of
active dask workers.
start_client : bool
If `True`, a distributed computing client is launched and attached to
the dask worker cluster. If `start_client` is `False`, only a distributed
computing cluster is started to which compute-clients can connect.
job_extra : list
Extra sbatch parameters to pass to SLURMCluster.
mem_cushion : int
Amount of memory to "withhold" from `mem_per_worker` to stay clear of
partition limits (either imposed via QoS or `MaxMemPerCPU`)
**kwargs : dict
Additional keyword arguments can be used to control job-submission details.
Returns
-------
proc : object
A distributed computing client (if ``start_client = True``) or
a distributed computing cluster (otherwise).
Examples
--------
The following command launches 10 SLURM workers with 2 gigabytes memory each
in the `8GBSppc` partition
>>> client = bic_cluster_setup(n_workers=10, partition="8GBSppc", mem_per_worker="2GB")
Use default settings to start 2 SLURM workers in the 16GBSppc partition
(allocating 2 cores and 16 GB memory per worker)
>>> client = bic_cluster_setup(partition="16GBSppc")
The underlying distributed computing cluster can be accessed using
>>> client.cluster
Notes
-----
The employed parallel computing engine relies on the concurrent processing library
`Dask <https://docs.dask.org/en/latest/>`_. Thus, the distributed computing
clients generated here are in fact instances of :class:`distributed.Client`.
This function specifically acts as a wrapper for :class:`dask_jobqueue.SLURMCluster`.
Users familiar with Dask in general and its distributed scheduler and cluster
objects in particular, may leverage Dask's entire API to fine-tune parallel
processing jobs to their liking (if wanted).
See also
--------
dask_jobqueue.SLURMCluster : launch a dask cluster of SLURM workers
slurm_cluster_setup : start a distributed Dask cluster of parallel processing workers using SLURM
local_cluster_setup : start a local Dask multi-processing cluster on the host machine
cluster_cleanup : remove dangling parallel processing worker-clusters
"""
# For later reference: dynamically fetch name of current function
funcName = f"<{inspect.currentframe().f_code.co_name}>" # type: ignore
# Don't start a new cluster on top of an existing one
active_client = _probe_existing_client(start_client)
if active_client:
return active_client
# Check if SLURM's `sinfo` can be accessed
start_local = _probe_sinfo_or_start_local(interactive)
if start_local:
return local_cluster_setup(interactive=interactive)
# Use default by-worker process count or extract it from anonymous keyword args (if provided)
processes_per_worker = kwargs.pop("processes_per_worker", 1)
log.debug("Found `sinfo`, set `processes_per_worker` to %d", processes_per_worker)
# Get micro-architecture of submitting host
mArch = platform.machine()
# Fetch available and define invalid partitions and probe for auto-selection
avail_partitions = _get_slurm_partitions()
invalid_partitions = ["VISppc", "VISx86"]
auto_partition, auto_memory = _probe_auto_partition(partition, avail_partitions, invalid_partitions, mem_per_worker)
if auto_partition is not None:
if mArch == "x86_64":
partition = f"{auto_partition}x86"
else:
partition = f"{auto_partition}ppc"
mem_per_worker = None # type: ignore
msg = "Picked partition %s based on estimated memory consumption of %s GB"
log.info(msg, partition, auto_memory)
# Prevent cross-architecture client startups
if (mArch == "ppc64le" and "x86" in partition) or \
(mArch == "x86_64" and "ppc" in partition):
otherArch = list(set(["x86_64", "ppc64le"]).difference([mArch]))[0]
msg = "Cannot start SLURM workers in partition %s with " +\
"architecture %s from submitting host with architecture %s. " +\
"Please start x86_64 workers from bic-svhpcx86[01-06] and POWER workers from the hub(s)."
raise ValueError(msg%(partition, otherArch, mArch))
# Convert memory selections to MB, "auto" is converted to `None`
mem_per_worker = _probe_mem_spec(mem_per_worker)
# If either core-count or mem-spec is undefined, go and ask partition for `DefMeMPerCPU`
if cores_per_worker is None or mem_per_worker is None:
defMem, partMem = _probe_scontrol(partition)
# If not explicitly provided, extract by-worker CPU core count from
# partition via `DefMeMPerCPU` and `mem_per_worker` (if defined)
if cores_per_worker is None:
if mem_per_worker is not None:
partMem = int(mem_per_worker.replace("MB", ""))
cores_per_worker = max(1, round(partMem / defMem))
log.debug("Derived core-count from partition: `cores_per_worker=%d`", cores_per_worker)
# If `mem_per_worker` is still unassigned, use partition limit
if mem_per_worker is None:
mem_per_worker = f"{partMem}MB"
log.debug("No `mem_per_worker` specified, using default of %s", mem_per_worker)
# Determine if `job_extra`` is a list (this is also checked in `slurm_cluster_setup`,
# but we may need to append to it, so ensure that's possible)
_probe_job_extra(job_extra)
# If '--output' was not provided, append default output folder to `job_extra`
if not any(option.startswith("--output") or option.startswith("-o") for option in job_extra):
log.debug("Auto-populating `--output` setting for sbatch")
usr = getpass.getuser()
slurm_wdir = f"/mnt/hpc/home/{usr}/slurm/{usr}_{datetime.now().strftime('%Y%m%d-%H%M%S')}"
os.makedirs(slurm_wdir, exist_ok=True)
log.debug("Using %s for slurm logs", slurm_wdir)
out_files = os.path.join(slurm_wdir, "slurm-%j.out")
job_extra.append(f"--output={out_files}")
log.debug("Setting `--output=%s`", out_files)
# CoBIC-specific: only specific ports are available within the HPC network
if os.path.isfile("/usr/local/bin/squeue_summary"):
ifname = get_interface("172.18.90")
schedPort = get_free_port(60001, 63000)
scheduler_options = {"port": schedPort, "interface" : ifname}
worker_extra_args = ["--worker-port=60001:63000", "--nanny-port=60001:63000"]
else:
scheduler_options = None
# Let the SLURM-specific setup function do the rest (returns client or cluster)
daskobj = slurm_cluster_setup(partition, cores_per_worker, n_workers,
processes_per_worker, mem_per_worker, # type: ignore
n_workers_startup, timeout, interactive,
interactive_wait, start_client, job_extra,
scheduler_options=scheduler_options,
worker_extra_args=worker_extra_args,
avail_partitions=avail_partitions,
invalid_partitions=invalid_partitions,
mem_cushion=mem_cushion, **kwargs)
# Emit short explainer how to connect to Dashboard
if isinstance(daskobj, Client):
dblink = daskobj.cluster.dashboard_link
elif isinstance(daskobj, SLURMCluster):
dblink = daskobj.dashboard_link
else:
return None
ip, port = dblink[dblink.find("http://") + len("http://"):dblink.rfind("/status")].split(":")
username = getpass.getuser()
if socket.gethostname().startswith("bic-svhub0"):
ifname = get_interface("192.168.161")
hubip = psutil.net_if_addrs()[ifname][0].address
sshcmd = f"ssh -L {port}:localhost:{port}"
else:
hubip = "192.168.161.221"
sshcmd = f"ssh -L {port}:{ip}:{port}"
msg = "Connect to dashboard by starting a new ssh tunnel via %s %s@%s"
log.info(msg, sshcmd, username, hubip)
msg = "Open your browser and go to http://localhost:%s"
log.info(msg, port)
return daskobj
# Setup SLURM cluster
[docs]
def slurm_cluster_setup(
partition: str = "partition_name",
n_cores: int = 1,
n_workers: int = 1,
processes_per_worker: int = 1,
mem_per_worker: Optional[str] = "1GB",
n_workers_startup: int = 1,
timeout: int = 60,
interactive: bool = True,
interactive_wait: int = 10,
start_client: bool = True,
job_extra: List = [],
worker_extra_args: Optional[List[str]] = None,
scheduler_options: Optional[Dict] = None,
avail_partitions: List = [],
invalid_partitions: List = [],
mem_cushion: int = 100,
**kwargs: Optional[Any]) -> Union[Client, SLURMCluster, None]:
"""
Start a distributed Dask cluster of parallel processing workers using SLURM
**NOTE** If you are working on the ESI or CoBIC HPC cluster, please use
:func:`~acme.esi_cluster_setup` or :func:`~acme.bic_cluster_setup` instead!
Parameters
----------
partition : str
Name of SLURM partition/queue to use
n_cores : int
Number of CPU cores per SLURM worker
n_workers : int
Number of SLURM workers (=jobs) to spawn
processes_per_worker : int
Number of processes to use per SLURM job (=worker). Should be greater
than one only if the chosen partition contains nodes that expose multiple
cores per job.
mem_per_worker : str or None
Memory allocation for each worker. If `None`, partition's `DefMemPerCPU` is queried.
n_workers_startup : int
Number of spawned SLURM workers to wait for. The code does not return until either
`n_workers_startup` SLURM jobs are running or the `timeout` interval (see
below) has been exceeded.
timeout : int
Number of seconds to wait for requested workers to start (see `n_workers_startup`).
interactive : bool
If `True`, user input is queried in case not enough workers (set by `n_workers_startup`)
could be started in the provided waiting period (determined by `timeout`).
The code waits `interactive_wait` seconds for a user choice - if none is
provided, it continues with the current number of running workers (if greater
than zero). If `interactive` is `False` and no worker could be started
within `timeout` seconds, a `TimeoutError` is raised.
interactive_wait : int
Countdown interval (seconds) to wait for a user response in case fewer than
`n_workers_startup` workers could be started. If no choice is provided within
the given time, the code automatically proceeds with the current number of
active dask workers.
start_client : bool
If `True`, a distributed computing client is launched and attached to
the dask worker cluster. If `start_client` is `False`, only a distributed
computing cluster is started to which compute-clients can connect.
job_extra : list
Extra sbatch parameters to pass to SLURMCluster.
worker_extra_args : list or None
Additional arguments to be passed to :class:`distributed.Worker`
scheduler_options : dict or None
Additional arguments to be passed to :class:`distributed.Scheduler`
avail_partition : list
List of valid partition names (strings) that are available for launching
dask workers. If not provided, partitions are fetched at runtime using `sinfo`
invalid_partition : list
List of partition names (strings) that are not available for launching
dask workers.
mem_cushion : int
Amount of memory to "withhold" from `mem_per_worker` to stay clear of
partition limits (either imposed via QoS or `MaxMemPerCPU`)
Returns
-------
proc : object or None
A distributed computing client (if ``start_client = True``) or
a distributed computing cluster (otherwise). If no SLURM workers
can be started within the given timeout interval, `proc` is set
to `None`.
See also
--------
dask_jobqueue.SLURMCluster : launch a dask cluster of SLURM workers
esi_cluster_setup : start a SLURM worker cluster on the ESI HPC infrastructure
bic_cluster_setup : start a SLURM worker cluster on the CoBIC HPC infrastructure
local_cluster_setup : start a local Dask multi-processing cluster on the host machine
cluster_cleanup : remove dangling parallel processing worker-clusters
"""
# For later reference: dynamically fetch name of current function
funcName = f"<{inspect.currentframe().f_code.co_name}>" # type: ignore
# If not provided, retrieve all partitions currently available in SLURM
if len(avail_partitions) == 0:
avail_partitions = _get_slurm_partitions()
# Make sure we're in a valid partition
_parse_partition(partition, avail_partitions, invalid_partitions)
# Parse worker count
try:
scalar_parser(n_workers, varname="n_workers", ntype="int_like", lims=[1, np.inf])
except Exception as exc:
log.error("Error parsing `n_workers`")
raise exc
log.debug("Using `n_workers = %d`", n_workers)
# Convert memory selections to MB, "auto" is converted to `None`
mem_per_worker = _probe_mem_spec(mem_per_worker)
# Check for sanity of requested core count
try:
scalar_parser(n_cores, varname="n_cores",
ntype="int_like", lims=[1, np.inf])
except Exception as exc:
log.error("Error parsing `n_cores`")
raise exc
log.debug("Using `n_cores = %d`", n_cores)
# Parse worker-waiter count
try:
scalar_parser(n_workers_startup, varname="n_workers_startup", ntype="int_like", lims=[0, np.inf])
except Exception as exc:
log.error("Error parsing `n_workers_startup`")
raise exc
log.debug("Using `n_workers_startup = %d`", n_workers_startup)
# Parse memory cushion to withhold from max
try:
scalar_parser(mem_cushion, varname="mem_cushion", ntype="int_like", lims=[0, np.inf])
except Exception as exc:
log.error("Error parsing `mem_cushion`")
raise exc
log.debug("Using `mem_cushion = %d`", mem_cushion)
# Try to infer memory limit (*in MB*) of chosen partition from QoS
defMem, partMem = _probe_scontrol(partition)
# If that didn't work, try to infer memory limit from `MaxMemPerCPU`
if partMem < 0:
log.debug("Use `scontrol` to fetch MaxMemPerCPU")
pc = subprocess.run(f"scontrol -o show partition {partition}",
capture_output=True, check=True, shell=True, text=True)
try:
mem_lim = n_cores * (int(pc.stdout.strip().partition("MaxMemPerCPU=")[-1].split()[0]))
except IndexError: # pragma: no cover
mem_lim = np.inf # type: ignore
log.debug("Found a limit of %s MB", str(mem_lim))
else:
mem_lim = partMem
# Lower upper bound on worker-memory to not accidentally trigger TRES/QoS violations
if not np.isinf(mem_lim):
mem_lim -= mem_cushion
# Consolidate requested memory with chosen partition (or assign default memory)
if mem_per_worker is None:
if np.isinf(mem_lim):
mem_per_worker = f"{(n_cores * defMem) - mem_cushion}MB"
else:
mem_per_worker = str(mem_lim) + "MB"
log.debug("Using partition limit of %s MB", str(mem_lim))
else:
if int(mem_per_worker.replace("MB", "")) > mem_lim:
msg = "`mem_per_worker` exceeds limit of %d MB for partition %s. " +\
"Capping memory at partition limit. "
log.warning(msg, mem_lim, partition)
mem_per_worker = str(int(mem_lim)) + "MB"
# Parse requested timeout period
try:
scalar_parser(timeout, varname="timeout", ntype="int_like", lims=[1, np.inf])
except Exception as exc:
log.error("Error parsing `timeout`")
raise exc
log.debug("Using `timeout = %d`", timeout)
# Parse requested interactive waiting period
try:
scalar_parser(interactive_wait, varname="interactive_wait", ntype="int_like",
lims=[0, np.inf])
except Exception as exc:
log.error("Error parsing `interactive_wait`")
raise exc
log.debug("Using `interactive_wait = %d`", interactive_wait)
# Determine if cluster allocation is happening interactively
if not isinstance(interactive, bool):
msg = "`interactive` has to be Boolean, not %s"
log.error(msg, str(type(interactive)))
raise TypeError("%s %s"%(funcName, msg%(str(type(interactive)))))
log.debug("Using `interactive = %s`", str(interactive))
# Determine if a dask client was requested
if not isinstance(start_client, bool):
msg = "`start_client` has to be Boolean, not %s"
log.error(msg, str(type(start_client)))
raise TypeError("%s %s"%(funcName, msg%(str(type(start_client)))))
log.debug("Using `start_client = %s`", str(start_client))
# Determine if `job_extra` is a list
_probe_job_extra(job_extra)
# Determine if job_extra options are valid
for option in job_extra:
msg = "`job_extra` has to be a valid sbatch option, not %s"
if not isinstance(option, str):
log.error(msg, str(type(option)))
raise TypeError("%s %s"%(funcName, msg%(str(type(option)))))
if not option[0] == "-":
msg = "`job_extra` options must be flagged with - or --"
log.error(msg)
raise ValueError("%s %s"%(funcName, msg))
log.debug("Using `job_extra = %s`", str(job_extra))
# Ensure validity of requested worker processes
try:
scalar_parser(processes_per_worker, varname="processes_per_worker",
ntype="int_like", lims=[1, np.inf])
except Exception as exc:
log.error("Error parsing `processes_per_worker`")
raise exc
log.debug("Using `processes_per_worker = %d`", processes_per_worker)
# Check validity of '--output' option if provided
userOutSpec = [option.startswith("--output") or option.startswith("-o") for option in job_extra]
if any(userOutSpec):
userOut = job_extra[userOutSpec.index(True)]
outSpec = userOut.split("=")
if len(outSpec) != 2:
msg = "SLURM output directory must be specified using -o/--output=/path/to/file, not %s"
log.error(msg, userOut)
raise ValueError("%s %s"%(funcName, msg%(userOut)))
slurm_wdir = os.path.split(outSpec[1])[0]
if len(slurm_wdir) > 0 and not os.path.isdir(os.path.expanduser(slurm_wdir)):
msg = "SLURM output location has to be an existing directory, not %s"
log.error(msg, slurm_wdir)
raise ValueError("%s %s"%(funcName, msg%(slurm_wdir)))
else:
slurm_wdir = None
log.debug("Using `local_directory = %s`", slurm_wdir)
# Pick up any additional scheduler/worker args to be passed to SLURMCluster
extra_args = {}
if worker_extra_args: # pragma: no cover
extra_args["worker_extra_args"] = worker_extra_args
if scheduler_options: # pragma: no cover
extra_args["scheduler_options"] = scheduler_options # type: ignore
# Create `SLURMCluster` object using provided parameters
log.debug("Instantiating `SLURMCluster` object")
cluster = SLURMCluster(cores=n_cores,
job_cpu=n_cores,
memory=mem_per_worker,
processes=processes_per_worker,
local_directory=slurm_wdir,
queue=partition,
python=sys.executable,
job_directives_skip=["-t 00:30:00"],
job_extra_directives=job_extra,
**extra_args) # type: ignore
# Compute total no. of workers and up-scale cluster accordingly
if n_workers_startup < n_workers:
msg = "Requested worker-count %d exceeds `n_workers_startup = %d`, " +\
"waiting for %d workers to come online"
log.debug(msg, n_workers, n_workers_startup, n_workers_startup)
cluster.scale(n_workers)
# Fire up waiting routine to avoid returning an undercooked cluster
if _cluster_waiter(cluster, funcName, n_workers, timeout, interactive, interactive_wait): # pragma: no cover
return None
# Kill a zombie cluster in non-interactive mode
if not interactive and count_online_workers(cluster) == 0:
cluster_cleanup(Client(cluster))
msg = "SLURM workers could not be started within given time-out " +\
"interval of %d seconds"
log.error(msg, timeout)
raise TimeoutError("%s %s"%(funcName, msg%(timeout)))
# Highlight how to connect to dask performance monitor
msg = "Parallel computing client ready, dashboard accessible at %s"
log.info(msg, cluster.dashboard_link)
# If client was requested, return that instead of the created cluster
if start_client:
return Client(cluster)
return cluster
def _get_slurm_partitions() -> List:
"""
Local helper to fetch all partitions defined in SLURM
"""
# For later reference: dynamically fetch name of current function
funcName = f"<{inspect.currentframe().f_code.co_name}>" # type: ignore
# Retrieve all partitions currently available in SLURM
log.debug("Use `sinfo` to fetch available partitions")
proc = subprocess.Popen("sinfo -h -o %P",
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, shell=True)
out, err = proc.communicate()
# Any non-zero return-code means SLURM is not ready to use
if proc.returncode != 0: # pragma: no cover
msg = "Error fetching SLURM partition setup from node %s: %s"
log.error(msg, socket.gethostname(), err)
raise IOError("%s %s"%(funcName, msg%(socket.gethostname(), err)))
# Remove asterisk appended to any default partitions
out = out.replace("*", "")
# Return formatted subprocess shell output
log.debug("Found partitions: %s", out)
return out.split()
def _cluster_waiter(
cluster: SLURMCluster,
funcName: str,
total_workers: int,
timeout: int,
interactive: bool,
interactive_wait: int) -> bool:
"""
Local helper that can be called recursively
"""
# Wait until all workers have been started successfully or we run out of time
wrkrs = count_online_workers(cluster)
to = str(timedelta(seconds=timeout))[2:]
fmt = "{desc}: {n}/{total} \t[elapsed time {elapsed} | timeout at " + to + "]"
ani = tqdm(desc=f"{funcName} SLURM workers ready", total=total_workers,
leave=True, bar_format=fmt, initial=wrkrs, position=0)
counter = 0
while count_online_workers(cluster) < total_workers and counter < timeout:
time.sleep(1)
counter += 1
ani.update(max(0, count_online_workers(cluster) - wrkrs))
wrkrs = count_online_workers(cluster)
ani.refresh() # force refresh to display elapsed time every second
ani.close()
# If we ran out of time before all workers could be started, ask what to do
if counter == timeout and interactive: # pragma: no cover
msg = "SLURM workers could not be started within given time-out " +\
"interval of %d seconds"
log.info(msg, timeout)
query = f"{funcName} Do you want to [k]eep waiting for 60s, [a]bort or " +\
f"[c]ontinue with {wrkrs} workers?"
choice = user_input(query, valid=["k", "a", "c"], default="c", timeout=interactive_wait)
if choice == "k":
return _cluster_waiter(cluster, funcName, total_workers, 60, True, 60)
elif choice == "a":
log.info("Closing cluster...")
cluster_cleanup(Client(cluster))
return True
else:
if wrkrs == 0:
query = f"{funcName} Cannot continue with 0 workers. Do you want to " +\
"[k]eep waiting for 60s or [a]bort?"
choice = user_input(query, valid=["k", "a"],
default="a", timeout=60)
if choice == "k":
_cluster_waiter(cluster, funcName, total_workers, 60, True, 60)
else:
log.info("Closing cluster...")
cluster_cleanup(Client(cluster))
return True
return False
[docs]
def local_cluster_setup(
n_workers: Optional[int] = None,
mem_per_worker: Optional[str] = None,
interactive: bool = True) -> Union[Client, None]:
"""
Start a local distributed Dask multi-processing cluster
Parameters
----------
n_workers : int
Number of local workers to start (this should align with the locally
available hardware, see :class:`distributed.LocalCluster` for details)
mem_per_worker : str
Memory cap for each local worker (corresponds to the `memory_limit`
keyword of a :class:`distributed.worker.Worker`)
interactive : bool
If `True`, a confirmation dialog is displayed to ensure proper encapsulation
of calls to `local_cluster_setup` inside a script's main module block.
See Notes for details. If `interactive` is `False`, the dialog is not shown.
Returns
-------
client : distributed.Client or None
A distributed computing client. If a client cannot be started,
`proc` is set to `None`.
Notes
-----
The way Python spawns new processes requires an explicit separation of initialization
code (i.e., code blocks that should only be executed once) from the actual
program code. Specifically, everything that is supposed to be invoked only
once by the parent spawner must be encapsulated in a script's main block.
Otherwise, initialization code is not only run once by the parent process but
executed by every child process at import time.
This means that starting a local multi-processing cluster *has* to be wrapped
inside a script's main module block, otherwise, every child process created by
the multi-processing cluster starts a multi-processing cluster itself and so
on escalating to an infinite recursion. Thus, if `local_cluster_setup` is
called inside a script, it has to be encapsulated in the script's main module
block, i.e.,
.. code-block:: python
if __name__ == "__main__":
...
local_cluster_setup()
...
Note that this capsulation is **only** required inside Python scripts. Launching
`local_cluster_setup` in the (i)Python shell or inside a Jupyter notebook
does not suffer from this problem.
A more in-depth technical discussion of this limitation can be found in
`Dask's GitHub issue tracker <https://github.com/dask/distributed/issues/2520>`_.
Examples
--------
The following command launches a local distributed computing cluster using
all CPU cores available on the host machine
>>> client = local_cluster_setup()
The underlying distributed computing cluster can be accessed using
>>> client.cluster
See also
--------
distributed.LocalCluster : create local worker cluster
esi_cluster_setup : start a SLURM worker cluster on the ESI HPC infrastructure
bic_cluster_setup : start a SLURM worker cluster on the CoBIC HPC infrastructure
cluster_cleanup : remove dangling parallel processing worker-clusters
"""
# For later reference: dynamically fetch name of current function
funcName = f"<{inspect.currentframe().f_code.co_name}>" # type: ignore
# Determine if cluster allocation is happening interactively
if not isinstance(interactive, bool):
msg = "`interactive` has to be Boolean, not %s"
log.error(msg, str(type(interactive)))
raise TypeError("%s %s"%(funcName, msg%(str(type(interactive)))))
log.debug("Using `interactive = %s`", str(interactive))
if not is_jupyter():
msg = """\
If you use a script to start a local parallel computing client, please ensure
the call to `local_cluster_setup` is wrapped inside a main module block, i.e.,
if __name__ == "__main__":
...
local_cluster_setup()
...
Otherwise, a RuntimeError is raised due to an infinite recursion triggered by
new processes being started before the calling process can finish its bootstrapping
phase.
"""
msg = textwrap.dedent(msg)
log.debug(msg)
# Additional safe-guard: if a script is executed, double-check with the user
# for proper main idiom usage
if interactive: # pragma: no cover
msg = f"{funcName} If launched from a script, did you wrap your code " +\
"inside a __main__ module block?"
if not user_yesno(msg, default="no"):
return None
# Start the actual distributed client
if n_workers is not None or mem_per_worker is not None:
msg = "Starting `LocalCluster` with `n_workers = %s` and `memory_limit = %s`"
log.debug(msg, str(n_workers), str(mem_per_worker))
cluster = LocalCluster(n_workers=n_workers, memory_limit=mem_per_worker)
client = Client(cluster)
else:
client = Client()
msg = "Local parallel computing client ready, dashboard accessible at %s"
log.info(msg, client.cluster.dashboard_link)
return client
[docs]
def cluster_cleanup(client: Optional[Client] = None) -> None:
"""
Stop and close dangling parallel processing workers
Parameters
----------
client : dask distributed computing client or None
Either a :class:`distributed.Client` or `None`. If `None`, a
global client is queried for and shut-down if found (without confirmation!).
Returns
-------
Nothing : None
See also
--------
esi_cluster_setup : Launch SLURM workers on the ESI compute cluster
bic_cluster_setup : Launch SLURM workers on the CoBIC compute cluster
slurm_cluster_setup : start a distributed Dask cluster of parallel processing workers using SLURM
local_cluster_setup : start a local Dask multi-processing cluster on the host machine
"""
# For later reference: dynamically fetch name of current function
funcName = f"<{inspect.currentframe().f_code.co_name}>" # type: ignore
# Attempt to establish connection to dask client
if client is None:
try:
client = get_client()
except ValueError:
log.warning("No dangling clients or clusters found.")
return
except Exception as exc: # pragma: no cover
log.error("Error looking for dask client")
raise exc
else:
if not isinstance(client, Client):
msg = "`client` has to be a dask client object, not %s"
log.error(msg, str(type(client)))
raise TypeError("%s %s"%(funcName, msg%(str(type(client)))))
log.debug("Found client %s", str(client))
# Prepare message for prompt
if client.cluster.__class__.__name__ == "LocalCluster":
userClust = f"LocalCluster hosted on {client.scheduler_info()['address']}"
else:
userName = getpass.getuser()
outDir = client.cluster.job_header.partition("--output=")[-1]
jobID = outDir.partition(f"{userName}_")[-1].split(os.sep)[0]
userClust = f"cluster {userName}_{jobID}"
nWorkers = count_online_workers(client.cluster)
# First gracefully shut down all workers, then close client
client.retire_workers(list(client.scheduler_info()['workers']), close_workers=True)
client.close()
try:
client.cluster.close()
except Exception as exc: # pragma: no cover
log.warning("Could not gracefully shut down cluster: %s", str(exc))
# Communicate what just happened and get outta here
msg = "Successfully shut down %s containing %d workers"
log.info(msg, userClust, nWorkers)
return
def count_online_workers(cluster: SLURMCluster) -> int:
"""
Local replacement for the late `._count_active_workers` class method
"""
return len([w["memory_limit"] for w in cluster.scheduler_info["workers"].values()])
def _probe_existing_client(start_client : bool) -> Union[Client, SLURMCluster, LocalCluster, None]:
"""
Don't start a new cluster on top of an existing one
"""
try:
client = get_client()
log.debug("Found existing client")
if count_online_workers(client.cluster) == 0:
log.debug("No active workers detected in %s", str(client))
cluster_cleanup(client)
else:
log.info("Found existing parallel computing client %s. \
Not starting new cluster.", str(client))
if start_client:
return client
return client.cluster
except ValueError:
log.debug("No existing clients detected")
return None
def _probe_sinfo_or_start_local(interactive : bool) -> bool:
"""
Check if SLURM's `sinfo` can be accessed
"""
log.debug("Test if `sinfo` is available")
proc = subprocess.Popen("sinfo",
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, shell=True)
_, err = proc.communicate()
# Any non-zero return-code means SLURM is not ready to use
startLocal = False
if proc.returncode != 0:
# SLURM is not installed: either allocate `LocalCluster` or just leave
if proc.returncode > 0: # pragma: no cover
if interactive:
msg = f"SLURM does not seem to be installed on this machine " +\
f"({socket.gethostname()}). Do you want to start a local multi-processing " +\
"computing client instead? "
startLocal = user_yesno(msg, default="no")
else:
startLocal = True
if not startLocal:
msg = "Cannot access SLURM queuing system from node %s: %s "
log.error(msg, socket.gethostname(), err)
raise IOError("%s"%(msg%(socket.gethostname(), err)))
return startLocal
def _probe_auto_partition(
partition : str,
avail_partitions : List,
invalid_partitions : List,
mem_per_worker: str) -> Tuple[Union[str, None], Union[str, None]]:
"""
If partition is "auto" use `mem_per_worker` to pick pseudo-optimal partition
"""
# Note: the `np.where` gymnastic below is necessary since `argmin` refuses
# to return multiple matches; if `mem_per_worker` is 12, then ``memDiff = [4, 4, ...]``,
# however, 8GB won't fit a 12GB worker, so we have to pick the second match 16GB
if partition == "auto":
if not isinstance(mem_per_worker, str) or mem_per_worker.find("estimate_memuse:") < 0:
msg = "Cannot auto-select partition without first invoking memory estimation in `ParallelMap`. "
log.error(msg)
raise IOError(msg)
memEstimate = int(mem_per_worker.replace("estimate_memuse:", ""))
mem_per_worker = "auto"
log.info("Automatically selecting SLURM partition...")
gbQueues = np.unique([int(queue.split("GB")[0]) for queue in avail_partitions if queue[0].isdigit()])
memDiff = np.abs(gbQueues - memEstimate)
queueIdx = np.where(memDiff == memDiff.min())[0][-1]
auto_partition = f"{gbQueues[queueIdx]}GBS"
auto_memory = f"{memEstimate} GB"
else:
_parse_partition(partition, avail_partitions, invalid_partitions)
auto_partition = auto_memory = None # type: ignore
return auto_partition, auto_memory
def _parse_partition(
partition : str,
avail_partitions : List,
invalid_partitions: List = []) -> None:
"""
Ensure validity of partition
"""
if partition not in avail_partitions:
valid = list(set(avail_partitions).difference(invalid_partitions))
lgl = "'" + "or '".join(opt + "' " for opt in valid)
msg = "Invalid partition selection %s, available SLURM partitions are %s"
log.error(msg, str(partition), lgl)
raise ValueError(msg%(str(partition), lgl))
log.debug("Found `partition = %s`", partition)
return
def _probe_mem_spec(mem_per_worker : Union[str, None]) -> Union[str, None]:
"""
Returned `mem_per_worker` is either in MB or None
"""
if isinstance(mem_per_worker, str):
if mem_per_worker == "auto":
mem_per_worker = None # type: ignore
log.debug("Using auto-memory selection")
if mem_per_worker is not None:
msg = "`mem_per_worker` has to be a valid memory specifier (e.g., '8GB', '12000MB'), not %s"
if not isinstance(mem_per_worker, str):
log.error(msg, str(type(mem_per_worker)))
raise TypeError(msg%(str(type(mem_per_worker))))
if not any(szstr in mem_per_worker for szstr in ["MB", "GB"]):
log.error(msg, mem_per_worker)
raise ValueError(msg%(mem_per_worker))
memNumeric = mem_per_worker.replace("MB", "").replace("GB", "")
log.debug("Found `mem_per_worker = %s` in input args", mem_per_worker)
try:
memVal = float(memNumeric)
except:
log.error(msg, mem_per_worker)
raise ValueError(msg%(mem_per_worker))
if memVal <= 0:
log.error(msg, mem_per_worker)
raise ValueError(msg%(mem_per_worker))
if "MB" in mem_per_worker:
mbMem = int(memVal)
else:
mbMem = int(round(memVal * 1000))
mem_per_worker = f"{mbMem}MB"
log.debug("Using `mem_per_worker` = %d MB", mbMem)
return mem_per_worker
def _probe_job_extra(job_extra : List) -> None:
"""
Ensure job_extra is a list
"""
if not isinstance(job_extra, list):
msg = "`job_extra` has to be a list, not %s"
log.error(msg, str(type(job_extra)))
raise TypeError(msg%(str(type(job_extra))))
return
def _probe_scontrol(partition : str) -> int:
"""
(Attempt to) Infer default mem-to-cpu setting from partition
"""
try:
log.debug("Using `scontrol` to get partition info")
pc = subprocess.run(f"scontrol -o show partition {partition}",
capture_output=True, check=True, shell=True, text=True)
defMem = int(pc.stdout.strip().partition("DefMemPerCPU=")[-1].split()[0])
log.debug("Found DefMemPerCPU=%d", defMem)
qos = pc.stdout.strip().partition("QoS=")[-1].split()[0]
log.debug("Found QoS=%s", qos)
pc = subprocess.run(f"sacctmgr show qos name={qos} format=MaxTRES -P",
capture_output=True, check=True, shell=True, text=True)
partMem = pc.stdout.strip().partition("mem=")[-1]
log.debug("Found MaxTRES memory limit=%s", partMem)
except Exception as exc: # pragma: no cover
msg = "Cannot fetch available memory per CPU in SLURM: %s"
log.error(msg, str(exc))
raise IOError(msg%(str(exc)))
# Convert `partMem` to MB
if len(partMem) == 0:
partMem = -1
elif partMem.endswith("M"):
partMem = int(partMem.replace("M", ""))
elif partMem.endswith("G"):
partMem = int(float(partMem.replace("G", "")) * 1024)
elif partMem.endswith("T"):
partMem = int(float(partMem.replace("G", "")) * 1048576)
else:
msg = "Unrecognized QoS memory specification %s"
log.error(msg, partMem)
raise ValueError(msg%(partMem))
return max(1000, defMem), partMem