Skip to content

Commit a98786e

Browse files
authored
[feat] Support lazy init when calling TQ API (Ascend#33)
Previously, when integrating TransferQueue (TQ) with VERL, `tq.init()` is eagerly called to creat the global `_TRANSFER_QUEUE_CLIENT` variable to enable subsequent operations like `tq.async_kv_batch_get()` to access the TQ client instances easily. However, this eager initialization caused Ray to be started and initialized before VERL launches the training cluster with its runtime environment. As a result, runtime environment configurations specified by the VERL PPO trainer were ignored during Ray cluster initialization. To avoid this issue, we now support lazy initialization: the TQ client is instantiated on-demand upon the first invocation of a TQ API, ensuring Ray is initialized only when VERL is fully prepared to configure the cluster environment correctly. --------- Signed-off-by: MissLittleFish <yhuang@smail.nju.edu.cn>
1 parent ea8f7cf commit a98786e

1 file changed

Lines changed: 26 additions & 13 deletions

File tree

transfer_queue/interface.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ def _maybe_create_transferqueue_client(
4747
global _TRANSFER_QUEUE_CLIENT
4848
if _TRANSFER_QUEUE_CLIENT is None:
4949
if conf is None:
50-
raise ValueError("Missing config for initializing TransferQueueClient!")
50+
_init_from_existing()
51+
assert _TRANSFER_QUEUE_CLIENT is not None, (
52+
"TransferQueueController has not been initialized yet. Please call init() first."
53+
)
54+
return _TRANSFER_QUEUE_CLIENT
55+
5156
pid = os.getpid()
5257
_TRANSFER_QUEUE_CLIENT = TransferQueueClient(
5358
client_id=f"TransferQueueClient_{pid}", controller_info=conf.controller.zmq_info
@@ -88,23 +93,34 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
8893
return conf
8994

9095

91-
def _init_from_existing() -> None:
92-
"""Initialize the TransferQueueClient from existing controller."""
96+
def _init_from_existing() -> bool:
97+
"""Initialize the TransferQueueClient from existing controller.
98+
99+
Returns:
100+
True if successfully initialized from existing controller, False otherwise.
101+
"""
102+
103+
try:
104+
controller = ray.get_actor("TransferQueueController")
105+
except ValueError:
106+
logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.")
107+
return False
93108

94-
controller = ray.get_actor("TransferQueueController")
95109
logger.info("Found existing TransferQueueController instance. Connecting...")
96110

97111
conf = None
98112
while conf is None:
99-
remote_conf = ray.get(controller.get_config.remote())
100-
if remote_conf is not None:
101-
_maybe_create_transferqueue_client(remote_conf)
113+
conf = ray.get(controller.get_config.remote())
114+
if conf is not None:
115+
_maybe_create_transferqueue_client(conf)
102116
logger.info("TransferQueueClient initialized.")
103-
return
117+
return True
104118

105119
logger.debug("Waiting for controller to initialize... Retrying in 1s")
106120
time.sleep(1)
107121

122+
return False
123+
108124

109125
# ==================== Initialization API ====================
110126
def init(conf: Optional[DictConfig] = None) -> None:
@@ -138,14 +154,11 @@ def init(conf: Optional[DictConfig] = None) -> None:
138154
>>> metadata = tq.get_meta(...)
139155
>>> data = tq.get_data(metadata)
140156
"""
141-
try:
142-
_init_from_existing()
143-
except ValueError:
144-
logger.info("No TransferQueueController found. Starting first-time initialization...")
145-
else:
157+
if _init_from_existing():
146158
return
147159

148160
# First-time initialize TransferQueue
161+
logger.info("No TransferQueueController found. Starting first-time initialization...")
149162

150163
# create config
151164
final_conf = OmegaConf.create({}, flags={"allow_objects": True})

0 commit comments

Comments
 (0)