5
5
import logging
6
6
import os
7
7
import sys
8
- from enum import Enum
9
- from typing import NamedTuple , Optional
8
+ from typing import Optional
10
9
11
10
from vllm .config import KVTransferConfig
12
11
from vllm .distributed .kv_events import KVEventsConfig
31
30
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
32
31
DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
33
32
33
+ VALID_CONNECTORS = {"nixl" , "lmcache" , "kvbm" , "null" , "none" }
34
+
34
35
# Global LMCache configuration - initialize once on module import
35
36
ENABLE_LMCACHE = os .getenv ("ENABLE_LMCACHE" , "0" ).lower () in ("1" , "true" , "yes" )
36
37
@@ -54,8 +55,8 @@ class Config:
54
55
# rest vLLM args
55
56
engine_args : AsyncEngineArgs
56
57
57
- # Decision for KV transfer configuration
58
- kv_transfer_decision : Optional ["KVTransferDecision" ] = None
58
+ # Connector list from CLI
59
+ connector_list : Optional [list ] = None
59
60
60
61
61
62
def parse_args () -> Config :
@@ -95,9 +96,11 @@ def parse_args() -> Config:
95
96
help = f"Maximum port number for Dynamo services (default: { DEFAULT_DYNAMO_PORT_MAX } ). Must be in registered ports range (1024-49151)." ,
96
97
)
97
98
parser .add_argument (
98
- "--no-kv-transfer-config" ,
99
- action = "store_true" ,
100
- help = "Disable Dynamo's kv_transfer_config defaults. vLLM will use its own defaults instead." ,
99
+ "--connector" ,
100
+ nargs = "*" ,
101
+ default = ["nixl" ],
102
+ help = "List of connectors to use (e.g., --connector nixl lmcache). "
103
+ "Options: nixl, lmcache, kvbm, null, none. Default: nixl" ,
101
104
)
102
105
103
106
parser = AsyncEngineArgs .add_cli_args (parser )
@@ -149,10 +152,35 @@ def parse_args() -> Config:
149
152
min = args .dynamo_port_min , max = args .dynamo_port_max
150
153
)
151
154
152
- # Build KV decision based on CLI flag and engine args
153
- config .kv_decision = compute_kv_transfer_decision (
154
- args .no_kv_transfer_config , bool (args .kv_transfer_config ), config
155
+ # Check for conflicting flags
156
+ has_kv_transfer_config = (
157
+ hasattr (engine_args , "kv_transfer_config" )
158
+ and engine_args .kv_transfer_config is not None
155
159
)
160
+ has_connector_flag = args .connector is not None
161
+
162
+ if has_kv_transfer_config and has_connector_flag :
163
+ raise ValueError (
164
+ "Cannot specify both --kv-transfer-config and --connector flags"
165
+ )
166
+
167
+ if has_connector_flag :
168
+ normalized = [c .lower () for c in args .connector ]
169
+
170
+ invalid = [c for c in normalized if c not in VALID_CONNECTORS ]
171
+ if invalid :
172
+ raise ValueError (
173
+ f"Invalid connector(s): { ', ' .join (invalid )} . Valid options are: { ', ' .join (sorted (VALID_CONNECTORS ))} "
174
+ )
175
+
176
+ if "none" in normalized or "null" in normalized :
177
+ if len (normalized ) > 1 :
178
+ raise ValueError (
179
+ "'none' and 'null' cannot be combined with other connectors"
180
+ )
181
+ config .connector_list = []
182
+ else :
183
+ config .connector_list = normalized
156
184
157
185
if config .engine_args .block_size is None :
158
186
config .engine_args .block_size = 16
@@ -182,7 +210,10 @@ async def configure_ports_with_etcd(config: Config, etcd_client):
182
210
config .kv_port = kv_port
183
211
logger .info (f"Allocated ZMQ KV events port: { kv_port } (worker_id={ worker_id } )" )
184
212
185
- if config .kv_decision .is_nixl :
213
+ # Check if NIXL is needed based on connector list
214
+ needs_nixl = config .connector_list and "nixl" in config .connector_list
215
+
216
+ if needs_nixl :
186
217
# Allocate side channel ports
187
218
# https://github.com/vllm-project/vllm/blob/releases/v0.10.0/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py#L372
188
219
# NIXL calculates ports as: base_port + (dp_rank * tp_size) + tp_rank
@@ -223,84 +254,75 @@ async def configure_ports_with_etcd(config: Config, etcd_client):
223
254
set_side_channel_host_and_port (base_side_channel_port )
224
255
225
256
226
- class KVDecisionReason (str , Enum ):
227
- DISABLED = "disabled"
228
- USER = "user"
229
- DEFAULT = "default"
230
-
257
+ def create_kv_transfer_config (config : Config ) -> Optional [KVTransferConfig ]:
258
+ """Create KVTransferConfig based on user config or connector list.
231
259
232
- class KVTransferDecision (NamedTuple ):
233
- kv_transfer_config : Optional [KVTransferConfig ]
234
- reason : KVDecisionReason
235
- kind : Optional [str ]
236
- is_nixl : bool = False
260
+ Handles logging and returns the appropriate config or None.
261
+ """
262
+ has_user_kv_config = (
263
+ hasattr (config .engine_args , "kv_transfer_config" )
264
+ and config .engine_args .kv_transfer_config is not None
265
+ )
237
266
267
+ if has_user_kv_config :
268
+ logger .info ("Using user-provided kv_transfer_config from --kv-transfer-config" )
269
+ return None # Let vLLM use the user's config
238
270
239
- def compute_kv_transfer_decision (
240
- no_kv_transfer_config : bool , usr_kv_transfer_config : bool , config : Config
241
- ) -> KVTransferDecision :
242
- """Decide how to set kv_transfer_config.
271
+ # No connector list or empty list means no config
272
+ if not config . connector_list :
273
+ logger . info ( "Using vLLM defaults for kv_transfer_config" )
274
+ return None
243
275
244
- - If disabled via flag, return (None, 'disabled', None)
245
- - If user supplied a config, return (None, 'user', None)
246
- - Else return (default_cfg, 'default', kind)
247
- """
248
- if no_kv_transfer_config :
249
- return KVTransferDecision (None , KVDecisionReason .DISABLED , None )
276
+ logger .info (f"Creating kv_transfer_config from --connector { config .connector_list } " )
250
277
251
- if usr_kv_transfer_config :
252
- return KVTransferDecision (None , KVDecisionReason .USER , None )
278
+ # Check which connectors are present
279
+ has_nixl = "nixl" in config .connector_list
280
+ has_lmcache = "lmcache" in config .connector_list
281
+ has_kvbm = "kvbm" in config .connector_list
253
282
254
- if ENABLE_LMCACHE :
255
- if config .is_prefill_worker :
256
- kv_transfer_config = KVTransferConfig (
257
- kv_connector = "MultiConnector" ,
283
+ # Single connector case
284
+ if len (config .connector_list ) == 1 :
285
+ if has_nixl :
286
+ return KVTransferConfig (kv_connector = "NixlConnector" , kv_role = "kv_both" )
287
+ elif has_lmcache :
288
+ return KVTransferConfig (
289
+ kv_connector = "LMCacheConnectorV1" , kv_role = "kv_both"
290
+ )
291
+ elif has_kvbm :
292
+ return KVTransferConfig (
293
+ kv_connector = "DynamoConnector" ,
294
+ kv_connector_module_path = "dynamo.llm.vllm_integration.connector" ,
258
295
kv_role = "kv_both" ,
259
- kv_connector_extra_config = {
260
- "connectors" : [
261
- {"kv_connector" : "LMCacheConnectorV1" , "kv_role" : "kv_both" },
262
- {"kv_connector" : "NixlConnector" , "kv_role" : "kv_both" },
263
- ]
264
- },
265
296
)
266
- return KVTransferDecision (
267
- kv_transfer_config ,
268
- KVDecisionReason .DEFAULT ,
269
- "MultiConnector (LMCacheConnectorV1 + NixlConnector)" ,
270
- True ,
297
+ # Multiple connectors - use MultiConnector
298
+ else :
299
+ multi_connectors = []
300
+ if has_lmcache :
301
+ multi_connectors .append (
302
+ {"kv_connector" : "LMCacheConnectorV1" , "kv_role" : "kv_both" }
271
303
)
272
- else :
273
- kv_transfer_config = KVTransferConfig (
274
- kv_connector = "LMCacheConnectorV1" , kv_role = " kv_both"
304
+ if has_nixl :
305
+ multi_connectors . append (
306
+ { " kv_connector" : "NixlConnector" , " kv_role" : " kv_both"}
275
307
)
276
- return KVTransferDecision (
277
- kv_transfer_config , KVDecisionReason .DEFAULT , "LMCacheConnectorV1"
308
+ if has_kvbm :
309
+ multi_connectors .append (
310
+ {
311
+ "kv_connector" : "DynamoConnector" ,
312
+ "kv_connector_module_path" : "dynamo.llm.vllm_integration.connector" ,
313
+ "kv_role" : "kv_both" ,
314
+ }
278
315
)
279
- else :
280
- kv_transfer_config = KVTransferConfig (
281
- kv_connector = "NixlConnector" , kv_role = "kv_both"
282
- )
283
- return KVTransferDecision (
284
- kv_transfer_config , KVDecisionReason .DEFAULT , "NixlConnector" , True
316
+
317
+ return KVTransferConfig (
318
+ kv_connector = "MultiConnector" ,
319
+ kv_role = "kv_both" ,
320
+ kv_connector_extra_config = {"connectors" : multi_connectors },
285
321
)
286
322
287
323
288
324
def overwrite_args (config ):
289
325
"""Set vLLM defaults for Dynamo."""
290
- dp_rank = config .engine_args .data_parallel_rank or 0
291
- decision = config .kv_decision
292
-
293
- if decision .reason == KVDecisionReason .DISABLED :
294
- logger .info (
295
- "kv_transfer_config disabled via --no-kv-transfer-config; vLLM will use its own defaults."
296
- )
297
- elif decision .reason == KVDecisionReason .USER :
298
- logger .info ("User-provided kv_transfer_config; skipping Dynamo defaults." )
299
- else :
300
- logger .info (
301
- "Applying Dynamo default kv_transfer_config: %s. To customize, pass --kv-transfer-config '<JSON>' or --no-kv-transfer-config." ,
302
- decision .kind ,
303
- )
304
326
defaults = {
305
327
"task" : "generate" ,
306
328
# As of vLLM >=0.10.0 the engine unconditionally calls
@@ -313,12 +335,12 @@ def overwrite_args(config):
313
335
"disable_log_stats" : False ,
314
336
}
315
337
316
- # Only set kv_transfer_config if we computed a default (non-None )
317
- if decision . kv_transfer_config is not None :
318
- defaults ["kv_transfer_config" ] = decision . kv_transfer_config
338
+ kv_config = create_kv_transfer_config ( config )
339
+ if kv_config :
340
+ defaults ["kv_transfer_config" ] = kv_config
319
341
320
342
if config .engine_args .enable_prefix_caching :
321
- # If caching, send events
343
+ dp_rank = config . engine_args . data_parallel_rank or 0
322
344
defaults |= {
323
345
# Always setting up kv events if enable prefix cache.
324
346
"kv_events_config" : KVEventsConfig (
0 commit comments