diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 92c7ede23..65cef580c 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -84,13 +84,14 @@ ) @click.option( "--device-memory-limit", - default="auto", - help="Bytes of memory per CUDA device that the worker can use. " - "This can be an integer (bytes), " - "float (fraction of total device memory), " - "string (like 5GB or 5000M), " - "'auto', or zero for no memory management " - "(i.e., allow full device memory usage).", + default="0.8", + help="Specifies the size of the CUDA device LRU cache, which " + "is used to determine when the worker starts spilling to host " + "memory. This can be a float (fraction of total device " + "memory), an integer (bytes), a string (like 5GB or 5000M), " + "and 'auto' or 0 to disable spilling to host (i.e., allow " + "full device memory usage). Default is 0.8, 80% of the " + "worker's total device memory.", ) @click.option( "--rmm-pool-size", diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 90cf99c17..39c464355 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -25,10 +25,10 @@ RMMSetup, cuda_visible_devices, get_cpu_affinity, - get_device_total_memory, get_n_gpus, get_ucx_config, get_ucx_net_devices, + parse_device_memory_limit, ) @@ -211,12 +211,9 @@ def del_pid_file(): data=( DeviceHostFile, { - "device_memory_limit": get_device_total_memory(index=i) - if ( - device_memory_limit == "auto" - or device_memory_limit == int(0) - ) - else parse_bytes(device_memory_limit), + "device_memory_limit": parse_device_memory_limit( + device_memory_limit, device_index=i + ), "memory_limit": memory_limit, "local_directory": local_directory, }, diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 60fbfb059..9acbc1035 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -14,10 +14,10 @@ RMMSetup, cuda_visible_devices, get_cpu_affinity, - get_device_total_memory, get_ucx_config, get_ucx_net_devices, parse_cuda_visible_device, + parse_device_memory_limit, ) @@ -38,18 +38,22 @@ class LocalCUDACluster(LocalCluster): Parameters ---------- - CUDA_VISIBLE_DEVICES: str - String like ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to - different GPUs + CUDA_VISIBLE_DEVICES: str or list + String or list ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to + different GPUs. + device_memory_limit: int, float or str + Specifies the size of the CUDA device LRU cache, which is used to + determine when the worker starts spilling to host memory. This can be + a float (fraction of total device memory), an integer (bytes), a string + (like 5GB or 5000M), and "auto", 0 or None to disable spilling to + host (i.e., allow full device memory usage). Default is 0.8, 80% of the + worker's total device memory. interface: str The external interface used to connect to the scheduler, usually an ethernet interface is used for connection, and not an InfiniBand interface (if one is available). threads_per_worker: int Number of threads to be used for each CUDA worker process. - CUDA_VISIBLE_DEVICES: str or list - String or list ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to - different GPUs. protocol: str Protocol to use for communication, e.g., "tcp" or "ucx". enable_tcp_over_ucx: bool @@ -117,7 +121,7 @@ def __init__( threads_per_worker=1, processes=True, memory_limit="auto", - device_memory_limit=None, + device_memory_limit=0.8, CUDA_VISIBLE_DEVICES=None, data=None, local_directory=None, @@ -147,7 +151,9 @@ def __init__( self.host_memory_limit = parse_memory_limit( memory_limit, threads_per_worker, n_workers ) - self.device_memory_limit = device_memory_limit + self.device_memory_limit = parse_device_memory_limit( + device_memory_limit, device_index=0 + ) self.rmm_pool_size = rmm_pool_size self.rmm_managed_memory = rmm_managed_memory @@ -176,11 +182,6 @@ def __init__( "Processes are necessary in order to use multiple GPUs with Dask" ) - if self.device_memory_limit is None: - self.device_memory_limit = get_device_total_memory(0) - elif isinstance(self.device_memory_limit, str): - self.device_memory_limit = parse_bytes(self.device_memory_limit) - if data is None: data = ( DeviceHostFile, @@ -265,9 +266,7 @@ def new_worker_spec(self): visible_devices = cuda_visible_devices(worker_count, self.cuda_visible_devices) spec["options"].update( { - "env": { - "CUDA_VISIBLE_DEVICES": visible_devices, - }, + "env": {"CUDA_VISIBLE_DEVICES": visible_devices,}, "plugins": { CPUAffinity(get_cpu_affinity(worker_count)), RMMSetup(self.rmm_pool_size, self.rmm_managed_memory), diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 0bef9e8d8..dea4b3d3d 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -85,23 +85,23 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov "params", [ { - "device_memory_limit": 200e6, - "memory_limit": 800e6, + "device_memory_limit": int(200e6), + "memory_limit": int(800e6), "host_target": 0.0, "host_spill": 0.0, "host_pause": None, "spills_to_disk": False, }, { - "device_memory_limit": 200e6, - "memory_limit": 200e6, + "device_memory_limit": int(200e6), + "memory_limit": int(200e6), "host_target": 0.0, "host_spill": 0.0, "host_pause": None, "spills_to_disk": True, }, { - "device_memory_limit": 200e6, + "device_memory_limit": int(200e6), "memory_limit": 0, "host_target": 0.0, "host_spill": 0.0, @@ -163,23 +163,23 @@ def test_device_spill(client, scheduler, worker): "params", [ { - "device_memory_limit": 200e6, - "memory_limit": 800e6, + "device_memory_limit": int(200e6), + "memory_limit": int(800e6), "host_target": 0.0, "host_spill": 0.0, "host_pause": None, "spills_to_disk": False, }, { - "device_memory_limit": 200e6, - "memory_limit": 200e6, + "device_memory_limit": int(200e6), + "memory_limit": int(200e6), "host_target": 0.0, "host_spill": 0.0, "host_pause": None, "spills_to_disk": True, }, { - "device_memory_limit": 200e6, + "device_memory_limit": int(200e6), "memory_limit": 0, "host_target": 0.0, "host_spill": 0.0, @@ -241,23 +241,23 @@ async def test_cupy_cluster_device_spill(params): "params", [ { - "device_memory_limit": 200e6, - "memory_limit": 800e6, + "device_memory_limit": int(200e6), + "memory_limit": int(800e6), "host_target": 0.0, "host_spill": 0.0, "host_pause": None, "spills_to_disk": False, }, { - "device_memory_limit": 200e6, - "memory_limit": 200e6, + "device_memory_limit": int(200e6), + "memory_limit": int(200e6), "host_target": 0.0, "host_spill": 0.0, "host_pause": None, "spills_to_disk": True, }, { - "device_memory_limit": 200e6, + "device_memory_limit": int(200e6), "memory_limit": 0, "host_target": 0.0, "host_spill": 0.0, @@ -330,23 +330,23 @@ def test_device_spill(client, scheduler, worker): "params", [ { - "device_memory_limit": 200e6, - "memory_limit": 800e6, + "device_memory_limit": int(200e6), + "memory_limit": int(800e6), "host_target": 0.0, "host_spill": 0.0, "host_pause": None, "spills_to_disk": False, }, { - "device_memory_limit": 200e6, - "memory_limit": 200e6, + "device_memory_limit": int(200e6), + "memory_limit": int(200e6), "host_target": 0.0, "host_spill": 0.0, "host_pause": None, "spills_to_disk": True, }, { - "device_memory_limit": 200e6, + "device_memory_limit": int(200e6), "memory_limit": 0, "host_target": 0.0, "host_spill": 0.0, diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index 3700bd127..b56e1b6c1 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -13,6 +13,7 @@ get_ucx_config, get_ucx_net_devices, parse_cuda_visible_device, + parse_device_memory_limit, unpack_bitmask, ) @@ -219,3 +220,15 @@ def test_parse_visible_devices(): with pytest.raises(TypeError): parse_cuda_visible_device(None) parse_cuda_visible_device([]) + + +def test_parse_device_memory_limit(): + total = get_device_total_memory(0) + + assert parse_device_memory_limit(None) == total + assert parse_device_memory_limit(0) == total + assert parse_device_memory_limit("auto") == total + + assert parse_device_memory_limit(0.8) == int(total * 0.8) + assert parse_device_memory_limit(1000000000) == 1000000000 + assert parse_device_memory_limit("1GB") == 1000000000 diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index b2c7ccf40..7cd1dae56 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -2,13 +2,15 @@ import os import time import warnings +from contextlib import suppress from multiprocessing import cpu_count import numpy as np import pynvml import toolz -from dask.distributed import wait +from distributed import wait +from distributed.utils import parse_bytes try: from nvtx import annotate as nvtx_annotate @@ -442,3 +444,42 @@ def cuda_visible_devices(i, visible=None): L = visible[i:] + visible[:i] return ",".join(map(str, L)) + + +def parse_device_memory_limit(device_memory_limit, device_index=0): + """Parse memory limit to be used by a CUDA device. + + + Parameters + ---------- + device_memory_limit: float, int, str or None + This can be a float (fraction of total device memory), an integer (bytes), + a string (like 5GB or 5000M), and "auto", 0 or None for the total device + size. + device_index: int + The index of device from which to obtain the total memory amount. + + Examples + -------- + >>> # On a 32GB CUDA device + >>> parse_device_memory_limit(None) + 34089730048 + >>> parse_device_memory_limit(0.8) + 27271784038 + >>> parse_device_memory_limit(1000000000) + 1000000000 + >>> parse_device_memory_limit("1GB") + 1000000000 + """ + if any(device_memory_limit == v for v in [0, "0", None, "auto"]): + return get_device_total_memory(device_index) + + with suppress(ValueError, TypeError): + device_memory_limit = float(device_memory_limit) + if isinstance(device_memory_limit, float) and device_memory_limit <= 1: + return int(get_device_total_memory(device_index) * device_memory_limit) + + if isinstance(device_memory_limit, str): + return parse_bytes(device_memory_limit) + else: + return int(device_memory_limit) diff --git a/docs/source/specializations.rst b/docs/source/specializations.rst index e044e312e..3d5bdac79 100644 --- a/docs/source/specializations.rst +++ b/docs/source/specializations.rst @@ -13,6 +13,8 @@ Spilling From Device For applications that do not fit in GPU memory, Dask-CUDA supports spilling from device memory to host memory when the GPU can't fit more data. The spilling mechanism is automatically triggered once the user-defined limit is reached, such limit can be set via the ``--device-memory-limit`` and ``device_memory_limit`` arguments for ``dask-cuda-worker`` and ``LocalCUDACluster``, respectively. +Previously, spilling was disabled by default, but since Dask-CUDA 0.17 the default has been changed to ``0.8`` -- spilling will begin when Dask-CUDA device memory utilization reaches 80% of the device's total memory. Behavior can configured with ``--device-memory-limit`` flag. Users can disable spilling by setting ``--device-memory-limit=0`` or ``device_memory_limit=0``. + CPU Affinity ------------