|
7 | 7 | from dask import delayed
|
8 | 8 | from dask.distributed import Client, Event, Semaphore, wait
|
9 | 9 |
|
10 |
| -TIMEOUT_THRESHOLD = 600 # 10 minutes |
| 10 | +TIMEOUT_THRESHOLD = 1800 # 10 minutes |
11 | 11 |
|
12 | 12 |
|
13 | 13 | @pytest.mark.stability
|
@@ -80,23 +80,23 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int:
|
80 | 80 | sem_fan_out = Semaphore(name="fan-out", max_leases=fan_out_size)
|
81 | 81 | ev_fan_out = Event(name="fan-out", client=client)
|
82 | 82 |
|
83 |
| - fan_out = client.map( |
| 83 | + fut = client.map( |
84 | 84 | clog, range(fan_out_size), ev=ev_fan_out, sem=sem_fan_out
|
85 | 85 | )
|
86 | 86 |
|
87 |
| - reduction = client.submit(sum, fan_out) |
| 87 | + fut = client.submit(sum, fut) |
88 | 88 | sem_barrier = Semaphore(name="barrier", max_leases=1)
|
89 | 89 | ev_barrier = Event(name="barrier", client=client)
|
90 |
| - barrier = client.submit(clog, reduction, ev=ev_barrier, sem=sem_barrier) |
| 90 | + fut = client.submit(clog, fut, ev=ev_barrier, sem=sem_barrier) |
91 | 91 |
|
92 | 92 | sem_final_fan_out = Semaphore(name="final-fan-out", max_leases=fan_out_size)
|
93 | 93 | ev_final_fan_out = Event(name="final-fan-out", client=client)
|
94 |
| - final_fan_out = client.map( |
| 94 | + fut = client.map( |
95 | 95 | clog,
|
96 | 96 | range(fan_out_size),
|
97 | 97 | ev=ev_final_fan_out,
|
98 | 98 | sem=sem_final_fan_out,
|
99 |
| - barrier=barrier, |
| 99 | + barrier=fut, |
100 | 100 | )
|
101 | 101 |
|
102 | 102 | # Scale up to maximum
|
@@ -132,7 +132,8 @@ def clog(x: int, ev: Event, sem: Semaphore, **kwargs) -> int:
|
132 | 132 | assert adapt.log[-1][1]["status"] == "up"
|
133 | 133 |
|
134 | 134 | ev_final_fan_out.set()
|
135 |
| - client.gather(final_fan_out) |
| 135 | + client.gather(fut) |
| 136 | + del fut |
136 | 137 |
|
137 | 138 | # Scale down to minimum
|
138 | 139 | start = time.monotonic()
|
|
0 commit comments