Skip to content

Conversation

@BuffMcBigHuge
Copy link

@BuffMcBigHuge BuffMcBigHuge commented Jun 25, 2025

Summary

This pull request refactors the core stream processing architecture of Comfystream to enable true parallelization using a pool of worker processes managed by comfy.distributed.process_pool_executor.ProcessPoolExecutor ( HiddenSwitch ). This transition from a single-process, multi-threaded model to a multi-process architecture allows for significant performance gains, greater stability, and the ability to run multiple, independent ComfyUI workflows concurrently.

The key benefit of this update:

  • Increased Throughput: Multiple worker processes can execute ComfyUI workflows in parallel, dramatically increasing the number of frames that can be processed per second.

Detailed Changes by Component

1. ComfyStreamClient (client.py)

The ComfyStreamClient has been fundamentally redesigned to manage the process pool.

  • Process-Based Parallelism: The client now leverages ProcessPoolExecutor to spawn and manage a configurable number of worker processes. This replaces the previous model where the EmbeddedComfyClient likely used threads within a single process.
  • Inter-Process Communication (IPC): Communication between the main server process and the worker pool is now handled by multiprocessing.Manager queues (image_inputs, image_outputs). This is essential for safely passing tensor data and control messages across process boundaries.
  • Worker Management:
    • A new distribute_frames task acts as a manager, creating a worker_loop task for each worker in the pool.
    • The worker_loop is a persistent task that continuously requests work from the EmbeddedComfyClient, which in turn pulls frames from the shared input queue. This architecture allows workers to independently process frames from one or more workflows.
  • Simplified Prompt Updates: The update_prompts logic is now much simpler. It updates a shared list of prompts, and the worker loops automatically pick up the changes on their next iteration without requiring a restart or complex locking.
  • Robust Cleanup: The cleanup method is significantly more robust. It follows a strict sequence:
    1. Sets a shutting_down flag to stop workers gracefully.
    2. Cancels all worker and manager tasks.
    3. Shuts down the EmbeddedComfyClient.
    4. Explicitly shuts down the ProcessPoolExecutor, terminating any stubborn worker processes to prevent zombies.

2. Pipeline (pipeline.py)

The Pipeline has been adapted to integrate with the new multi-process client and to improve real-time stream stability.

  • Decoupled Frame Processing:
    • The Pipeline now features an output_buffer (asyncio.Queue). A background _collect_frames_simple task continuously polls the client for completed frames and places them in this buffer.
    • get_processed_video_frame now pulls from this buffer instead of directly from the client. This decouples frame delivery from frame processing. If the buffer is empty (i.e., processing is lagging), it now returns the original unprocessed frame to avoid stalling the video stream, maintaining a constant frame rate for the client.
  • Frame Tracking: A unique frame_id is now assigned to each incoming video frame. This is critical for tracking frames as they are passed between processes, although the current "simple" collector does not enforce order.
  • Configuration: The Pipeline constructor now accepts a max_workers argument, which is passed to the ComfyStreamClient to configure the size of the process pool.

3. Tensor Cache (tensor_cache.py)

The tensor_cache module has been repurposed to serve as the bridge between the EmbeddedComfyClient (running in a worker process) and the multiprocessing queues managed by the main process.

  • Worker-Specific Initialization: The previous tensor_cache used simple in-memory queues. The new implementation features an init_tensor_cache function, which is called by ProcessPoolExecutor when each worker process is spawned.
  • Queue Wrapping: This function replaces the module's global queue objects with wrapper classes (MultiProcessInputQueue, MultiProcessOutputQueue) that interface directly with the multiprocessing.Queue objects created in the main process. This allows the LoadTensor and SaveTensor custom nodes (which use tensor_cache) to function correctly within the multi-process environment without modification.
  • CPU Tensor Transfer: Tensors are explicitly moved to the CPU (.cpu()) before being placed in an output queue, which is a requirement for sending tensor data across process boundaries.

4. Server Application (app.py)

The main server application has been enhanced for stability and configuration.

  • Worker Configuration: A --workers command-line argument has been added to allow users to specify the number of worker processes to spawn.
  • Graceful Shutdown:
    • The application now listens for SIGINT (Ctrl+C) and SIGTERM signals.
    • Upon receiving a signal, a graceful shutdown sequence is initiated. The on_shutdown handler now correctly cleans up the pipeline (and its worker processes) before closing network connections.
    • A force_cleanup_and_exit function is included to terminate the executor's processes if they fail to shut down gracefully.
  • Improved Logging: Logging messages have been standardized with a prefix for clearer debugging.

Note

The ProcessPoolExecutor has been shown to increase performance at the cost of latency. As worker count increases, latency also increases. This can be improved with changes to the frame buffer management. There is also potential for new CPU bottlenecking caused by the increase throughput, which may result in frame-timing oscillations.

BuffMcBigHuge and others added 30 commits March 18, 2025 16:08
…ced uncessary base64 input frame operations, prep for multi-instance, cleanup.
…ame size handling, commented out some logging.
Co-authored-by: John | Elite Encoder <[email protected]>
…ediate step, moved prompt execution strategy to `execution_start` event, moved buffer to self variable to avoid reinitalization.
…to improve frame buffer, modified comfy arg handling.
…cations to spawning instances, better handling of misconfigured workspace.
…g, attempt to fix tensor_rt directory retrieval in comfyui.
…oved extreanous executor type param, commented out some logging.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ensure audio and video streams can be processed at the the same time

1 participant