5
5
import logging
6
6
import os
7
7
import sys
8
- from enum import Enum
9
- from typing import NamedTuple , Optional
8
+ from typing import Optional
10
9
11
10
from vllm .config import KVTransferConfig
12
11
from vllm .distributed .kv_events import KVEventsConfig
31
30
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
32
31
DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
33
32
33
+ VALID_CONNECTORS = {"nixl" , "lmcache" , "kvbm" , "null" , "none" }
34
+
34
35
# Global LMCache configuration - initialize once on module import
35
36
ENABLE_LMCACHE = os .getenv ("ENABLE_LMCACHE" , "0" ).lower () in ("1" , "true" , "yes" )
36
37
@@ -54,8 +55,8 @@ class Config:
54
55
# rest vLLM args
55
56
engine_args : AsyncEngineArgs
56
57
57
- # Decision for KV transfer configuration
58
- kv_transfer_decision : Optional ["KVTransferDecision" ] = None
58
+ # Connector list from CLI
59
+ connector_list : Optional [list ] = None
59
60
60
61
61
62
def parse_args () -> Config :
@@ -95,9 +96,11 @@ def parse_args() -> Config:
95
96
help = f"Maximum port number for Dynamo services (default: { DEFAULT_DYNAMO_PORT_MAX } ). Must be in registered ports range (1024-49151)." ,
96
97
)
97
98
parser .add_argument (
98
- "--no-kv-transfer-config" ,
99
- action = "store_true" ,
100
- help = "Disable Dynamo's kv_transfer_config defaults. vLLM will use its own defaults instead." ,
99
+ "--connector" ,
100
+ nargs = "*" ,
101
+ default = ["nixl" ],
102
+ help = "List of connectors to use (e.g., --connector nixl lmcache). "
103
+ "Options: nixl, lmcache, kvbm, null, none. Default: nixl" ,
101
104
)
102
105
103
106
parser = AsyncEngineArgs .add_cli_args (parser )
@@ -149,10 +152,35 @@ def parse_args() -> Config:
149
152
min = args .dynamo_port_min , max = args .dynamo_port_max
150
153
)
151
154
152
- # Build KV decision based on CLI flag and engine args
153
- config .kv_decision = compute_kv_transfer_decision (
154
- args .no_kv_transfer_config , bool (args .kv_transfer_config ), config
155
+ # Check for conflicting flags
156
+ has_kv_transfer_config = (
157
+ hasattr (engine_args , "kv_transfer_config" )
158
+ and engine_args .kv_transfer_config is not None
155
159
)
160
+ has_connector_flag = args .connector is not None
161
+
162
+ if has_kv_transfer_config and has_connector_flag :
163
+ raise ValueError (
164
+ "Cannot specify both --kv-transfer-config and --connector flags"
165
+ )
166
+
167
+ if has_connector_flag :
168
+ normalized = [c .lower () for c in args .connector ]
169
+
170
+ invalid = [c for c in normalized if c not in VALID_CONNECTORS ]
171
+ if invalid :
172
+ raise ValueError (
173
+ f"Invalid connector(s): { ', ' .join (invalid )} . Valid options are: { ', ' .join (sorted (VALID_CONNECTORS ))} "
174
+ )
175
+
176
+ if "none" in normalized or "null" in normalized :
177
+ if len (normalized ) > 1 :
178
+ raise ValueError (
179
+ "'none' and 'null' cannot be combined with other connectors"
180
+ )
181
+ config .connector_list = []
182
+ else :
183
+ config .connector_list = normalized
156
184
157
185
if config .engine_args .block_size is None :
158
186
config .engine_args .block_size = 16
@@ -182,11 +210,15 @@ async def configure_ports_with_etcd(config: Config, etcd_client):
182
210
config .kv_port = kv_port
183
211
logger .info (f"Allocated ZMQ KV events port: { kv_port } (worker_id={ worker_id } )" )
184
212
185
- # Allocate side channel ports
186
- # https://github.com/vllm-project/vllm/blob/releases/v0.10.1/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py#L443
187
- # NIXL calculates ports as: base_port + (dp_rank * tp_size) + tp_rank
188
- # For dp_rank, we need to reserve tp_size consecutive ports
189
- tp_size = config .engine_args .tensor_parallel_size or 1
213
+ # Check if NIXL is needed based on connector list
214
+ needs_nixl = config .connector_list and "nixl" in config .connector_list
215
+
216
+ if needs_nixl :
217
+ # Allocate side channel ports
218
+ # https://github.com/vllm-project/vllm/blob/releases/v0.10.0/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py#L372
219
+ # NIXL calculates ports as: base_port + (dp_rank * tp_size) + tp_rank
220
+ # For dp_rank, we need to reserve tp_size consecutive ports
221
+ tp_size = config .engine_args .tensor_parallel_size or 1
190
222
191
223
# The first port for this dp_rank will be at: base_port + (dp_rank * tp_size)
192
224
# We need to allocate tp_size consecutive ports starting from there
@@ -222,84 +254,75 @@ async def configure_ports_with_etcd(config: Config, etcd_client):
222
254
set_side_channel_host_and_port (base_side_channel_port )
223
255
224
256
225
- class KVDecisionReason (str , Enum ):
226
- DISABLED = "disabled"
227
- USER = "user"
228
- DEFAULT = "default"
229
-
257
+ def create_kv_transfer_config (config : Config ) -> Optional [KVTransferConfig ]:
258
+ """Create KVTransferConfig based on user config or connector list.
230
259
231
- class KVTransferDecision (NamedTuple ):
232
- kv_transfer_config : Optional [KVTransferConfig ]
233
- reason : KVDecisionReason
234
- kind : Optional [str ]
235
- is_nixl : bool = False
260
+ Handles logging and returns the appropriate config or None.
261
+ """
262
+ has_user_kv_config = (
263
+ hasattr (config .engine_args , "kv_transfer_config" )
264
+ and config .engine_args .kv_transfer_config is not None
265
+ )
236
266
267
+ if has_user_kv_config :
268
+ logger .info ("Using user-provided kv_transfer_config from --kv-transfer-config" )
269
+ return None # Let vLLM use the user's config
237
270
238
- def compute_kv_transfer_decision (
239
- no_kv_transfer_config : bool , usr_kv_transfer_config : bool , config : Config
240
- ) -> KVTransferDecision :
241
- """Decide how to set kv_transfer_config.
271
+ # No connector list or empty list means no config
272
+ if not config . connector_list :
273
+ logger . info ( "Using vLLM defaults for kv_transfer_config" )
274
+ return None
242
275
243
- - If disabled via flag, return (None, 'disabled', None)
244
- - If user supplied a config, return (None, 'user', None)
245
- - Else return (default_cfg, 'default', kind)
246
- """
247
- if no_kv_transfer_config :
248
- return KVTransferDecision (None , KVDecisionReason .DISABLED , None )
276
+ logger .info (f"Creating kv_transfer_config from --connector { config .connector_list } " )
249
277
250
- if usr_kv_transfer_config :
251
- return KVTransferDecision (None , KVDecisionReason .USER , None )
278
+ # Check which connectors are present
279
+ has_nixl = "nixl" in config .connector_list
280
+ has_lmcache = "lmcache" in config .connector_list
281
+ has_kvbm = "kvbm" in config .connector_list
252
282
253
- if ENABLE_LMCACHE :
254
- if config .is_prefill_worker :
255
- kv_transfer_config = KVTransferConfig (
256
- kv_connector = "MultiConnector" ,
283
+ # Single connector case
284
+ if len (config .connector_list ) == 1 :
285
+ if has_nixl :
286
+ return KVTransferConfig (kv_connector = "NixlConnector" , kv_role = "kv_both" )
287
+ elif has_lmcache :
288
+ return KVTransferConfig (
289
+ kv_connector = "LMCacheConnectorV1" , kv_role = "kv_both"
290
+ )
291
+ elif has_kvbm :
292
+ return KVTransferConfig (
293
+ kv_connector = "DynamoConnector" ,
294
+ kv_connector_module_path = "dynamo.llm.vllm_integration.connector" ,
257
295
kv_role = "kv_both" ,
258
- kv_connector_extra_config = {
259
- "connectors" : [
260
- {"kv_connector" : "LMCacheConnectorV1" , "kv_role" : "kv_both" },
261
- {"kv_connector" : "NixlConnector" , "kv_role" : "kv_both" },
262
- ]
263
- },
264
296
)
265
- return KVTransferDecision (
266
- kv_transfer_config ,
267
- KVDecisionReason .DEFAULT ,
268
- "MultiConnector (LMCacheConnectorV1 + NixlConnector)" ,
269
- True ,
297
+ # Multiple connectors - use MultiConnector
298
+ else :
299
+ multi_connectors = []
300
+ if has_lmcache :
301
+ multi_connectors .append (
302
+ {"kv_connector" : "LMCacheConnectorV1" , "kv_role" : "kv_both" }
270
303
)
271
- else :
272
- kv_transfer_config = KVTransferConfig (
273
- kv_connector = "LMCacheConnectorV1" , kv_role = " kv_both"
304
+ if has_nixl :
305
+ multi_connectors . append (
306
+ { " kv_connector" : "NixlConnector" , " kv_role" : " kv_both"}
274
307
)
275
- return KVTransferDecision (
276
- kv_transfer_config , KVDecisionReason .DEFAULT , "LMCacheConnectorV1"
308
+ if has_kvbm :
309
+ multi_connectors .append (
310
+ {
311
+ "kv_connector" : "DynamoConnector" ,
312
+ "kv_connector_module_path" : "dynamo.llm.vllm_integration.connector" ,
313
+ "kv_role" : "kv_both" ,
314
+ }
277
315
)
278
- else :
279
- kv_transfer_config = KVTransferConfig (
280
- kv_connector = "NixlConnector" , kv_role = "kv_both"
281
- )
282
- return KVTransferDecision (
283
- kv_transfer_config , KVDecisionReason .DEFAULT , "NixlConnector" , True
316
+
317
+ return KVTransferConfig (
318
+ kv_connector = "MultiConnector" ,
319
+ kv_role = "kv_both" ,
320
+ kv_connector_extra_config = {"connectors" : multi_connectors },
284
321
)
285
322
286
323
287
324
def overwrite_args (config ):
288
325
"""Set vLLM defaults for Dynamo."""
289
- dp_rank = config .engine_args .data_parallel_rank or 0
290
- decision = config .kv_decision
291
-
292
- if decision .reason == KVDecisionReason .DISABLED :
293
- logger .info (
294
- "kv_transfer_config disabled via --no-kv-transfer-config; vLLM will use its own defaults."
295
- )
296
- elif decision .reason == KVDecisionReason .USER :
297
- logger .info ("User-provided kv_transfer_config; skipping Dynamo defaults." )
298
- else :
299
- logger .info (
300
- "Applying Dynamo default kv_transfer_config: %s. To customize, pass --kv-transfer-config '<JSON>' or --no-kv-transfer-config." ,
301
- decision .kind ,
302
- )
303
326
defaults = {
304
327
"task" : "generate" ,
305
328
# As of vLLM >=0.10.0 the engine unconditionally calls
@@ -312,12 +335,12 @@ def overwrite_args(config):
312
335
"disable_log_stats" : False ,
313
336
}
314
337
315
- # Only set kv_transfer_config if we computed a default (non-None )
316
- if decision . kv_transfer_config is not None :
317
- defaults ["kv_transfer_config" ] = decision . kv_transfer_config
338
+ kv_config = create_kv_transfer_config ( config )
339
+ if kv_config :
340
+ defaults ["kv_transfer_config" ] = kv_config
318
341
319
342
if config .engine_args .enable_prefix_caching :
320
- # If caching, send events
343
+ dp_rank = config . engine_args . data_parallel_rank or 0
321
344
defaults |= {
322
345
# Always setting up kv events if enable prefix cache.
323
346
"kv_events_config" : KVEventsConfig (
0 commit comments