Skip to content

Commit b5c36b5

Browse files
Added worker resources from config (#4456)
1 parent 3a94ebc commit b5c36b5

File tree

5 files changed

+73
-0
lines changed

5 files changed

+73
-0
lines changed

distributed/distributed-schema.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,14 @@ properties:
305305
Whether or not to run consistency checks during execution.
306306
This is typically only used for debugging.
307307
308+
resources:
309+
type: object
310+
description: |
311+
A dictionary specifying resources for workers.
312+
313+
See https://distributed.dask.org/en/latest/resources.html for more information.
314+
properties: {}
315+
308316
lifetime:
309317
type: object
310318
description: |

distributed/distributed.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ distributed:
7070
preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html
7171
daemon: True
7272
validate: False # Check worker state at every step for debugging
73+
resources: {} # Key: value pairs specifying worker resources.
7374
lifetime:
7475
duration: null # Time after which to gracefully shutdown the worker
7576
stagger: 0 seconds # Random amount by which to stagger lifetimes

distributed/tests/test_resources.py

+18
Original file line numberDiff line numberDiff line change
@@ -392,3 +392,21 @@ def g(dask_worker):
392392
logs = client.run(g)
393393
assert logs[a["address"]]
394394
assert not logs[b["address"]]
395+
396+
397+
@gen_cluster(config={"distributed.worker.resources.my_resources": 1}, client=True)
398+
async def test_resources_from_config(c, s, a, b):
399+
info = c.scheduler_info()
400+
for worker in [a, b]:
401+
assert info["workers"][worker.address]["resources"] == {"my_resources": 1}
402+
403+
404+
@gen_cluster(
405+
worker_kwargs=dict(resources={"my_resources": 10}),
406+
config={"distributed.worker.resources.my_resources": 1},
407+
client=True,
408+
)
409+
async def test_resources_from_python_override_config(c, s, a, b):
410+
info = c.scheduler_info()
411+
for worker in [a, b]:
412+
assert info["workers"][worker.address]["resources"] == {"my_resources": 10}

distributed/worker.py

+3
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,9 @@ def __init__(
519519
nthreads = ncores
520520

521521
self.nthreads = nthreads or CPU_COUNT
522+
if resources is None:
523+
resources = dask.config.get("distributed.worker.resources", None)
524+
522525
self.total_resources = resources or {}
523526
self.available_resources = (resources or {}).copy()
524527
self.death_timeout = parse_timedelta(death_timeout)

docs/source/resources.rst

+43
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,49 @@ When we submit tasks to the cluster we specify constraints per task
4747
processed = [client.submit(process, d, resources={'GPU': 1}) for d in data]
4848
final = client.submit(aggregate, processed, resources={'MEMORY': 70e9})
4949
50+
Specifying Resources
51+
--------------------
52+
53+
Resources can be specifed in several ways. The easiest option will depend on exactly
54+
how your cluster is being created.
55+
56+
**From the command line**
57+
58+
Resources can be provided when starting the worker process, as shown above:
59+
60+
.. code-block:: console
61+
62+
dask-worker scheduler:8786 --resources "GPU=2"
63+
64+
The keys are used as the resource name and the values are parsed into a numeric value.
65+
66+
**From Dask's configuration system**
67+
68+
Alternatively, resources can be specified using Dask's
69+
`configuration system <https://docs.dask.org/en/latest/configuration.html>`_.
70+
71+
.. code-block:: python
72+
73+
from distributed import LocalCluster
74+
75+
with dask.config.set({"distributed.worker.resources.GPU": 2}):
76+
cluster = LocalCluster()
77+
78+
The configuration will need to be set in the process that's spawning the actual worker.
79+
This might be easiest to achieve by specifying resources as an environment variable
80+
(shown in the next section).
81+
82+
**From environment variables**
83+
84+
Like any other Dask config value, resources can be specified as environment variables
85+
before starting the process. Using Bash syntax
86+
87+
.. code-block:: console
88+
89+
$ DASK_DISTRIBUTED__WORKER__RESOURCES__GPU=2 dask-worker
90+
...
91+
92+
This might be the easiest solution if you aren't able to pass options to the :class:`distributed.Worker` class.
5093

5194
Resources are applied separately to each worker process
5295
-------------------------------------------------------

0 commit comments

Comments
 (0)