Skip to content

Commit 703fcaf

Browse files
committed
Restart client to free memory
1 parent 6edcece commit 703fcaf

File tree

3 files changed

+15
-9
lines changed

3 files changed

+15
-9
lines changed

azimuth/app.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,6 @@ def run_validation_module(pipeline_index=None):
349349
for pipeline_index in range(len(config.pipelines)):
350350
run_validation_module(pipeline_index)
351351
task_manager.clear_worker_cache()
352-
task_manager.restart()
353352

354353

355354
def run_startup_tasks(azimuth_config: AzimuthConfig, cluster: SpecCluster):
@@ -368,6 +367,7 @@ def run_startup_tasks(azimuth_config: AzimuthConfig, cluster: SpecCluster):
368367
run_validation(DatasetSplitName.train, task_manager, azimuth_config)
369368
if _dataset_split_managers.get(DatasetSplitName.eval):
370369
run_validation(DatasetSplitName.eval, task_manager, azimuth_config)
370+
task_manager.restart()
371371

372372
azimuth_config.save() # Save only after the validation modules ran successfully
373373

azimuth/startup.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,16 @@ def startup_tasks(
251251

252252
mods = start_tasks_for_dms(config, dataset_split_managers, task_manager, start_up_tasks)
253253

254-
# Start a thread to monitor the status.
255-
th = threading.Thread(
256-
target=wait_for_startup, args=(mods, task_manager), name=START_UP_THREAD_NAME
257-
)
258-
th.setDaemon(True)
259-
th.start()
254+
startup_ready = all(m.done() for m in mods.values())
255+
if startup_ready:
256+
log.info("Loading the application from cache. It should be accessible now.")
257+
else:
258+
# Start a thread to monitor the status.
259+
th = threading.Thread(
260+
target=wait_for_startup, args=(mods, task_manager), name=START_UP_THREAD_NAME
261+
)
262+
th.setDaemon(True)
263+
th.start()
260264

261265
return mods
262266

@@ -346,4 +350,3 @@ def wait_for_startup(startup_mods: Dict[str, DaskModule], task_manager: TaskMana
346350
task_manager.restart()
347351
# After restarting, it is safe to unlock the task manager.
348352
task_manager.unlock()
349-
log.info("Cluster restarted to free memory.")

azimuth/task_manager.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright ServiceNow, Inc. 2021 – 2022
22
# This source code is licensed under the Apache 2.0 license found in the LICENSE file
33
# in the root directory of this source tree.
4+
import time
45
from typing import Any, Callable, Dict, List, Optional, Tuple
56

67
import structlog
@@ -217,6 +218,8 @@ def clear_worker_cache(self):
217218
self.client.run(ArtifactManager.clear_cache)
218219

219220
def restart(self):
220-
# Clear futures to free memory.
221+
log.info("Cluster restarted to free memory.")
221222
for task_name, module in self.current_tasks.items():
222223
module.future = None
224+
self.client.restart()
225+
time.sleep(2) # Without that, the test routers fail because some tasks can't get scheduled.

0 commit comments

Comments
 (0)