Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify device_memory_limit parsing and set default to 0.8 #439

Merged
15 changes: 8 additions & 7 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@
)
@click.option(
"--device-memory-limit",
default="auto",
help="Bytes of memory per CUDA device that the worker can use. "
"This can be an integer (bytes), "
"float (fraction of total device memory), "
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management "
"(i.e., allow full device memory usage).",
default="0.8",
help="Specifies the size of the CUDA device LRU cache, which "
"is used to determine when the worker starts spilling to host "
"memory. This can be a float (fraction of total device "
"memory), an integer (bytes), a string (like 5GB or 5000M), "
"and 'auto' or 0 to disable spilling to host (i.e., allow "
"full device memory usage). Default is 0.8, 80% of the "
"worker's total device memory.",
)
@click.option(
"--rmm-pool-size",
Expand Down
11 changes: 4 additions & 7 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
RMMSetup,
cuda_visible_devices,
get_cpu_affinity,
get_device_total_memory,
get_n_gpus,
get_ucx_config,
get_ucx_net_devices,
parse_device_memory_limit,
)


Expand Down Expand Up @@ -211,12 +211,9 @@ def del_pid_file():
data=(
DeviceHostFile,
{
"device_memory_limit": get_device_total_memory(index=i)
if (
device_memory_limit == "auto"
or device_memory_limit == int(0)
)
else parse_bytes(device_memory_limit),
"device_memory_limit": parse_device_memory_limit(
device_memory_limit, device_index=i
),
"memory_limit": memory_limit,
"local_directory": local_directory,
},
Expand Down
33 changes: 16 additions & 17 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
RMMSetup,
cuda_visible_devices,
get_cpu_affinity,
get_device_total_memory,
get_ucx_config,
get_ucx_net_devices,
parse_cuda_visible_device,
parse_device_memory_limit,
)


Expand All @@ -38,18 +38,22 @@ class LocalCUDACluster(LocalCluster):

Parameters
----------
CUDA_VISIBLE_DEVICES: str
String like ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to
different GPUs
CUDA_VISIBLE_DEVICES: str or list
String or list ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to
different GPUs.
device_memory_limit: int, float or str
Specifies the size of the CUDA device LRU cache, which is used to
determine when the worker starts spilling to host memory. This can be
a float (fraction of total device memory), an integer (bytes), a string
(like 5GB or 5000M), and "auto", 0 or None to disable spilling to
host (i.e., allow full device memory usage). Default is 0.8, 80% of the
worker's total device memory.
interface: str
The external interface used to connect to the scheduler, usually
an ethernet interface is used for connection, and not an InfiniBand
interface (if one is available).
threads_per_worker: int
Number of threads to be used for each CUDA worker process.
CUDA_VISIBLE_DEVICES: str or list
String or list ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to
different GPUs.
protocol: str
Protocol to use for communication, e.g., "tcp" or "ucx".
enable_tcp_over_ucx: bool
Expand Down Expand Up @@ -117,7 +121,7 @@ def __init__(
threads_per_worker=1,
processes=True,
memory_limit="auto",
device_memory_limit=None,
device_memory_limit=0.8,
CUDA_VISIBLE_DEVICES=None,
data=None,
local_directory=None,
Expand Down Expand Up @@ -147,7 +151,9 @@ def __init__(
self.host_memory_limit = parse_memory_limit(
memory_limit, threads_per_worker, n_workers
)
self.device_memory_limit = device_memory_limit
self.device_memory_limit = parse_device_memory_limit(
device_memory_limit, device_index=0
)

self.rmm_pool_size = rmm_pool_size
self.rmm_managed_memory = rmm_managed_memory
Expand Down Expand Up @@ -176,11 +182,6 @@ def __init__(
"Processes are necessary in order to use multiple GPUs with Dask"
)

if self.device_memory_limit is None:
self.device_memory_limit = get_device_total_memory(0)
elif isinstance(self.device_memory_limit, str):
self.device_memory_limit = parse_bytes(self.device_memory_limit)

if data is None:
data = (
DeviceHostFile,
Expand Down Expand Up @@ -265,9 +266,7 @@ def new_worker_spec(self):
visible_devices = cuda_visible_devices(worker_count, self.cuda_visible_devices)
spec["options"].update(
{
"env": {
"CUDA_VISIBLE_DEVICES": visible_devices,
},
"env": {"CUDA_VISIBLE_DEVICES": visible_devices,},
"plugins": {
CPUAffinity(get_cpu_affinity(worker_count)),
RMMSetup(self.rmm_pool_size, self.rmm_managed_memory),
Expand Down
40 changes: 20 additions & 20 deletions dask_cuda/tests/test_spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,23 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov
"params",
[
{
"device_memory_limit": 200e6,
"memory_limit": 800e6,
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": False,
},
{
"device_memory_limit": 200e6,
"memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": True,
},
{
"device_memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
Expand Down Expand Up @@ -163,23 +163,23 @@ def test_device_spill(client, scheduler, worker):
"params",
[
{
"device_memory_limit": 200e6,
"memory_limit": 800e6,
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": False,
},
{
"device_memory_limit": 200e6,
"memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": True,
},
{
"device_memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
Expand Down Expand Up @@ -241,23 +241,23 @@ async def test_cupy_cluster_device_spill(params):
"params",
[
{
"device_memory_limit": 200e6,
"memory_limit": 800e6,
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": False,
},
{
"device_memory_limit": 200e6,
"memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": True,
},
{
"device_memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
Expand Down Expand Up @@ -330,23 +330,23 @@ def test_device_spill(client, scheduler, worker):
"params",
[
{
"device_memory_limit": 200e6,
"memory_limit": 800e6,
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": False,
},
{
"device_memory_limit": 200e6,
"memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": True,
},
{
"device_memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
Expand Down
13 changes: 13 additions & 0 deletions dask_cuda/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
get_ucx_config,
get_ucx_net_devices,
parse_cuda_visible_device,
parse_device_memory_limit,
unpack_bitmask,
)

Expand Down Expand Up @@ -219,3 +220,15 @@ def test_parse_visible_devices():
with pytest.raises(TypeError):
parse_cuda_visible_device(None)
parse_cuda_visible_device([])


def test_parse_device_memory_limit():
total = get_device_total_memory(0)

assert parse_device_memory_limit(None) == total
assert parse_device_memory_limit(0) == total
assert parse_device_memory_limit("auto") == total

assert parse_device_memory_limit(0.8) == int(total * 0.8)
assert parse_device_memory_limit(1000000000) == 1000000000
assert parse_device_memory_limit("1GB") == 1000000000
43 changes: 42 additions & 1 deletion dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
import os
import time
import warnings
from contextlib import suppress
from multiprocessing import cpu_count

import numpy as np
import pynvml
import toolz

from dask.distributed import wait
from distributed import wait
from distributed.utils import parse_bytes

try:
from nvtx import annotate as nvtx_annotate
Expand Down Expand Up @@ -442,3 +444,42 @@ def cuda_visible_devices(i, visible=None):

L = visible[i:] + visible[:i]
return ",".join(map(str, L))


def parse_device_memory_limit(device_memory_limit, device_index=0):
"""Parse memory limit to be used by a CUDA device.


Parameters
----------
device_memory_limit: float, int, str or None
This can be a float (fraction of total device memory), an integer (bytes),
a string (like 5GB or 5000M), and "auto", 0 or None for the total device
size.
device_index: int
The index of device from which to obtain the total memory amount.

Examples
--------
>>> # On a 32GB CUDA device
>>> parse_device_memory_limit(None)
34089730048
>>> parse_device_memory_limit(0.8)
27271784038
>>> parse_device_memory_limit(1000000000)
1000000000
>>> parse_device_memory_limit("1GB")
1000000000
"""
if any(device_memory_limit == v for v in [0, "0", None, "auto"]):
return get_device_total_memory(device_index)

with suppress(ValueError, TypeError):
device_memory_limit = float(device_memory_limit)
if isinstance(device_memory_limit, float) and device_memory_limit <= 1:
return int(get_device_total_memory(device_index) * device_memory_limit)

if isinstance(device_memory_limit, str):
return parse_bytes(device_memory_limit)
else:
return int(device_memory_limit)
2 changes: 2 additions & 0 deletions docs/source/specializations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Spilling From Device

For applications that do not fit in GPU memory, Dask-CUDA supports spilling from device memory to host memory when the GPU can't fit more data. The spilling mechanism is automatically triggered once the user-defined limit is reached, such limit can be set via the ``--device-memory-limit`` and ``device_memory_limit`` arguments for ``dask-cuda-worker`` and ``LocalCUDACluster``, respectively.

Previously, spilling was disabled by default, but since Dask-CUDA 0.17 the default has been changed to ``0.8`` -- spilling will begin when Dask-CUDA device memory utilization reaches 80% of the device's total memory. The old behavior can still be set with ``--device-memory-limit=0`` or ``device_memory_limit=0``.
pentschev marked this conversation as resolved.
Show resolved Hide resolved

CPU Affinity
------------

Expand Down