[Serve][LLM] Support concurrent streaming in SGLang completions() endpoint#64094
[Serve][LLM] Support concurrent streaming in SGLang completions() endpoint#64094Truc54 wants to merge 3 commits into
Conversation
…point Refactors SGLang completions() endpoint under request.stream=True to run prompt generations concurrently instead of sequentially: - Uses asyncio.create_task to run _stream_generate in parallel for each prompt in the request. - Uses an asyncio.Queue to collect streaming chunks from tasks and yields them as they arrive. - Implements proper error propagation and cancellation of all active producer tasks inside a finally block to prevent resource leaks. Closes ray-project#63901. Signed-off-by: Truc54 <trungtruc5405@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request refactors the SGLang engine's streaming completions to process multiple prompts concurrently using an asyncio queue and background producer tasks, rather than sequentially. It also adds comprehensive unit tests to verify concurrent streaming and exception handling. The review feedback suggests awaiting the cancelled background tasks in the finally block to prevent potential resource leaks or unhandled task exception warnings.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| finally: | ||
| for task in tasks: | ||
| if not task.done(): | ||
| task.cancel() |
There was a problem hiding this comment.
When cancelling background tasks in the finally block, it is important to await them (e.g., using asyncio.gather with return_exceptions=True). Otherwise, the tasks might continue running their cleanup/finally blocks in the background after the generator has returned, which can lead to resource leaks, race conditions, or "Task exception was never retrieved" warnings if any task failed.
| finally: | |
| for task in tasks: | |
| if not task.done(): | |
| task.cancel() | |
| finally: | |
| for task in tasks: | |
| if not task.done(): | |
| task.cancel() | |
| if tasks: | |
| await asyncio.gather(*tasks, return_exceptions=True) |
…s() finally block Awaits cancelled background producer tasks using asyncio.gather with return_exceptions=True in the finally block of the SGLang completions() streaming generator. This avoids potential resource leaks, race conditions, and "Task exception was never retrieved" warnings if any background task failed. Signed-off-by: Truc54 <trungtruc5405@gmail.com>
8edf7b0 to
5c625d8
Compare
5c625d8 to
c0a12ac
Compare
Fix flakiness in test_concurrent_streaming_completions by replacing time-based asyncio.sleep delays with explicit asyncio.Event synchronization objects. CI environments with high CPU load can suffer scheduling delays, causing asynchronous generators to yield out-of-order and fail text assertions. Using events guarantees deterministic interleaving of chunks. Signed-off-by: Truc54 <trungtruc5405@gmail.com>
c0a12ac to
5ac4f52
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 5ac4f52. Configure here.
| if isinstance(item, Exception): | ||
| raise item | ||
| elif item is None: | ||
| completed_tasks += 1 |
There was a problem hiding this comment.
None sentinel masks producer failures
Medium Severity
Each streaming producer always enqueues None in finally after the try/except, but only Exception subclasses are forwarded on the queue. Failures such as asyncio.CancelledError or KeyboardInterrupt bypass except Exception, yet still emit the completion sentinel, so the main loop can finish as if every prompt completed when a worker actually aborted.
Reviewed by Cursor Bugbot for commit 5ac4f52. Configure here.


Description
This PR addresses issue #63901 by refactoring the streaming path in
SGLangServer.completions()to process concurrent generation prompts in parallel instead of sequentially.Why is this needed?
In the current implementation of the SGLang server engine, completing multiple prompts with
stream=Trueruns a sequentialforloop over the prompts. This means promptimust stream to completion before prompti+1starts generating. This sequential bottleneck dramatically limits throughput and increases request latency under concurrent load.Solution:
request.streambranch inSGLangServer.completions()to execute_stream_generateconcurrently for all prompts usingasyncio.create_task().asyncio.Queue.finallyblock to cancel outstanding tasks if the generator is closed or an error occurs, preventing task leaks.Related issues
Closes #63901
Additional information
Implementation details:
asyncio.Queue): Parallelized completion streams using an asynchronous producer-consumer queue pattern.python/ray/llm/tests/serve/cpu/deployments/test_sglang_server.pythat utilizes a meta-path importer hook to bypass native compilation dependencies.Both tests pass successfully:
test_concurrent_streaming_completions(Verifies chunks are properly interleaved and delta-decoded).test_concurrent_streaming_completions_exception_handling(Verifies exception propagation and task cancellation).