diff --git a/src/nvidia_resiliency_ext/checkpointing/async_ckpt/core.py b/src/nvidia_resiliency_ext/checkpointing/async_ckpt/core.py index 3680b173..b114ad94 100644 --- a/src/nvidia_resiliency_ext/checkpointing/async_ckpt/core.py +++ b/src/nvidia_resiliency_ext/checkpointing/async_ckpt/core.py @@ -486,7 +486,10 @@ def _start_worker(self, rank: int) -> None: rank (int): the rank of the current trainer process. """ ctx = mp.get_context('spawn') - logger.info(f"PersistentAsyncCaller: {rank}, Starting Async Caller") + if rank == 0: + logger.info(f"PersistentAsyncCaller: {rank}, Starting Async Caller") + else: + logger.debug(f"PersistentAsyncCaller: {rank}, Starting Async Caller") if self.background_worker_is_daemon: async_loop_target = PersistentAsyncCaller.async_loop_for_daemon_worker else: @@ -612,7 +615,10 @@ def close(self, abort=False): abort (bool, optional): Default to False. Needs to be manually set to true when the checkpoint async process needs to be aborted. """ - logger.info(f"PersistentAsyncCaller: {self.rank}, Destroying Async Caller") + if self.rank == 0: + logger.info(f"PersistentAsyncCaller: {self.rank}, Destroying Async Caller") + else: + logger.debug(f"PersistentAsyncCaller: {self.rank}, Destroying Async Caller") if self.process: if abort: logger.error(f"Persistent worker aborted in rank {self.rank}") @@ -663,7 +669,9 @@ def cleanup_worker_data_cache(cls): to properly release any IPC handles stored in the cache. """ if cls._worker_data_cache: - logger.info(f"Cleaning up worker data cache with {len(cls._worker_data_cache)} entries") + logger.debug( + f"Cleaning up worker data cache with {len(cls._worker_data_cache)} entries" + ) # Clear all cached data structures which may contain IPC handles cls._worker_data_cache.clear() gc.collect()