-
Notifications
You must be signed in to change notification settings - Fork 19
Integration tests for adaptive scaling #211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Notably, the worker logs contain this:
@ntabris: Does this mean something to you? It feels like upgrades shouldn't cause an issue with workflows within 20 minutes of firing up a cluster. See https://gitlab.com/coiled/cloud/-/issues/5060 |
Another issue I've run into is that In case it helps, these are the related logs:
From what I can tell, the cluster determined that two of the processes exited for "some" reason and I only requested that one worker stopped. This might explain the ominous 9 |
For some reason |
It looks like the |
CI failures can be grouped as follows:
|
23b5a85
to
8a0d566
Compare
Do you have any cluster ids for this? I'd like to see what the AWS shutdown reason was |
I also think the unattended upgrades thing is a red herring, I suspect either this shutdown was requested by adaptive or AWS killed your instance for other reasons. |
@shughes-uk looks like this might be cluster in question: https://cloud.coiled.io/dask-engineering/clusters/44038/details Where do we capture the AWS shutdown reason? |
It would be in the instance stop reason, looks like these were shut down by adaptive scaling intentionally |
@shughes-uk could you say in more detail what's telling you this? Maybe I should already know but this isn't clear to me. Looking at logs, I see this on the scheduler:
and this on one of the first worker to go down (
On the control plane side we don't see the dask worker process (and then instance) stopping until 10:01:44UTC. None of this looks to me like a clean shutdown of worker from adaptive scaling, but it's possible the logs are just very misleading / I don't know what to look for. |
feadd86
to
a5c05fa
Compare
This one has been solved, missed an |
Sorry for the late reply, this appears to be the cluster with the ominous From a first look into the logs, it seems like it did pretty much the same as the cluster @ntabris found. |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An overall question is that the intervals we're waiting for adaptive scaling to happen—even scaling down—are pretty slow (like 420 seconds). Would it be possible to set some scaling parameters differently so these would run faster? I also get that the slow runtime is broadly part of why a lot of these will be skipped (and maybe, even more broadly, why adaptive scaling isn't very useful to actually use right now).
minimum = 0 | ||
maximum = 10 | ||
fan_out_size = 100 | ||
with Cluster( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to reuse the same cluster via a module-level fixture here, or is it important that both tests use standalone clusters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd prefer a clean cluster here to make sure that I test its scaling behavior in isolation. In addition to that, I'd at the very least have to spin up workers again to end up with maximum
workers in the beginning, which might diminish the benefit we gain from reusing a cluster since it will take up quite some time.
""" | ||
maximum = 10 | ||
with Cluster( | ||
name=f"test_adaptive_scaling-{uuid.uuid4().hex}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, didn't know you had addded this in the meantime!
|
||
@pytest.mark.stability | ||
@pytest.mark.parametrize("minimum,threshold", [(0, 300), (1, 150)]) | ||
def test_scale_up_on_task_load(minimum, threshold): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to benchmark these as well? Not sure if they're appropriate to benchmark or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I discussed with @fjetter and for now the idea seems to be focussing on stability, but eventually, we should benchmark these, yes. This test in particular might also be a candidate that could be moved to benchmarks as is (maybe referencing in this file with a module-level docstring).
fan_out = [clog(i, ev=ev_fan_out) for i in range(fan_out_size)] | ||
barrier = clog(delayed(sum)(fan_out), ev=ev_barrier) | ||
final_fan_out = [ | ||
clog(i, ev=ev_final_fan_out, barrier=barrier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the barrier used for here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to start the final_fan_out
tasks once the barrier
task, i.e., scaling down, is done. Would replacing barrier
with reduction
make this clearer?
# Initialize array on workers to avoid adaptive scale-down | ||
arr = client.persist( | ||
da.random.random( | ||
(8 * 1024, 8 * 1024, 16, 16), chunks=(8 * 1024, 128, 2, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not at all necessary, but maybe it would be possible to write this in terms of scaled_array_shape
?
https://github.com/coiled/coiled-runtime/blob/78a8614b203971494520d273619383c08b002118/tests/utils_test.py#L14-L36
Fair point, with the latest shift of perspective on these tests as focusing on stability only, I'll give setting parameters to speed things up another pass. This will likely only benefit us in the case of scaling down though. |
xref: dask/distributed#6962 |
CI failure for Python 3.8, Runtime 0.1.0 (https://github.com/coiled/coiled-runtime/runs/8195319805?check_suite_focus=true#step:6:102) appears to be caused by #306:
|
PR Contents
distributed=2022.6.0
used byv0.1.0
contains regression #306This PR adds a suite of integration tests for adaptive scaling. Generally, these tests assert that the cluster scales up/down as desired and does so quickly enough. For now, quickly is quite loosely defined as scaling within a few minutes. We may want to iterate on this, but using the Coiled default settings, this appears to the closest bound we should use if we want the tests to pass reliably.
Test Statistics
General
Measure: Latency of scaling (up|down) from the point where the runnable tasks indicate that the cluster should scale.
Sample size: 10
test_scale_up_on_task_load
[minimum=0, scatter=True]
Fails with
TimeoutError: No valid workers found
due to dask/distributed#6686Note:
Scaling up from an empty cluster takes ~2x as long as from a non-empty cluster. This is due to the fact that the cluster first spins up a single worker and only scales further once that is done.
test_adapt_to_changing_workload
Note:
Scaling down takes about 5 minutes (due to the
interval='5s'
andwait_count=60
). IMO, while is is not snappy in terms of reaction time to changing workload patterns, it makes sense given the long startup time of containers. Essentially, we wait for about 3x the startup time before scaling down to smooth the pattern and avoid constantly firing up new containers if we over-reacted.test_adapt_to_memory_intensive_workload
TL;DR: This behavior of this test is very unreliable due to the long duration of scaling up, which leads to significant spilling and the workload slowing down tremendously or spilling workers getting OOM-killed.
Note:
Due to the long time it takes for this test to scale up, the first worker generates too much data in the beginning, leading to swapping and slowing down the processing.
When running the memory-intensive task in isolation, it takes ~30 s on a cluster that consists of 10 workers. On an adaptive cluster that starts out with a single worker and can scale up to 10 workers, this becomes highly variable. If it finishes within 10 minutes, it appears to be due to an OOM error killing the worker that had spilled to disk. As a consequence, the data was reshuffled and workers were able to progress much faster.
Due to this, I would currently avoid testing that nothing gets recomputed. As things stand, recomputing tasks might be necessary to drive workloads over the finish line where workers spilled too much.