Skip to content

Integration tests for adaptive scaling #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
767ef42
Add basic scale-up tests
hendrikmakait Jul 19, 2022
601b722
Adaptive workload test
hendrikmakait Jul 20, 2022
c873e4d
Linting
hendrikmakait Jul 20, 2022
7d2b797
Add memory-intensive changing workload
hendrikmakait Jul 21, 2022
053b150
Adapt timeouts
hendrikmakait Jul 21, 2022
aef1e60
Adjust memory-based test
hendrikmakait Jul 21, 2022
a3c9037
Adjust tests to return measurements if run standalone
hendrikmakait Jul 26, 2022
9281345
Initially wait for workers to validate precondition
hendrikmakait Jul 26, 2022
4a2ce29
Adjust memory-intensive workload size to avoid swapping on scaled-up …
hendrikmakait Jul 26, 2022
69d1454
Adapt memory-intensive workload to withhold second batch of intensive…
hendrikmakait Jul 27, 2022
28135d1
Skip memory-based test
hendrikmakait Jul 27, 2022
6fc69a2
Minor
hendrikmakait Jul 27, 2022
010b5a8
Remove out-of-sync assertion
hendrikmakait Jul 27, 2022
5a844e4
Lose references to futures
hendrikmakait Jul 28, 2022
03b1d11
Clean up code and add docstrings
hendrikmakait Jul 28, 2022
615727c
black
hendrikmakait Jul 28, 2022
2c77ceb
Replace semaphores with allowed-failures=0
hendrikmakait Aug 4, 2022
c07d673
Typo
hendrikmakait Aug 4, 2022
cf70a37
black
hendrikmakait Aug 5, 2022
2242bf0
flake8
hendrikmakait Aug 5, 2022
8b192ad
Increase scale-up threshold
hendrikmakait Aug 5, 2022
53bdb40
Set allowed-failures correctly
hendrikmakait Aug 15, 2022
05b40c6
Fix env vars
hendrikmakait Aug 16, 2022
a5c05fa
flake8
hendrikmakait Aug 16, 2022
4fa01bb
Simplify test_adapt_to_changing_workload
hendrikmakait Aug 19, 2022
0824ffd
Minor
hendrikmakait Aug 19, 2022
d9c08df
Simplifiy memory-based test
hendrikmakait Aug 19, 2022
2f0a4c1
Add test_adaptive_rechunk_stress
hendrikmakait Aug 19, 2022
a6483f1
Remove parameters on test_adapt_to_changing_workload
hendrikmakait Aug 19, 2022
2e78b7e
linting
hendrikmakait Aug 19, 2022
3af2abe
Improve test speed
hendrikmakait Aug 19, 2022
4187adc
Skip failing test
hendrikmakait Aug 19, 2022
a82fa29
Address review comments
hendrikmakait Aug 23, 2022
c34f000
lint
hendrikmakait Aug 23, 2022
9c55620
Empty commit to retrigger CI
hendrikmakait Aug 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 320 additions & 0 deletions tests/stability/test_adaptive_scaling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
import time

import dask.array as da
import pytest
from coiled.v2 import Cluster
from dask import delayed
from dask.distributed import Client, Event, wait

TIMEOUT_THRESHOLD = 600 # 10 minutes


@pytest.mark.stability
@pytest.mark.parametrize("minimum,threshold", [(0, 300), (1, 150)])
def test_scale_up_on_task_load(minimum, threshold, test_name_uuid):
"""Tests that adaptive scaling reacts in a reasonable amount of time to
an increased task load and scales up.
"""
maximum = 10
with Cluster(
name=test_name_uuid,
n_workers=minimum,
worker_vm_types=["t3.medium"],
wait_for_workers=True,
# Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior
# during adaptive scaling but we receive a KilledWorker() instead.
environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"},
) as cluster:
with Client(cluster) as client:
adapt = cluster.adapt(minimum=minimum, maximum=maximum)
time.sleep(adapt.interval * 2.1) # Ensure enough time for system to adapt
assert len(adapt.log) == 0
ev_fan_out = Event(name="fan-out", client=client)

def clog(x: int, ev: Event) -> int:
ev.wait()
return x

futures = client.map(clog, range(100), ev=ev_fan_out)

start = time.monotonic()
client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD)
end = time.monotonic()
duration = end - start
assert duration < threshold, duration
assert len(adapt.log) <= 2
assert adapt.log[-1][1] == {"status": "up", "n": maximum}
ev_fan_out.set()
client.gather(futures)
return duration


@pytest.mark.skip(reason="coiled-runtime#266")
@pytest.mark.stability
def test_adapt_to_changing_workload(test_name_uuid):
"""Tests that adaptive scaling reacts within a reasonable amount of time to
a varying task load and scales up or down. This also asserts that no recomputation
is caused by the scaling.
"""
minimum = 0
maximum = 10
fan_out_size = 100
with Cluster(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to reuse the same cluster via a module-level fixture here, or is it important that both tests use standalone clusters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer a clean cluster here to make sure that I test its scaling behavior in isolation. In addition to that, I'd at the very least have to spin up workers again to end up with maximum workers in the beginning, which might diminish the benefit we gain from reusing a cluster since it will take up quite some time.

name=test_name_uuid,
n_workers=maximum,
worker_vm_types=["t3.medium"],
wait_for_workers=True,
# Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior
# during adaptive scaling but we receive a KilledWorker() instead.
environ={
"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0",
"DASK_DISTRIBUTED__SCHEDULER__UNKNOWN_TASK_DURATION": "500ms",
},
) as cluster:
with Client(cluster) as client:

@delayed
def clog(x: int, ev: Event, **kwargs) -> int:
ev.wait()
return x

def workload(
fan_out_size,
ev_fan_out,
ev_barrier,
ev_final_fan_out,
):
fan_out = [clog(i, ev=ev_fan_out) for i in range(fan_out_size)]
barrier = clog(delayed(sum)(fan_out), ev=ev_barrier)
final_fan_out = [
clog(i, ev=ev_final_fan_out, barrier=barrier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the barrier used for here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to start the final_fan_out tasks once the barrier task, i.e., scaling down, is done. Would replacing barrier with reduction make this clearer?

for i in range(fan_out_size)
]
return final_fan_out

ev_fan_out = Event(name="fan-out", client=client)
ev_barrier = Event(name="barrier", client=client)
ev_final_fan_out = Event(name="final-fan-out", client=client)

fut = client.compute(
workload(
fan_out_size=fan_out_size,
ev_fan_out=ev_fan_out,
ev_barrier=ev_barrier,
ev_final_fan_out=ev_final_fan_out,
)
)

adapt = cluster.adapt(
minimum=minimum,
maximum=maximum,
target_duration="5s",
interval="1s",
wait_count=1,
)
time.sleep(adapt.interval * 2.1)
assert len(adapt.log) == 0

ev_fan_out.set()
# Scale down to a single worker
start = time.monotonic()
while len(cluster.observed) > 1:
if time.monotonic() - start >= TIMEOUT_THRESHOLD:
raise TimeoutError()
time.sleep(0.1)
end = time.monotonic()
duration_first_scale_down = end - start
assert duration_first_scale_down < 10, duration_first_scale_down
assert len(cluster.observed) == 1
assert adapt.log[-1][1]["status"] == "down"

ev_barrier.set()
# Scale up to maximum again
start = time.monotonic()
client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD)
end = time.monotonic()
duration_second_scale_up = end - start
assert duration_second_scale_up < 150, duration_second_scale_up
assert len(cluster.observed) == maximum
assert adapt.log[-1][1]["status"] == "up"

ev_final_fan_out.set()
client.gather(fut)
del fut

# Scale down to minimum
start = time.monotonic()
while len(cluster.observed) > minimum:
if time.monotonic() - start >= TIMEOUT_THRESHOLD:
raise TimeoutError()
time.sleep(0.1)
end = time.monotonic()
duration_second_scale_down = end - start
assert duration_second_scale_down < 10, duration_second_scale_down
assert len(cluster.observed) == minimum
assert adapt.log[-1][1]["status"] == "down"
return (
duration_first_scale_down,
duration_second_scale_up,
duration_second_scale_down,
)


@pytest.mark.stability
def test_adaptive_rechunk_stress(test_name_uuid):
"""Tests adaptive scaling in a transfer-heavy workload that reduces its memory load
in a series of rechunking and dimensional reduction steps.
"""
with Cluster(
name=test_name_uuid,
n_workers=32,
worker_vm_types=["t3.large"],
wait_for_workers=True,
# Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior
# during adaptive scaling but we receive a KilledWorker() instead.
environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"},
) as cluster:
with Client(cluster) as client:

def workload(arr):
arr = arr.sum(axis=[3])
arr = (
arr.rechunk((128, 8 * 1024, 2))
.rechunk((8 * 1024, 128, 2))
.rechunk((128, 8 * 1024, 2))
.sum(axis=[2])
)
arr = (
arr.rechunk((64, 8 * 1024))
.rechunk((8 * 1024, 64))
.rechunk((64, 8 * 1024))
.sum(axis=[1])
)
return arr.sum()

# Initialize array on workers to avoid adaptive scale-down
arr = client.persist(
da.random.random(
(8 * 1024, 8 * 1024, 16, 16), chunks=(8 * 1024, 128, 2, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all necessary, but maybe it would be possible to write this in terms of scaled_array_shape?
https://github.com/coiled/coiled-runtime/blob/78a8614b203971494520d273619383c08b002118/tests/utils_test.py#L14-L36

)
)
wait(arr)

cluster.adapt(
minimum=1,
maximum=32,
interval="1s",
target_duration="180s",
wait_count=1,
)
fut = client.compute(workload(arr))
del arr
wait(fut)
assert fut.result()


@pytest.mark.skip(
reason="The test behavior is unreliable and may lead to very long runtime (see: coiled-runtime#211)"
)
@pytest.mark.stability
@pytest.mark.parametrize("minimum", (0, 1))
def test_adapt_to_memory_intensive_workload(minimum, test_name_uuid):
"""Tests that adaptive scaling reacts within a reasonable amount of time to a varying task and memory load.

Note: This tests currently results in spilling and very long runtimes.
"""
maximum = 10
with Cluster(
name=test_name_uuid,
n_workers=maximum,
worker_vm_types=["t3.medium"],
wait_for_workers=True,
# Note: We set allowed-failures to ensure that no tasks are not retried upon ungraceful shutdown behavior
# during adaptive scaling but we receive a KilledWorker() instead.
environ={
"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0",
"DASK_DISTRIBUTED__SCHEDULER__UNKNOWN_TASK_DURATION": "500ms",
},
) as cluster:
with Client(cluster) as client:

def memory_intensive_processing():
matrix = da.random.random((40000, 40000), chunks=(40000, 500))
rechunked = matrix.rechunk((500, 40000))
reduction = rechunked.sum()
return reduction

@delayed
def clog(x, ev_start: Event, ev_barrier: Event):
ev_start.set()
ev_barrier.wait()
return x

def compute_intensive_barrier_task(
data, ev_start: Event, ev_barrier: Event
):
barrier = clog(data, ev_start, ev_barrier)
return barrier

ev_scale_down = Event(name="scale_down", client=client)
ev_barrier = Event(name="barrier", client=client)

fut = client.compute(
compute_intensive_barrier_task(
memory_intensive_processing(), ev_scale_down, ev_barrier
)
)

adapt = cluster.adapt(
minimum=minimum,
maximum=maximum,
target_duration="5s",
interval="1s",
wait_count=1,
)

ev_scale_down.wait()
# Scale down to a single worker on barrier task
start = time.monotonic()
while len(cluster.observed) > 1:
if time.monotonic() - start >= TIMEOUT_THRESHOLD:
raise TimeoutError()
time.sleep(0.1)
end = time.monotonic()
duration_first_scale_down = end - start
assert duration_first_scale_down < 60, duration_first_scale_down
assert len(cluster.observed) == 1
assert adapt.log[-1][1]["status"] == "down"

ev_barrier.set()
wait(fut)
fut = client.compute(memory_intensive_processing())

# Scale up to maximum on postprocessing
start = time.monotonic()
client.wait_for_workers(n_workers=maximum, timeout=TIMEOUT_THRESHOLD)
end = time.monotonic()
duration_second_scale_up = end - start
assert duration_second_scale_up < 150, duration_second_scale_up
assert len(cluster.observed) == maximum
assert adapt.log[-1][1]["status"] == "up"

wait(fut)
del fut

# Scale down to minimum
start = time.monotonic()
while len(cluster.observed) > minimum:
if time.monotonic() - start >= TIMEOUT_THRESHOLD:
raise TimeoutError()
time.sleep(0.1)
end = time.monotonic()
duration_second_scale_down = end - start
assert duration_second_scale_down < 60, duration_second_scale_down
assert len(cluster.observed) == minimum
assert adapt.log[-1][1]["status"] == "down"
return (
duration_first_scale_down,
duration_second_scale_up,
duration_second_scale_down,
)