diff --git a/examples/by_feature/automatic_gradient_accumulation.py b/examples/by_feature/automatic_gradient_accumulation.py index 826aa4d9401..c610f7704c3 100644 --- a/examples/by_feature/automatic_gradient_accumulation.py +++ b/examples/by_feature/automatic_gradient_accumulation.py @@ -217,6 +217,7 @@ def inner_training_loop(batch_size): # And call it at the end with no arguments # Note: You could also refactor this outside of your training loop function inner_training_loop() + accelerator.end_training() def main(): diff --git a/examples/by_feature/checkpointing.py b/examples/by_feature/checkpointing.py index 3b78d94a5a2..ff83f5f1e8a 100644 --- a/examples/by_feature/checkpointing.py +++ b/examples/by_feature/checkpointing.py @@ -276,6 +276,7 @@ def training_function(config, args): if args.output_dir is not None: output_dir = os.path.join(args.output_dir, output_dir) accelerator.save_state(output_dir) + accelerator.end_training() def main(): diff --git a/examples/by_feature/cross_validation.py b/examples/by_feature/cross_validation.py index f2f011e9849..f1caa2672ee 100644 --- a/examples/by_feature/cross_validation.py +++ b/examples/by_feature/cross_validation.py @@ -255,6 +255,7 @@ def training_function(config, args): preds = torch.stack(test_predictions, dim=0).sum(dim=0).div(int(args.num_folds)).argmax(dim=-1) test_metric = metric.compute(predictions=preds, references=test_references) accelerator.print("Average test metrics from all folds:", test_metric) + accelerator.end_training() def main(): diff --git a/examples/by_feature/ddp_comm_hook.py b/examples/by_feature/ddp_comm_hook.py index 5f0c3672e3c..d7028b59793 100644 --- a/examples/by_feature/ddp_comm_hook.py +++ b/examples/by_feature/ddp_comm_hook.py @@ -192,6 +192,7 @@ def training_function(config, args): eval_metric = metric.compute() # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric) + accelerator.end_training() def main(): diff --git a/examples/by_feature/deepspeed_with_config_support.py b/examples/by_feature/deepspeed_with_config_support.py index eaab89f233b..ff7535761af 100755 --- a/examples/by_feature/deepspeed_with_config_support.py +++ b/examples/by_feature/deepspeed_with_config_support.py @@ -716,6 +716,7 @@ def group_texts(examples): with open(os.path.join(args.output_dir, "all_results.json"), "w") as f: json.dump({"perplexity": perplexity, "eval_loss": eval_loss.item()}, f) + accelerator.end_training() if __name__ == "__main__": diff --git a/examples/by_feature/early_stopping.py b/examples/by_feature/early_stopping.py index a3525f12ead..12e087d891f 100644 --- a/examples/by_feature/early_stopping.py +++ b/examples/by_feature/early_stopping.py @@ -222,6 +222,7 @@ def training_function(config, args): # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric) + accelerator.end_training() def main(): diff --git a/examples/by_feature/fsdp_with_peak_mem_tracking.py b/examples/by_feature/fsdp_with_peak_mem_tracking.py index 7c677d4b7a5..0cd3fec55d5 100644 --- a/examples/by_feature/fsdp_with_peak_mem_tracking.py +++ b/examples/by_feature/fsdp_with_peak_mem_tracking.py @@ -399,8 +399,7 @@ def collate_fn(examples): step=epoch, ) - if args.with_tracking: - accelerator.end_training() + accelerator.end_training() def main(): diff --git a/examples/by_feature/gradient_accumulation.py b/examples/by_feature/gradient_accumulation.py index 90046e5f1fc..d277bd7d8bb 100644 --- a/examples/by_feature/gradient_accumulation.py +++ b/examples/by_feature/gradient_accumulation.py @@ -197,6 +197,7 @@ def training_function(config, args): eval_metric = metric.compute() # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric) + accelerator.end_training() def main(): diff --git a/examples/by_feature/local_sgd.py b/examples/by_feature/local_sgd.py index 39e1faac486..cdcf950af19 100644 --- a/examples/by_feature/local_sgd.py +++ b/examples/by_feature/local_sgd.py @@ -202,6 +202,7 @@ def training_function(config, args): eval_metric = metric.compute() # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric) + accelerator.end_training() def main(): diff --git a/examples/by_feature/megatron_lm_gpt_pretraining.py b/examples/by_feature/megatron_lm_gpt_pretraining.py index ec3df408ba6..18488ec41e2 100644 --- a/examples/by_feature/megatron_lm_gpt_pretraining.py +++ b/examples/by_feature/megatron_lm_gpt_pretraining.py @@ -703,6 +703,7 @@ def group_texts(examples): with open(os.path.join(args.output_dir, "all_results.json"), "w") as f: json.dump({"perplexity": perplexity}, f) + accelerator.end_training() if __name__ == "__main__": diff --git a/examples/by_feature/memory.py b/examples/by_feature/memory.py index 22a707331a7..847135e426e 100644 --- a/examples/by_feature/memory.py +++ b/examples/by_feature/memory.py @@ -210,6 +210,7 @@ def inner_training_loop(batch_size): # And call it at the end with no arguments # Note: You could also refactor this outside of your training loop function inner_training_loop() + accelerator.end_training() def main(): diff --git a/examples/by_feature/multi_process_metrics.py b/examples/by_feature/multi_process_metrics.py index 6a65ea1f314..626781ac811 100644 --- a/examples/by_feature/multi_process_metrics.py +++ b/examples/by_feature/multi_process_metrics.py @@ -214,6 +214,7 @@ def training_function(config, args): eval_metric = metric.compute() # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric) + accelerator.end_training() def main(): diff --git a/examples/by_feature/profiler.py b/examples/by_feature/profiler.py index e3efc4d324d..cc1843acdd3 100644 --- a/examples/by_feature/profiler.py +++ b/examples/by_feature/profiler.py @@ -203,6 +203,7 @@ def training_function(config, args): eval_metric = metric.compute() # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric) + accelerator.end_training() def main(): diff --git a/examples/by_feature/schedule_free.py b/examples/by_feature/schedule_free.py index e334d6cd417..e9cf11702c8 100644 --- a/examples/by_feature/schedule_free.py +++ b/examples/by_feature/schedule_free.py @@ -202,6 +202,7 @@ def training_function(config, args): eval_metric = metric.compute() # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric) + accelerator.end_training() def main(): diff --git a/examples/by_feature/tracking.py b/examples/by_feature/tracking.py index 8032bfc05e3..c97c5517b35 100644 --- a/examples/by_feature/tracking.py +++ b/examples/by_feature/tracking.py @@ -236,11 +236,7 @@ def training_function(config, args): step=epoch, ) - # New Code # - # When a run is finished, you should call `accelerator.end_training()` - # to close all of the open trackers - if args.with_tracking: - accelerator.end_training() + accelerator.end_training() def main(): diff --git a/examples/complete_cv_example.py b/examples/complete_cv_example.py index f3a58bf0872..fcc899608be 100644 --- a/examples/complete_cv_example.py +++ b/examples/complete_cv_example.py @@ -262,8 +262,7 @@ def training_function(config, args): output_dir = os.path.join(args.output_dir, output_dir) accelerator.save_state(output_dir) - if args.with_tracking: - accelerator.end_training() + accelerator.end_training() def main(): diff --git a/examples/complete_nlp_example.py b/examples/complete_nlp_example.py index f537086fdec..a31a5d4faef 100644 --- a/examples/complete_nlp_example.py +++ b/examples/complete_nlp_example.py @@ -256,8 +256,7 @@ def collate_fn(examples): output_dir = os.path.join(args.output_dir, output_dir) accelerator.save_state(output_dir) - if args.with_tracking: - accelerator.end_training() + accelerator.end_training() def main(): diff --git a/examples/cv_example.py b/examples/cv_example.py index 87823d5638a..87d1575e08e 100644 --- a/examples/cv_example.py +++ b/examples/cv_example.py @@ -180,6 +180,7 @@ def training_function(config, args): eval_metric = accurate.item() / num_elems # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}: {100 * eval_metric:.2f}") + accelerator.end_training() def main(): diff --git a/examples/inference/pippy/bert.py b/examples/inference/pippy/bert.py index 1bff27f4dd8..bed3562337b 100644 --- a/examples/inference/pippy/bert.py +++ b/examples/inference/pippy/bert.py @@ -76,3 +76,4 @@ output = torch.stack(tuple(output[0])) print(f"Time of first pass: {first_batch}") print(f"Average time per batch: {(end_time - start_time) / 5}") +PartialState().destroy_process_group() diff --git a/examples/inference/pippy/gpt2.py b/examples/inference/pippy/gpt2.py index 6d6200edfd4..994327f3c0d 100644 --- a/examples/inference/pippy/gpt2.py +++ b/examples/inference/pippy/gpt2.py @@ -75,3 +75,4 @@ output = torch.stack(tuple(output[0])) print(f"Time of first pass: {first_batch}") print(f"Average time per batch: {(end_time - start_time) / 5}") +PartialState().destroy_process_group() diff --git a/examples/inference/pippy/llama.py b/examples/inference/pippy/llama.py index ab7fdb97d46..a1b2e12bb8f 100644 --- a/examples/inference/pippy/llama.py +++ b/examples/inference/pippy/llama.py @@ -52,3 +52,4 @@ next_token_logits = output[0][:, -1, :] next_token = torch.argmax(next_token_logits, dim=-1) print(tokenizer.batch_decode(next_token)) +PartialState().destroy_process_group() diff --git a/examples/inference/pippy/t5.py b/examples/inference/pippy/t5.py index 98b6bb42aa3..2f9218aef14 100644 --- a/examples/inference/pippy/t5.py +++ b/examples/inference/pippy/t5.py @@ -87,3 +87,4 @@ output = torch.stack(tuple(output[0])) print(f"Time of first pass: {first_batch}") print(f"Average time per batch: {(end_time - start_time) / 5}") +PartialState().destroy_process_group() diff --git a/examples/nlp_example.py b/examples/nlp_example.py index b26972f9e19..3d2012f097d 100644 --- a/examples/nlp_example.py +++ b/examples/nlp_example.py @@ -185,6 +185,7 @@ def training_function(config, args): eval_metric = metric.compute() # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric) + accelerator.end_training() def main(): diff --git a/setup.py b/setup.py index 9310cd551ce..27d609cfa11 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ }, python_requires=">=3.8.0", install_requires=[ - "numpy>=1.17,<2.0.0", + "numpy>=1.17,<3.0.0", "packaging>=20.0", "psutil", "pyyaml", diff --git a/src/accelerate/accelerator.py b/src/accelerate/accelerator.py index 56fb754b9f1..3f5f1279132 100755 --- a/src/accelerate/accelerator.py +++ b/src/accelerate/accelerator.py @@ -2727,9 +2727,7 @@ def end_training(self): for tracker in self.trackers: tracker.finish() - if torch.distributed.is_initialized(): - # needed when using torch.distributed.init_process_group - torch.distributed.destroy_process_group() + self.state.destroy_process_group() def save(self, obj, f, safe_serialization=False): """ diff --git a/src/accelerate/state.py b/src/accelerate/state.py index 5ebad46c1be..37c96babb9e 100644 --- a/src/accelerate/state.py +++ b/src/accelerate/state.py @@ -789,6 +789,16 @@ def set_device(self): self.device = torch.device(device, device_index) device_module.set_device(self.device) + def destroy_process_group(self, group=None): + """ + Destroys the process group. If one is not specified, the default process group is destroyed. + """ + if self.fork_launched and group is None: + return + # needed when using torch.distributed.init_process_group + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group(group) + def __getattr__(self, name: str): # By this point we know that no attributes of `self` contain `name`, # so we just modify the error message @@ -983,6 +993,18 @@ def _reset_state(reset_partial_state: bool = False): if reset_partial_state: PartialState._reset_state() + def destroy_process_group(self, group=None): + """ + Destroys the process group. If one is not specified, the default process group is destroyed. + + If `self.fork_lauched` is `True` and `group` is `None`, nothing happens. + """ + PartialState().destroy_process_group(group) + + @property + def fork_launched(self): + return PartialState().fork_launched + @property def use_distributed(self): """ diff --git a/src/accelerate/test_utils/scripts/external_deps/test_checkpointing.py b/src/accelerate/test_utils/scripts/external_deps/test_checkpointing.py index 41c77c7ec5e..7b86a40b207 100644 --- a/src/accelerate/test_utils/scripts/external_deps/test_checkpointing.py +++ b/src/accelerate/test_utils/scripts/external_deps/test_checkpointing.py @@ -223,6 +223,7 @@ def training_function(config, args): if accelerator.is_main_process: with open(os.path.join(args.output_dir, f"state_{epoch}.json"), "w") as f: json.dump(state, f) + accelerator.end_training() def main(): diff --git a/src/accelerate/test_utils/scripts/external_deps/test_metrics.py b/src/accelerate/test_utils/scripts/external_deps/test_metrics.py index 9ac13aba626..dfbf5c9f90c 100755 --- a/src/accelerate/test_utils/scripts/external_deps/test_metrics.py +++ b/src/accelerate/test_utils/scripts/external_deps/test_metrics.py @@ -294,6 +294,7 @@ def main(): if accelerator.is_local_main_process: print("**Test that `drop_last` is taken into account**") test_gather_for_metrics_drop_last() + accelerator.end_training() accelerator.state._reset_state() diff --git a/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py b/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py index 92dfd9f1da4..93e3a34ffb1 100644 --- a/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py +++ b/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py @@ -240,6 +240,7 @@ def training_function(config, args): if accelerator.is_main_process: with open(os.path.join(args.output_dir, "peak_memory_utilization.json"), "w") as f: json.dump(train_total_peak_memory, f) + accelerator.end_training() def main(): diff --git a/src/accelerate/test_utils/scripts/external_deps/test_performance.py b/src/accelerate/test_utils/scripts/external_deps/test_performance.py index 7051859aa74..f1f7ddd579f 100644 --- a/src/accelerate/test_utils/scripts/external_deps/test_performance.py +++ b/src/accelerate/test_utils/scripts/external_deps/test_performance.py @@ -205,6 +205,7 @@ def training_function(config, args): if accelerator.is_main_process: with open(os.path.join(args.output_dir, "all_results.json"), "w") as f: json.dump(performance_metric, f) + accelerator.end_training() def main(): diff --git a/src/accelerate/test_utils/scripts/external_deps/test_pippy.py b/src/accelerate/test_utils/scripts/external_deps/test_pippy.py index f589365649d..86dd4139551 100644 --- a/src/accelerate/test_utils/scripts/external_deps/test_pippy.py +++ b/src/accelerate/test_utils/scripts/external_deps/test_pippy.py @@ -125,5 +125,6 @@ def test_resnet(batch_size: int = 2): state.print("Testing CV model...") test_resnet() test_resnet(3) + state.destroy_process_group() else: print("Less than two GPUs found, not running tests!") diff --git a/src/accelerate/test_utils/scripts/test_ddp_comm_hook.py b/src/accelerate/test_utils/scripts/test_ddp_comm_hook.py index cc7fdaa88d2..01c939db15b 100644 --- a/src/accelerate/test_utils/scripts/test_ddp_comm_hook.py +++ b/src/accelerate/test_utils/scripts/test_ddp_comm_hook.py @@ -13,7 +13,7 @@ # limitations under the License. import torch -from accelerate import Accelerator, DDPCommunicationHookType, DistributedDataParallelKwargs +from accelerate import Accelerator, DDPCommunicationHookType, DistributedDataParallelKwargs, PartialState class MockModel(torch.nn.Module): @@ -71,6 +71,7 @@ def main(): ]: print(f"Test DDP comm hook: {comm_hook}, comm wrapper: {comm_wrapper}") test_ddp_comm_hook(comm_hook, comm_wrapper, comm_state_option) + PartialState().destroy_process_group() if __name__ == "__main__": diff --git a/src/accelerate/test_utils/scripts/test_distributed_data_loop.py b/src/accelerate/test_utils/scripts/test_distributed_data_loop.py index 0bd127e832d..2e9d3ee1de7 100644 --- a/src/accelerate/test_utils/scripts/test_distributed_data_loop.py +++ b/src/accelerate/test_utils/scripts/test_distributed_data_loop.py @@ -307,6 +307,8 @@ def main(): loader = DataLoader(dataset, sampler=sampler, batch_size=None, collate_fn=default_collate, num_workers=NUM_WORKERS) test_data_loader(loader, accelerator) + accelerator.end_training() + if __name__ == "__main__": main() diff --git a/src/accelerate/test_utils/scripts/test_merge_weights.py b/src/accelerate/test_utils/scripts/test_merge_weights.py index c847ec2ebfb..a1390864047 100644 --- a/src/accelerate/test_utils/scripts/test_merge_weights.py +++ b/src/accelerate/test_utils/scripts/test_merge_weights.py @@ -158,3 +158,4 @@ def test_merge_weights_command_pytorch(model, path): if accelerator.is_main_process: shutil.rmtree(out_path) accelerator.wait_for_everyone() + accelerator.end_training() diff --git a/src/accelerate/test_utils/scripts/test_notebook.py b/src/accelerate/test_utils/scripts/test_notebook.py index 5e1e3931d39..267c11b50b2 100644 --- a/src/accelerate/test_utils/scripts/test_notebook.py +++ b/src/accelerate/test_utils/scripts/test_notebook.py @@ -110,6 +110,8 @@ def main(): if is_bnb_available(): print("Test problematic imports (bnb)") test_problematic_imports() + if NUM_PROCESSES > 1: + PartialState().destroy_process_group() if __name__ == "__main__": diff --git a/src/accelerate/test_utils/scripts/test_ops.py b/src/accelerate/test_utils/scripts/test_ops.py index 1b18780fa70..e70d0fe504f 100644 --- a/src/accelerate/test_utils/scripts/test_ops.py +++ b/src/accelerate/test_utils/scripts/test_ops.py @@ -173,6 +173,7 @@ def main(): test_op_checker(state) state.print("testing sending tensors across devices") test_copy_tensor_to_devices(state) + state.destroy_process_group() if __name__ == "__main__": diff --git a/src/accelerate/test_utils/scripts/test_script.py b/src/accelerate/test_utils/scripts/test_script.py index 25bdfcb71b1..1d54d098a63 100644 --- a/src/accelerate/test_utils/scripts/test_script.py +++ b/src/accelerate/test_utils/scripts/test_script.py @@ -822,6 +822,8 @@ def main(): print("\n**Test reinstantiated state**") test_reinstantiated_state() + state.destroy_process_group() + if __name__ == "__main__": main() diff --git a/src/accelerate/test_utils/scripts/test_sync.py b/src/accelerate/test_utils/scripts/test_sync.py index bbfec6783d0..1029f475f6c 100644 --- a/src/accelerate/test_utils/scripts/test_sync.py +++ b/src/accelerate/test_utils/scripts/test_sync.py @@ -20,7 +20,7 @@ from torch.optim.lr_scheduler import LambdaLR from torch.utils.data import DataLoader -from accelerate.accelerator import Accelerator, GradientAccumulationPlugin +from accelerate.accelerator import Accelerator, DataLoaderConfiguration, GradientAccumulationPlugin from accelerate.state import GradientState from accelerate.test_utils import RegressionDataset, RegressionModel from accelerate.utils import DistributedType, set_seed @@ -249,9 +249,9 @@ def test_gradient_accumulation_with_opt_and_scheduler( split_batches=False, dispatch_batches=False, sync_each_batch=False ): gradient_accumulation_plugin = GradientAccumulationPlugin(num_steps=2, sync_each_batch=sync_each_batch) + dataloader_config = DataLoaderConfiguration(split_batches=split_batches, dispatch_batches=dispatch_batches) accelerator = Accelerator( - split_batches=split_batches, - dispatch_batches=dispatch_batches, + dataloader_config=dataloader_config, gradient_accumulation_plugin=gradient_accumulation_plugin, ) # Test that context manager behaves properly @@ -392,6 +392,7 @@ def main(): f"`split_batches={split_batch}` and `dispatch_batches={dispatch_batches}` and `sync_each_batch={sync_each_batch}`**", ) test_gradient_accumulation_with_opt_and_scheduler(split_batch, dispatch_batches, sync_each_batch) + state.destroy_process_group() def _mp_fn(index): diff --git a/tests/test_metrics.py b/tests/test_metrics.py index fcdacf64d32..d953e2edee7 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -14,6 +14,9 @@ import unittest +import numpy as np +from packaging import version + from accelerate import debug_launcher from accelerate.test_utils import ( DEFAULT_LAUNCH_COMMAND, @@ -29,6 +32,7 @@ @require_huggingface_suite +@unittest.skipIf(version.parse(np.__version__) >= version.parse("2.0"), "Test requires numpy version < 2.0") class MetricTester(unittest.TestCase): def setUp(self): self.test_file_path = path_in_accelerate_package("test_utils", "scripts", "external_deps", "test_metrics.py") diff --git a/tests/test_tracking.py b/tests/test_tracking.py index 730bcbf25c4..99a147f55c1 100644 --- a/tests/test_tracking.py +++ b/tests/test_tracking.py @@ -27,6 +27,7 @@ import numpy as np import torch +from packaging import version # We use TF to parse the logs from accelerate import Accelerator @@ -68,6 +69,7 @@ @require_tensorboard class TensorBoardTrackingTest(unittest.TestCase): + @unittest.skipIf(version.parse(np.__version__) >= version.parse("2.0"), "TB doesn't support numpy 2.0") def test_init_trackers(self): project_name = "test_project_with_config" with tempfile.TemporaryDirectory() as dirpath: