Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/nvidia_resiliency_ext/checkpointing/async_ckpt/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,10 @@ def _start_worker(self, rank: int) -> None:
rank (int): the rank of the current trainer process.
"""
ctx = mp.get_context('spawn')
logger.info(f"PersistentAsyncCaller: {rank}, Starting Async Caller")
if rank == 0:
logger.info(f"PersistentAsyncCaller: {rank}, Starting Async Caller")
else:
logger.debug(f"PersistentAsyncCaller: {rank}, Starting Async Caller")
if self.background_worker_is_daemon:
async_loop_target = PersistentAsyncCaller.async_loop_for_daemon_worker
else:
Expand Down Expand Up @@ -612,7 +615,7 @@ def close(self, abort=False):
abort (bool, optional): Default to False. Needs to be manually set to true when
the checkpoint async process needs to be aborted.
"""
logger.info(f"PersistentAsyncCaller: {self.rank}, Destroying Async Caller")
logger.debug(f"PersistentAsyncCaller: {self.rank}, Destroying Async Caller")
Comment thread
shurkat-nvidia marked this conversation as resolved.
Outdated
if self.process:
if abort:
logger.error(f"Persistent worker aborted in rank {self.rank}")
Expand Down Expand Up @@ -663,7 +666,7 @@ def cleanup_worker_data_cache(cls):
to properly release any IPC handles stored in the cache.
"""
if cls._worker_data_cache:
logger.info(f"Cleaning up worker data cache with {len(cls._worker_data_cache)} entries")
logger.debug(f"Cleaning up worker data cache with {len(cls._worker_data_cache)} entries")
# Clear all cached data structures which may contain IPC handles
cls._worker_data_cache.clear()
gc.collect()
Expand Down
Loading