From 767ef4280e588de5b6f3e6096e60628f28d5632e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 19 Jul 2022 16:11:30 +0200 Subject: [PATCH 01/35] Add basic scale-up tests --- tests/stability/test_adaptive_scaling.py | 41 ++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/stability/test_adaptive_scaling.py diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py new file mode 100644 index 0000000000..1a9c4c3120 --- /dev/null +++ b/tests/stability/test_adaptive_scaling.py @@ -0,0 +1,41 @@ +import uuid +import pytest +from coiled.v2 import Cluster +from dask.distributed import Client +from distributed import Event +import time + + +@pytest.mark.stability +@pytest.mark.parametrize("minimum", (0, 1)) +@pytest.mark.parametrize("scatter", (False, pytest.param(True, marks=[pytest.mark.xfail("dask/distributed#6686")]))) +def test_scale_up_on_task_load(minimum, scatter): + with Cluster( + name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + n_workers=minimum, + worker_vm_types=["t3.medium"], + ) as cluster: + with Client(cluster) as client: + assert len(cluster.observed) == minimum + adapt = cluster.adapt(minimum=minimum, maximum=10, interval="5s", wait_count=3) + time.sleep(10) + assert len(adapt.log) == 0 + ev_fan_out = Event(name="fan-out", client=client) + + def clog(x: int, ev: Event) -> int: + ev.wait() + return x + + numbers = range(100) + if scatter is True: + numbers = client.scatter(list(numbers)) + + futures = client.map(clog, numbers, ev=ev_fan_out) + + # Scale up within 5 minutes + client.wait_for_workers(n_workers=10, timeout=300) + assert len(adapt.log) <= 2 + assert adapt.log[-1][1] == {"status": "up", "n": 10} + ev_fan_out.set() + client.gather(futures) + From 601b72226a268ffe45c581251729b085a18e64e8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 20 Jul 2022 14:40:22 +0200 Subject: [PATCH 02/35] Adaptive workload test --- tests/stability/test_adaptive_scaling.py | 84 ++++++++++++++++++++++-- 1 file changed, 78 insertions(+), 6 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 1a9c4c3120..b63df1ee99 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -1,10 +1,10 @@ -import uuid import pytest -from coiled.v2 import Cluster -from dask.distributed import Client -from distributed import Event import time +import uuid +import dask.array as da +from coiled.v2 import Cluster +from dask.distributed import Client, Event, Semaphore @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) @@ -17,8 +17,8 @@ def test_scale_up_on_task_load(minimum, scatter): ) as cluster: with Client(cluster) as client: assert len(cluster.observed) == minimum - adapt = cluster.adapt(minimum=minimum, maximum=10, interval="5s", wait_count=3) - time.sleep(10) + adapt = cluster.adapt(minimum=minimum, maximum=10) + time.sleep(adapt.interval * 2.1) # Ensure enough time for system to adapt assert len(adapt.log) == 0 ev_fan_out = Event(name="fan-out", client=client) @@ -39,3 +39,75 @@ def clog(x: int, ev: Event) -> int: ev_fan_out.set() client.gather(futures) + +@pytest.mark.stability +@pytest.mark.parametrize("minimum", (0, 1)) +def test_adapt_to_changing_workload(minimum: int): + maximum = 20 + fan_out_size = 100 + with Cluster( + name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + n_workers=10, + worker_vm_types=["t3.medium"], + ) as cluster: + with Client(cluster) as client: + adapt = cluster.adapt(minimum=minimum, maximum=maximum) + assert len(adapt.log) == 0 + + def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: + # Ensure that no recomputation happens by decrementing a countdown on a semaphore + acquired = sem.acquire(timeout=0.1) + assert acquired is True + ev.wait() + return x + + sem_fan_out = Semaphore(name="fan-out", max_leases=fan_out_size) + ev_fan_out = Event(name="fan-out", client=client) + + fan_out = client.map(clog, range(fan_out_size), ev=ev_fan_out, sem=sem_fan_out) + + reduction = client.submit(sum, fan_out) + sem_barrier = Semaphore(name="barrier", max_leases=1) + ev_barrier = Event(name="barrier", client=client) + barrier = client.submit(clog, reduction, ev=ev_barrier, sem=sem_barrier) + + sem_final_fan_out = Semaphore(name="final-fan-out", max_leases=fan_out_size) + ev_final_fan_out = Event(name="final-fan-out", client=client) + final_fan_out = client.map(clog, range(fan_out_size), ev=ev_final_fan_out, sem=sem_final_fan_out, barrier=barrier) + + # Scale up to maximum + client.wait_for_workers(n_workers=maximum, timeout=300) + assert len(cluster.observed) == maximum + assert adapt.log[-1][1]["status"] == "up" + + ev_fan_out.set() + # Scale down to a single worker + start = time.monotonic() + while len(cluster.observed) > 1: + time.sleep(0.1) + end = time.monotonic() + assert len(cluster.observed) == 1 + assert adapt.log[-1][1]["status"] == "down" + # Do not take longer than 5 minutes to scale down + assert end - start < 300 + + ev_barrier.set() + # Scale up to maximum again + client.wait_for_workers(n_workers=maximum, timeout=300) + while len(cluster.observed) < maximum: + time.sleep(0.1) + assert len(cluster.observed) == 20 + assert adapt.log[-1][1]["status"] == "up" + + ev_final_fan_out.set() + client.gather(final_fan_out) + + # Scale down to minimum + start = time.monotonic() + while len(cluster.observed) > minimum: + time.sleep(0.1) + end = time.monotonic() + assert len(cluster.observed) == minimum + assert adapt.log[-1][1]["status"] == "down" + # Do not take longer than 5 minutes to scale down + assert end - start < 300 \ No newline at end of file From c873e4d049aad0647923cd41f0abaf487f5c5208 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 20 Jul 2022 14:55:37 +0200 Subject: [PATCH 03/35] Linting --- tests/stability/test_adaptive_scaling.py | 29 ++++++++++++++++-------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index b63df1ee99..e4a089133e 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -2,15 +2,18 @@ import time import uuid -import dask.array as da from coiled.v2 import Cluster from dask.distributed import Client, Event, Semaphore + @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) -@pytest.mark.parametrize("scatter", (False, pytest.param(True, marks=[pytest.mark.xfail("dask/distributed#6686")]))) +@pytest.mark.parametrize( + "scatter", + (False, pytest.param(True, marks=[pytest.mark.xfail("dask/distributed#6686")])), +) def test_scale_up_on_task_load(minimum, scatter): - with Cluster( + with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", n_workers=minimum, worker_vm_types=["t3.medium"], @@ -18,7 +21,7 @@ def test_scale_up_on_task_load(minimum, scatter): with Client(cluster) as client: assert len(cluster.observed) == minimum adapt = cluster.adapt(minimum=minimum, maximum=10) - time.sleep(adapt.interval * 2.1) # Ensure enough time for system to adapt + time.sleep(adapt.interval * 2.1) # Ensure enough time for system to adapt assert len(adapt.log) == 0 ev_fan_out = Event(name="fan-out", client=client) @@ -64,16 +67,24 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: sem_fan_out = Semaphore(name="fan-out", max_leases=fan_out_size) ev_fan_out = Event(name="fan-out", client=client) - fan_out = client.map(clog, range(fan_out_size), ev=ev_fan_out, sem=sem_fan_out) + fan_out = client.map( + clog, range(fan_out_size), ev=ev_fan_out, sem=sem_fan_out + ) reduction = client.submit(sum, fan_out) sem_barrier = Semaphore(name="barrier", max_leases=1) ev_barrier = Event(name="barrier", client=client) barrier = client.submit(clog, reduction, ev=ev_barrier, sem=sem_barrier) - + sem_final_fan_out = Semaphore(name="final-fan-out", max_leases=fan_out_size) ev_final_fan_out = Event(name="final-fan-out", client=client) - final_fan_out = client.map(clog, range(fan_out_size), ev=ev_final_fan_out, sem=sem_final_fan_out, barrier=barrier) + final_fan_out = client.map( + clog, + range(fan_out_size), + ev=ev_final_fan_out, + sem=sem_final_fan_out, + barrier=barrier, + ) # Scale up to maximum client.wait_for_workers(n_workers=maximum, timeout=300) @@ -90,7 +101,7 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: assert adapt.log[-1][1]["status"] == "down" # Do not take longer than 5 minutes to scale down assert end - start < 300 - + ev_barrier.set() # Scale up to maximum again client.wait_for_workers(n_workers=maximum, timeout=300) @@ -110,4 +121,4 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" # Do not take longer than 5 minutes to scale down - assert end - start < 300 \ No newline at end of file + assert end - start < 300 From 7d2b7977fb65c1836754edf544bf1a17056f9287 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 21 Jul 2022 10:09:09 +0200 Subject: [PATCH 04/35] Add memory-intensive changing workload --- tests/stability/test_adaptive_scaling.py | 108 +++++++++++++++++++++-- 1 file changed, 99 insertions(+), 9 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index e4a089133e..34b322429d 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -1,18 +1,24 @@ -import pytest import time import uuid +import dask.array as da +import pytest from coiled.v2 import Cluster -from dask.distributed import Client, Event, Semaphore +from dask import delayed +from dask.distributed import Client, Event, Semaphore, wait @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) @pytest.mark.parametrize( "scatter", - (False, pytest.param(True, marks=[pytest.mark.xfail("dask/distributed#6686")])), + ( + False, + pytest.param(True, marks=[pytest.mark.xfail(reason="dask/distributed#6686")]), + ), ) def test_scale_up_on_task_load(minimum, scatter): + maximum = 10 with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", n_workers=minimum, @@ -20,7 +26,7 @@ def test_scale_up_on_task_load(minimum, scatter): ) as cluster: with Client(cluster) as client: assert len(cluster.observed) == minimum - adapt = cluster.adapt(minimum=minimum, maximum=10) + adapt = cluster.adapt(minimum=minimum, maximum=maximum) time.sleep(adapt.interval * 2.1) # Ensure enough time for system to adapt assert len(adapt.log) == 0 ev_fan_out = Event(name="fan-out", client=client) @@ -36,9 +42,9 @@ def clog(x: int, ev: Event) -> int: futures = client.map(clog, numbers, ev=ev_fan_out) # Scale up within 5 minutes - client.wait_for_workers(n_workers=10, timeout=300) + client.wait_for_workers(n_workers=maximum, timeout=300) assert len(adapt.log) <= 2 - assert adapt.log[-1][1] == {"status": "up", "n": 10} + assert adapt.log[-1][1] == {"status": "up", "n": maximum} ev_fan_out.set() client.gather(futures) @@ -46,11 +52,11 @@ def clog(x: int, ev: Event) -> int: @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) def test_adapt_to_changing_workload(minimum: int): - maximum = 20 + maximum = 10 fan_out_size = 100 with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", - n_workers=10, + n_workers=5, worker_vm_types=["t3.medium"], ) as cluster: with Client(cluster) as client: @@ -107,7 +113,7 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: client.wait_for_workers(n_workers=maximum, timeout=300) while len(cluster.observed) < maximum: time.sleep(0.1) - assert len(cluster.observed) == 20 + assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" ev_final_fan_out.set() @@ -122,3 +128,87 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: assert adapt.log[-1][1]["status"] == "down" # Do not take longer than 5 minutes to scale down assert end - start < 300 + + +@pytest.mark.stability +@pytest.mark.parametrize("minimum", (0, 1)) +def test_adapt_to_memory_intensive_workload(minimum): + maximum = 10 + with Cluster( + name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + n_workers=minimum, + worker_vm_types=["t3.medium"], + ) as cluster: + with Client(cluster) as client: + assert len(cluster.observed) == minimum + adapt = cluster.adapt(minimum=minimum, maximum=maximum) + assert len(adapt.log) == 0 + + def memory_intensive_preprocessing(): + matrix = da.random.random((48000, 48000)) + rechunked = matrix.rechunk((matrix.shape[0], 200)).rechunk( + (200, matrix.shape[1]) + ) + reduction = rechunked.sum() + return reduction + + @delayed + def clog(x, ev: Event): + ev.wait() + return x + + def compute_intensive_barrier_task(data, ev: Event): + barrier = clog(data, ev) + return barrier + + def memory_intensive_postprocessing(data): + matrix = da.random.random((48000, 48000)) + matrix = matrix + da.from_delayed(data, shape=(1,), dtype="float") + rechunked = matrix.rechunk((matrix.shape[0], 200)).rechunk( + (200, matrix.shape[1]) + ) + reduction = rechunked.sum() + return reduction + + ev_barrier = Event(name="barrier", client=client) + + fut = client.compute( + memory_intensive_postprocessing( + compute_intensive_barrier_task( + memory_intensive_preprocessing(), ev_barrier + ) + ) + ) + + # Scale up to maximum on preprocessing + client.wait_for_workers(n_workers=maximum, timeout=360) + assert len(cluster.observed) == maximum + assert adapt.log[-1][1]["status"] == "up" + + # Scale down to a single worker on barrier task + start = time.monotonic() + while len(cluster.observed) > 1: + time.sleep(0.1) + end = time.monotonic() + assert len(cluster.observed) == 1 + assert adapt.log[-1][1]["status"] == "down" + assert end - start < 420 + + ev_barrier.set() + + # Scale up to maximum on postprocessing + client.wait_for_workers(n_workers=maximum, timeout=360) + assert len(cluster.observed) == maximum + assert adapt.log[-1][1]["status"] == "up" + + wait(fut) + del fut + + # Scale down to minimum + start = time.monotonic() + while len(cluster.observed) > minimum: + time.sleep(0.1) + end = time.monotonic() + assert len(cluster.observed) == minimum + assert adapt.log[-1][1]["status"] == "down" + assert end - start < 420 From 053b150cfd7d93332e5a0fc5fec1b48fe2ac578f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 21 Jul 2022 14:35:35 +0200 Subject: [PATCH 05/35] Adapt timeouts --- tests/stability/test_adaptive_scaling.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 34b322429d..4e9c619bf8 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -42,7 +42,7 @@ def clog(x: int, ev: Event) -> int: futures = client.map(clog, numbers, ev=ev_fan_out) # Scale up within 5 minutes - client.wait_for_workers(n_workers=maximum, timeout=300) + client.wait_for_workers(n_workers=maximum, timeout=360) assert len(adapt.log) <= 2 assert adapt.log[-1][1] == {"status": "up", "n": maximum} ev_fan_out.set() @@ -93,7 +93,7 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: ) # Scale up to maximum - client.wait_for_workers(n_workers=maximum, timeout=300) + client.wait_for_workers(n_workers=maximum, timeout=420) assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -105,12 +105,11 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: end = time.monotonic() assert len(cluster.observed) == 1 assert adapt.log[-1][1]["status"] == "down" - # Do not take longer than 5 minutes to scale down - assert end - start < 300 + assert end - start < 420 ev_barrier.set() # Scale up to maximum again - client.wait_for_workers(n_workers=maximum, timeout=300) + client.wait_for_workers(n_workers=maximum, timeout=420) while len(cluster.observed) < maximum: time.sleep(0.1) assert len(cluster.observed) == maximum @@ -126,8 +125,7 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: end = time.monotonic() assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" - # Do not take longer than 5 minutes to scale down - assert end - start < 300 + assert end - start < 420 @pytest.mark.stability From aef1e60a89e590f35b7040dd9048833270caccbe Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 21 Jul 2022 15:41:44 +0200 Subject: [PATCH 06/35] Adjust memory-based test --- tests/stability/test_adaptive_scaling.py | 27 ++++++++++++------------ 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 4e9c619bf8..8d31f789e2 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -143,37 +143,37 @@ def test_adapt_to_memory_intensive_workload(minimum): assert len(adapt.log) == 0 def memory_intensive_preprocessing(): - matrix = da.random.random((48000, 48000)) - rechunked = matrix.rechunk((matrix.shape[0], 200)).rechunk( - (200, matrix.shape[1]) - ) + matrix = da.random.random((50000, 50000), chunks=(50000, 100)) + rechunked = matrix.rechunk((100, 50000)) reduction = rechunked.sum() return reduction @delayed - def clog(x, ev: Event): - ev.wait() + def clog(x, ev_start: Event, ev_barrier: Event): + ev_start.set() + ev_barrier.wait() return x - def compute_intensive_barrier_task(data, ev: Event): - barrier = clog(data, ev) + def compute_intensive_barrier_task( + data, ev_start: Event, ev_barrier: Event + ): + barrier = clog(data, ev_start, ev_barrier) return barrier def memory_intensive_postprocessing(data): - matrix = da.random.random((48000, 48000)) + matrix = da.random.random((50000, 50000), chunks=(50000, 100)) matrix = matrix + da.from_delayed(data, shape=(1,), dtype="float") - rechunked = matrix.rechunk((matrix.shape[0], 200)).rechunk( - (200, matrix.shape[1]) - ) + rechunked = matrix.rechunk((100, 50000)) reduction = rechunked.sum() return reduction + ev_scale_down = Event(name="scale_down", client=client) ev_barrier = Event(name="barrier", client=client) fut = client.compute( memory_intensive_postprocessing( compute_intensive_barrier_task( - memory_intensive_preprocessing(), ev_barrier + memory_intensive_preprocessing(), ev_scale_down, ev_barrier ) ) ) @@ -183,6 +183,7 @@ def memory_intensive_postprocessing(data): assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" + ev_scale_down.wait() # Scale down to a single worker on barrier task start = time.monotonic() while len(cluster.observed) > 1: From a3c9037bc536c247ee76066b38c53b94b3dbb148 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 26 Jul 2022 15:26:35 +0200 Subject: [PATCH 07/35] Adjust tests to return measurements if run standalone --- tests/stability/test_adaptive_scaling.py | 67 +++++++++++++++++++----- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 8d31f789e2..946504ea46 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -7,6 +7,7 @@ from dask import delayed from dask.distributed import Client, Event, Semaphore, wait +TIMEOUT_THRESHOLD = 600 # 10 minutes @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) @@ -41,12 +42,16 @@ def clog(x: int, ev: Event) -> int: futures = client.map(clog, numbers, ev=ev_fan_out) - # Scale up within 5 minutes - client.wait_for_workers(n_workers=maximum, timeout=360) + end = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + start = time.monotonic() + duration = end - start + assert duration < 360 assert len(adapt.log) <= 2 assert adapt.log[-1][1] == {"status": "up", "n": maximum} ev_fan_out.set() client.gather(futures) + return duration @pytest.mark.stability @@ -93,7 +98,11 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: ) # Scale up to maximum - client.wait_for_workers(n_workers=maximum, timeout=420) + start = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + end = time.monotonic() + duration_first_scale_up = end - start + assert duration_first_scale_up < 420 assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -101,17 +110,22 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: # Scale down to a single worker start = time.monotonic() while len(cluster.observed) > 1: + if time.monotonic() - start >= TIMEOUT_THRESHOLD: + raise TimeoutError() time.sleep(0.1) end = time.monotonic() + duration_first_scale_down = end - start + assert duration_first_scale_down < 420 assert len(cluster.observed) == 1 assert adapt.log[-1][1]["status"] == "down" - assert end - start < 420 ev_barrier.set() # Scale up to maximum again - client.wait_for_workers(n_workers=maximum, timeout=420) - while len(cluster.observed) < maximum: - time.sleep(0.1) + start = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + end = time.monotonic() + duration_second_scale_up = end - start + assert duration_second_scale_up < 420 assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -121,11 +135,20 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: # Scale down to minimum start = time.monotonic() while len(cluster.observed) > minimum: + if time.monotonic() - start >= TIMEOUT_THRESHOLD: + raise TimeoutError() time.sleep(0.1) end = time.monotonic() + duration_second_scale_down = end - start + assert duration_second_scale_down < 420 assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" - assert end - start < 420 + return ( + duration_first_scale_up, + duration_first_scale_down, + duration_second_scale_up, + duration_second_scale_down, + ) @pytest.mark.stability @@ -179,7 +202,11 @@ def memory_intensive_postprocessing(data): ) # Scale up to maximum on preprocessing - client.wait_for_workers(n_workers=maximum, timeout=360) + start = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + end = time.monotonic() + duration_first_scale_up = end - start + assert duration_first_scale_up < 420 assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -187,16 +214,23 @@ def memory_intensive_postprocessing(data): # Scale down to a single worker on barrier task start = time.monotonic() while len(cluster.observed) > 1: + if time.monotonic() - start >= TIMEOUT_THRESHOLD: + raise TimeoutError() time.sleep(0.1) end = time.monotonic() + duration_first_scale_down = end - start + assert duration_first_scale_down < 420 assert len(cluster.observed) == 1 assert adapt.log[-1][1]["status"] == "down" - assert end - start < 420 ev_barrier.set() # Scale up to maximum on postprocessing - client.wait_for_workers(n_workers=maximum, timeout=360) + start = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + end = time.monotonic() + duration_second_scale_up = end - start + assert duration_second_scale_up < 420 assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -206,8 +240,17 @@ def memory_intensive_postprocessing(data): # Scale down to minimum start = time.monotonic() while len(cluster.observed) > minimum: + if time.monotonic() - start >= TIMEOUT_THRESHOLD: + raise TimeoutError() time.sleep(0.1) end = time.monotonic() + duration_second_scale_down = end - start + assert duration_second_scale_down < 420 assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" - assert end - start < 420 + return ( + duration_first_scale_up, + duration_first_scale_down, + duration_second_scale_up, + duration_second_scale_down, + ) From 928134508acc1ee4c5493374bd973187c3eeb784 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 26 Jul 2022 15:54:48 +0200 Subject: [PATCH 08/35] Initially wait for workers to validate precondition --- tests/stability/test_adaptive_scaling.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 946504ea46..400a8b66d2 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -24,6 +24,7 @@ def test_scale_up_on_task_load(minimum, scatter): name=f"test_adaptive_scaling-{uuid.uuid4().hex}", n_workers=minimum, worker_vm_types=["t3.medium"], + wait_for_workers=True, ) as cluster: with Client(cluster) as client: assert len(cluster.observed) == minimum @@ -63,6 +64,7 @@ def test_adapt_to_changing_workload(minimum: int): name=f"test_adaptive_scaling-{uuid.uuid4().hex}", n_workers=5, worker_vm_types=["t3.medium"], + wait_for_workers=True, ) as cluster: with Client(cluster) as client: adapt = cluster.adapt(minimum=minimum, maximum=maximum) @@ -159,6 +161,7 @@ def test_adapt_to_memory_intensive_workload(minimum): name=f"test_adaptive_scaling-{uuid.uuid4().hex}", n_workers=minimum, worker_vm_types=["t3.medium"], + wait_for_workers=True, ) as cluster: with Client(cluster) as client: assert len(cluster.observed) == minimum From 4a2ce29aab2d02614e7cf5cfa3ed578828499b98 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 26 Jul 2022 16:33:12 +0200 Subject: [PATCH 09/35] Adjust memory-intensive workload size to avoid swapping on scaled-up cluster --- tests/stability/test_adaptive_scaling.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 400a8b66d2..ba4fc53f3b 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -169,8 +169,8 @@ def test_adapt_to_memory_intensive_workload(minimum): assert len(adapt.log) == 0 def memory_intensive_preprocessing(): - matrix = da.random.random((50000, 50000), chunks=(50000, 100)) - rechunked = matrix.rechunk((100, 50000)) + matrix = da.random.random((40000, 40000), chunks=(40000, 500)) + rechunked = matrix.rechunk((500, 40000)) reduction = rechunked.sum() return reduction @@ -187,9 +187,9 @@ def compute_intensive_barrier_task( return barrier def memory_intensive_postprocessing(data): - matrix = da.random.random((50000, 50000), chunks=(50000, 100)) + matrix = da.random.random((40000, 40000), chunks=(40000, 500)) matrix = matrix + da.from_delayed(data, shape=(1,), dtype="float") - rechunked = matrix.rechunk((100, 50000)) + rechunked = matrix.rechunk((500, 40000)) reduction = rechunked.sum() return reduction From 69d145481e0463db2b7c85a556a6f20575e1f2c8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 27 Jul 2022 15:31:51 +0200 Subject: [PATCH 10/35] Adapt memory-intensive workload to withhold second batch of intensive tasks --- tests/stability/test_adaptive_scaling.py | 25 +++++++++--------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index ba4fc53f3b..4f4e253df4 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -168,7 +168,7 @@ def test_adapt_to_memory_intensive_workload(minimum): adapt = cluster.adapt(minimum=minimum, maximum=maximum) assert len(adapt.log) == 0 - def memory_intensive_preprocessing(): + def memory_intensive_processing(): matrix = da.random.random((40000, 40000), chunks=(40000, 500)) rechunked = matrix.rechunk((500, 40000)) reduction = rechunked.sum() @@ -186,21 +186,12 @@ def compute_intensive_barrier_task( barrier = clog(data, ev_start, ev_barrier) return barrier - def memory_intensive_postprocessing(data): - matrix = da.random.random((40000, 40000), chunks=(40000, 500)) - matrix = matrix + da.from_delayed(data, shape=(1,), dtype="float") - rechunked = matrix.rechunk((500, 40000)) - reduction = rechunked.sum() - return reduction - ev_scale_down = Event(name="scale_down", client=client) ev_barrier = Event(name="barrier", client=client) fut = client.compute( - memory_intensive_postprocessing( - compute_intensive_barrier_task( - memory_intensive_preprocessing(), ev_scale_down, ev_barrier - ) + compute_intensive_barrier_task( + memory_intensive_processing(), ev_scale_down, ev_barrier ) ) @@ -209,7 +200,7 @@ def memory_intensive_postprocessing(data): client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) end = time.monotonic() duration_first_scale_up = end - start - assert duration_first_scale_up < 420 + assert duration_first_scale_up < 420, duration_first_scale_up assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -222,18 +213,20 @@ def memory_intensive_postprocessing(data): time.sleep(0.1) end = time.monotonic() duration_first_scale_down = end - start - assert duration_first_scale_down < 420 + assert duration_first_scale_down < 420, duration_first_scale_down assert len(cluster.observed) == 1 assert adapt.log[-1][1]["status"] == "down" ev_barrier.set() + wait(fut) + fut = memory_intensive_processing() # Scale up to maximum on postprocessing start = time.monotonic() client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) end = time.monotonic() duration_second_scale_up = end - start - assert duration_second_scale_up < 420 + assert duration_second_scale_up < 420, duration_second_scale_up assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -248,7 +241,7 @@ def memory_intensive_postprocessing(data): time.sleep(0.1) end = time.monotonic() duration_second_scale_down = end - start - assert duration_second_scale_down < 420 + assert duration_second_scale_down < 420, duration_second_scale_down assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" return ( From 28135d16f8c1cd512525d4a1b1d8290074b65f43 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 27 Jul 2022 16:06:19 +0200 Subject: [PATCH 11/35] Skip memory-based test --- tests/stability/test_adaptive_scaling.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 4f4e253df4..9b7f55d818 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -7,7 +7,8 @@ from dask import delayed from dask.distributed import Client, Event, Semaphore, wait -TIMEOUT_THRESHOLD = 600 # 10 minutes +TIMEOUT_THRESHOLD = 600 # 10 minutes + @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) @@ -153,6 +154,9 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: ) +@pytest.mark.skip( + reason="The test behavior is unreliable and may lead to very long runtime (see: coiled-runtime#211)" +) @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) def test_adapt_to_memory_intensive_workload(minimum): @@ -219,7 +223,7 @@ def compute_intensive_barrier_task( ev_barrier.set() wait(fut) - fut = memory_intensive_processing() + fut = client.compute(memory_intensive_processing()) # Scale up to maximum on postprocessing start = time.monotonic() From 6fc69a212197740678355103b589de9d57e29219 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 27 Jul 2022 16:25:28 +0200 Subject: [PATCH 12/35] Minor --- tests/stability/test_adaptive_scaling.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 9b7f55d818..be0f9558bf 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -168,7 +168,6 @@ def test_adapt_to_memory_intensive_workload(minimum): wait_for_workers=True, ) as cluster: with Client(cluster) as client: - assert len(cluster.observed) == minimum adapt = cluster.adapt(minimum=minimum, maximum=maximum) assert len(adapt.log) == 0 From 010b5a83d66a544afc655f1d805cd1a287d6bfe0 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 27 Jul 2022 17:27:24 +0200 Subject: [PATCH 13/35] Remove out-of-sync assertion --- tests/stability/test_adaptive_scaling.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index be0f9558bf..cb125c9116 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -28,7 +28,6 @@ def test_scale_up_on_task_load(minimum, scatter): wait_for_workers=True, ) as cluster: with Client(cluster) as client: - assert len(cluster.observed) == minimum adapt = cluster.adapt(minimum=minimum, maximum=maximum) time.sleep(adapt.interval * 2.1) # Ensure enough time for system to adapt assert len(adapt.log) == 0 From 5a844e45829fca4bc6d75026c4d9deb6195ee770 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 28 Jul 2022 14:03:16 +0200 Subject: [PATCH 14/35] Lose references to futures --- tests/stability/test_adaptive_scaling.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index cb125c9116..a70ce81008 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -7,7 +7,7 @@ from dask import delayed from dask.distributed import Client, Event, Semaphore, wait -TIMEOUT_THRESHOLD = 600 # 10 minutes +TIMEOUT_THRESHOLD = 1800 # 10 minutes @pytest.mark.stability @@ -80,23 +80,23 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: sem_fan_out = Semaphore(name="fan-out", max_leases=fan_out_size) ev_fan_out = Event(name="fan-out", client=client) - fan_out = client.map( + fut = client.map( clog, range(fan_out_size), ev=ev_fan_out, sem=sem_fan_out ) - reduction = client.submit(sum, fan_out) + fut = client.submit(sum, fut) sem_barrier = Semaphore(name="barrier", max_leases=1) ev_barrier = Event(name="barrier", client=client) - barrier = client.submit(clog, reduction, ev=ev_barrier, sem=sem_barrier) + fut = client.submit(clog, fut, ev=ev_barrier, sem=sem_barrier) sem_final_fan_out = Semaphore(name="final-fan-out", max_leases=fan_out_size) ev_final_fan_out = Event(name="final-fan-out", client=client) - final_fan_out = client.map( + fut = client.map( clog, range(fan_out_size), ev=ev_final_fan_out, sem=sem_final_fan_out, - barrier=barrier, + barrier=fut, ) # Scale up to maximum @@ -132,7 +132,8 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: assert adapt.log[-1][1]["status"] == "up" ev_final_fan_out.set() - client.gather(final_fan_out) + client.gather(fut) + del fut # Scale down to minimum start = time.monotonic() From 03b1d11f6d1a48d0f966e5342d77755021985edd Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 28 Jul 2022 18:22:52 +0200 Subject: [PATCH 15/35] Clean up code and add docstrings --- tests/stability/test_adaptive_scaling.py | 74 ++++++++++++++++-------- 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index a70ce81008..d51bd0b77e 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -11,7 +11,7 @@ @pytest.mark.stability -@pytest.mark.parametrize("minimum", (0, 1)) +@pytest.mark.parametrize("minimum,threshold", [(0, 240), (1, 120)]) @pytest.mark.parametrize( "scatter", ( @@ -19,7 +19,10 @@ pytest.param(True, marks=[pytest.mark.xfail(reason="dask/distributed#6686")]), ), ) -def test_scale_up_on_task_load(minimum, scatter): +def test_scale_up_on_task_load(minimum, threshold, scatter): + """Tests that adaptive scaling reacts in a reasonable amount of time to + an increased task load and scales up. + """ maximum = 10 with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", @@ -47,7 +50,7 @@ def clog(x: int, ev: Event) -> int: client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) start = time.monotonic() duration = end - start - assert duration < 360 + assert duration < threshold, duration assert len(adapt.log) <= 2 assert adapt.log[-1][1] == {"status": "up", "n": maximum} ev_fan_out.set() @@ -58,6 +61,10 @@ def clog(x: int, ev: Event) -> int: @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) def test_adapt_to_changing_workload(minimum: int): + """Tests that adaptive scaling reacts within a reasonable amount of time to + a varying task load and scales up or down. This also asserts that no recomputation + is caused by the scaling. + """ maximum = 10 fan_out_size = 100 with Cluster( @@ -70,33 +77,50 @@ def test_adapt_to_changing_workload(minimum: int): adapt = cluster.adapt(minimum=minimum, maximum=maximum) assert len(adapt.log) == 0 + @delayed def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: # Ensure that no recomputation happens by decrementing a countdown on a semaphore - acquired = sem.acquire(timeout=0.1) - assert acquired is True + acquired = sem.acquire(timeout=1) + assert acquired is True, "Could not acquire semaphore, likely recomputation happened." ev.wait() return x + def workload( + fan_out_size, + ev_fan_out, + sem_fan_out, + ev_barrier, + sem_barrier, + ev_final_fan_out, + sem_final_fan_out, + ): + fan_out = [ + clog(i, ev=ev_fan_out, sem=sem_fan_out) for i in range(fan_out_size) + ] + barrier = clog(delayed(sum)(fan_out), ev=ev_barrier, sem=sem_barrier) + final_fan_out = [ + clog(i, ev=ev_final_fan_out, sem=sem_final_fan_out, barrier=barrier) + for i in range(fan_out_size) + ] + return final_fan_out + sem_fan_out = Semaphore(name="fan-out", max_leases=fan_out_size) ev_fan_out = Event(name="fan-out", client=client) - - fut = client.map( - clog, range(fan_out_size), ev=ev_fan_out, sem=sem_fan_out - ) - - fut = client.submit(sum, fut) sem_barrier = Semaphore(name="barrier", max_leases=1) ev_barrier = Event(name="barrier", client=client) - fut = client.submit(clog, fut, ev=ev_barrier, sem=sem_barrier) - sem_final_fan_out = Semaphore(name="final-fan-out", max_leases=fan_out_size) ev_final_fan_out = Event(name="final-fan-out", client=client) - fut = client.map( - clog, - range(fan_out_size), - ev=ev_final_fan_out, - sem=sem_final_fan_out, - barrier=fut, + + fut = client.compute( + workload( + fan_out_size=fan_out_size, + ev_fan_out=ev_fan_out, + sem_fan_out=sem_fan_out, + ev_barrier=ev_barrier, + sem_barrier=sem_barrier, + ev_final_fan_out=ev_final_fan_out, + sem_final_fan_out=sem_final_fan_out, + ) ) # Scale up to maximum @@ -104,7 +128,7 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) end = time.monotonic() duration_first_scale_up = end - start - assert duration_first_scale_up < 420 + assert duration_first_scale_up < 120 assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -117,7 +141,7 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: time.sleep(0.1) end = time.monotonic() duration_first_scale_down = end - start - assert duration_first_scale_down < 420 + assert duration_first_scale_down < 330 assert len(cluster.observed) == 1 assert adapt.log[-1][1]["status"] == "down" @@ -127,7 +151,7 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) end = time.monotonic() duration_second_scale_up = end - start - assert duration_second_scale_up < 420 + assert duration_second_scale_up < 120 assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -143,7 +167,7 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: time.sleep(0.1) end = time.monotonic() duration_second_scale_down = end - start - assert duration_second_scale_down < 420 + assert duration_second_scale_down < 330 assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" return ( @@ -160,6 +184,10 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) def test_adapt_to_memory_intensive_workload(minimum): + """Tests that adaptive scaling reacts within a reasonable amount of time to a varying task and memory load. + + Note: This tests currently results in spilling and very long runtimes. + """ maximum = 10 with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", From 615727c9a6aaa71ac315146c97cdec3fe9f81dec Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 28 Jul 2022 20:39:35 +0200 Subject: [PATCH 16/35] black --- tests/stability/test_adaptive_scaling.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index d51bd0b77e..1ec7e30668 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -20,7 +20,7 @@ ), ) def test_scale_up_on_task_load(minimum, threshold, scatter): - """Tests that adaptive scaling reacts in a reasonable amount of time to + """Tests that adaptive scaling reacts in a reasonable amount of time to an increased task load and scales up. """ maximum = 10 @@ -61,8 +61,8 @@ def clog(x: int, ev: Event) -> int: @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) def test_adapt_to_changing_workload(minimum: int): - """Tests that adaptive scaling reacts within a reasonable amount of time to - a varying task load and scales up or down. This also asserts that no recomputation + """Tests that adaptive scaling reacts within a reasonable amount of time to + a varying task load and scales up or down. This also asserts that no recomputation is caused by the scaling. """ maximum = 10 @@ -81,7 +81,9 @@ def test_adapt_to_changing_workload(minimum: int): def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: # Ensure that no recomputation happens by decrementing a countdown on a semaphore acquired = sem.acquire(timeout=1) - assert acquired is True, "Could not acquire semaphore, likely recomputation happened." + assert ( + acquired is True + ), "Could not acquire semaphore, likely recomputation happened." ev.wait() return x @@ -185,7 +187,7 @@ def workload( @pytest.mark.parametrize("minimum", (0, 1)) def test_adapt_to_memory_intensive_workload(minimum): """Tests that adaptive scaling reacts within a reasonable amount of time to a varying task and memory load. - + Note: This tests currently results in spilling and very long runtimes. """ maximum = 10 From 2c77cebf6a1edc19affeca474300b8ad04772882 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 4 Aug 2022 14:47:15 +0200 Subject: [PATCH 17/35] Replace semaphores with allowed-failures=0 --- tests/stability/test_adaptive_scaling.py | 208 +++++++++++------------ 1 file changed, 99 insertions(+), 109 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 1ec7e30668..e970bb9551 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -1,6 +1,7 @@ import time import uuid +import dask import dask.array as da import pytest from coiled.v2 import Cluster @@ -67,117 +68,106 @@ def test_adapt_to_changing_workload(minimum: int): """ maximum = 10 fan_out_size = 100 - with Cluster( - name=f"test_adaptive_scaling-{uuid.uuid4().hex}", - n_workers=5, - worker_vm_types=["t3.medium"], - wait_for_workers=True, - ) as cluster: - with Client(cluster) as client: - adapt = cluster.adapt(minimum=minimum, maximum=maximum) - assert len(adapt.log) == 0 - - @delayed - def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int: - # Ensure that no recomputation happens by decrementing a countdown on a semaphore - acquired = sem.acquire(timeout=1) - assert ( - acquired is True - ), "Could not acquire semaphore, likely recomputation happened." - ev.wait() - return x - - def workload( - fan_out_size, - ev_fan_out, - sem_fan_out, - ev_barrier, - sem_barrier, - ev_final_fan_out, - sem_final_fan_out, - ): - fan_out = [ - clog(i, ev=ev_fan_out, sem=sem_fan_out) for i in range(fan_out_size) - ] - barrier = clog(delayed(sum)(fan_out), ev=ev_barrier, sem=sem_barrier) - final_fan_out = [ - clog(i, ev=ev_final_fan_out, sem=sem_final_fan_out, barrier=barrier) - for i in range(fan_out_size) - ] - return final_fan_out - - sem_fan_out = Semaphore(name="fan-out", max_leases=fan_out_size) - ev_fan_out = Event(name="fan-out", client=client) - sem_barrier = Semaphore(name="barrier", max_leases=1) - ev_barrier = Event(name="barrier", client=client) - sem_final_fan_out = Semaphore(name="final-fan-out", max_leases=fan_out_size) - ev_final_fan_out = Event(name="final-fan-out", client=client) - - fut = client.compute( - workload( - fan_out_size=fan_out_size, - ev_fan_out=ev_fan_out, - sem_fan_out=sem_fan_out, - ev_barrier=ev_barrier, - sem_barrier=sem_barrier, - ev_final_fan_out=ev_final_fan_out, - sem_final_fan_out=sem_final_fan_out, + # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior + # during adative scaling but we receive a KilledWorker() instead. + with dask.config.set({"distributed.scheduler.allowed-failures": 0}): + with Cluster( + name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + n_workers=5, + worker_vm_types=["t3.medium"], + wait_for_workers=True, + ) as cluster: + with Client(cluster) as client: + adapt = cluster.adapt(minimum=minimum, maximum=maximum) + assert len(adapt.log) == 0 + + @delayed + def clog(x: int, ev: Event, **kwargs) -> int: + ev.wait() + return x + + def workload( + fan_out_size, + ev_fan_out, + ev_barrier, + ev_final_fan_out, + ): + fan_out = [ + clog(i, ev=ev_fan_out) for i in range(fan_out_size) + ] + barrier = clog(delayed(sum)(fan_out), ev=ev_barrier) + final_fan_out = [ + clog(i, ev=ev_final_fan_out, barrier=barrier) + for i in range(fan_out_size) + ] + return final_fan_out + + ev_fan_out = Event(name="fan-out", client=client) + ev_barrier = Event(name="barrier", client=client) + ev_final_fan_out = Event(name="final-fan-out", client=client) + + fut = client.compute( + workload( + fan_out_size=fan_out_size, + ev_fan_out=ev_fan_out, + ev_barrier=ev_barrier, + ev_final_fan_out=ev_final_fan_out, + ) ) - ) - - # Scale up to maximum - start = time.monotonic() - client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) - end = time.monotonic() - duration_first_scale_up = end - start - assert duration_first_scale_up < 120 - assert len(cluster.observed) == maximum - assert adapt.log[-1][1]["status"] == "up" - - ev_fan_out.set() - # Scale down to a single worker - start = time.monotonic() - while len(cluster.observed) > 1: - if time.monotonic() - start >= TIMEOUT_THRESHOLD: - raise TimeoutError() - time.sleep(0.1) - end = time.monotonic() - duration_first_scale_down = end - start - assert duration_first_scale_down < 330 - assert len(cluster.observed) == 1 - assert adapt.log[-1][1]["status"] == "down" - - ev_barrier.set() - # Scale up to maximum again - start = time.monotonic() - client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) - end = time.monotonic() - duration_second_scale_up = end - start - assert duration_second_scale_up < 120 - assert len(cluster.observed) == maximum - assert adapt.log[-1][1]["status"] == "up" - - ev_final_fan_out.set() - client.gather(fut) - del fut - # Scale down to minimum - start = time.monotonic() - while len(cluster.observed) > minimum: - if time.monotonic() - start >= TIMEOUT_THRESHOLD: - raise TimeoutError() - time.sleep(0.1) - end = time.monotonic() - duration_second_scale_down = end - start - assert duration_second_scale_down < 330 - assert len(cluster.observed) == minimum - assert adapt.log[-1][1]["status"] == "down" - return ( - duration_first_scale_up, - duration_first_scale_down, - duration_second_scale_up, - duration_second_scale_down, - ) + # Scale up to maximum + start = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + end = time.monotonic() + duration_first_scale_up = end - start + assert duration_first_scale_up < 120 + assert len(cluster.observed) == maximum + assert adapt.log[-1][1]["status"] == "up" + + ev_fan_out.set() + # Scale down to a single worker + start = time.monotonic() + while len(cluster.observed) > 1: + if time.monotonic() - start >= TIMEOUT_THRESHOLD: + raise TimeoutError() + time.sleep(0.1) + end = time.monotonic() + duration_first_scale_down = end - start + assert duration_first_scale_down < 330 + assert len(cluster.observed) == 1 + assert adapt.log[-1][1]["status"] == "down" + + ev_barrier.set() + # Scale up to maximum again + start = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + end = time.monotonic() + duration_second_scale_up = end - start + assert duration_second_scale_up < 120 + assert len(cluster.observed) == maximum + assert adapt.log[-1][1]["status"] == "up" + + ev_final_fan_out.set() + client.gather(fut) + del fut + + # Scale down to minimum + start = time.monotonic() + while len(cluster.observed) > minimum: + if time.monotonic() - start >= TIMEOUT_THRESHOLD: + raise TimeoutError() + time.sleep(0.1) + end = time.monotonic() + duration_second_scale_down = end - start + assert duration_second_scale_down < 330 + assert len(cluster.observed) == minimum + assert adapt.log[-1][1]["status"] == "down" + return ( + duration_first_scale_up, + duration_first_scale_down, + duration_second_scale_up, + duration_second_scale_down, + ) @pytest.mark.skip( From c07d673ced5c0e86c068001349f954b01376ee96 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 4 Aug 2022 14:48:14 +0200 Subject: [PATCH 18/35] Typo --- tests/stability/test_adaptive_scaling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index e970bb9551..501c1ddb58 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -69,7 +69,7 @@ def test_adapt_to_changing_workload(minimum: int): maximum = 10 fan_out_size = 100 # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior - # during adative scaling but we receive a KilledWorker() instead. + # during adaptive scaling but we receive a KilledWorker() instead. with dask.config.set({"distributed.scheduler.allowed-failures": 0}): with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", From cf70a3722dcb655c473c76684569169a5ea8d97b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 5 Aug 2022 17:11:20 +0200 Subject: [PATCH 19/35] black --- tests/stability/test_adaptive_scaling.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 501c1ddb58..e64deb3690 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -92,9 +92,7 @@ def workload( ev_barrier, ev_final_fan_out, ): - fan_out = [ - clog(i, ev=ev_fan_out) for i in range(fan_out_size) - ] + fan_out = [clog(i, ev=ev_fan_out) for i in range(fan_out_size)] barrier = clog(delayed(sum)(fan_out), ev=ev_barrier) final_fan_out = [ clog(i, ev=ev_final_fan_out, barrier=barrier) From 2242bf01716df0c3b0cb7ff27cb8b7375befe50b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 5 Aug 2022 17:13:27 +0200 Subject: [PATCH 20/35] flake8 --- tests/stability/test_adaptive_scaling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index e64deb3690..b0f75fa9c4 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -6,7 +6,7 @@ import pytest from coiled.v2 import Cluster from dask import delayed -from dask.distributed import Client, Event, Semaphore, wait +from dask.distributed import Client, Event, wait TIMEOUT_THRESHOLD = 1800 # 10 minutes From 8b192ad1f8c551cef96ecbbf61b42d7dea77aab8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 5 Aug 2022 18:58:21 +0200 Subject: [PATCH 21/35] Increase scale-up threshold --- tests/stability/test_adaptive_scaling.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index b0f75fa9c4..f348d38599 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -12,7 +12,7 @@ @pytest.mark.stability -@pytest.mark.parametrize("minimum,threshold", [(0, 240), (1, 120)]) +@pytest.mark.parametrize("minimum,threshold", [(0, 300), (1, 150)]) @pytest.mark.parametrize( "scatter", ( @@ -118,7 +118,7 @@ def workload( client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) end = time.monotonic() duration_first_scale_up = end - start - assert duration_first_scale_up < 120 + assert duration_first_scale_up < 150 assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -141,7 +141,7 @@ def workload( client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) end = time.monotonic() duration_second_scale_up = end - start - assert duration_second_scale_up < 120 + assert duration_second_scale_up < 150 assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" From 53bdb405a190ccb23fffe1c8ccd341f5ea560b6b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 15 Aug 2022 15:59:06 +0200 Subject: [PATCH 22/35] Set allowed-failures correctly --- tests/stability/test_adaptive_scaling.py | 198 ++++++++++++----------- 1 file changed, 102 insertions(+), 96 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index f348d38599..dcaa9740e8 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -30,6 +30,9 @@ def test_scale_up_on_task_load(minimum, threshold, scatter): n_workers=minimum, worker_vm_types=["t3.medium"], wait_for_workers=True, + # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior + # during adaptive scaling but we receive a KilledWorker() instead. + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0}, ) as cluster: with Client(cluster) as client: adapt = cluster.adapt(minimum=minimum, maximum=maximum) @@ -68,104 +71,104 @@ def test_adapt_to_changing_workload(minimum: int): """ maximum = 10 fan_out_size = 100 - # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior - # during adaptive scaling but we receive a KilledWorker() instead. - with dask.config.set({"distributed.scheduler.allowed-failures": 0}): - with Cluster( - name=f"test_adaptive_scaling-{uuid.uuid4().hex}", - n_workers=5, - worker_vm_types=["t3.medium"], - wait_for_workers=True, - ) as cluster: - with Client(cluster) as client: - adapt = cluster.adapt(minimum=minimum, maximum=maximum) - assert len(adapt.log) == 0 - - @delayed - def clog(x: int, ev: Event, **kwargs) -> int: - ev.wait() - return x - - def workload( - fan_out_size, - ev_fan_out, - ev_barrier, - ev_final_fan_out, - ): - fan_out = [clog(i, ev=ev_fan_out) for i in range(fan_out_size)] - barrier = clog(delayed(sum)(fan_out), ev=ev_barrier) - final_fan_out = [ - clog(i, ev=ev_final_fan_out, barrier=barrier) - for i in range(fan_out_size) - ] - return final_fan_out - - ev_fan_out = Event(name="fan-out", client=client) - ev_barrier = Event(name="barrier", client=client) - ev_final_fan_out = Event(name="final-fan-out", client=client) - - fut = client.compute( - workload( - fan_out_size=fan_out_size, - ev_fan_out=ev_fan_out, - ev_barrier=ev_barrier, - ev_final_fan_out=ev_final_fan_out, - ) - ) + with Cluster( + name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + n_workers=5, + worker_vm_types=["t3.medium"], + wait_for_workers=True, + # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior + # during adaptive scaling but we receive a KilledWorker() instead. + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0}, + ) as cluster: + with Client(cluster) as client: + adapt = cluster.adapt(minimum=minimum, maximum=maximum) + assert len(adapt.log) == 0 - # Scale up to maximum - start = time.monotonic() - client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) - end = time.monotonic() - duration_first_scale_up = end - start - assert duration_first_scale_up < 150 - assert len(cluster.observed) == maximum - assert adapt.log[-1][1]["status"] == "up" - - ev_fan_out.set() - # Scale down to a single worker - start = time.monotonic() - while len(cluster.observed) > 1: - if time.monotonic() - start >= TIMEOUT_THRESHOLD: - raise TimeoutError() - time.sleep(0.1) - end = time.monotonic() - duration_first_scale_down = end - start - assert duration_first_scale_down < 330 - assert len(cluster.observed) == 1 - assert adapt.log[-1][1]["status"] == "down" - - ev_barrier.set() - # Scale up to maximum again - start = time.monotonic() - client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) - end = time.monotonic() - duration_second_scale_up = end - start - assert duration_second_scale_up < 150 - assert len(cluster.observed) == maximum - assert adapt.log[-1][1]["status"] == "up" - - ev_final_fan_out.set() - client.gather(fut) - del fut - - # Scale down to minimum - start = time.monotonic() - while len(cluster.observed) > minimum: - if time.monotonic() - start >= TIMEOUT_THRESHOLD: - raise TimeoutError() - time.sleep(0.1) - end = time.monotonic() - duration_second_scale_down = end - start - assert duration_second_scale_down < 330 - assert len(cluster.observed) == minimum - assert adapt.log[-1][1]["status"] == "down" - return ( - duration_first_scale_up, - duration_first_scale_down, - duration_second_scale_up, - duration_second_scale_down, + @delayed + def clog(x: int, ev: Event, **kwargs) -> int: + ev.wait() + return x + + def workload( + fan_out_size, + ev_fan_out, + ev_barrier, + ev_final_fan_out, + ): + fan_out = [clog(i, ev=ev_fan_out) for i in range(fan_out_size)] + barrier = clog(delayed(sum)(fan_out), ev=ev_barrier) + final_fan_out = [ + clog(i, ev=ev_final_fan_out, barrier=barrier) + for i in range(fan_out_size) + ] + return final_fan_out + + ev_fan_out = Event(name="fan-out", client=client) + ev_barrier = Event(name="barrier", client=client) + ev_final_fan_out = Event(name="final-fan-out", client=client) + + fut = client.compute( + workload( + fan_out_size=fan_out_size, + ev_fan_out=ev_fan_out, + ev_barrier=ev_barrier, + ev_final_fan_out=ev_final_fan_out, ) + ) + + # Scale up to maximum + start = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + end = time.monotonic() + duration_first_scale_up = end - start + assert duration_first_scale_up < 150 + assert len(cluster.observed) == maximum + assert adapt.log[-1][1]["status"] == "up" + + ev_fan_out.set() + # Scale down to a single worker + start = time.monotonic() + while len(cluster.observed) > 1: + if time.monotonic() - start >= TIMEOUT_THRESHOLD: + raise TimeoutError() + time.sleep(0.1) + end = time.monotonic() + duration_first_scale_down = end - start + assert duration_first_scale_down < 330 + assert len(cluster.observed) == 1 + assert adapt.log[-1][1]["status"] == "down" + + ev_barrier.set() + # Scale up to maximum again + start = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + end = time.monotonic() + duration_second_scale_up = end - start + assert duration_second_scale_up < 150 + assert len(cluster.observed) == maximum + assert adapt.log[-1][1]["status"] == "up" + + ev_final_fan_out.set() + client.gather(fut) + del fut + + # Scale down to minimum + start = time.monotonic() + while len(cluster.observed) > minimum: + if time.monotonic() - start >= TIMEOUT_THRESHOLD: + raise TimeoutError() + time.sleep(0.1) + end = time.monotonic() + duration_second_scale_down = end - start + assert duration_second_scale_down < 330 + assert len(cluster.observed) == minimum + assert adapt.log[-1][1]["status"] == "down" + return ( + duration_first_scale_up, + duration_first_scale_down, + duration_second_scale_up, + duration_second_scale_down, + ) @pytest.mark.skip( @@ -184,6 +187,9 @@ def test_adapt_to_memory_intensive_workload(minimum): n_workers=minimum, worker_vm_types=["t3.medium"], wait_for_workers=True, + # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior + # during adaptive scaling but we receive a KilledWorker() instead. + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0}, ) as cluster: with Client(cluster) as client: adapt = cluster.adapt(minimum=minimum, maximum=maximum) From 05b40c6d2c4208080697c7cf392693d1b9f1abb9 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 16 Aug 2022 12:48:24 +0200 Subject: [PATCH 23/35] Fix env vars --- tests/stability/test_adaptive_scaling.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index dcaa9740e8..151e731a57 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -32,7 +32,7 @@ def test_scale_up_on_task_load(minimum, threshold, scatter): wait_for_workers=True, # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior # during adaptive scaling but we receive a KilledWorker() instead. - environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0}, + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, ) as cluster: with Client(cluster) as client: adapt = cluster.adapt(minimum=minimum, maximum=maximum) @@ -78,7 +78,7 @@ def test_adapt_to_changing_workload(minimum: int): wait_for_workers=True, # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior # during adaptive scaling but we receive a KilledWorker() instead. - environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0}, + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, ) as cluster: with Client(cluster) as client: adapt = cluster.adapt(minimum=minimum, maximum=maximum) @@ -189,7 +189,7 @@ def test_adapt_to_memory_intensive_workload(minimum): wait_for_workers=True, # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior # during adaptive scaling but we receive a KilledWorker() instead. - environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0}, + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, ) as cluster: with Client(cluster) as client: adapt = cluster.adapt(minimum=minimum, maximum=maximum) From a5c05faafe538359ecdedfd56cde0316aaf7c30b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 16 Aug 2022 12:52:12 +0200 Subject: [PATCH 24/35] flake8 --- tests/stability/test_adaptive_scaling.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 151e731a57..d36cd6ddb8 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -1,7 +1,6 @@ import time import uuid -import dask import dask.array as da import pytest from coiled.v2 import Cluster From 4fa01bb0e1ac1e82235b4d78d1f93e57118af9af Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Aug 2022 08:27:32 +0200 Subject: [PATCH 25/35] Simplify test_adapt_to_changing_workload --- tests/stability/test_adaptive_scaling.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index d36cd6ddb8..462835971f 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -62,17 +62,18 @@ def clog(x: int, ev: Event) -> int: @pytest.mark.stability -@pytest.mark.parametrize("minimum", (0, 1)) -def test_adapt_to_changing_workload(minimum: int): +@pytest.mark.stability +def test_adapt_to_changing_workload(): """Tests that adaptive scaling reacts within a reasonable amount of time to a varying task load and scales up or down. This also asserts that no recomputation is caused by the scaling. """ - maximum = 10 + minimum=0 + maximum=10 fan_out_size = 100 with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", - n_workers=5, + n_workers=maximum, worker_vm_types=["t3.medium"], wait_for_workers=True, # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior @@ -80,8 +81,6 @@ def test_adapt_to_changing_workload(minimum: int): environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, ) as cluster: with Client(cluster) as client: - adapt = cluster.adapt(minimum=minimum, maximum=maximum) - assert len(adapt.log) == 0 @delayed def clog(x: int, ev: Event, **kwargs) -> int: @@ -115,14 +114,8 @@ def workload( ) ) - # Scale up to maximum - start = time.monotonic() - client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) - end = time.monotonic() - duration_first_scale_up = end - start - assert duration_first_scale_up < 150 - assert len(cluster.observed) == maximum - assert adapt.log[-1][1]["status"] == "up" + adapt = cluster.adapt(minimum=minimum, maximum=maximum) + wait(90) ev_fan_out.set() # Scale down to a single worker From 0824ffdcd44454535c1b04d87a4c79e949c7d106 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Aug 2022 08:42:58 +0200 Subject: [PATCH 26/35] Minor --- tests/stability/test_adaptive_scaling.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 462835971f..8016d850a4 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -7,7 +7,7 @@ from dask import delayed from dask.distributed import Client, Event, wait -TIMEOUT_THRESHOLD = 1800 # 10 minutes +TIMEOUT_THRESHOLD = 900 # 15 minutes @pytest.mark.stability @@ -49,9 +49,9 @@ def clog(x: int, ev: Event) -> int: futures = client.map(clog, numbers, ev=ev_fan_out) - end = time.monotonic() - client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) start = time.monotonic() + client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) + end = time.monotonic() duration = end - start assert duration < threshold, duration assert len(adapt.log) <= 2 From d9c08df77922d410ae5a9989bf1017e810b6a1d5 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Aug 2022 08:44:21 +0200 Subject: [PATCH 27/35] Simplifiy memory-based test --- tests/stability/test_adaptive_scaling.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 8016d850a4..87f25b7b05 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -176,7 +176,7 @@ def test_adapt_to_memory_intensive_workload(minimum): maximum = 10 with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", - n_workers=minimum, + n_workers=maximum, worker_vm_types=["t3.medium"], wait_for_workers=True, # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior @@ -184,9 +184,6 @@ def test_adapt_to_memory_intensive_workload(minimum): environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, ) as cluster: with Client(cluster) as client: - adapt = cluster.adapt(minimum=minimum, maximum=maximum) - assert len(adapt.log) == 0 - def memory_intensive_processing(): matrix = da.random.random((40000, 40000), chunks=(40000, 500)) rechunked = matrix.rechunk((500, 40000)) @@ -214,14 +211,7 @@ def compute_intensive_barrier_task( ) ) - # Scale up to maximum on preprocessing - start = time.monotonic() - client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) - end = time.monotonic() - duration_first_scale_up = end - start - assert duration_first_scale_up < 420, duration_first_scale_up - assert len(cluster.observed) == maximum - assert adapt.log[-1][1]["status"] == "up" + adapt = cluster.adapt(minimum=minimum, maximum=maximum) ev_scale_down.wait() # Scale down to a single worker on barrier task From 2f0a4c1ae071d971cd50a898e41b7e2d823f89c6 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Aug 2022 10:49:24 +0200 Subject: [PATCH 28/35] Add test_adaptive_rechunk_stress --- tests/stability/test_adaptive_scaling.py | 52 ++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 87f25b7b05..ab5824d311 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -163,6 +163,58 @@ def workload( ) +@pytest.mark.stability +def test_adaptive_rechunk_stress(): + """Tests adaptive scaling in a transfer-heavy workload that reduces its memory load + in a series of rechunking and dimensional reduction steps. + """ + with Cluster( + name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + n_workers=32, + worker_vm_types=["t3.large"], + wait_for_workers=True, + # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior + # during adaptive scaling but we receive a KilledWorker() instead. + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, + ) as cluster: + with Client(cluster) as client: + def workload(arr): + arr = arr.sum(axis=[3]) + arr = ( + arr.rechunk((128, 8 * 1024, 2)) + .rechunk((8 * 1024, 128, 2)) + .rechunk((128, 8 * 1024, 2)) + .sum(axis=[2]) + ) + arr = ( + arr.rechunk((64, 8 * 1024)) + .rechunk((8 * 1024, 64)) + .rechunk((64, 8 * 1024)) + .sum(axis=[1]) + ) + return arr.sum() + + # Initialize array on workers to avoid adaptive scale-down + arr = client.persist( + da.random.random( + (8 * 1024, 8 * 1024, 16, 16), chunks=(8 * 1024, 128, 2, 2) + ) + ) + wait(arr) + + cluster.adapt( + minimum=1, + maximum=32, + interval="1s", + target_duration="180s", + wait_count=1, + ) + fut = client.compute(workload(arr)) + del arr + wait(fut) + assert fut.result() + + @pytest.mark.skip( reason="The test behavior is unreliable and may lead to very long runtime (see: coiled-runtime#211)" ) From a6483f1a2b42e0e8b83d7ef324df83c52327f491 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Aug 2022 10:50:59 +0200 Subject: [PATCH 29/35] Remove parameters on test_adapt_to_changing_workload --- tests/stability/test_adaptive_scaling.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index ab5824d311..91087bf844 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -68,8 +68,8 @@ def test_adapt_to_changing_workload(): a varying task load and scales up or down. This also asserts that no recomputation is caused by the scaling. """ - minimum=0 - maximum=10 + minimum = 0 + maximum = 10 fan_out_size = 100 with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", From 2e78b7e6650616d853c33c8c21f950aff6710406 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Aug 2022 10:53:34 +0200 Subject: [PATCH 30/35] linting --- tests/stability/test_adaptive_scaling.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 91087bf844..ee705c1585 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -156,7 +156,6 @@ def workload( assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" return ( - duration_first_scale_up, duration_first_scale_down, duration_second_scale_up, duration_second_scale_down, @@ -166,7 +165,7 @@ def workload( @pytest.mark.stability def test_adaptive_rechunk_stress(): """Tests adaptive scaling in a transfer-heavy workload that reduces its memory load - in a series of rechunking and dimensional reduction steps. + in a series of rechunking and dimensional reduction steps. """ with Cluster( name=f"test_adaptive_scaling-{uuid.uuid4().hex}", @@ -178,6 +177,7 @@ def test_adaptive_rechunk_stress(): environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, ) as cluster: with Client(cluster) as client: + def workload(arr): arr = arr.sum(axis=[3]) arr = ( @@ -201,7 +201,7 @@ def workload(arr): ) ) wait(arr) - + cluster.adapt( minimum=1, maximum=32, @@ -236,6 +236,7 @@ def test_adapt_to_memory_intensive_workload(minimum): environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, ) as cluster: with Client(cluster) as client: + def memory_intensive_processing(): matrix = da.random.random((40000, 40000), chunks=(40000, 500)) rechunked = matrix.rechunk((500, 40000)) @@ -306,7 +307,6 @@ def compute_intensive_barrier_task( assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" return ( - duration_first_scale_up, duration_first_scale_down, duration_second_scale_up, duration_second_scale_down, From 3af2abef7c9cc6e5f9a284680a3d89d12dac82e4 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Aug 2022 11:28:55 +0200 Subject: [PATCH 31/35] Improve test speed --- tests/stability/test_adaptive_scaling.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index ee705c1585..8a48fe74b0 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -7,19 +7,12 @@ from dask import delayed from dask.distributed import Client, Event, wait -TIMEOUT_THRESHOLD = 900 # 15 minutes +TIMEOUT_THRESHOLD = 600 # 10 minutes @pytest.mark.stability @pytest.mark.parametrize("minimum,threshold", [(0, 300), (1, 150)]) -@pytest.mark.parametrize( - "scatter", - ( - False, - pytest.param(True, marks=[pytest.mark.xfail(reason="dask/distributed#6686")]), - ), -) -def test_scale_up_on_task_load(minimum, threshold, scatter): +def test_scale_up_on_task_load(minimum, threshold): """Tests that adaptive scaling reacts in a reasonable amount of time to an increased task load and scales up. """ @@ -43,11 +36,7 @@ def clog(x: int, ev: Event) -> int: ev.wait() return x - numbers = range(100) - if scatter is True: - numbers = client.scatter(list(numbers)) - - futures = client.map(clog, numbers, ev=ev_fan_out) + futures = client.map(clog, range(100), ev=ev_fan_out) start = time.monotonic() client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) From 4187adc9a37078be955ee5e67a46f84282a25011 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Aug 2022 16:27:39 +0200 Subject: [PATCH 32/35] Skip failing test --- tests/stability/test_adaptive_scaling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 8a48fe74b0..61428a26c6 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -50,7 +50,7 @@ def clog(x: int, ev: Event) -> int: return duration -@pytest.mark.stability +@pytest.mark.skip(reason="coiled-runtime#266") @pytest.mark.stability def test_adapt_to_changing_workload(): """Tests that adaptive scaling reacts within a reasonable amount of time to From a82fa29d60028e295727befb069aeb3c821c461c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 23 Aug 2022 18:38:35 +0200 Subject: [PATCH 33/35] Address review comments --- tests/stability/test_adaptive_scaling.py | 56 +++++++++++++++--------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 61428a26c6..842b5f777f 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -1,5 +1,4 @@ import time -import uuid import dask.array as da import pytest @@ -12,13 +11,13 @@ @pytest.mark.stability @pytest.mark.parametrize("minimum,threshold", [(0, 300), (1, 150)]) -def test_scale_up_on_task_load(minimum, threshold): +def test_scale_up_on_task_load(minimum, threshold, test_name_uuid): """Tests that adaptive scaling reacts in a reasonable amount of time to an increased task load and scales up. """ maximum = 10 with Cluster( - name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + name=test_name_uuid, n_workers=minimum, worker_vm_types=["t3.medium"], wait_for_workers=True, @@ -52,7 +51,7 @@ def clog(x: int, ev: Event) -> int: @pytest.mark.skip(reason="coiled-runtime#266") @pytest.mark.stability -def test_adapt_to_changing_workload(): +def test_adapt_to_changing_workload(test_name_uuid): """Tests that adaptive scaling reacts within a reasonable amount of time to a varying task load and scales up or down. This also asserts that no recomputation is caused by the scaling. @@ -61,13 +60,16 @@ def test_adapt_to_changing_workload(): maximum = 10 fan_out_size = 100 with Cluster( - name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + name=test_name_uuid, n_workers=maximum, worker_vm_types=["t3.medium"], wait_for_workers=True, # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior # during adaptive scaling but we receive a KilledWorker() instead. - environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, + environ={ + "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0", + "DASK_DISTRIBUTED__SCHEDULER__UNKNOWN_TASK_DURATION": "500ms", + }, ) as cluster: with Client(cluster) as client: @@ -103,8 +105,15 @@ def workload( ) ) - adapt = cluster.adapt(minimum=minimum, maximum=maximum) - wait(90) + adapt = cluster.adapt( + minimum=minimum, + maximum=maximum, + target_duration="5s", + interval="1s", + wait_count=1, + ) + time.sleep(adapt.interval * 2.1) + assert len(adapt.log) == 0 ev_fan_out.set() # Scale down to a single worker @@ -115,7 +124,7 @@ def workload( time.sleep(0.1) end = time.monotonic() duration_first_scale_down = end - start - assert duration_first_scale_down < 330 + assert duration_first_scale_down < 10, duration_first_scale_down assert len(cluster.observed) == 1 assert adapt.log[-1][1]["status"] == "down" @@ -125,7 +134,7 @@ def workload( client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) end = time.monotonic() duration_second_scale_up = end - start - assert duration_second_scale_up < 150 + assert duration_second_scale_up < 150, duration_second_scale_up assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -141,7 +150,7 @@ def workload( time.sleep(0.1) end = time.monotonic() duration_second_scale_down = end - start - assert duration_second_scale_down < 330 + assert duration_second_scale_down < 10, duration_second_scale_down assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" return ( @@ -152,12 +161,12 @@ def workload( @pytest.mark.stability -def test_adaptive_rechunk_stress(): +def test_adaptive_rechunk_stress(test_name_uuid): """Tests adaptive scaling in a transfer-heavy workload that reduces its memory load in a series of rechunking and dimensional reduction steps. """ with Cluster( - name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + name=test_name_uuid, n_workers=32, worker_vm_types=["t3.large"], wait_for_workers=True, @@ -209,20 +218,21 @@ def workload(arr): ) @pytest.mark.stability @pytest.mark.parametrize("minimum", (0, 1)) -def test_adapt_to_memory_intensive_workload(minimum): +def test_adapt_to_memory_intensive_workload(minimum, test_name_uuid): """Tests that adaptive scaling reacts within a reasonable amount of time to a varying task and memory load. Note: This tests currently results in spilling and very long runtimes. """ maximum = 10 with Cluster( - name=f"test_adaptive_scaling-{uuid.uuid4().hex}", + name=test_name_uuid, n_workers=maximum, worker_vm_types=["t3.medium"], wait_for_workers=True, # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior # during adaptive scaling but we receive a KilledWorker() instead. - environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0", + "DASK_DISTRIBUTED__SCHEDULER__UNKNOWN_TASK_DURATION": "500ms",}, ) as cluster: with Client(cluster) as client: @@ -253,7 +263,13 @@ def compute_intensive_barrier_task( ) ) - adapt = cluster.adapt(minimum=minimum, maximum=maximum) + adapt = cluster.adapt( + minimum=minimum, + maximum=maximum, + target_duration="5s", + interval="1s", + wait_count=1, + ) ev_scale_down.wait() # Scale down to a single worker on barrier task @@ -264,7 +280,7 @@ def compute_intensive_barrier_task( time.sleep(0.1) end = time.monotonic() duration_first_scale_down = end - start - assert duration_first_scale_down < 420, duration_first_scale_down + assert duration_first_scale_down < 60, duration_first_scale_down assert len(cluster.observed) == 1 assert adapt.log[-1][1]["status"] == "down" @@ -277,7 +293,7 @@ def compute_intensive_barrier_task( client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD) end = time.monotonic() duration_second_scale_up = end - start - assert duration_second_scale_up < 420, duration_second_scale_up + assert duration_second_scale_up < 150, duration_second_scale_up assert len(cluster.observed) == maximum assert adapt.log[-1][1]["status"] == "up" @@ -292,7 +308,7 @@ def compute_intensive_barrier_task( time.sleep(0.1) end = time.monotonic() duration_second_scale_down = end - start - assert duration_second_scale_down < 420, duration_second_scale_down + assert duration_second_scale_down < 60, duration_second_scale_down assert len(cluster.observed) == minimum assert adapt.log[-1][1]["status"] == "down" return ( From c34f000eb375a780bfe9ce2c979a8974f4104c72 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 23 Aug 2022 18:49:58 +0200 Subject: [PATCH 34/35] lint --- tests/stability/test_adaptive_scaling.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/stability/test_adaptive_scaling.py b/tests/stability/test_adaptive_scaling.py index 842b5f777f..723260c5a9 100644 --- a/tests/stability/test_adaptive_scaling.py +++ b/tests/stability/test_adaptive_scaling.py @@ -231,8 +231,10 @@ def test_adapt_to_memory_intensive_workload(minimum, test_name_uuid): wait_for_workers=True, # Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior # during adaptive scaling but we receive a KilledWorker() instead. - environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0", - "DASK_DISTRIBUTED__SCHEDULER__UNKNOWN_TASK_DURATION": "500ms",}, + environ={ + "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0", + "DASK_DISTRIBUTED__SCHEDULER__UNKNOWN_TASK_DURATION": "500ms", + }, ) as cluster: with Client(cluster) as client: From 9c556201aa5f69c129185502b412d3d5b060b2cb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 Aug 2022 21:03:04 +0200 Subject: [PATCH 35/35] Empty commit to retrigger CI