From a8fb0a9e108d3b6e7e06fa75e524fdddf91ce6cc Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Wed, 3 Jun 2026 14:39:20 +0800 Subject: [PATCH] fix: localize RTensor trajectories before reading on controller v2 inference service's data_proxy remotizes the exported trajectory dict (areal/experimental/inference_service/data_proxy/app.py:745), so the controller side receives dict-of-RTensor. Existing consumers read tensor values directly and crash with AttributeError / TypeError because RTensor exposes neither .flatten/.shape nor __len__/__getitem__. Key changes: - workflow_executor._dump_trajectory: RTensor.localize(traj) at entry so versions/input_ids/attention_mask/etc. become real tensors; no-op on v1 dict-of-Tensor. - InferenceServiceWorkflow._run_online: to_local() traj["rewards"] before len()/index in the tensor branch; interactions branch untouched (already plain Python data). Lazy fetch for the engine training path is preserved: only the local copy used by dump/reward-extraction is materialized, the outer trajectory dict still carries RTensors through to the training worker. Co-Authored-By: Claude Opus 4.7 --- .../inference_service/controller/workflow.py | 9 +++++++-- areal/infra/workflow_executor.py | 3 +++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/areal/experimental/inference_service/controller/workflow.py b/areal/experimental/inference_service/controller/workflow.py index c823e3419c..98be7bb1b8 100644 --- a/areal/experimental/inference_service/controller/workflow.py +++ b/areal/experimental/inference_service/controller/workflow.py @@ -11,6 +11,7 @@ from areal.api.workflow_api import RolloutWorkflow from areal.infra import workflow_context +from areal.infra.rpc.rtensor import RTensor from areal.infra.rpc.serialization import deserialize_value from areal.infra.utils.http import async_http_retry from areal.utils import logging, stats_tracker @@ -245,8 +246,12 @@ async def _run_online( if not traj: return None - if "rewards" in traj and len(traj["rewards"]) > 0: - last_reward = float(traj["rewards"][-1]) + rewards_tensor = traj.get("rewards") + if isinstance(rewards_tensor, RTensor): + rewards_tensor = rewards_tensor.to_local() + + if rewards_tensor is not None and len(rewards_tensor) > 0: + last_reward = float(rewards_tensor[-1]) elif ( "interactions" in traj and traj["interactions"] diff --git a/areal/infra/workflow_executor.py b/areal/infra/workflow_executor.py index 949d2af0b2..dcf1eabd12 100644 --- a/areal/infra/workflow_executor.py +++ b/areal/infra/workflow_executor.py @@ -22,6 +22,7 @@ from areal.api.cli_args import InferenceEngineConfig from areal.api import RolloutWorkflow +from areal.infra.rpc.rtensor import RTensor from .async_task_runner import ( AsyncTaskRunner, TaskQueueFullError, @@ -840,6 +841,8 @@ async def _dump_trajectory( if traj is None: return False, "trajectory is None" + traj = RTensor.localize(traj) + dump_dir = self._get_dump_dir(is_eval) if dump_dir is None: return False, "dump dir is empty"