From fc90523f8f78cff41aecbd787864889373a54c92 Mon Sep 17 00:00:00 2001 From: Arsenal591 Date: Tue, 25 Oct 2022 22:49:51 +0800 Subject: [PATCH 1/2] Support specifying multiple dashboard addresses. --- distributed/cli/dask_worker.py | 50 ++++++++++++++++++++++++++-------- distributed/utils.py | 1 + 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index e95953de4a6..f97084aeb07 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -20,14 +20,14 @@ from distributed import Nanny from distributed._signals import wait_for_signals -from distributed.comm import get_address_host_port +from distributed.comm import get_address_host_port, unparse_host_port from distributed.deploy.utils import nprocesses_nthreads from distributed.preloading import validate_preload_argv from distributed.proctitle import ( enable_proctitle_on_children, enable_proctitle_on_current, ) -from distributed.utils import import_term, parse_ports +from distributed.utils import clean_dashboard_address, import_term, parse_ports logger = logging.getLogger("distributed.dask_worker") @@ -77,7 +77,11 @@ "--dashboard-address", type=str, default=":0", - help="Address on which to listen for diagnostics dashboard", + help="Address on which to listen for diagnostics dashboard. " + "When creating multiple workers with --nworkers, dashboard addresses " + "can be specified by using commas to separate multiple ip addresses. " + "For example, --dashboard-address=3000,3001,3002 will use ports " + "3000, 3001, 3002.", ) @click.option( "--dashboard/--no-dashboard", @@ -367,7 +371,9 @@ def del_pid_file(): worker_class = import_term(worker_class) - port_kwargs = _apportion_ports(worker_port, nanny_port, n_workers, nanny) + port_kwargs = _apportion_ports( + worker_port, nanny_port, dashboard_address, n_workers, nanny + ) assert len(port_kwargs) == n_workers if nanny: @@ -404,7 +410,6 @@ async def run(): contact_address=contact_address, host=host, dashboard=dashboard, - dashboard_address=dashboard_address, name=name if n_workers == 1 or name is None or name == "" else str(name) + "-" + str(i), @@ -455,18 +460,22 @@ async def wait_for_signals_and_close(): def _apportion_ports( - worker_port: str | None, nanny_port: str | None, n_workers: int, nanny: bool + worker_port: str | None, + nanny_port: str | None, + dashboard_address: str | None, + n_workers: int, + nanny: bool, ) -> list[dict[str, Any]]: """Spread out evenly --worker-port and/or --nanny-port ranges to the workers and nannies, avoiding overlap. Returns ======= - List of kwargs to pass to the Worker or Nanny construtors + List of kwargs to pass to the Worker or Nanny constructors """ seen = set() - def parse_unique(s: str | None) -> Iterator[int | None]: + def parse_port_unique(s: str | None) -> Iterator[int | None]: ports = parse_ports(s) if ports in ([0], [None]): for _ in range(n_workers): @@ -477,8 +486,17 @@ def parse_unique(s: str | None) -> Iterator[int | None]: seen.add(port) yield port - worker_ports_iter = parse_unique(worker_port) - nanny_ports_iter = parse_unique(nanny_port) + def parse_address_unique(s: str | None) -> Iterator[str | None]: + if s is None: + for _ in range(n_workers): + yield None + else: + addresses = clean_dashboard_address(s) + for item in addresses: + yield unparse_host_port(item["address"], item["port"]) + + worker_ports_iter = parse_port_unique(worker_port) + nanny_ports_iter = parse_port_unique(nanny_port) # [(worker ports, nanny ports), ...] ports: list[tuple[set[int | None], set[int | None]]] = [ @@ -506,6 +524,7 @@ def parse_unique(s: str | None) -> Iterator[int | None]: more_nps = False kwargs = [] + dashboard_addresses_iter = parse_address_unique(dashboard_address) for worker_ports_i, nanny_ports_i in ports: if not worker_ports_i or not nanny_ports_i: if nanny: @@ -518,6 +537,13 @@ def parse_unique(s: str | None) -> Iterator[int | None]: f"Not enough ports in range --worker_port {worker_port} " f"for {n_workers} workers" ) + try: + address = next(dashboard_addresses_iter) + except StopIteration: + raise ValueError( + f"Not enough ports in range --dashboard_address {dashboard_address} " + f"for {n_workers} workers" + ) # None and int can't be sorted together, # but None and 0 are guaranteed to be alone @@ -528,9 +554,9 @@ def parse_unique(s: str | None) -> Iterator[int | None]: np: Any = sorted(nanny_ports_i) if len(np) == 1: np = np[0] - kwargs_i = {"port": np, "worker_port": wp} + kwargs_i = {"port": np, "worker_port": wp, "dashboard_address": address} else: - kwargs_i = {"port": wp} + kwargs_i = {"port": wp, "dashboard_address": address} kwargs.append(kwargs_i) diff --git a/distributed/utils.py b/distributed/utils.py index b63965a8561..2f8ce2d935e 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -162,6 +162,7 @@ def _get_ip(host, port, family): ip = sock.getsockname()[0] return ip except OSError as e: + print("s", host, port) warnings.warn( "Couldn't detect a suitable IP address for " "reaching %r, defaulting to hostname: %s" % (host, e), From acc4ddc564801cb468f7d6bc6e4fb4f111735775 Mon Sep 17 00:00:00 2001 From: Arsenal591 Date: Wed, 26 Oct 2022 00:49:45 +0800 Subject: [PATCH 2/2] Add some unit tests. --- distributed/cli/dask_worker.py | 12 +- distributed/cli/tests/test_dask_worker.py | 153 ++++++++++++++-------- 2 files changed, 105 insertions(+), 60 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index f97084aeb07..5cb0f51c9c5 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -462,7 +462,7 @@ async def wait_for_signals_and_close(): def _apportion_ports( worker_port: str | None, nanny_port: str | None, - dashboard_address: str | None, + dashboard_address: str, n_workers: int, nanny: bool, ) -> list[dict[str, Any]]: @@ -486,12 +486,12 @@ def parse_port_unique(s: str | None) -> Iterator[int | None]: seen.add(port) yield port - def parse_address_unique(s: str | None) -> Iterator[str | None]: - if s is None: + def parse_address_unique(s: str) -> Iterator[str]: + addresses = clean_dashboard_address(s) + if len(addresses) == 1 and addresses[0]["port"] in (0, None): for _ in range(n_workers): - yield None + yield unparse_host_port(addresses[0]["address"], addresses[0]["port"]) else: - addresses = clean_dashboard_address(s) for item in addresses: yield unparse_host_port(item["address"], item["port"]) @@ -541,7 +541,7 @@ def parse_address_unique(s: str | None) -> Iterator[str | None]: address = next(dashboard_addresses_iter) except StopIteration: raise ValueError( - f"Not enough ports in range --dashboard_address {dashboard_address} " + f"Not enough addresses in --dashboard_address {dashboard_address} " f"for {n_workers} workers" ) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 3750b24e0bb..3323f7c258e 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -35,112 +35,155 @@ [ # Single worker ( - (None, None, 1, False), - [{"port": None}], + (None, None, ":0", 1, False), + [{"port": None, "dashboard_address": ":0"}], ), ( - (None, None, 1, True), - [{"port": None, "worker_port": None}], + (None, None, ":0", 1, True), + [{"port": None, "worker_port": None, "dashboard_address": ":0"}], ), - (("123", None, 1, False), [{"port": 123}]), + (("123", None, ":0", 1, False), [{"port": 123, "dashboard_address": ":0"}]), ( - ("123", None, 1, True), - [{"port": None, "worker_port": 123}], + ("123", None, ":0", 1, True), + [{"port": None, "worker_port": 123, "dashboard_address": ":0"}], ), ( - (None, "456", 1, True), - [{"port": 456, "worker_port": None}], + (None, "456", ":0", 1, True), + [{"port": 456, "worker_port": None, "dashboard_address": ":0"}], ), ( - ("123", "456", 1, True), - [{"port": 456, "worker_port": 123}], + ("123", "456", ":0", 1, True), + [{"port": 456, "worker_port": 123, "dashboard_address": ":0"}], + ), + ( + ("123", "456", "789", 1, True), + [{"port": 456, "worker_port": 123, "dashboard_address": ":789"}], + ), + ( + ("123", "456", ":789", 1, True), + [{"port": 456, "worker_port": 123, "dashboard_address": ":789"}], ), # port=None or 0 and multiple workers ( - (None, None, 2, False), + (None, None, ":0", 2, False), [ - {"port": None}, - {"port": None}, + {"port": None, "dashboard_address": ":0"}, + {"port": None, "dashboard_address": ":0"}, ], ), ( - (None, None, 2, True), + (None, None, ":0", 2, True), [ - {"port": None, "worker_port": None}, - {"port": None, "worker_port": None}, + {"port": None, "worker_port": None, "dashboard_address": ":0"}, + {"port": None, "worker_port": None, "dashboard_address": ":0"}, ], ), ( - (0, "0", 2, True), + (0, "0", ":0", 2, True), [ - {"port": 0, "worker_port": 0}, - {"port": 0, "worker_port": 0}, + {"port": 0, "worker_port": 0, "dashboard_address": ":0"}, + {"port": 0, "worker_port": 0, "dashboard_address": ":0"}, ], ), ( - ("0", None, 2, True), + ("0", None, ":0", 2, True), [ - {"port": None, "worker_port": 0}, - {"port": None, "worker_port": 0}, + {"port": None, "worker_port": 0, "dashboard_address": ":0"}, + {"port": None, "worker_port": 0, "dashboard_address": ":0"}, ], ), # port ranges ( - ("100:103", None, 1, False), - [{"port": [100, 101, 102, 103]}], + ("100:103", None, ":0", 1, False), + [{"port": [100, 101, 102, 103], "dashboard_address": ":0"}], + ), + ( + ("100:103", None, ":0", 2, False), + [ + { + "port": [100, 102], + "dashboard_address": ":0", + }, # Round robin apportion + {"port": [101, 103], "dashboard_address": ":0"}, + ], + ), + # multiple dashboard addresses + ( + (None, None, "123,456", 1, False), + [{"port": None, "dashboard_address": ":123"}], ), ( - ("100:103", None, 2, False), + (None, None, "123,456", 2, False), [ - {"port": [100, 102]}, # Round robin apportion - {"port": [101, 103]}, + {"port": None, "dashboard_address": ":123"}, + {"port": None, "dashboard_address": ":456"}, ], ), # port range is not an exact multiple of n_workers ( - ("100:107", None, 3, False), + ("100:107", None, ":0", 3, False), [ - {"port": [100, 103, 106]}, - {"port": [101, 104, 107]}, - {"port": [102, 105]}, + {"port": [100, 103, 106], "dashboard_address": ":0"}, + {"port": [101, 104, 107], "dashboard_address": ":0"}, + {"port": [102, 105], "dashboard_address": ":0"}, ], ), ( - ("100:103", None, 2, True), + ("100:103", None, ":0", 2, True), [ - {"port": None, "worker_port": [100, 102]}, - {"port": None, "worker_port": [101, 103]}, + {"port": None, "worker_port": [100, 102], "dashboard_address": ":0"}, + {"port": None, "worker_port": [101, 103], "dashboard_address": ":0"}, ], ), ( - (None, "110:113", 2, True), + (None, "110:113", ":0", 2, True), [ - {"port": [110, 112], "worker_port": None}, - {"port": [111, 113], "worker_port": None}, + {"port": [110, 112], "worker_port": None, "dashboard_address": ":0"}, + {"port": [111, 113], "worker_port": None, "dashboard_address": ":0"}, ], ), # port ranges have different length between nannies and workers ( - ("100:103", "110:114", 2, True), + ("100:103", "110:114", ":0", 2, True), + [ + { + "port": [110, 112, 114], + "worker_port": [100, 102], + "dashboard_address": ":0", + }, + { + "port": [111, 113], + "worker_port": [101, 103], + "dashboard_address": ":0", + }, + ], + ), + # dashboard addresses have different length from workers + ( + ("100:101", None, "123,456,789", 2, False), [ - {"port": [110, 112, 114], "worker_port": [100, 102]}, - {"port": [111, 113], "worker_port": [101, 103]}, + {"port": 100, "dashboard_address": ":123"}, + {"port": 101, "dashboard_address": ":456"}, ], ), # identical port ranges ( - ("100:103", "100:103", 2, True), + ("100:103", "100:103", ":0", 2, True), [ - {"port": 101, "worker_port": 100}, - {"port": 103, "worker_port": 102}, + {"port": 101, "worker_port": 100, "dashboard_address": ":0"}, + {"port": 103, "worker_port": 102, "dashboard_address": ":0"}, ], ), # overlapping port ranges ( - ("100:105", "104:106", 2, True), + ("100:105", "104:106", ":0", 2, True), [ - {"port": [104, 106], "worker_port": [100, 102]}, - {"port": 105, "worker_port": [101, 103]}, + { + "port": [104, 106], + "worker_port": [100, 102], + "dashboard_address": ":0", + }, + {"port": 105, "worker_port": [101, 103], "dashboard_address": ":0"}, ], ), ], @@ -151,19 +194,21 @@ def test_apportion_ports(args, expect): def test_apportion_ports_bad(): with pytest.raises(ValueError, match="Not enough ports in range"): - _apportion_ports("100:102", None, 4, False) + _apportion_ports("100:102", None, ":0", 4, False) with pytest.raises(ValueError, match="Not enough ports in range"): - _apportion_ports(None, "100:102", 4, False) + _apportion_ports(None, "100:102", ":0", 4, False) with pytest.raises(ValueError, match="Not enough ports in range"): - _apportion_ports("100:102", "100:102", 3, True) + _apportion_ports("100:102", "100:102", ":0", 3, True) with pytest.raises(ValueError, match="Not enough ports in range"): - _apportion_ports("100:102", "102:104", 3, True) + _apportion_ports("100:102", "102:104", ":0", 3, True) with pytest.raises(ValueError, match="port_stop must be greater than port_start"): - _apportion_ports("102:100", None, 4, False) + _apportion_ports("102:100", None, ":0", 4, False) with pytest.raises(ValueError, match="invalid literal for int"): - _apportion_ports("foo", None, 1, False) + _apportion_ports("foo", None, ":0", 1, False) with pytest.raises(ValueError, match="too many values to unpack"): - _apportion_ports("100:101:102", None, 1, False) + _apportion_ports("100:101:102", None, ":0", 1, False) + with pytest.raises(ValueError, match="Not enough addresses in"): + _apportion_ports(None, None, "123,456", 3, False) @pytest.mark.slow