15
15
import asyncio
16
16
import logging
17
17
from datetime import datetime , timedelta
18
- from typing import Any , Dict , cast
18
+ from typing import Any , Dict , Set , cast
19
19
20
20
from quart import Blueprint , current_app , request
21
21
from werkzeug .utils import secure_filename
33
33
get_adapter ,
34
34
get_adapters ,
35
35
get_tests ,
36
+ get_testsuites ,
36
37
remove_adapter ,
37
38
)
38
39
@@ -63,6 +64,7 @@ async def run() -> None:
63
64
64
65
current_app .add_background_task (run )
65
66
return
67
+
66
68
logger .debug (
67
69
"Not enough client_types to run any test(have %s)" ,
68
70
[str (item ) for item in available_adapters ],
@@ -115,21 +117,71 @@ async def cleanup_unresponsive_adapters() -> None:
115
117
)
116
118
117
119
120
+ sleeping_tasks : Set [asyncio .Future [None ]] = set ()
121
+
122
+
123
+ async def interrupt_tasks () -> None :
124
+ logger .info ("Waking up background tasks" )
125
+ for task in sleeping_tasks :
126
+ task .cancel ()
127
+
128
+
129
+ def should_finish_tests () -> bool :
130
+ for testsuite in get_testsuites ():
131
+ for testcase in testsuite .test_cases :
132
+ if testcase .state not in ("failed" , "error" , "success" ):
133
+ logger .info (f"Not exiting because of { testcase } " )
134
+ return False
135
+ return True
136
+
137
+
138
+ async def loop_check_all_tests_done () -> None :
139
+ while not stop_background_tasks :
140
+ logging .debug ("Running check for test completion" )
141
+ if should_finish_tests ():
142
+ # do not await because shutdown() awaits all background tasks (inc this one) to shut down first.
143
+ asyncio .create_task (current_app .shutdown ())
144
+
145
+ sleep_task : asyncio .Future [None ] = asyncio .ensure_future (asyncio .sleep (30 ))
146
+ try :
147
+ sleeping_tasks .add (sleep_task )
148
+ await sleep_task
149
+ except asyncio .CancelledError :
150
+ pass # we don't mind this task being cancelled.
151
+ finally :
152
+ sleeping_tasks .remove (sleep_task )
153
+ logging .info ("Termination task shutting down" )
154
+
155
+
118
156
async def loop_cleanup_unresponsive_adapters () -> None :
119
157
while not stop_background_tasks :
120
- logging .info ("Running sweep for idle adapters" )
158
+ logging .debug ("Running sweep for idle adapters" )
121
159
await cleanup_unresponsive_adapters ()
122
- await asyncio .sleep (30 )
123
160
124
- logging .info ("Finished sweep task" )
161
+ sleep_task : asyncio .Future [None ] = asyncio .ensure_future (asyncio .sleep (30 ))
162
+ try :
163
+ sleeping_tasks .add (sleep_task )
164
+ await sleep_task
165
+ except asyncio .CancelledError :
166
+ pass # we don't mind this task being cancelled.
167
+ finally :
168
+ sleeping_tasks .remove (sleep_task )
169
+ logging .info ("Sweep task shutting down" )
125
170
126
171
127
172
async def loop_check_for_new_tests () -> None :
128
173
while not stop_background_tasks :
129
- logging .info ("Running sweep for new tests" )
174
+ logging .debug ("Running sweep for new tests" )
130
175
await check_for_new_tests ()
131
- await asyncio .sleep (30 )
132
- logging .info ("Finished new test task" )
176
+ sleep_task : asyncio .Future [None ] = asyncio .ensure_future (asyncio .sleep (30 ))
177
+ try :
178
+ sleeping_tasks .add (sleep_task )
179
+ await sleep_task
180
+ except asyncio .CancelledError :
181
+ pass # we don't mind this task being cancelled.
182
+ finally :
183
+ sleeping_tasks .remove (sleep_task )
184
+ logging .info ("New test task shutting down" )
133
185
134
186
135
187
async def adapter_shutdown () -> None :
@@ -153,6 +205,7 @@ async def register(adapter_uuid: str): # type: ignore
153
205
return {}
154
206
adapter = Adapter (adapter_uuid , registration )
155
207
add_adapter (adapter )
208
+ await interrupt_tasks ()
156
209
return {}
157
210
158
211
0 commit comments