Skip to content

Wire OutputRouter into streaming chat#259

Open
masonjames wants to merge 2 commits into
raullenchai:mainfrom
masonjames:codex/engine-output-router-wiring
Open

Wire OutputRouter into streaming chat#259
masonjames wants to merge 2 commits into
raullenchai:mainfrom
masonjames:codex/engine-output-router-wiring

Conversation

@masonjames
Copy link
Copy Markdown
Contributor

Summary

  • create a per-request OutputRouter.from_tokenizer() in BatchedEngine.stream_chat()
  • feed generated token IDs through the router and emit routed GenerationOutput.channel chunks
  • keep unsupported tokenizers on the legacy parser path and make the postprocessor gate explicit with output.channel is not None

Tests

  • uv run pytest tests/test_batched_engine_output_router.py tests/test_postprocessor.py tests/test_output_router.py
  • uv run ruff check vllm_mlx/engine/batched.py vllm_mlx/service/postprocessor.py tests/test_batched_engine_output_router.py tests/test_postprocessor.py
  • uv run ruff format --check vllm_mlx/engine/batched.py vllm_mlx/service/postprocessor.py tests/test_batched_engine_output_router.py tests/test_postprocessor.py

Addresses the engine-wiring follow-up from #63.

@raullenchai
Copy link
Copy Markdown
Owner

SOP §0–§2 review. Requesting changes — 1 P0 + 4 P1s found in codex round 1.

§0 Necessity: ✅ advances the OutputRouter migration tracked in #63. Goal is correct.

§2 Codex round 1 — P0:

_create_output_router() calls OutputRouter.from_tokenizer(self.tokenizer) which (as of vllm_mlx/output_router.py:361-374) detects three formats:

  1. Gemma 4 (<|channel> + <|tool_call>) — already migrated
  2. GPT-OSS Harmony (<|channel|> + <|message|>) — already migrated
  3. Qwen3 / DeepSeek R1 (<think> + </think>) — NOT migrated yet (sub-issues feat: migrate Qwen3 reasoning parser to OutputRouter (token-level) #64 + feat: migrate DeepSeek R1 reasoning parser to OutputRouter (token-level) #65 of feat: migrate legacy reasoning/tool parsers to token-level OutputRouter #63 are still OPEN)

Since qwen3.5-4b is the default test model and the most common deployment target, this PR would silently switch every Qwen3 and DeepSeek R1 streaming chat from the battle-tested legacy regex parser to the new router. The PR description says "keep unsupported tokenizers on the legacy parser path" — but Qwen3/DeepSeek are technically supported by OutputRouter, just not validated through this engine path.

Fix: explicit allowlist gate. E.g., in _create_output_router():

ROUTER_ALLOWLIST = {"gemma4", "harmony"}  # extend per #64, #65 as those land

def _create_output_router(self):
    try:
        router = OutputRouter.from_tokenizer(self.tokenizer)
        if router is None:
            return None
        if router.token_map.format_tag not in ROUTER_ALLOWLIST:
            return None
        return router
    except Exception as e:
        logger.debug("OutputRouter unavailable: %s", e)
        return None

Or invert it — pass an explicit allowed_formats argument to from_tokenizer.

§2 Codex round 1 — P1s (also need addressing):

  1. Token-by-token decoding loses byte-fallback fidelity. Router calls tokenizer.decode([token_id]) per event. Multi-byte UTF-8 / byte-fallback BPE pieces break this way — you get or missing chars. The scheduler's incremental detokenizer is what output.new_text already provides; routing should preserve that, not re-decode token-at-a-time.
  2. Exception-during-routing fallback yields original raw output mid-stream. If router.feed() raises after some routed chunks have been emitted, the legacy parser hasn't seen those bytes yet. The wrapper then continues using the same router on the next iteration — mixed mode. Either fail closed, or set router = None for the rest of the request after first failure.
  3. Logprobs not split. Each routed GenerationOutput gets tokens=[event.token_id] but logprobs=output.logprobs (the whole chunk's logprobs). For stream_interval > 1 flushes (multi-token chunks), this misaligns the logprobs against the routed tokens. Either split the logprob list alongside token_ids, or set logprobs=None on routed chunks until pairing is fixed.
  4. self.tokenizer access outside the fallback try. A partially-loaded tokenizer property raise would break the request instead of falling back to legacy. Move the access inside try.

Suggested next iteration:

Holding merge until P0 is fixed. P1s strongly preferred but I can take them as follow-up issues if you'd rather scope them separately. Thanks for moving the migration forward — this is the right direction, just needs the gate so it doesn't blast-radius onto Qwen3/DeepSeek users.

@raullenchai
Copy link
Copy Markdown
Owner

Thanks for picking this up @masonjames. Reviewed against current main, ran the targeted suites, and did a second-opinion adversarial pass via DeepSeek. Local checks pass cleanly:

  • pytest tests/test_batched_engine_output_router.py tests/test_postprocessor.py tests/test_output_router.py -q114 passed
  • ruff check + ruff format --check on changed files → clean
  • All required CI checks green (lint, type-check, test-matrix 3.10/3.11/3.12, test-apple-silicon, tests).

The shape of the change is right (per-request router factory, channel-tagged GenerationOutput chunks, postprocessor change from truthy to is not None so channel="" would still hit the channel branch). A couple of substantive concerns before merge though:

P0 — blocker

1. Inherits #197 / #343 — accumulated router state silently dropped on stream end

OutputRouter has no finalize() / flush(). The relevant accumulator (vllm_mlx/output_router.py:122-247) holds:

  • _tool_tokens: list[int] while state == RouterState.TOOL_CALL (waiting for tool_call_end)
  • _pending_channel_style / _pending_message_channel while state in (AWAITING_CHANNEL_TYPE, AWAITING_MESSAGE)
  • the implicit THINKING accumulation that only re-emits on <|message|> switch

_stream_with_output_router correctly handles the empty-routed-outputs + finished case by yielding a sentinel channel=None output (batched.py:910-919), but it never drains the router. So:

  • A request that ends in finish_reason="length" mid-tool-call drops the entire tool body.
  • A request that ends mid-AWAITING_MESSAGE drops the trailing control tokens (the <|channel|>final sequence is consumed but if <|message|> never arrives, the user sees nothing).

This is the same data-loss class as #197 and #343. Suggest either:

  • Add OutputRouter.finalize() -> RouterEvent | None that, depending on state, returns a best-effort Channel.TOOL_CALL event with tokenizer.decode(self._tool_tokens) (and analogues for the other awaiting states), then call it when output.finished and any pending state remains; or
  • At the call site in _stream_with_output_router, check router.state != RouterState.CONTENT and router.state != RouterState.INIT after the loop and emit a fallback chunk re-decoded from the buffered tokens.

Either way please add a regression test for "stream ends in TOOL_CALL state" — that's the exact failure mode users will hit on max_tokens truncation during a function call.

P1 — should fix

2. Streaming granularity changes from chunk-level to token-level

Every token that triggers a RouterEvent now produces its own GenerationOutput with new_text=event.text, where event.text = tokenizer.decode([token_id]) (single-token decode). For Harmony streams that previously yielded ("ing", reasoning, False) as part of a multi-token chunk, downstream SSE consumers will now receive one frame per token. The test_stream_chat_routes_supported_tokenizer_channels test happens to assert this token-by-token shape, but it's worth being explicit that this is a contract change vs the legacy path.

If this is intentional (it gives the postprocessor cleaner per-token boundaries), document it in the docstring of _stream_with_output_router. If it's incidental, consider buffering same-channel adjacent events into a single GenerationOutput per upstream chunk to preserve the previous granularity.

3. Test coverage gaps

The new test file covers the supported-tokenizer happy path (reasoning→content transition) and the unsupported-tokenizer passthrough. Missing:

  • tool_call channel — function calling is the highest-value channel and isn't exercised at all.
  • Mid-channel terminationfinished=True arriving while router is in THINKING or TOOL_CALL state (per feat: MiniMax-M2 support, Tier 0 optimizations & pinned prefix cache #1 above).
  • Router raises mid-stream — the except Exception fallback in batched.py:889-893 yields the raw output and continues, but the router instance is now in an unknown state for the next chunk. Either reset the router, drop it (router = None for the rest of the stream), or test the documented behavior.

4. Magic token IDs

HARMONY_VOCAB in the test uses real Harmony token IDs (200002, 200005, 35644, etc.) without comments tying them to the <|channel|> / analysis strings they encode. A one-line # Harmony vocab IDs (mirrors GPT-OSS tokenizer) at the top of the dict would make this much easier to maintain when token IDs drift.

P2 — nits

  • _channel_name(channel) returns channel.name.lower(). Works today (CONTENT/REASONING/TOOL_CALLcontent/reasoning/tool_call matches what postprocessor._process_channel_routed checks), but a future enum rename would silently break the postprocessor branch. An explicit _CHANNEL_TO_STRING = {Channel.CONTENT: "content", ...} dict would catch that at refactor time.
  • routed_outputs[-1].finished = True mutates the GenerationOutput dataclass in place. Fine here because routed_outputs is freshly constructed in this method, but a dataclasses.replace(...) would be safer if anyone later passes those instances elsewhere.
  • The OutputRouter unavailable log is debug while the OutputRouter failed; falling back log is warning. The first is the common case for unsupported tokenizers; the second is an actual bug surface — the asymmetry is correct, but worth a brief comment so future readers don't "fix" it.

Verdict

I'd love to see this land — channel routing in the engine layer is the right architecture and the refactor is small. Asking for the finalize/drain fix (#1) plus the tool_call + mid-stream-end tests (#3) before merge. Items 2 and 4 are nice-to-haves; everything in P2 is reviewer preference.

Happy to pair on the finalize hook if useful — it's small enough to add as a follow-up commit on this PR.


Reviewed by @raullenchai (Rapid-MLX maintainer) with adversarial second-pass via DeepSeek.

@masonjames
Copy link
Copy Markdown
Contributor Author

masonjames commented May 11, 2026

Thanks for the detailed review. I pushed a follow-up commit that keeps this PR scoped to the validated engine-router formats and addresses the stream-safety issues raised here.

What changed:

  • Added an explicit engine allowlist for gemma4 and harmony; Qwen3 / DeepSeek think-tag tokenizers now stay on the legacy parser path until feat: migrate Qwen3 reasoning parser to OutputRouter (token-level) #64 / feat: migrate DeepSeek R1 reasoning parser to OutputRouter (token-level) #65 are validated through this path.
  • Added OutputRouter.finalize() and call it on finished routed streams so buffered state is drained instead of dropped, including finish_reason="length" mid-tool-call.
  • Added regression coverage for truncated tool-call finalization and pending Harmony state drain.
  • Changed routed chunks to set logprobs=None for now, avoiding incorrect multi-token logprob pairing.
  • Moved tokenizer access inside the fallback guard in _create_output_router().
  • On router exceptions mid-stream, disable routing for the rest of the request and continue with raw legacy chunks instead of mixing states.
  • Preserved scheduler output.new_text for single-token routed events so the incremental detokenizer remains the source of truth for byte-fallback / multibyte fidelity.
  • Documented the intentional routed streaming granularity: one GenerationOutput per routed token so downstream postprocessing gets clean channel boundaries.
  • Replaced enum-name-derived channel strings with an explicit channel mapping.
  • Replaced the final-output in-place mutation with dataclasses.replace(...).
  • Added comments explaining the debug-vs-warning logging distinction and the Harmony test vocab IDs.

Validation:

  • uv run pytest tests/test_batched_engine_output_router.py tests/test_output_router.py tests/test_postprocessor.py -q -> 121 passed
  • uv run ruff check vllm_mlx/output_router.py vllm_mlx/engine/batched.py tests/test_batched_engine_output_router.py tests/test_output_router.py tests/test_postprocessor.py
  • uv run ruff format --check vllm_mlx/output_router.py vllm_mlx/engine/batched.py tests/test_batched_engine_output_router.py tests/test_output_router.py tests/test_postprocessor.py
  • git diff --check

This should address the P0, the requested P1 coverage, and the P2 nits without needing a secondary PR.

Excited about Rapid-MLX and happy to help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants