From 8ea84172458f0239be92d72e18f1a4b0a8f3d5ed Mon Sep 17 00:00:00 2001 From: misko Date: Thu, 23 Oct 2025 02:20:18 +0000 Subject: [PATCH 01/13] add local head mode to ParallelMLIPPredictUnitRay --- .../core/launchers/cluster/ray_cluster.py | 2 + src/fairchem/core/units/mlip_unit/predict.py | 129 +++++++++++------- 2 files changed, 84 insertions(+), 47 deletions(-) diff --git a/src/fairchem/core/launchers/cluster/ray_cluster.py b/src/fairchem/core/launchers/cluster/ray_cluster.py index 99a78b11b0..ffe9ecc441 100644 --- a/src/fairchem/core/launchers/cluster/ray_cluster.py +++ b/src/fairchem/core/launchers/cluster/ray_cluster.py @@ -205,6 +205,8 @@ def _ray_head_script( "--num-gpus", f"{num_gpus}", "--dashboard-host=0.0.0.0", + "--resources", + '{"head": 1}', ], env=head_env, stdout=subprocess.PIPE, diff --git a/src/fairchem/core/units/mlip_unit/predict.py b/src/fairchem/core/units/mlip_unit/predict.py index 56377d4d2a..dfdda24de3 100644 --- a/src/fairchem/core/units/mlip_unit/predict.py +++ b/src/fairchem/core/units/mlip_unit/predict.py @@ -12,6 +12,7 @@ import math import os import random +import sys from collections import defaultdict from contextlib import nullcontext from functools import wraps @@ -340,8 +341,7 @@ def move_tensors_to_cpu(data): return data -@remote -class MLIPWorker: +class MLIPWorkerLocal: def __init__( self, worker_id: int, @@ -361,54 +361,58 @@ def __init__( ) self.master_port = get_free_port() if master_port is None else master_port self.is_setup = False + self.last_received_atomic_data = None def get_master_address_and_port(self): return (self.master_address, self.master_port) def _distributed_setup( self, - worker_id: int, - master_port: int, - world_size: int, - predictor_config: dict, - master_address: str, ): # initialize distributed environment # TODO, this wont work for multi-node, need to fix master addr - logging.info(f"Initializing worker {worker_id}...") - setup_env_local_multi_gpu(worker_id, master_port, master_address) - # local_rank = int(os.environ["LOCAL_RANK"]) - device = predictor_config.get("device", "cpu") + logging.info(f"Initializing worker {self.worker_id}...") + setup_env_local_multi_gpu(self.worker_id, self.master_port, self.master_address) + + device = self.predictor_config.get("device", "cpu") assign_device_for_local_rank(device == "cpu", 0) backend = "gloo" if device == "cpu" else "nccl" dist.init_process_group( backend=backend, - rank=worker_id, - world_size=world_size, + rank=self.worker_id, + world_size=self.world_size, ) - gp_utils.setup_graph_parallel_groups(world_size, backend) - self.predict_unit = hydra.utils.instantiate(predictor_config) + gp_utils.setup_graph_parallel_groups(self.world_size, backend) + self.predict_unit = hydra.utils.instantiate(self.predictor_config) + self.device = get_device_for_local_rank() logging.info( - f"Worker {worker_id}, gpu_id: {ray.get_gpu_ids()}, loaded predict unit: {self.predict_unit}, " - f"on port {self.master_port}, with device: {get_device_for_local_rank()}, config: {self.predictor_config}" + f"Worker {self.worker_id}, gpu_id: {ray.get_gpu_ids()}, loaded predict unit: {self.predict_unit}, " + f"on port {self.master_port}, with device: {self.device}, config: {self.predictor_config}" ) + self.is_setup = True - def predict(self, data: AtomicData) -> dict[str, torch.tensor] | None: + def predict( + self, data: AtomicData, md: bool = False + ) -> dict[str, torch.tensor] | None: if not self.is_setup: - self._distributed_setup( - self.worker_id, - self.master_port, - self.world_size, - self.predictor_config, - self.master_address, - ) - self.is_setup = True + self._distributed_setup() + out = self.predict_unit.predict(data) - out = move_tensors_to_cpu(out) if self.worker_id == 0: - return out - else: - return None + return move_tensors_to_cpu(out) + + if self.worker_id != 0 and md: + self.last_received_atomic_data = data.to(self.device) + while True: + torch.distributed.broadcast(self.last_received_atomic_data.pos, src=0) + self.predict_unit.predict(self.last_received_atomic_data) + + return None + + +@remote +class MLIPWorker(MLIPWorkerLocal): + pass @requires(ray_installed, message="Requires `ray` to be installed") @@ -446,6 +450,15 @@ def __init__( "atom_refs": atom_refs, "assert_on_nans": assert_on_nans, } + logging.basicConfig( + level=logging.INFO, + force=True, + stream=sys.stdout, + format="%(asctime)s %(levelname)s [%(processName)s] %(name)s: %(message)s", + ) + # Optional: keep Ray/uvicorn chatty logs in check + logging.getLogger("ray").setLevel(logging.INFO) + logging.getLogger("uvicorn").setLevel(logging.INFO) if not ray.is_initialized(): ray.init( logging_level=logging.INFO, @@ -454,6 +467,9 @@ def __init__( # }, ) + self.local = True + self.last_sent_atomic_data = None + num_nodes = math.ceil(num_workers / num_workers_per_node) num_workers_on_node_array = [num_workers_per_node] * num_nodes if num_workers % num_workers_per_node > 0: @@ -465,10 +481,12 @@ def __init__( # first create one placement group for each node num_gpu_per_worker = 1 if device == "cuda" else 0 placement_groups = [] - for workers in num_workers_on_node_array: + for node_idx, workers in enumerate(num_workers_on_node_array): bundle = {"CPU": workers} if device == "cuda": bundle["GPU"] = workers + if self.local and node_idx == 0: + bundle["head"] = 0.1 pg = ray.util.placement_group([bundle], strategy="STRICT_PACK") placement_groups.append(pg) ray.get(pg.ready()) # Wait for each placement group to be scheduled @@ -482,11 +500,21 @@ def __init__( placement_group_capture_child_tasks=True, # Ensure child tasks also run in this PG ), ).remote(0, num_workers, predict_unit_config) - master_addr, master_port = ray.get( - rank0_worker.get_master_address_and_port.remote() - ) + + self.workers = [] + if self.local: + self.local_rank0 = MLIPWorkerLocal( + worker_id=0, + world_size=num_workers, + predictor_config=predict_unit_config, + ) + master_addr, master_port = self.local_rank0.get_master_address_and_port() + else: + master_addr, master_port = ray.get( + rank0_worker.get_master_address_and_port.remote() + ) + self.workers.append(rank0_worker) logging.info(f"Started rank0 on {master_addr}:{master_port}") - self.workers = [rank0_worker] # next place all ranks in order and pack them on placement groups # ie: rank0-7 -> placement group 0, 8->15 -> placement group 1 etc. @@ -520,20 +548,27 @@ def __init__( self.workers.append(actor) worker_id += 1 - def predict( - self, data: AtomicData, undo_element_references: bool = True - ) -> dict[str, torch.tensor]: + def predict(self, data: AtomicData, md: bool = False) -> dict[str, torch.tensor]: # put the reference in the object store only once # this data transfer should be made more efficienct by using a shared memory transfer + nccl broadcast - data_ref = ray.put(data) - futures = [w.predict.remote(data_ref) for w in self.workers] - # just get the first result that is ready since they are identical - # the rest of the futures should go out of scope and memory garbage collected - # ready_ids, _ = ray.wait(futures, num_returns=1) - # result = ray.get(ready_ids[0]) - # result = ray.get(futures) - # return result[0] - return ray.get(futures[0]) + + if not md: + data_ref = ray.put(data) + futures = [w.predict.remote(data_ref) for w in self.workers] + if self.local: + return self.local_rank0.predict(data) + return ray.get(futures[0]) + + assert self.local, "MD only supported in local mode currently" + # MD is enabled + if self.last_sent_atomic_data is None: + data_ref = ray.put(data) + futures = [w.predict.remote(data_ref, md=md) for w in self.workers] + self.last_sent_atomic_data = data + else: + data = data.to(self.local_rank0.device) + torch.distributed.broadcast(data.pos, src=0) + return self.local_rank0.predict(data) @property def dataset_to_tasks(self) -> dict[str, list]: From 163f0ae09f79ccfe460e31b376ad67e91e50b793 Mon Sep 17 00:00:00 2001 From: misko Date: Fri, 24 Oct 2025 17:37:30 +0000 Subject: [PATCH 02/13] detect gpus on head --- src/fairchem/core/units/mlip_unit/predict.py | 43 ++++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/src/fairchem/core/units/mlip_unit/predict.py b/src/fairchem/core/units/mlip_unit/predict.py index beb694e65c..061b6ef0d3 100644 --- a/src/fairchem/core/units/mlip_unit/predict.py +++ b/src/fairchem/core/units/mlip_unit/predict.py @@ -438,6 +438,7 @@ def __init__( seed=seed, atom_refs=atom_refs, ) + self.inference_settings = inference_settings self._dataset_to_tasks = copy.deepcopy(_mlip_pred_unit.dataset_to_tasks) predict_unit_config = { @@ -450,6 +451,7 @@ def __init__( "atom_refs": atom_refs, "assert_on_nans": assert_on_nans, } + logging.basicConfig( level=logging.INFO, force=True, @@ -467,9 +469,14 @@ def __init__( # }, ) - self.local = True self.last_sent_atomic_data = None + # check if we have a GPU locally + head = next( + n for n in ray.nodes() if n["Alive"] and n["Resources"].get("head", 0) > 0 + ) + self.head_node_has_gpu = head["Resources"].get("GPU", 0) > 0 + num_nodes = math.ceil(num_workers / num_workers_per_node) num_workers_on_node_array = [num_workers_per_node] * num_nodes if num_workers % num_workers_per_node > 0: @@ -485,7 +492,7 @@ def __init__( bundle = {"CPU": workers} if device == "cuda": bundle["GPU"] = workers - if self.local and node_idx == 0: + if self.head_node_has_gpu and node_idx == 0: bundle["head"] = 0.1 pg = ray.util.placement_group([bundle], strategy="STRICT_PACK") placement_groups.append(pg) @@ -502,7 +509,7 @@ def __init__( ).remote(0, num_workers, predict_unit_config) self.workers = [] - if self.local: + if self.head_node_has_gpu: self.local_rank0 = MLIPWorkerLocal( worker_id=0, world_size=num_workers, @@ -548,28 +555,30 @@ def __init__( self.workers.append(actor) worker_id += 1 - def predict(self, data: AtomicData, md: bool = False) -> dict[str, torch.tensor]: + def predict(self, data: AtomicData) -> dict[str, torch.tensor]: # put the reference in the object store only once # this data transfer should be made more efficienct by using a shared memory transfer + nccl broadcast - if not md: + if self.head_node_has_gpu and self.inference_settings.merge_mole: + if self.last_sent_atomic_data is None: + data_ref = ray.put(data) + # this will put the ray works into an infinite loop listening for broadcasts + futures = [ + w.predict.remote(data_ref, md=self.inference_settings.merge_mole) + for w in self.workers + ] + self.last_sent_atomic_data = data + else: + data = data.to(self.local_rank0.device) + torch.distributed.broadcast(data.pos, src=0) + return self.local_rank0.predict(data) + else: data_ref = ray.put(data) futures = [w.predict.remote(data_ref) for w in self.workers] - if self.local: + if self.head_node_has_gpu: return self.local_rank0.predict(data) return ray.get(futures[0]) - assert self.local, "MD only supported in local mode currently" - # MD is enabled - if self.last_sent_atomic_data is None: - data_ref = ray.put(data) - futures = [w.predict.remote(data_ref, md=md) for w in self.workers] - self.last_sent_atomic_data = data - else: - data = data.to(self.local_rank0.device) - torch.distributed.broadcast(data.pos, src=0) - return self.local_rank0.predict(data) - @property def dataset_to_tasks(self) -> dict[str, list]: return self._dataset_to_tasks From 79263cd1725779e5ecdc58267423e94f4e0a25bb Mon Sep 17 00:00:00 2001 From: misko Date: Fri, 24 Oct 2025 19:38:32 +0000 Subject: [PATCH 03/13] rename md param --- src/fairchem/core/units/mlip_unit/predict.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fairchem/core/units/mlip_unit/predict.py b/src/fairchem/core/units/mlip_unit/predict.py index 061b6ef0d3..1cf2e28f97 100644 --- a/src/fairchem/core/units/mlip_unit/predict.py +++ b/src/fairchem/core/units/mlip_unit/predict.py @@ -392,7 +392,7 @@ def _distributed_setup( self.is_setup = True def predict( - self, data: AtomicData, md: bool = False + self, data: AtomicData, use_nccl: bool = False ) -> dict[str, torch.tensor] | None: if not self.is_setup: self._distributed_setup() @@ -401,7 +401,7 @@ def predict( if self.worker_id == 0: return move_tensors_to_cpu(out) - if self.worker_id != 0 and md: + if self.worker_id != 0 and use_nccl: self.last_received_atomic_data = data.to(self.device) while True: torch.distributed.broadcast(self.last_received_atomic_data.pos, src=0) From fe61759f886ea92c28d7dd35f4c445ee3bcbd35f Mon Sep 17 00:00:00 2001 From: misko Date: Fri, 24 Oct 2025 22:51:51 +0000 Subject: [PATCH 04/13] fix case with no head resource --- src/fairchem/core/units/mlip_unit/predict.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/fairchem/core/units/mlip_unit/predict.py b/src/fairchem/core/units/mlip_unit/predict.py index 1cf2e28f97..e83f54d098 100644 --- a/src/fairchem/core/units/mlip_unit/predict.py +++ b/src/fairchem/core/units/mlip_unit/predict.py @@ -473,9 +473,16 @@ def __init__( # check if we have a GPU locally head = next( - n for n in ray.nodes() if n["Alive"] and n["Resources"].get("head", 0) > 0 + ( + n + for n in ray.nodes() + if n["Alive"] and n["Resources"].get("head", 0) > 0 + ), + None, + ) + self.head_node_has_gpu = ( + head["Resources"].get("GPU", 0) > 0 if head is not None else False ) - self.head_node_has_gpu = head["Resources"].get("GPU", 0) > 0 num_nodes = math.ceil(num_workers / num_workers_per_node) num_workers_on_node_array = [num_workers_per_node] * num_nodes From 11c36c666c4b143a4989c385e3aaeeb0e5b1441d Mon Sep 17 00:00:00 2001 From: misko Date: Wed, 29 Oct 2025 22:46:59 +0000 Subject: [PATCH 05/13] cleanup --- src/fairchem/core/units/mlip_unit/predict.py | 60 +++++++------------- 1 file changed, 22 insertions(+), 38 deletions(-) diff --git a/src/fairchem/core/units/mlip_unit/predict.py b/src/fairchem/core/units/mlip_unit/predict.py index e83f54d098..804d139194 100644 --- a/src/fairchem/core/units/mlip_unit/predict.py +++ b/src/fairchem/core/units/mlip_unit/predict.py @@ -480,9 +480,7 @@ def __init__( ), None, ) - self.head_node_has_gpu = ( - head["Resources"].get("GPU", 0) > 0 if head is not None else False - ) + assert head is not None, "Could not find head node in Ray cluster" num_nodes = math.ceil(num_workers / num_workers_per_node) num_workers_on_node_array = [num_workers_per_node] * num_nodes @@ -499,14 +497,14 @@ def __init__( bundle = {"CPU": workers} if device == "cuda": bundle["GPU"] = workers - if self.head_node_has_gpu and node_idx == 0: + if node_idx == 0: bundle["head"] = 0.1 pg = ray.util.placement_group([bundle], strategy="STRICT_PACK") placement_groups.append(pg) ray.get(pg.ready()) # Wait for each placement group to be scheduled - # place rank 0 on placement group 0 - rank0_worker = MLIPWorker.options( + # Need to still place worker to occupy space, otherwise ray double books this GPU + _ = MLIPWorker.options( num_gpus=num_gpu_per_worker, scheduling_strategy=PlacementGroupSchedulingStrategy( placement_group=placement_groups[0], @@ -516,18 +514,12 @@ def __init__( ).remote(0, num_workers, predict_unit_config) self.workers = [] - if self.head_node_has_gpu: - self.local_rank0 = MLIPWorkerLocal( - worker_id=0, - world_size=num_workers, - predictor_config=predict_unit_config, - ) - master_addr, master_port = self.local_rank0.get_master_address_and_port() - else: - master_addr, master_port = ray.get( - rank0_worker.get_master_address_and_port.remote() - ) - self.workers.append(rank0_worker) + self.local_rank0 = MLIPWorkerLocal( + worker_id=0, + world_size=num_workers, + predictor_config=predict_unit_config, + ) + master_addr, master_port = self.local_rank0.get_master_address_and_port() logging.info(f"Started rank0 on {master_addr}:{master_port}") # next place all ranks in order and pack them on placement groups @@ -564,27 +556,19 @@ def __init__( def predict(self, data: AtomicData) -> dict[str, torch.tensor]: # put the reference in the object store only once - # this data transfer should be made more efficienct by using a shared memory transfer + nccl broadcast - - if self.head_node_has_gpu and self.inference_settings.merge_mole: - if self.last_sent_atomic_data is None: - data_ref = ray.put(data) - # this will put the ray works into an infinite loop listening for broadcasts - futures = [ - w.predict.remote(data_ref, md=self.inference_settings.merge_mole) - for w in self.workers - ] - self.last_sent_atomic_data = data - else: - data = data.to(self.local_rank0.device) - torch.distributed.broadcast(data.pos, src=0) - return self.local_rank0.predict(data) - else: + if not self.inference_settings.merge_mole or self.last_sent_atomic_data is None: data_ref = ray.put(data) - futures = [w.predict.remote(data_ref) for w in self.workers] - if self.head_node_has_gpu: - return self.local_rank0.predict(data) - return ray.get(futures[0]) + # this will put the ray works into an infinite loop listening for broadcasts + _futures = [ + w.predict.remote(data_ref, use_nccl=self.inference_settings.merge_mole) + for w in self.workers + ] + self.last_sent_atomic_data = data + else: + data = data.to(self.local_rank0.device) + torch.distributed.broadcast(data.pos, src=0) + + return self.local_rank0.predict(data) @property def dataset_to_tasks(self) -> dict[str, list]: From 3a3753d817e8c9d339662f662ec24128bba0a08c Mon Sep 17 00:00:00 2001 From: misko Date: Thu, 30 Oct 2025 00:12:25 +0000 Subject: [PATCH 06/13] remove need for head resource --- src/fairchem/core/units/mlip_unit/predict.py | 23 +++++++------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/fairchem/core/units/mlip_unit/predict.py b/src/fairchem/core/units/mlip_unit/predict.py index 804d139194..252a25bbf4 100644 --- a/src/fairchem/core/units/mlip_unit/predict.py +++ b/src/fairchem/core/units/mlip_unit/predict.py @@ -366,6 +366,9 @@ def __init__( def get_master_address_and_port(self): return (self.master_address, self.master_port) + def get_device_for_local_rank(self): + return get_device_for_local_rank() + def _distributed_setup( self, ): @@ -471,17 +474,6 @@ def __init__( self.last_sent_atomic_data = None - # check if we have a GPU locally - head = next( - ( - n - for n in ray.nodes() - if n["Alive"] and n["Resources"].get("head", 0) > 0 - ), - None, - ) - assert head is not None, "Could not find head node in Ray cluster" - num_nodes = math.ceil(num_workers / num_workers_per_node) num_workers_on_node_array = [num_workers_per_node] * num_nodes if num_workers % num_workers_per_node > 0: @@ -493,18 +485,16 @@ def __init__( # first create one placement group for each node num_gpu_per_worker = 1 if device == "cuda" else 0 placement_groups = [] - for node_idx, workers in enumerate(num_workers_on_node_array): + for workers in num_workers_on_node_array: bundle = {"CPU": workers} if device == "cuda": bundle["GPU"] = workers - if node_idx == 0: - bundle["head"] = 0.1 pg = ray.util.placement_group([bundle], strategy="STRICT_PACK") placement_groups.append(pg) ray.get(pg.ready()) # Wait for each placement group to be scheduled # Need to still place worker to occupy space, otherwise ray double books this GPU - _ = MLIPWorker.options( + rank0_worker = MLIPWorker.options( num_gpus=num_gpu_per_worker, scheduling_strategy=PlacementGroupSchedulingStrategy( placement_group=placement_groups[0], @@ -513,6 +503,9 @@ def __init__( ), ).remote(0, num_workers, predict_unit_config) + local_gpu_or_cpu = ray.get(rank0_worker.get_device_for_local_rank.remote()) + os.environ[CURRENT_DEVICE_TYPE_STR] = local_gpu_or_cpu + self.workers = [] self.local_rank0 = MLIPWorkerLocal( worker_id=0, From 5d3c6a0d8dfb4960ab0ff48da03cf64b100ab26b Mon Sep 17 00:00:00 2001 From: misko Date: Thu, 30 Oct 2025 00:14:32 +0000 Subject: [PATCH 07/13] remove head resource --- src/fairchem/core/launchers/cluster/ray_cluster.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/fairchem/core/launchers/cluster/ray_cluster.py b/src/fairchem/core/launchers/cluster/ray_cluster.py index ffe9ecc441..99a78b11b0 100644 --- a/src/fairchem/core/launchers/cluster/ray_cluster.py +++ b/src/fairchem/core/launchers/cluster/ray_cluster.py @@ -205,8 +205,6 @@ def _ray_head_script( "--num-gpus", f"{num_gpus}", "--dashboard-host=0.0.0.0", - "--resources", - '{"head": 1}', ], env=head_env, stdout=subprocess.PIPE, From 88ea2cc67d9649c8b91c5c9bb65d8723f4239c80 Mon Sep 17 00:00:00 2001 From: misko Date: Thu, 30 Oct 2025 00:15:51 +0000 Subject: [PATCH 08/13] remove embeddings return when in GP mode --- src/fairchem/core/models/uma/escn_md.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/fairchem/core/models/uma/escn_md.py b/src/fairchem/core/models/uma/escn_md.py index 0555d6daee..27bc1ea2ff 100644 --- a/src/fairchem/core/models/uma/escn_md.py +++ b/src/fairchem/core/models/uma/escn_md.py @@ -647,13 +647,11 @@ def forward( outputs[energy_key] = {"energy": energy} if self.wrap_property else energy - embeddings = emb["node_embedding"].detach() - if gp_utils.initialized(): - embeddings = gp_utils.gather_from_model_parallel_region(embeddings, dim=0) - - outputs["embeddings"] = ( - {"embeddings": embeddings} if self.wrap_property else embeddings - ) + if not gp_utils.initialized(): + embeddings = emb["node_embedding"].detach() + outputs["embeddings"] = ( + {"embeddings": embeddings} if self.wrap_property else embeddings + ) if self.regress_stress: grads = torch.autograd.grad( From 3f711c5a17046342ea280982f4b3f606b3c25101 Mon Sep 17 00:00:00 2001 From: misko Date: Thu, 30 Oct 2025 23:17:55 +0000 Subject: [PATCH 09/13] cleanup before launch --- tests/conftest.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index d293f54626..700581c076 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,8 @@ import pytest import torch +import fairchem.core.common.gp_utils as gp_utils +from fairchem.core.common import distutils @pytest.fixture() def command_line_inference_checkpoint(request): @@ -134,3 +136,10 @@ def water_xyz_file(tmp_path_factory): fpath = d / "water.xyz" fpath.write_text(contents) return str(fpath) + + +@pytest.fixture(autouse=True) +def setup_before_each_test(): + if gp_utils.initialized(): + gp_utils.cleanup_gp() + distutils.cleanup() \ No newline at end of file From eaf99ce23b54378bbc5b05383296d2f70120b341 Mon Sep 17 00:00:00 2001 From: misko Date: Fri, 31 Oct 2025 00:01:45 +0000 Subject: [PATCH 10/13] increase atol --- tests/core/units/mlip_unit/test_predict.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/core/units/mlip_unit/test_predict.py b/tests/core/units/mlip_unit/test_predict.py index ea9f83cb4a..aba872c372 100644 --- a/tests/core/units/mlip_unit/test_predict.py +++ b/tests/core/units/mlip_unit/test_predict.py @@ -14,7 +14,7 @@ from tests.conftest import seed_everywhere FORCE_TOL = 1e-4 -ATOL = 1e-5 +ATOL = 2e-5 def get_fcc_carbon_xtal( From 314f9e5d45971a4e62a59a8ab947905327408956 Mon Sep 17 00:00:00 2001 From: misko Date: Wed, 5 Nov 2025 23:19:42 +0000 Subject: [PATCH 11/13] cleanup gp utils after ppunit --- tests/core/units/mlip_unit/test_predict.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/core/units/mlip_unit/test_predict.py b/tests/core/units/mlip_unit/test_predict.py index c00ddab67c..e5ea0e90b6 100644 --- a/tests/core/units/mlip_unit/test_predict.py +++ b/tests/core/units/mlip_unit/test_predict.py @@ -11,6 +11,8 @@ from fairchem.core.units.mlip_unit.api.inference import InferenceSettings from fairchem.core.units.mlip_unit.predict import ParallelMLIPPredictUnit from tests.conftest import seed_everywhere +import fairchem.core.common.gp_utils as gp_utils +from fairchem.core.common import distutils FORCE_TOL = 1e-4 ATOL = 5e-4 @@ -154,6 +156,10 @@ def test_parallel_predict_unit(workers, device): for _ in range(runs): pp_results = ppunit.predict(atomic_data) + if gp_utils.initialized(): + gp_utils.cleanup_gp() + distutils.cleanup() + seed_everywhere(seed) normal_predict_unit = pretrained_mlip.get_predict_unit( "uma-s-1p1", device=device, inference_settings=ifsets From 9b41c5045bd756ebc047a6f17d509b5406ce837b Mon Sep 17 00:00:00 2001 From: misko Date: Thu, 6 Nov 2025 00:19:52 +0000 Subject: [PATCH 12/13] fix tests --- src/fairchem/core/units/mlip_unit/predict.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/fairchem/core/units/mlip_unit/predict.py b/src/fairchem/core/units/mlip_unit/predict.py index 252a25bbf4..8d0d8c4731 100644 --- a/src/fairchem/core/units/mlip_unit/predict.py +++ b/src/fairchem/core/units/mlip_unit/predict.py @@ -472,7 +472,7 @@ def __init__( # }, ) - self.last_sent_atomic_data = None + self.atomic_data_on_device = None num_nodes = math.ceil(num_workers / num_workers_per_node) num_workers_on_node_array = [num_workers_per_node] * num_nodes @@ -549,19 +549,19 @@ def __init__( def predict(self, data: AtomicData) -> dict[str, torch.tensor]: # put the reference in the object store only once - if not self.inference_settings.merge_mole or self.last_sent_atomic_data is None: + if not self.inference_settings.merge_mole or self.atomic_data_on_device is None: data_ref = ray.put(data) # this will put the ray works into an infinite loop listening for broadcasts _futures = [ w.predict.remote(data_ref, use_nccl=self.inference_settings.merge_mole) for w in self.workers ] - self.last_sent_atomic_data = data + self.atomic_data_on_device = data.clone() else: - data = data.to(self.local_rank0.device) - torch.distributed.broadcast(data.pos, src=0) + self.atomic_data_on_device.pos = data.pos.to(self.local_rank0.device) + torch.distributed.broadcast(self.atomic_data_on_device.pos, src=0) - return self.local_rank0.predict(data) + return self.local_rank0.predict(self.atomic_data_on_device) @property def dataset_to_tasks(self) -> dict[str, list]: From 59f85478482a1894259cd81c6d7449725ec87eb6 Mon Sep 17 00:00:00 2001 From: misko Date: Thu, 6 Nov 2025 02:11:27 +0000 Subject: [PATCH 13/13] add cleanup after batch test --- tests/core/units/mlip_unit/test_predict.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/core/units/mlip_unit/test_predict.py b/tests/core/units/mlip_unit/test_predict.py index e5ea0e90b6..4f1227527a 100644 --- a/tests/core/units/mlip_unit/test_predict.py +++ b/tests/core/units/mlip_unit/test_predict.py @@ -233,6 +233,10 @@ def test_parallel_predict_unit_batch(workers, device): for _ in range(runs): pp_results = ppunit.predict(atomic_data) + if gp_utils.initialized(): + gp_utils.cleanup_gp() + distutils.cleanup() + seed_everywhere(seed) normal_predict_unit = pretrained_mlip.get_predict_unit( "uma-s-1p1", device=device, inference_settings=ifsets