#
# Helper routines for working w/dask
#
# Copyright © 2023 Ernst Strüngmann Institute (ESI) for Neuroscience
# in Cooperation with Max Planck Society
#
# SPDX-License-Identifier: BSD-3-Clause
#
# Builtin/3rd party package imports
import os
import sys
import socket
import platform
import subprocess
import getpass
import time
import inspect
import textwrap
import numpy as np
from tqdm import tqdm
if sys.platform == "win32": # pragma: no cover
# tqdm breaks term colors on Windows - fix that (tqdm issue #446)
import colorama
colorama.deinit()
colorama.init(strip=False)
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, get_client, LocalCluster
from datetime import datetime, timedelta
from typing import List, Optional, Any, Union
# Local imports
from acme import __deprecated__, __deprecation_wrng__
from .shared import user_input, user_yesno, is_jupyter
from .spy_interface import scalar_parser, log
__all__: List["str"] = ["esi_cluster_setup", "local_cluster_setup", "cluster_cleanup", "slurm_cluster_setup"]
# Setup SLURM workers on the ESI HPC cluster
[docs]def esi_cluster_setup(
partition: str,
n_workers: int = 2,
mem_per_worker: str = "auto",
cores_per_worker: Optional[int] = None,
n_workers_startup: int = 1,
timeout: int = 60,
interactive: bool = True,
interactive_wait: int = 120,
start_client: bool = True,
job_extra: List = [],
**kwargs: Optional[Any]) -> Union[Client, SLURMCluster, LocalCluster]:
"""
Start a Dask distributed SLURM worker cluster on the ESI HPC infrastructure
(or local multi-processing)
Parameters
----------
partition : str
Name of SLURM partition/queue to start workers in. Use the command `sinfo`
in the terminal to see a list of available SLURM partitions on the ESI HPC
cluster.
n_workers : int
Number of SLURM workers (=jobs) to spawn
mem_per_worker : str
Memory booking for each worker. Can be specified either in megabytes
(e.g., ``mem_per_worker = 1500MB``) or gigabytes (e.g., ``mem_per_worker = "2GB"``).
If `mem_per_worker` is `"auto"` it is attempted to infer a sane default value
from the chosen partition, e.g., for ``partition = "8GBS"`` `mem_per_worker` is
automatically set to the allowed maximum of `'8GB'`. On the IBM POWER
partition "E880", `mem_per_worker` is set to 16 GB if not provided.
Note, even in queues with guaranteed memory bookings, it is possible to allocate less
memory than the allowed maximum per worker to spawn numerous low-memory
workers. See Examples for details.
cores_per_worker : None or int
Number of CPU cores allocated for each worker. If `None`, core-count
is set based on partition settings (`DefMemPerCPU`) with respect to
CPU architecture (minimum 1 on x86_64, and 4 on IBM POWER).
n_workers_startup : int
Number of spawned workers to wait for. If `n_workers_startup` is `1` (default),
the code does not proceed until either 1 SLURM job is running or the
`timeout` interval has been exceeded.
timeout : int
Number of seconds to wait for requested workers to start (see `n_workers_startup`).
interactive : bool
If `True`, user input is queried in case not enough workers (set by `n_workers_startup`)
could be started in the provided waiting period (determined by `timeout`).
The code waits `interactive_wait` seconds for a user choice - if none is
provided, it continues with the current number of running workers (if greater
than zero). If `interactive` is `False` and no worker could not be started
within `timeout` seconds, a `TimeoutError` is raised.
interactive_wait : int
Countdown interval (seconds) to wait for a user response in case fewer than
`n_workers_startup` workers could be started. If no choice is provided within
the given time, the code automatically proceeds with the current number of
active dask workers.
start_client : bool
If `True`, a distributed computing client is launched and attached to
the dask worker cluster. If `start_client` is `False`, only a distributed
computing cluster is started to which compute-clients can connect.
job_extra : list
Extra sbatch parameters to pass to SLURMCluster.
**kwargs : dict
Additional keyword arguments can be used to control job-submission details.
Returns
-------
proc : object
A distributed computing client (if ``start_client = True``) or
a distributed computing cluster (otherwise).
Examples
--------
The following command launches 10 SLURM workers with 2 gigabytes memory each
in the `8GBS` partition
>>> client = esi_cluster_setup(n_workers=10, partition="8GBS", mem_per_worker="2GB")
Use default settings to start 2 SLURM workers in the IBM POWER E880 partition
(allocating 4 cores and 16 GB memory per worker)
>>> client = esi_cluster_setup(partition="E880")
The underlying distributed computing cluster can be accessed using
>>> client.cluster
Notes
-----
The employed parallel computing engine relies on the concurrent processing library
`Dask <https://docs.dask.org/en/latest/>`_. Thus, the distributed computing
clients generated here are in fact instances of :class:`distributed.Client`.
This function specifically acts as a wrapper for :class:`dask_jobqueue.SLURMCluster`.
Users familiar with Dask in general and its distributed scheduler and cluster
objects in particular, may leverage Dask's entire API to fine-tune parallel
processing jobs to their liking (if wanted).
See also
--------
dask_jobqueue.SLURMCluster : launch a dask cluster of SLURM workers
slurm_cluster_setup : start a distributed Dask cluster of parallel processing workers using SLURM
local_cluster_setup : start a local Dask multi-processing cluster on the host machine
cluster_cleanup : remove dangling parallel processing worker-clusters
"""
# For later reference: dynamically fetch name of current function
funcName = "<{}>".format(inspect.currentframe().f_code.co_name) # type: ignore
# Backwards compatibility: legacy keywords are converted to new nomenclature
if any(kw in kwargs for kw in __deprecated__):
log.warning(__deprecation_wrng__)
n_workers = kwargs.pop("n_jobs", n_workers) # type: ignore
mem_per_worker = kwargs.pop("mem_per_job", mem_per_worker)
n_workers_startup = kwargs.pop("n_jobs_startup", n_workers_startup) # type: ignore
log.debug("Set `n_workers = n_jobs`, `mem_per_worker = mem_per_job`\
and `n_workers_startup = n_jobs_startup`")
# Don't start a new cluster on top of an existing one
try:
client = get_client()
log.debug("Found existing client")
if count_online_workers(client.cluster) == 0:
log.debug("No active workers detected in %s", str(client))
cluster_cleanup(client)
else:
log.info("Found existing parallel computing client %s. \
Not starting new cluster.", str(client))
if start_client:
return client
return client.cluster
except ValueError:
log.debug("No existing clients detected")
# Check if SLURM's `sinfo` can be accessed
log.debug("Test if `sinfo` is available")
proc = subprocess.Popen("sinfo",
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, shell=True)
_, err = proc.communicate()
# Any non-zero return-code means SLURM is not ready to use
if proc.returncode != 0:
# SLURM is not installed: either allocate `LocalCluster` or just leave
if proc.returncode > 0:
if interactive:
msg = "{name:s} SLURM does not seem to be installed on this machine " +\
"({host:s}). Do you want to start a local multi-processing " +\
"computing client instead? "
startLocal = user_yesno(msg.format(name=funcName, host=socket.gethostname()),
default="no")
else:
startLocal = True
if startLocal:
return local_cluster_setup(interactive=interactive, start_client=start_client)
# SLURM is installed, but something's wrong
msg = "Cannot access SLURM queuing system from node %s: %s "
log.error(msg, socket.gethostname(), err)
raise IOError("%s %s"%(funcName, msg%(socket.gethostname(), err)))
# Use default by-worker process count or extract it from anonymous keyword args (if provided)
processes_per_worker = kwargs.pop("processes_per_worker", 1)
log.debug("Found `sinfo`, set `processes_per_worker` to %d", processes_per_worker)
# Get micro-architecture of submitting host
mArch = platform.machine()
# If partition is "auto" use `mem_per_worker` to pick pseudo-optimal partition
# Note: the `np.where` gymnastic below is necessary since `argmin` refuses
# to return multiple matches; if `mem_per_worker` is 12, then ``memDiff = [4, 4, ...]``,
# however, 8GB won't fit a 12GB worker, so we have to pick the second match 16GB
if isinstance(partition, str):
if partition == "auto":
if not isinstance(mem_per_worker, str) or mem_per_worker.find("estimate_memuse:") < 0:
msg = "Cannot auto-select partition without first invoking memory estimation in `ParallelMap`. "
log.error(msg)
raise IOError("%s %s"%(funcName, msg))
memEstimate = int(mem_per_worker.replace("estimate_memuse:", ""))
mem_per_worker = "auto"
log.info("Automatically selecting SLURM partition...")
availPartitions = _get_slurm_partitions()
if mArch == "x86_64":
gbQueues = np.unique([int(queue.split("GB")[0]) for queue in availPartitions if queue[0].isdigit()])
memDiff = np.abs(gbQueues - memEstimate)
queueIdx = np.where(memDiff == memDiff.min())[0][-1]
partition = "{}GBXS".format(gbQueues[queueIdx])
else:
partition = "E880"
mem_per_worker = f"{memEstimate} GB"
msg = "Picked partition %s based on estimated memory consumption of %d GB"
log.info(msg, partition, memEstimate)
else:
if (partition == "E880" and mArch == "x86_64") or \
(mArch == "ppc64le" and partition != "E880"):
otherArch = list(set(["x86_64", "ppc64le"]).difference([mArch]))[0]
msg = "Cannot start SLURM workers in partition %s with " +\
"architecture %s from submitting host with architecture %s. " +\
"Start x86_64 workers from esi-svhpc{1,2,3} and POWER workers from the hub."
raise ValueError(msg%(partition, otherArch, mArch))
# Convert "auto" memory selection query to `None` for easier handling below`
if isinstance(mem_per_worker, str) and mem_per_worker == "auto":
mem_per_worker = None # type: ignore
log.debug("Using auto-memory selection")
# If not explicitly provided, extract by-worker CPU core count from
# partition via `DefMeMPerCPU` and `mem_per_worker`
if cores_per_worker is None:
try:
log.debug("Using `scontrol` to get partition info")
pc = subprocess.run("scontrol -o show partition {}".format(partition),
capture_output=True, check=True, shell=True, text=True)
defMem = int(pc.stdout.strip().partition("DefMemPerCPU=")[-1].split()[0])
log.debug("Found DefMemPerCPU=%d", defMem)
except Exception as exc:
msg = "Cannot fetch available memory per CPU in SLURM: %s"
log.error(msg, str(exc))
raise IOError("%s %s"%(funcName, msg%(str(exc))))
# Use ESI-specific x86_64 partition layout (8GB(S/X/L), 16GB(S/X/L), ... )
# to get infer memory and core count; use "sane" (fat quotes)
# defaults on IBM POWER (if nothing was provided, use 4 cores/16 GB per worker)
if mArch == "x86_64":
memPerCore = 8000
else:
if mem_per_worker is not None:
try:
memVal = float(mem_per_worker.replace("MB", "").replace("GB", ""))
except:
raise ValueError("Invalid value of `mem_per_worker`: %s"%mem_per_worker)
if memVal <= 0:
raise ValueError("Value of `mem_per_worker` has to be > 0!")
if "MB" in mem_per_worker:
defMem = int(memVal)
else:
defMem = int(round(memVal * 1000))
log.debug("Found `mem_per_worker` set to %d MB", defMem)
else:
defMem = 16000 # default to 4 cores per worker if no mem req was specified
mem_per_worker = f"{defMem} MB"
log.debug("No `mem_per_worker` specified, using default of %d MB", defMem)
memPerCore = 4000
# Set core-count per worker (applies to both x86_64 and ppc64le)
cores_per_worker = max(1, int(defMem / memPerCore))
log.debug("Derived core-count from partition: `cores_per_worker=%d`", cores_per_worker)
# Determine if `job_extra`` is a list (this is also checked in `slurm_cluster_setup`,
# but we may need to append to it, so ensure that's possible)
if not isinstance(job_extra, list):
msg = "`job_extra` has to be a list, not %s"
log.error(msg, str(type(job_extra)))
raise TypeError("%s %s"%(funcName, msg%(str(type(job_extra)))))
# If '--output' was not provided, append default output folder to `job_extra`
if not any(option.startswith("--output") or option.startswith("-o") for option in job_extra):
log.debug("Auto-populating `--output` setting for sbatch")
usr = getpass.getuser()
slurm_wdir = "/cs/slurm/{usr:s}/{usr:s}_{date:s}"
slurm_wdir = slurm_wdir.format(usr=usr,
date=datetime.now().strftime('%Y%m%d-%H%M%S'))
os.makedirs(slurm_wdir, exist_ok=True)
log.debug("Using %s for slurm logs", slurm_wdir)
out_files = os.path.join(slurm_wdir, "slurm-%j.out")
job_extra.append("--output={}".format(out_files))
log.debug("Setting `--output=%s`", out_files)
# Let the SLURM-specific setup function do the rest (returns client or cluster)
return slurm_cluster_setup(partition, cores_per_worker, n_workers, # type: ignore
processes_per_worker, mem_per_worker,
n_workers_startup, timeout, interactive,
interactive_wait, start_client, job_extra,
invalid_partitions=["DEV", "ESI"], **kwargs)
# Setup SLURM cluster
[docs]def slurm_cluster_setup(
partition: str = "partition_name",
n_cores: int = 1,
n_workers: int = 1,
processes_per_worker: int = 1,
mem_per_worker: str = "1GB",
n_workers_startup: int = 1,
timeout: int = 60,
interactive: bool = True,
interactive_wait: int = 10,
start_client: bool = True,
job_extra: List = [],
invalid_partitions: List = [],
**kwargs: Optional[Any]) -> Union[Client, SLURMCluster, None]:
"""
Start a distributed Dask cluster of parallel processing workers using SLURM
**NOTE** If you are working on the ESI HPC cluster, please use
:func:`~acme.esi_cluster_setup` instead!
Parameters
----------
partition : str
Name of SLURM partition/queue to use
n_cores : int
Number of CPU cores per SLURM worker
n_workers : int
Number of SLURM workers (=jobs) to spawn
processes_per_worker : int
Number of processes to use per SLURM job (=worker). Should be greater
than one only if the chosen partition contains nodes that expose multiple
cores per job.
mem_per_worker : str
Memory allocation for each worker
n_workers_startup : int
Number of spawned SLURM workers to wait for. The code does not return until either
`n_workers_startup` SLURM jobs are running or the `timeout` interval (see
below) has been exceeded.
timeout : int
Number of seconds to wait for requested workers to start (see `n_workers_startup`).
interactive : bool
If `True`, user input is queried in case not enough workers (set by `n_workers_startup`)
could be started in the provided waiting period (determined by `timeout`).
The code waits `interactive_wait` seconds for a user choice - if none is
provided, it continues with the current number of running workers (if greater
than zero). If `interactive` is `False` and no worker could not be started
within `timeout` seconds, a `TimeoutError` is raised.
interactive_wait : int
Countdown interval (seconds) to wait for a user response in case fewer than
`n_workers_startup` workers could be started. If no choice is provided within
the given time, the code automatically proceeds with the current number of
active dask workers.
start_client : bool
If `True`, a distributed computing client is launched and attached to
the dask worker cluster. If `start_client` is `False`, only a distributed
computing cluster is started to which compute-clients can connect.
job_extra : list
Extra sbatch parameters to pass to SLURMCluster.
invalid_partition : list
List of partition names (strings) that are not available for launching
dask workers.
Returns
-------
proc : object or None
A distributed computing client (if ``start_client = True``) or
a distributed computing cluster (otherwise). If no SLURM workers
can be started within the given timeout interval, `proc` is set
to `None`.
See also
--------
dask_jobqueue.SLURMCluster : launch a dask cluster of SLURM workers
esi_cluster_setup : start a SLURM worker cluster on the ESI HPC infrastructure
local_cluster_setup : start a local Dask multi-processing cluster on the host machine
cluster_cleanup : remove dangling parallel processing worker-clusters
"""
# For later reference: dynamically fetch name of current function
funcName = "<{}>".format(inspect.currentframe().f_code.co_name) # type: ignore
# Backwards compatibility: legacy keywords are converted to new nomenclature
if any(kw in kwargs for kw in __deprecated__):
log.warning(__deprecation_wrng__)
n_workers = kwargs.pop("n_jobs", n_workers) # type: ignore
processes_per_worker = kwargs.pop("workers_per_job", processes_per_worker) # type: ignore
mem_per_worker = kwargs.pop("mem_per_job", mem_per_worker) # type: ignore
n_workers_startup = kwargs.pop("n_jobs_startup", n_workers_startup) # type: ignore
log.debug("Set `n_workers = n_jobs`, `processes_per_worker = workers_per_job`, \
`mem_per_worker = mem_per_job` \
and `n_workers_startup = n_jobs_startup`")
# Retrieve all partitions currently available in SLURM
availPartitions = _get_slurm_partitions()
# Make sure we're in a valid partition
if partition not in availPartitions:
valid = list(set(availPartitions).difference(invalid_partitions))
lgl = "'" + "or '".join(opt + "' " for opt in valid)
msg = "Invalid partition selection %s, available SLURM partitions are %s"
log.error(msg, str(partition), lgl)
raise ValueError("%s %s"%(funcName, msg%(str(partition), lgl)))
log.debug("Found `partition = %s`", partition)
# Parse worker count
try:
scalar_parser(n_workers, varname="n_workers", ntype="int_like", lims=[1, np.inf])
except Exception as exc:
log.error("Error parsing `n_workers`")
raise exc
log.debug("Using `n_workers = %d`", n_workers)
# Get requested memory per worker
if isinstance(mem_per_worker, str):
if mem_per_worker == "auto":
mem_per_worker = None # type: ignore
log.debug("Using auto-memory selection")
if mem_per_worker is not None:
msg = "`mem_per_worker` has to be a valid memory specifier (e.g., '8GB', '12000MB'), not %s"
if not isinstance(mem_per_worker, str):
log.error(msg, str(type(mem_per_worker)))
raise TypeError("%s %s"%(funcName, msg%(str(type(mem_per_worker)))))
if not any(szstr in mem_per_worker for szstr in ["MB", "GB"]):
log.error(msg, mem_per_worker)
raise ValueError("%s %s"%(funcName, msg%(mem_per_worker)))
memNumeric = mem_per_worker.replace("MB", "").replace("GB", "")
log.debug("Found `mem_per_worker = %s` in input args", mem_per_worker)
try:
memVal = float(memNumeric)
except:
log.error(msg, mem_per_worker)
raise ValueError("%s %s"%(funcName, msg%(mem_per_worker)))
if memVal <= 0:
log.error(msg, mem_per_worker)
raise ValueError("%s %s"%(funcName, msg%(mem_per_worker)))
# Parse worker-waiter count
try:
scalar_parser(n_workers_startup, varname="n_workers_startup", ntype="int_like", lims=[0, np.inf])
except Exception as exc:
log.error("Error parsing `n_workers_startup`")
raise exc
log.debug("Using `n_workers_startup = %d`", n_workers_startup)
# Get memory limit (*in MB*) of chosen partition (guaranteed to exist, cf. above)
log.debug("Use `scontrol` to fetch partition's memory limit")
pc = subprocess.run("scontrol -o show partition {}".format(partition),
capture_output=True, check=True, shell=True, text=True)
try:
mem_lim = int(pc.stdout.strip().partition("MaxMemPerCPU=")[-1].split()[0])
except IndexError:
mem_lim = np.inf # type: ignore
log.debug("Found a limit of %s MB", str(mem_lim))
# Consolidate requested memory with chosen partition (or assign default memory)
if mem_per_worker is None:
if np.isinf(mem_lim):
try:
mem_per_worker = pc.stdout.strip().partition("DefMemPerCPU=")[-1].split()[0] + "MB"
except IndexError:
raise ValueError("Cannot infer any default memory setting from partition %s"%partition)
else:
mem_per_worker = str(mem_lim) + "MB"
log.debug("Using partition limit of %s MB", str(mem_lim))
else:
if "MB" in mem_per_worker:
mem_req = int(memVal)
else:
mem_req = int(round(memVal * 1000))
if mem_req > mem_lim:
msg = "`mem_per_worker` exceeds limit of %d MB for partition %s. " +\
"Capping memory at partition limit. "
log.warning(msg, mem_lim, partition)
mem_per_worker = str(int(mem_lim)) + "MB"
# Parse requested timeout period
try:
scalar_parser(timeout, varname="timeout", ntype="int_like", lims=[1, np.inf])
except Exception as exc:
log.error("Error parsing `timeout`")
raise exc
log.debug("Using `timeout = %d`", timeout)
# Parse requested interactive waiting period
try:
scalar_parser(interactive_wait, varname="interactive_wait", ntype="int_like",
lims=[0, np.inf])
except Exception as exc:
log.error("Error parsing `interactive_wait`")
raise exc
log.debug("Using `interactive_wait = %d`", interactive_wait)
# Determine if cluster allocation is happening interactively
if not isinstance(interactive, bool):
msg = "`interactive` has to be Boolean, not %s"
log.error(msg, str(type(interactive)))
raise TypeError("%s %s"%(funcName, msg%(str(type(interactive)))))
log.debug("Using `interactive = %s`", str(interactive))
# Determine if a dask client was requested
if not isinstance(start_client, bool):
msg = "`start_client` has to be Boolean, not %s"
log.error(msg, str(type(start_client)))
raise TypeError("%s %s"%(funcName, msg%(str(type(start_client)))))
log.debug("Using `start_client = %s`", str(start_client))
# Determine if job_extra is a list
if not isinstance(job_extra, list):
msg = "`job_extra` has to be List, not %s"
log.error(msg, str(type(job_extra)))
raise TypeError("%s %s"%(funcName, msg%(str(type(job_extra)))))
# Determine if job_extra options are valid
for option in job_extra:
msg = "`job_extra` has to be a valid sbatch option, not %s"
if not isinstance(option, str):
log.error(msg, str(type(option)))
raise TypeError("%s %s"%(funcName, msg%(str(type(option)))))
if not option[0] == "-":
msg = "`job_extra` options must be flagged with - or --"
log.error(msg)
raise ValueError("%s %s"%(funcName, msg))
log.debug("Using `job_extra = %s`", str(job_extra))
# Ensure validity of requested worker processes
try:
scalar_parser(processes_per_worker, varname="processes_per_worker",
ntype="int_like", lims=[1, 16])
except Exception as exc:
log.error("Error parsing `processes_per_worker`")
raise exc
log.debug("Using `processes_per_worker = %d`", processes_per_worker)
# Check for sanity of requested core count
try:
scalar_parser(n_cores, varname="n_cores",
ntype="int_like", lims=[1, np.inf])
except Exception as exc:
log.error("Error parsing `n_cores`")
raise exc
log.debug("Using `n_cores = %d`", n_cores)
# Check validity of '--output' option if provided
userOutSpec = [option.startswith("--output") or option.startswith("-o") for option in job_extra]
if any(userOutSpec):
userOut = job_extra[userOutSpec.index(True)]
outSpec = userOut.split("=")
if len(outSpec) != 2:
msg = "SLURM output directory must be specified using -o/--output=/path/to/file, not %s"
log.error(msg, userOut)
raise ValueError("%s %s"%(funcName, msg%(userOut)))
slurm_wdir = os.path.split(outSpec[1])[0]
if len(slurm_wdir) > 0 and not os.path.isdir(os.path.expanduser(slurm_wdir)):
msg = "SLURM output location has to be an existing directory, not %s"
log.error(msg, slurm_wdir)
raise ValueError("%s %s"%(funcName, msg%(slurm_wdir)))
else:
slurm_wdir = None
log.debug("Using `local_directory = %s`", slurm_wdir)
# Create `SLURMCluster` object using provided parameters
log.debug("Instantiating `SLURMCluster` object")
cluster = SLURMCluster(cores=n_cores,
memory=mem_per_worker,
processes=processes_per_worker,
local_directory=slurm_wdir,
queue=partition,
python=sys.executable,
job_directives_skip=["-t", "--mem"],
job_extra_directives=job_extra)
# interface="asdf", # interface is set via `psutil.net_if_addrs()`
# Compute total no. of workers and up-scale cluster accordingly
if n_workers_startup < n_workers:
msg = "Requested worker-count %d exceeds `n_workers_startup = %d`, " +\
"waiting for %d workers to come online"
log.debug(msg, n_workers, n_workers_startup, n_workers_startup)
cluster.scale(n_workers)
# Fire up waiting routine to avoid returning an undercooked cluster
if _cluster_waiter(cluster, funcName, n_workers, timeout, interactive, interactive_wait):
return None
# Kill a zombie cluster in non-interactive mode
if not interactive and count_online_workers(cluster) == 0:
cluster_cleanup(Client(cluster))
msg = "SLURM workers could not be started within given time-out " +\
"interval of %d seconds"
log.error(msg, timeout)
raise TimeoutError("%s %s"%(funcName, msg%(timeout)))
# Highlight how to connect to dask performance monitor
msg = "Parallel computing client ready, dashboard accessible at %s"
log.info(msg, cluster.dashboard_link)
# If client was requested, return that instead of the created cluster
if start_client:
return Client(cluster)
return cluster
def _get_slurm_partitions() -> List:
"""
Local helper to fetch all partitions defined in SLURM
"""
# For later reference: dynamically fetch name of current function
funcName = "<{}>".format(inspect.currentframe().f_code.co_name) # type: ignore
# Retrieve all partitions currently available in SLURM
log.debug("Use `sinfo` to fetch available partitions")
proc = subprocess.Popen("sinfo -h -o %P",
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, shell=True)
out, err = proc.communicate()
# Any non-zero return-code means SLURM is not ready to use
if proc.returncode != 0:
msg = "Error fetching SLURM partition setup from node %s: %s"
log.error(msg, socket.gethostname(), err)
raise IOError("%s %s"%(funcName, msg%(socket.gethostname(), err)))
# Return formatted subprocess shell output
log.debug("Found partitions: %s", out)
return out.split()
def _cluster_waiter(
cluster: SLURMCluster,
funcName: str,
total_workers: int,
timeout: int,
interactive: bool,
interactive_wait: int) -> bool:
"""
Local helper that can be called recursively
"""
# Wait until all workers have been started successfully or we run out of time
wrkrs = count_online_workers(cluster)
to = str(timedelta(seconds=timeout))[2:]
fmt = "{desc}: {n}/{total} \t[elapsed time {elapsed} | timeout at " + to + "]"
ani = tqdm(desc="{} SLURM workers ready".format(funcName), total=total_workers,
leave=True, bar_format=fmt, initial=wrkrs, position=0)
counter = 0
while count_online_workers(cluster) < total_workers and counter < timeout:
time.sleep(1)
counter += 1
ani.update(max(0, count_online_workers(cluster) - wrkrs))
wrkrs = count_online_workers(cluster)
ani.refresh() # force refresh to display elapsed time every second
ani.close()
# If we ran out of time before all workers could be started, ask what to do
if counter == timeout and interactive: # pragma: no cover
msg = "SLURM workers could not be started within given time-out " +\
"interval of %d seconds"
log.info(msg, timeout)
query = "{name:s} Do you want to [k]eep waiting for 60s, [a]bort or " +\
"[c]ontinue with {wrk:d} workers?"
choice = user_input(query.format(name=funcName, wrk=wrkrs),
valid=["k", "a", "c"], default="c", timeout=interactive_wait)
if choice == "k":
return _cluster_waiter(cluster, funcName, total_workers, 60, True, 60)
elif choice == "a":
log.info("Closing cluster...")
cluster_cleanup(Client(cluster))
return True
else:
if wrkrs == 0:
query = "{} Cannot continue with 0 workers. Do you want to " +\
"[k]eep waiting for 60s or [a]bort?"
choice = user_input(query.format(funcName), valid=["k", "a"],
default="a", timeout=60)
if choice == "k":
_cluster_waiter(cluster, funcName, total_workers, 60, True, 60)
else:
log.info("Closing cluster...")
cluster_cleanup(Client(cluster))
return True
return False
[docs]def local_cluster_setup(
n_workers: Optional[int] = None,
mem_per_worker: Optional[str] = None,
interactive: bool = True,
start_client: bool = True) -> Union[Client, LocalCluster, None]:
"""
Start a local distributed Dask multi-processing cluster
Parameters
----------
n_workers : int
Number of local workers to start (this should align with the locally
available hardware, see :class:`distributed.LocalCluster` for details)
mem_per_worker : str
Memory cap for each local worker (corresponds to the `memory_limit`
keyword of a :class:`distributed.worker.Worker`)
interactive : bool
If `True`, a confirmation dialog is displayed to ensure proper encapsulation
of calls to `local_cluster_setup` inside a script's main module block.
See Notes for details. If `interactive` is `False`, the dialog is not shown.
start_client : bool
DEPRECATED. Will be removed in next release.
If `True`, a distributed computing client is launched and attached to
the workers. If `start_client` is `False`, only a distributed
computing cluster is started to which compute-clients can connect.
Returns
-------
proc : object
A distributed computing client (if ``start_client = True``) or
a distributed computing cluster (otherwise). If a client cannot
be started, `proc` is set to `None`.
Notes
-----
The way Python spawns new processes requires an explicit separation of initialization
code (i.e., code blocks that should only be executed once) from the actual
program code. Specifically, everything that is supposed to be invoked only
once by the parent spawner must be encapsulated in a script's main block.
Otherwise, initialization code is not only run once by the parent process but
executed by every child process at import time.
This means that starting a local multi-processing cluster *has* to be wrapped
inside a script's main module block, otherwise, every child process created by
the multi-processing cluster starts a multi-processing cluster itself and so
on escalating to an infinite recursion. Thus, if `local_cluster_setup` is
called inside a script, it has to be encapsulated in the script's main module
block, i.e.,
.. code-block:: python
if __name__ == "__main__":
...
local_cluster_setup()
...
Note that this capsulation is **only** required inside Python scripts. Launching
`local_cluster_setup` in the (i)Python shell or inside a Jupyter notebook
does not suffer from this problem.
A more in-depth technical discussion of this limitation can be found in
`Dask's GitHub issue tracker <https://github.com/dask/distributed/issues/2520>`_.
Examples
--------
The following command launches a local distributed computing cluster using
all CPU cores available on the host machine
>>> client = local_cluster_setup()
The underlying distributed computing cluster can be accessed using
>>> client.cluster
See also
--------
distributed.LocalCluster : create local worker cluster
esi_cluster_setup : Start a distributed Dask cluster using SLURM
cluster_cleanup : remove dangling parallel processing worker-clusters
"""
# For later reference: dynamically fetch name of current function
funcName = "<{}>".format(inspect.currentframe().f_code.co_name) # type: ignore
# Determine if cluster allocation is happening interactively
if not isinstance(interactive, bool):
msg = "`interactive` has to be Boolean, not %s"
log.error(msg, str(type(interactive)))
raise TypeError("%s %s"%(funcName, msg%(str(type(interactive)))))
log.debug("Using `interactive = %s`", str(interactive))
# Determine if a dask client was requested
if not isinstance(start_client, bool) or start_client is False:
log.warning("The keyword `start_client` in `local_cluster_setup` is DEPRECATED and will be ignored.")
start_client = True
# ...if not, print warning/info message
if not is_jupyter():
msg = """\
If you use a script to start a local parallel computing client, please ensure
the call to `local_cluster_setup` is wrapped inside a main module block, i.e.,
if __name__ == "__main__":
...
local_cluster_setup()
...
Otherwise, a RuntimeError is raised due to an infinite recursion triggered by
new processes being started before the calling process can finish its bootstrapping
phase.
"""
msg = textwrap.dedent(msg)
log.debug(msg)
# Additional safe-guard: if a script is executed, double-check with the user
# for proper main idiom usage
if interactive: # pragma: no cover
msg = "{name:s} If launched from a script, did you wrap your code " +\
"inside a __main__ module block?"
if not user_yesno(msg.format(name=funcName), default="no"):
return None
# Start the actual distributed client
if n_workers is not None or mem_per_worker is not None:
msg = "Starting `LocalCluster` with `n_workers = %s` and `memory_limit = %s`"
log.debug(msg, str(n_workers), str(mem_per_worker))
cluster = LocalCluster(n_workers=n_workers, memory_limit=mem_per_worker)
client = Client(cluster)
else:
client = Client()
msg = "Local parallel computing client ready, dashboard accessible at %s"
log.info(msg, client.cluster.dashboard_link)
if start_client:
return client
return client.cluster
[docs]def cluster_cleanup(client: Optional[Client] = None) -> None:
"""
Stop and close dangling parallel processing workers
Parameters
----------
client : dask distributed computing client or None
Either a :class:`distributed.Client` or `None`. If `None`, a
global client is queried for and shut-down if found (without confirmation!).
Returns
-------
Nothing : None
See also
--------
esi_cluster_setup : Launch SLURM workers on the ESI compute cluster
slurm_cluster_setup : start a distributed Dask cluster of parallel processing workers using SLURM
local_cluster_setup : start a local Dask multi-processing cluster on the host machine
"""
# For later reference: dynamically fetch name of current function
funcName = "<{}>".format(inspect.currentframe().f_code.co_name) # type: ignore
# Attempt to establish connection to dask client
if client is None:
try:
client = get_client()
except ValueError:
log.warning("No dangling clients or clusters found.")
return
except Exception as exc: # pragma: no cover
log.error("Error looking for dask client")
raise exc
else:
if not isinstance(client, Client):
msg = "`client` has to be a dask client object, not %s"
log.error(msg, str(type(client)))
raise TypeError("%s %s"%(funcName, msg%(str(type(client)))))
log.debug("Found client %s", str(client))
# Prepare message for prompt
if client.cluster.__class__.__name__ == "LocalCluster":
userClust = "LocalCluster hosted on {}".format(client.scheduler_info()["address"])
else:
userName = getpass.getuser()
outDir = client.cluster.job_header.partition("--output=")[-1]
jobID = outDir.partition("{}_".format(userName))[-1].split(os.sep)[0]
userClust = "cluster {0}_{1}".format(userName, jobID)
nWorkers = count_online_workers(client.cluster)
# First gracefully shut down all workers, then close client
client.retire_workers(list(client.scheduler_info()['workers']), close_workers=True)
client.close()
try:
client.cluster.close()
except Exception as exc:
log.warning("Could not gracefully shut down cluster: %s", str(exc))
# Communicate what just happened and get outta here
msg = "Successfully shut down %s containing %d workers"
log.info(msg, userClust, nWorkers)
return
def count_online_workers(cluster: SLURMCluster) -> int:
"""
Local replacement for the late `._count_active_workers` class method
"""
return len([w["memory_limit"] for w in cluster.scheduler_info["workers"].values()])