From a8ad3a7f4a312dc2c31c1d0dbae61516577ad60a Mon Sep 17 00:00:00 2001 From: lukiod Date: Sun, 14 Sep 2025 14:30:43 +0000 Subject: [PATCH 1/7] workflow added --- BATCH_PROCESSING_README.md | 178 ++++++++++++ benchmark_batch_processing.py | 241 ++++++++++++++++ example_batch_processing.py | 171 +++++++++++ nodes/tensor_utils/load_batch_tensor.py | 89 ++++++ nodes/tensor_utils/performance_timer.py | 150 ++++++++++ src/comfystream/tensor_cache.py | 3 +- test_batch_processing.py | 250 ++++++++++++++++ .../comfystream/sd15-tensorrt-batch2-api.json | 269 ++++++++++++++++++ .../sd15-tensorrt-batch2-performance-api.json | 249 ++++++++++++++++ .../sd15-tensorrt-batch2-tensor-api.json | 227 +++++++++++++++ 10 files changed, 1826 insertions(+), 1 deletion(-) create mode 100644 BATCH_PROCESSING_README.md create mode 100644 benchmark_batch_processing.py create mode 100644 example_batch_processing.py create mode 100644 nodes/tensor_utils/load_batch_tensor.py create mode 100644 nodes/tensor_utils/performance_timer.py create mode 100644 test_batch_processing.py create mode 100644 workflows/comfystream/sd15-tensorrt-batch2-api.json create mode 100644 workflows/comfystream/sd15-tensorrt-batch2-performance-api.json create mode 100644 workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json diff --git a/BATCH_PROCESSING_README.md b/BATCH_PROCESSING_README.md new file mode 100644 index 000000000..1af1a8a37 --- /dev/null +++ b/BATCH_PROCESSING_README.md @@ -0,0 +1,178 @@ +# ComfyStream Batch Processing Implementation + +This implementation adds batch processing capabilities to ComfyStream, allowing you to process 2 images at a time using the DreamShaper 8 SD 1.5 TensorRT engine optimized for batch size 2. + +## Overview + +The batch processing implementation includes: +- Modified tensor cache to support batch inputs (queue size increased from 1 to 2) +- Custom batch tensor loading and saving nodes +- Performance measurement utilities +- ComfyUI workflows optimized for batch processing +- Benchmarking tools to measure FPS gains + +## Files Modified/Created + +### Core Changes +- `src/comfystream/tensor_cache.py` - Increased `image_inputs` queue size from 1 to 2 +- `nodes/tensor_utils/load_batch_tensor.py` - New batch tensor loading nodes +- `nodes/tensor_utils/performance_timer.py` - Performance measurement utilities + +### Workflows +- `workflows/comfystream/sd15-tensorrt-batch2-api.json` - Basic batch processing workflow +- `workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json` - Tensor-based batch workflow +- `workflows/comfystream/sd15-tensorrt-batch2-performance-api.json` - Performance-measured batch workflow + +### Examples and Tools +- `example_batch_processing.py` - Example usage of batch processing +- `benchmark_batch_processing.py` - Comprehensive performance benchmarking +- `BATCH_PROCESSING_README.md` - This documentation + +## Key Features + +### 1. Batch Tensor Processing +The `LoadBatchTensor` node collects up to `batch_size` images from the tensor cache and stacks them into a single batch tensor. This allows downstream nodes to process multiple images simultaneously. + +### 2. Performance Measurement +The `PerformanceTimerNode` and `StartPerformanceTimerNode` provide built-in performance tracking to measure: +- FPS (Frames Per Second) +- Processing time per image +- Batch efficiency +- Performance improvements + +### 3. Optimized Workflows +The batch workflows are designed to: +- Use TensorRT engines optimized for batch size 2 +- Process Depth Anything and KSampler nodes in batch mode +- Handle non-batchable nodes appropriately +- Measure and report performance metrics + +## Usage + +### Basic Batch Processing + +```python +import asyncio +import torch +from comfystream.client import ComfyStreamClient + +async def process_batch(): + client = ComfyStreamClient() + + # Load batch workflow + with open("./workflows/comfystream/sd15-tensorrt-batch2-performance-api.json", "r") as f: + workflow = json.load(f) + + # Generate test images + images = [ + torch.randn(1, 512, 512, 3, dtype=torch.float32), + torch.randn(1, 512, 512, 3, dtype=torch.float32), + ] + + # Add images to queue + for image in images: + client.put_video_input(image) + + # Process batch + await client.set_prompts([workflow]) + + # Collect outputs + outputs = [] + for _ in range(len(images)): + output = await client.get_video_output() + outputs.append(output) + + await client.cleanup() + return outputs +``` + +### Performance Benchmarking + +```python +from benchmark_batch_processing import BatchProcessingBenchmark + +async def run_benchmark(): + benchmark = BatchProcessingBenchmark() + await benchmark.load_workflows() + results = await benchmark.run_benchmark(num_test_images=10, batch_size=2) + benchmark.save_results() +``` + +## Workflow Structure + +### Batch Processing Workflow Components + +1. **LoadBatchTensor** - Collects images from tensor cache into batches +2. **StartPerformanceTimerNode** - Starts performance measurement +3. **DepthAnythingTensorrt** - Processes depth maps in batch mode +4. **TensorRTLoader** - Loads TensorRT engine optimized for batch size 2 +5. **KSampler** - Performs diffusion sampling in batch mode +6. **ControlNetApplyAdvanced** - Applies control net in batch mode +7. **VAEDecode** - Decodes latents to images in batch mode +8. **SaveBatchTensor** - Saves batch outputs to tensor cache +9. **PerformanceTimerNode** - Ends performance measurement and reports metrics + +## Performance Considerations + +### Expected Benefits +- **2x FPS improvement** when processing 2 images simultaneously +- **Reduced memory overhead** per image due to batch processing +- **Better GPU utilization** with larger batch sizes +- **Lower latency** per image in batch mode + +### Requirements +- TensorRT engine compiled for batch size 2 +- Sufficient GPU memory for batch processing +- ComfyUI nodes that support batch processing + +### Limitations +- Some nodes may not support batching (composite overlays, etc.) +- Memory usage increases with batch size +- Latency may increase if batch is not full + +## TensorRT Engine Setup + +To use this implementation, you need a TensorRT engine compiled for batch size 2: + +1. Use the `Dynamic Model TensorRT Conversion` node in ComfyUI +2. Set batch size parameters: + - `batch_size_min`: 2 + - `batch_size_max`: 2 + - `batch_size_opt`: 2 +3. Specify resolution: 512x512 +4. Provide filename prefix: `tensorrt/dreamshaper8_batch2` +5. Update the workflow to use the correct engine filename + +## Troubleshooting + +### Common Issues + +1. **Queue Full Error**: Ensure the tensor cache queue size is set to 2 or higher +2. **Memory Errors**: Reduce batch size or image resolution +3. **TensorRT Engine Not Found**: Verify the engine filename in the workflow +4. **Performance Not Improved**: Check that all nodes support batching + +### Debug Mode + +Enable debug logging to troubleshoot issues: + +```python +import logging +logging.basicConfig(level=logging.DEBUG) +``` + +## Future Improvements + +- Support for dynamic batch sizes +- Automatic batch size optimization +- Memory usage monitoring +- More sophisticated performance metrics +- Support for larger batch sizes (4, 8, etc.) + +## Contributing + +When adding new batch processing features: +1. Ensure compatibility with existing tensor cache system +2. Add performance measurement capabilities +3. Update documentation and examples +4. Test with various batch sizes and image resolutions diff --git a/benchmark_batch_processing.py b/benchmark_batch_processing.py new file mode 100644 index 000000000..5ae687343 --- /dev/null +++ b/benchmark_batch_processing.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 +""" +Benchmark script to compare single vs batch processing performance in ComfyStream. +This script measures FPS gains when processing images in batches of 2. +""" + +import asyncio +import time +import torch +import json +import numpy as np +from typing import List, Dict, Any +import logging + +from comfystream.client import ComfyStreamClient +from comfystream.tensor_cache import performance_timer + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class BatchProcessingBenchmark: + def __init__(self, cwd: str = None): + self.client = ComfyStreamClient(cwd=cwd) + self.results = { + "single_processing": {}, + "batch_processing": {}, + "performance_gains": {} + } + + async def load_workflows(self): + """Load the single and batch processing workflows.""" + try: + # Load single processing workflow + with open("./workflows/comfystream/sd15-tensorrt-api.json", "r") as f: + self.single_workflow = json.load(f) + + # Load batch processing workflow + with open("./workflows/comfystream/sd15-tensorrt-batch2-performance-api.json", "r") as f: + self.batch_workflow = json.load(f) + + logger.info("Workflows loaded successfully") + except FileNotFoundError as e: + logger.error(f"Workflow file not found: {e}") + raise + + def generate_test_images(self, num_images: int, height: int = 512, width: int = 512) -> List[torch.Tensor]: + """Generate test images for benchmarking.""" + images = [] + for i in range(num_images): + # Generate random test image + image = torch.randn(1, height, width, 3, dtype=torch.float32) + images.append(image) + return images + + async def benchmark_single_processing(self, test_images: List[torch.Tensor]) -> Dict[str, Any]: + """Benchmark single image processing.""" + logger.info("Starting single processing benchmark...") + + # Reset performance timer + performance_timer.reset() + + start_time = time.time() + + # Process each image individually + for i, image in enumerate(test_images): + logger.info(f"Processing single image {i+1}/{len(test_images)}") + + # Put image in queue + self.client.put_video_input(image) + + # Set workflow and process + await self.client.set_prompts([self.single_workflow]) + + # Wait for output + output = await self.client.get_video_output() + logger.info(f"Single image {i+1} processed, output shape: {output.shape}") + + end_time = time.time() + total_time = end_time - start_time + + results = { + "total_time": total_time, + "num_images": len(test_images), + "fps": len(test_images) / total_time, + "avg_time_per_image": total_time / len(test_images) + } + + logger.info(f"Single processing completed: {results['fps']:.2f} FPS") + return results + + async def benchmark_batch_processing(self, test_images: List[torch.Tensor], batch_size: int = 2) -> Dict[str, Any]: + """Benchmark batch image processing.""" + logger.info("Starting batch processing benchmark...") + + # Reset performance timer + performance_timer.reset() + + start_time = time.time() + + # Process images in batches + for i in range(0, len(test_images), batch_size): + batch_images = test_images[i:i+batch_size] + logger.info(f"Processing batch {i//batch_size + 1}/{(len(test_images) + batch_size - 1)//batch_size}") + + # Put batch images in queue + for image in batch_images: + self.client.put_video_input(image) + + # Set batch workflow and process + await self.client.set_prompts([self.batch_workflow]) + + # Wait for batch output + for _ in range(len(batch_images)): + output = await self.client.get_video_output() + logger.info(f"Batch image processed, output shape: {output.shape}") + + end_time = time.time() + total_time = end_time - start_time + + results = { + "total_time": total_time, + "num_images": len(test_images), + "batch_size": batch_size, + "fps": len(test_images) / total_time, + "avg_time_per_image": total_time / len(test_images), + "avg_time_per_batch": total_time / ((len(test_images) + batch_size - 1) // batch_size) + } + + logger.info(f"Batch processing completed: {results['fps']:.2f} FPS") + return results + + def calculate_performance_gains(self, single_results: Dict[str, Any], batch_results: Dict[str, Any]) -> Dict[str, Any]: + """Calculate performance gains from batch processing.""" + gains = { + "fps_improvement": (batch_results["fps"] - single_results["fps"]) / single_results["fps"] * 100, + "time_reduction": (single_results["total_time"] - batch_results["total_time"]) / single_results["total_time"] * 100, + "efficiency_ratio": batch_results["fps"] / single_results["fps"] + } + return gains + + async def run_benchmark(self, num_test_images: int = 10, batch_size: int = 2): + """Run the complete benchmark.""" + logger.info(f"Starting benchmark with {num_test_images} test images, batch size {batch_size}") + + # Generate test images + test_images = self.generate_test_images(num_test_images) + logger.info(f"Generated {len(test_images)} test images") + + try: + # Benchmark single processing + single_results = await self.benchmark_single_processing(test_images) + self.results["single_processing"] = single_results + + # Wait a bit between tests + await asyncio.sleep(2) + + # Benchmark batch processing + batch_results = await self.benchmark_batch_processing(test_images, batch_size) + self.results["batch_processing"] = batch_results + + # Calculate performance gains + performance_gains = self.calculate_performance_gains(single_results, batch_results) + self.results["performance_gains"] = performance_gains + + # Print results + self.print_results() + + return self.results + + except Exception as e: + logger.error(f"Benchmark failed: {e}") + raise + finally: + # Cleanup + await self.client.cleanup() + + def print_results(self): + """Print benchmark results in a formatted way.""" + print("\n" + "="*60) + print("COMFYSTREAM BATCH PROCESSING BENCHMARK RESULTS") + print("="*60) + + single = self.results["single_processing"] + batch = self.results["batch_processing"] + gains = self.results["performance_gains"] + + print(f"\nTest Configuration:") + print(f" Total Images: {single['num_images']}") + print(f" Batch Size: {batch['batch_size']}") + + print(f"\nSingle Processing:") + print(f" Total Time: {single['total_time']:.2f} seconds") + print(f" FPS: {single['fps']:.2f}") + print(f" Avg Time per Image: {single['avg_time_per_image']:.4f} seconds") + + print(f"\nBatch Processing:") + print(f" Total Time: {batch['total_time']:.2f} seconds") + print(f" FPS: {batch['fps']:.2f}") + print(f" Avg Time per Image: {batch['avg_time_per_image']:.4f} seconds") + print(f" Avg Time per Batch: {batch['avg_time_per_batch']:.4f} seconds") + + print(f"\nPerformance Gains:") + print(f" FPS Improvement: {gains['fps_improvement']:.1f}%") + print(f" Time Reduction: {gains['time_reduction']:.1f}%") + print(f" Efficiency Ratio: {gains['efficiency_ratio']:.2f}x") + + if gains['fps_improvement'] > 0: + print(f"\n✅ Batch processing is {gains['efficiency_ratio']:.2f}x faster!") + else: + print(f"\n❌ Single processing is faster (possible overhead)") + + print("="*60) + + def save_results(self, filename: str = "batch_processing_benchmark_results.json"): + """Save benchmark results to a JSON file.""" + with open(filename, 'w') as f: + json.dump(self.results, f, indent=2) + logger.info(f"Results saved to {filename}") + + +async def main(): + """Main benchmark execution.""" + benchmark = BatchProcessingBenchmark() + + try: + await benchmark.load_workflows() + results = await benchmark.run_benchmark(num_test_images=10, batch_size=2) + benchmark.save_results() + + except Exception as e: + logger.error(f"Benchmark execution failed: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + exit_code = asyncio.run(main()) + exit(exit_code) diff --git a/example_batch_processing.py b/example_batch_processing.py new file mode 100644 index 000000000..80981e1d5 --- /dev/null +++ b/example_batch_processing.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +""" +Example script demonstrating batch processing with ComfyStream. +This script shows how to process 2 images at a time using the batch workflow. +""" + +import asyncio +import torch +import json +import logging +from typing import List + +from comfystream.client import ComfyStreamClient + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def batch_processing_example(): + """Example of batch processing with ComfyStream.""" + + # Initialize ComfyStream client + client = ComfyStreamClient() + + try: + # Load the batch processing workflow + with open("./workflows/comfystream/sd15-tensorrt-batch2-performance-api.json", "r") as f: + workflow = json.load(f) + + logger.info("Loaded batch processing workflow") + + # Generate test images (2 images for batch processing) + test_images = [ + torch.randn(1, 512, 512, 3, dtype=torch.float32), # First image + torch.randn(1, 512, 512, 3, dtype=torch.float32), # Second image + ] + + logger.info(f"Generated {len(test_images)} test images") + + # Put both images in the input queue + for i, image in enumerate(test_images): + client.put_video_input(image) + logger.info(f"Added image {i+1} to input queue") + + # Set the batch processing workflow + await client.set_prompts([workflow]) + logger.info("Started batch processing workflow") + + # Process the batch and collect outputs + outputs = [] + for i in range(len(test_images)): + output = await client.get_video_output() + outputs.append(output) + logger.info(f"Received output {i+1}, shape: {output.shape}") + + logger.info(f"Batch processing completed! Processed {len(outputs)} images") + + # Print performance summary if available + # Note: The performance timer results would be available in the workflow output + # This is a simplified example focusing on the batch processing flow + + return outputs + + except Exception as e: + logger.error(f"Batch processing failed: {e}") + raise + finally: + # Cleanup + await client.cleanup() + + +async def single_vs_batch_comparison(): + """Compare single vs batch processing performance.""" + + client = ComfyStreamClient() + + try: + # Load workflows + with open("./workflows/comfystream/sd15-tensorrt-api.json", "r") as f: + single_workflow = json.load(f) + + with open("./workflows/comfystream/sd15-tensorrt-batch2-performance-api.json", "r") as f: + batch_workflow = json.load(f) + + # Generate test images + test_images = [ + torch.randn(1, 512, 512, 3, dtype=torch.float32), + torch.randn(1, 512, 512, 3, dtype=torch.float32), + ] + + logger.info("=== SINGLE PROCESSING TEST ===") + single_start = asyncio.get_event_loop().time() + + # Process images one by one + for i, image in enumerate(test_images): + client.put_video_input(image) + await client.set_prompts([single_workflow]) + output = await client.get_video_output() + logger.info(f"Single processed image {i+1}, shape: {output.shape}") + + single_end = asyncio.get_event_loop().time() + single_time = single_end - single_start + + logger.info("=== BATCH PROCESSING TEST ===") + batch_start = asyncio.get_event_loop().time() + + # Process images in batch + for image in test_images: + client.put_video_input(image) + + await client.set_prompts([batch_workflow]) + + for i in range(len(test_images)): + output = await client.get_video_output() + logger.info(f"Batch processed image {i+1}, shape: {output.shape}") + + batch_end = asyncio.get_event_loop().time() + batch_time = batch_end - batch_start + + # Calculate performance metrics + single_fps = len(test_images) / single_time + batch_fps = len(test_images) / batch_time + improvement = (batch_fps - single_fps) / single_fps * 100 + + logger.info("=== PERFORMANCE COMPARISON ===") + logger.info(f"Single processing time: {single_time:.2f} seconds") + logger.info(f"Batch processing time: {batch_time:.2f} seconds") + logger.info(f"Single processing FPS: {single_fps:.2f}") + logger.info(f"Batch processing FPS: {batch_fps:.2f}") + logger.info(f"Performance improvement: {improvement:.1f}%") + + if improvement > 0: + logger.info(f"✅ Batch processing is {batch_fps/single_fps:.2f}x faster!") + else: + logger.info("❌ Single processing is faster (possible overhead)") + + except Exception as e: + logger.error(f"Comparison failed: {e}") + raise + finally: + await client.cleanup() + + +async def main(): + """Main example execution.""" + logger.info("ComfyStream Batch Processing Example") + logger.info("=" * 50) + + try: + # Run batch processing example + logger.info("Running batch processing example...") + outputs = await batch_processing_example() + logger.info(f"Successfully processed {len(outputs)} images in batch") + + # Run performance comparison + logger.info("\nRunning performance comparison...") + await single_vs_batch_comparison() + + logger.info("\nExample completed successfully!") + + except Exception as e: + logger.error(f"Example failed: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + exit_code = asyncio.run(main()) + exit(exit_code) diff --git a/nodes/tensor_utils/load_batch_tensor.py b/nodes/tensor_utils/load_batch_tensor.py new file mode 100644 index 000000000..7ced0243d --- /dev/null +++ b/nodes/tensor_utils/load_batch_tensor.py @@ -0,0 +1,89 @@ +import torch +import numpy as np +from typing import List, Union + +from comfystream import tensor_cache + + +class LoadBatchTensor: + CATEGORY = "tensor_utils" + RETURN_TYPES = ("IMAGE",) + FUNCTION = "execute" + + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "batch_size": ("INT", {"default": 2, "min": 1, "max": 8, "step": 1}), + } + } + + @classmethod + def IS_CHANGED(s): + return float("nan") + + def execute(self, batch_size: int): + """ + Load a batch of images from the tensor cache. + Collects up to batch_size images from the queue. + """ + batch_images = [] + + # Collect images up to batch_size + for i in range(batch_size): + if not tensor_cache.image_inputs.empty(): + frame = tensor_cache.image_inputs.get(block=False) + frame.side_data.skipped = False + batch_images.append(frame.side_data.input) + else: + # If we don't have enough images, pad with the last available image + if batch_images: + batch_images.append(batch_images[-1]) + else: + # If no images available, create a dummy tensor + dummy_tensor = torch.zeros(1, 512, 512, 3, dtype=torch.float32) + batch_images.append(dummy_tensor) + + # Stack images into a batch + if len(batch_images) > 1: + batch_tensor = torch.cat(batch_images, dim=0) + else: + batch_tensor = batch_images[0] + + return (batch_tensor,) + + +class SaveBatchTensor: + CATEGORY = "tensor_utils" + RETURN_TYPES = () + FUNCTION = "execute" + OUTPUT_NODE = True + + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "images": ("IMAGE",), + } + } + + @classmethod + def IS_CHANGED(s): + return float("nan") + + def execute(self, images: torch.Tensor): + """ + Save a batch of images to the tensor cache. + Splits the batch and puts each image individually. + """ + # Split batch into individual images + if images.dim() == 4 and images.shape[0] > 1: + # Batch of images + for i in range(images.shape[0]): + single_image = images[i:i+1] # Keep batch dimension + tensor_cache.image_outputs.put_nowait(single_image) + else: + # Single image + tensor_cache.image_outputs.put_nowait(images) + + return images diff --git a/nodes/tensor_utils/performance_timer.py b/nodes/tensor_utils/performance_timer.py new file mode 100644 index 000000000..fc0d64799 --- /dev/null +++ b/nodes/tensor_utils/performance_timer.py @@ -0,0 +1,150 @@ +import time +import torch +from typing import Dict, List, Optional +from contextlib import contextmanager + + +class PerformanceTimer: + """Utility class for measuring performance metrics in ComfyStream workflows.""" + + def __init__(self): + self.timings: Dict[str, List[float]] = {} + self.current_timings: Dict[str, float] = {} + self.batch_sizes: List[int] = [] + self.total_images_processed = 0 + + def start_timing(self, operation: str): + """Start timing an operation.""" + self.current_timings[operation] = time.time() + + def end_timing(self, operation: str): + """End timing an operation and record the duration.""" + if operation in self.current_timings: + duration = time.time() - self.current_timings[operation] + if operation not in self.timings: + self.timings[operation] = [] + self.timings[operation].append(duration) + del self.current_timings[operation] + return duration + return 0.0 + + def record_batch_processing(self, batch_size: int, num_images: int): + """Record a batch processing event.""" + self.batch_sizes.append(batch_size) + self.total_images_processed += num_images + + def get_fps(self, operation: str = "total") -> float: + """Calculate FPS for a specific operation.""" + if operation not in self.timings or not self.timings[operation]: + return 0.0 + + total_time = sum(self.timings[operation]) + if total_time == 0: + return 0.0 + + return self.total_images_processed / total_time + + def get_average_time(self, operation: str) -> float: + """Get average time for an operation.""" + if operation not in self.timings or not self.timings[operation]: + return 0.0 + + return sum(self.timings[operation]) / len(self.timings[operation]) + + def get_performance_summary(self) -> Dict[str, float]: + """Get a comprehensive performance summary.""" + summary = { + "total_images_processed": self.total_images_processed, + "total_fps": self.get_fps("total"), + "average_batch_size": sum(self.batch_sizes) / len(self.batch_sizes) if self.batch_sizes else 0, + } + + for operation in self.timings: + summary[f"{operation}_fps"] = self.get_fps(operation) + summary[f"{operation}_avg_time"] = self.get_average_time(operation) + + return summary + + def reset(self): + """Reset all performance data.""" + self.timings.clear() + self.current_timings.clear() + self.batch_sizes.clear() + self.total_images_processed = 0 + + @contextmanager + def time_operation(self, operation: str): + """Context manager for timing operations.""" + self.start_timing(operation) + try: + yield + finally: + self.end_timing(operation) + + +# Global performance timer instance +performance_timer = PerformanceTimer() + + +class PerformanceTimerNode: + CATEGORY = "tensor_utils" + RETURN_TYPES = ("STRING",) + RETURN_NAMES = ("performance_summary",) + FUNCTION = "execute" + + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "operation": ("STRING", {"default": "workflow_execution"}), + "batch_size": ("INT", {"default": 1, "min": 1, "max": 8, "step": 1}), + "num_images": ("INT", {"default": 1, "min": 1, "max": 100, "step": 1}), + } + } + + @classmethod + def IS_CHANGED(s): + return float("nan") + + def execute(self, operation: str, batch_size: int, num_images: int): + """Record performance metrics and return summary.""" + performance_timer.record_batch_processing(batch_size, num_images) + performance_timer.end_timing(operation) + + summary = performance_timer.get_performance_summary() + + # Format summary as readable string + summary_str = f"Performance Summary:\n" + summary_str += f"Total Images Processed: {summary['total_images_processed']}\n" + summary_str += f"Total FPS: {summary['total_fps']:.2f}\n" + summary_str += f"Average Batch Size: {summary['average_batch_size']:.2f}\n" + + for key, value in summary.items(): + if key not in ["total_images_processed", "total_fps", "average_batch_size"]: + summary_str += f"{key}: {value:.4f}\n" + + return (summary_str,) + + +class StartPerformanceTimerNode: + CATEGORY = "tensor_utils" + RETURN_TYPES = ("STRING",) + RETURN_NAMES = ("timer_started",) + FUNCTION = "execute" + + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "operation": ("STRING", {"default": "workflow_execution"}), + } + } + + @classmethod + def IS_CHANGED(s): + return float("nan") + + def execute(self, operation: str): + """Start timing an operation.""" + performance_timer.start_timing(operation) + return (f"Started timing: {operation}",) diff --git a/src/comfystream/tensor_cache.py b/src/comfystream/tensor_cache.py index 5cd54332e..6af18d561 100644 --- a/src/comfystream/tensor_cache.py +++ b/src/comfystream/tensor_cache.py @@ -7,7 +7,8 @@ from typing import Union # TODO: improve eviction policy fifo might not be the best, skip alternate frames instead -image_inputs: Queue[Union[torch.Tensor, np.ndarray]] = Queue(maxsize=1) +# Increased queue size to support batch processing (2 images at a time) +image_inputs: Queue[Union[torch.Tensor, np.ndarray]] = Queue(maxsize=2) image_outputs: AsyncQueue[Union[torch.Tensor, np.ndarray]] = AsyncQueue() audio_inputs: Queue[Union[torch.Tensor, np.ndarray]] = Queue() diff --git a/test_batch_processing.py b/test_batch_processing.py new file mode 100644 index 000000000..ce86c47f4 --- /dev/null +++ b/test_batch_processing.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +""" +Test script to verify batch processing functionality in ComfyStream. +This script tests the basic batch processing workflow without requiring a full ComfyUI setup. +""" + +import asyncio +import torch +import json +import logging +from typing import List + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def test_tensor_cache_modifications(): + """Test that tensor cache modifications work correctly.""" + logger.info("Testing tensor cache modifications...") + + try: + from comfystream.tensor_cache import image_inputs + + # Test that queue size is now 2 + assert image_inputs.maxsize == 2, f"Expected maxsize 2, got {image_inputs.maxsize}" + + # Test putting multiple items + test_tensor1 = torch.randn(1, 512, 512, 3) + test_tensor2 = torch.randn(1, 512, 512, 3) + + # Should be able to put 2 items without blocking + image_inputs.put(test_tensor1, block=False) + image_inputs.put(test_tensor2, block=False) + + # Should be full now + assert image_inputs.full(), "Queue should be full after putting 2 items" + + # Clean up + image_inputs.get() + image_inputs.get() + + logger.info("✅ Tensor cache modifications working correctly") + return True + + except Exception as e: + logger.error(f"❌ Tensor cache test failed: {e}") + return False + + +def test_batch_tensor_nodes(): + """Test the batch tensor loading and saving nodes.""" + logger.info("Testing batch tensor nodes...") + + try: + from nodes.tensor_utils.load_batch_tensor import LoadBatchTensor, SaveBatchTensor + + # Test LoadBatchTensor + load_node = LoadBatchTensor() + + # Mock tensor cache with test data + from comfystream import tensor_cache + test_tensor1 = torch.randn(1, 512, 512, 3) + test_tensor2 = torch.randn(1, 512, 512, 3) + + # Add test data to cache + tensor_cache.image_inputs.put(test_tensor1) + tensor_cache.image_inputs.put(test_tensor2) + + # Test loading batch + batch_result = load_node.execute(batch_size=2) + assert len(batch_result) == 1, "Should return tuple with one element" + batch_tensor = batch_result[0] + assert batch_tensor.shape[0] == 2, f"Expected batch size 2, got {batch_tensor.shape[0]}" + + # Test SaveBatchTensor + save_node = SaveBatchTensor() + + # Mock the output queue + original_put = tensor_cache.image_outputs.put_nowait + saved_tensors = [] + + def mock_put(tensor): + saved_tensors.append(tensor) + + tensor_cache.image_outputs.put_nowait = mock_put + + # Test saving batch + save_node.execute(batch_tensor) + assert len(saved_tensors) == 2, f"Expected 2 saved tensors, got {len(saved_tensors)}" + + # Restore original function + tensor_cache.image_outputs.put_nowait = original_put + + # Clean up + while not tensor_cache.image_inputs.empty(): + tensor_cache.image_inputs.get() + + logger.info("✅ Batch tensor nodes working correctly") + return True + + except Exception as e: + logger.error(f"❌ Batch tensor nodes test failed: {e}") + return False + + +def test_performance_timer(): + """Test the performance timer functionality.""" + logger.info("Testing performance timer...") + + try: + from nodes.tensor_utils.performance_timer import PerformanceTimer, PerformanceTimerNode, StartPerformanceTimerNode + + # Test PerformanceTimer class + timer = PerformanceTimer() + + # Test timing operations + timer.start_timing("test_operation") + import time + time.sleep(0.01) # Small delay + timer.end_timing("test_operation") + + timer.record_batch_processing(batch_size=2, num_images=2) + + # Test getting performance summary + summary = timer.get_performance_summary() + assert "total_images_processed" in summary + assert "total_fps" in summary + assert summary["total_images_processed"] == 2 + + # Test nodes + start_node = StartPerformanceTimerNode() + timer_node = PerformanceTimerNode() + + # Test start timer node + result = start_node.execute("test_workflow") + assert "Started timing" in result[0] + + # Test timer node + result = timer_node.execute("test_workflow", 2, 2) + assert "Performance Summary" in result[0] + + logger.info("✅ Performance timer working correctly") + return True + + except Exception as e: + logger.error(f"❌ Performance timer test failed: {e}") + return False + + +def test_workflow_validation(): + """Test that the workflow JSON files are valid.""" + logger.info("Testing workflow validation...") + + workflow_files = [ + "workflows/comfystream/sd15-tensorrt-batch2-api.json", + "workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json", + "workflows/comfystream/sd15-tensorrt-batch2-performance-api.json" + ] + + try: + for workflow_file in workflow_files: + with open(workflow_file, 'r') as f: + workflow = json.load(f) + + # Basic validation + assert isinstance(workflow, dict), f"Workflow {workflow_file} should be a dictionary" + assert len(workflow) > 0, f"Workflow {workflow_file} should not be empty" + + # Check for required nodes + node_ids = list(workflow.keys()) + assert len(node_ids) > 0, f"Workflow {workflow_file} should have nodes" + + logger.info(f"✅ Workflow {workflow_file} is valid") + + logger.info("✅ All workflows are valid") + return True + + except Exception as e: + logger.error(f"❌ Workflow validation failed: {e}") + return False + + +def test_benchmark_script(): + """Test that the benchmark script can be imported and basic functions work.""" + logger.info("Testing benchmark script...") + + try: + from benchmark_batch_processing import BatchProcessingBenchmark + + # Test creating benchmark instance + benchmark = BatchProcessingBenchmark() + assert benchmark.client is not None + assert benchmark.results is not None + + # Test generating test images + test_images = benchmark.generate_test_images(2) + assert len(test_images) == 2 + assert all(isinstance(img, torch.Tensor) for img in test_images) + assert all(img.shape == (1, 512, 512, 3) for img in test_images) + + logger.info("✅ Benchmark script working correctly") + return True + + except Exception as e: + logger.error(f"❌ Benchmark script test failed: {e}") + return False + + +def run_all_tests(): + """Run all tests and report results.""" + logger.info("Starting ComfyStream Batch Processing Tests") + logger.info("=" * 50) + + tests = [ + ("Tensor Cache Modifications", test_tensor_cache_modifications), + ("Batch Tensor Nodes", test_batch_tensor_nodes), + ("Performance Timer", test_performance_timer), + ("Workflow Validation", test_workflow_validation), + ("Benchmark Script", test_benchmark_script), + ] + + passed = 0 + total = len(tests) + + for test_name, test_func in tests: + logger.info(f"\nRunning {test_name} test...") + try: + if test_func(): + passed += 1 + logger.info(f"✅ {test_name} test PASSED") + else: + logger.error(f"❌ {test_name} test FAILED") + except Exception as e: + logger.error(f"❌ {test_name} test FAILED with exception: {e}") + + logger.info("\n" + "=" * 50) + logger.info(f"Test Results: {passed}/{total} tests passed") + + if passed == total: + logger.info("🎉 All tests passed! Batch processing implementation is working correctly.") + return True + else: + logger.error(f"❌ {total - passed} tests failed. Please check the implementation.") + return False + + +if __name__ == "__main__": + success = run_all_tests() + exit(0 if success else 1) diff --git a/workflows/comfystream/sd15-tensorrt-batch2-api.json b/workflows/comfystream/sd15-tensorrt-batch2-api.json new file mode 100644 index 000000000..e9e68d8a2 --- /dev/null +++ b/workflows/comfystream/sd15-tensorrt-batch2-api.json @@ -0,0 +1,269 @@ +{ + "1": { + "inputs": { + "image1": "example1.png", + "image2": "example2.png", + "upload": "image" + }, + "class_type": "ImageBatch", + "_meta": { + "title": "Image Batch Input" + } + }, + "2": { + "inputs": { + "engine": "depth_anything_vitl14-fp16.engine", + "images": [ + "1", + 0 + ] + }, + "class_type": "DepthAnythingTensorrt", + "_meta": { + "title": "Depth Anything Tensorrt (Batch)" + } + }, + "3": { + "inputs": { + "unet_name": "static-dreamshaper8_SD15_$stat-b-2-h-512-w-512_00001_.engine", + "model_type": "SD15" + }, + "class_type": "TensorRTLoader", + "_meta": { + "title": "TensorRT Loader (Batch Size 2)" + } + }, + "5": { + "inputs": { + "text": "the hulk", + "clip": [ + "23", + 0 + ] + }, + "class_type": "CLIPTextEncode", + "_meta": { + "title": "CLIP Text Encode (Prompt)" + } + }, + "6": { + "inputs": { + "text": "", + "clip": [ + "23", + 0 + ] + }, + "class_type": "CLIPTextEncode", + "_meta": { + "title": "CLIP Text Encode (Negative Prompt)" + } + }, + "7": { + "inputs": { + "seed": 905056445574169, + "steps": 1, + "cfg": 1, + "sampler_name": "lcm", + "scheduler": "normal", + "denoise": 1, + "model": [ + "3", + 0 + ], + "positive": [ + "9", + 0 + ], + "negative": [ + "9", + 1 + ], + "latent_image": [ + "16", + 0 + ] + }, + "class_type": "KSampler", + "_meta": { + "title": "KSampler (Batch Processing)" + } + }, + "8": { + "inputs": { + "control_net_name": "control_v11f1p_sd15_depth_fp16.safetensors" + }, + "class_type": "ControlNetLoader", + "_meta": { + "title": "Load ControlNet Model" + } + }, + "9": { + "inputs": { + "strength": 1, + "start_percent": 0, + "end_percent": 1, + "positive": [ + "5", + 0 + ], + "negative": [ + "6", + 0 + ], + "control_net": [ + "10", + 0 + ], + "image": [ + "2", + 0 + ] + }, + "class_type": "ControlNetApplyAdvanced", + "_meta": { + "title": "Apply ControlNet (Batch)" + } + }, + "10": { + "inputs": { + "backend": "inductor", + "fullgraph": false, + "mode": "reduce-overhead", + "controlnet": [ + "8", + 0 + ] + }, + "class_type": "TorchCompileLoadControlNet", + "_meta": { + "title": "TorchCompileLoadControlNet" + } + }, + "11": { + "inputs": { + "vae_name": "taesd" + }, + "class_type": "VAELoader", + "_meta": { + "title": "Load VAE" + } + }, + "13": { + "inputs": { + "backend": "inductor", + "fullgraph": true, + "mode": "reduce-overhead", + "compile_encoder": true, + "compile_decoder": true, + "vae": [ + "11", + 0 + ] + }, + "class_type": "TorchCompileLoadVAE", + "_meta": { + "title": "TorchCompileLoadVAE" + } + }, + "14": { + "inputs": { + "samples": [ + "7", + 0 + ], + "vae": [ + "13", + 0 + ] + }, + "class_type": "VAEDecode", + "_meta": { + "title": "VAE Decode (Batch)" + } + }, + "15": { + "inputs": { + "images": [ + "14", + 0 + ] + }, + "class_type": "PreviewImage", + "_meta": { + "title": "Preview Image (Batch)" + } + }, + "16": { + "inputs": { + "width": 512, + "height": 512, + "batch_size": 2 + }, + "class_type": "EmptyLatentImage", + "_meta": { + "title": "Empty Latent Image (Batch Size 2)" + } + }, + "17": { + "inputs": { + "images": [ + "14", + 0 + ], + "index": 0 + }, + "class_type": "ImageFromBatch", + "_meta": { + "title": "Image From Batch (First Image)" + } + }, + "18": { + "inputs": { + "images": [ + "14", + 0 + ], + "index": 1 + }, + "class_type": "ImageFromBatch", + "_meta": { + "title": "Image From Batch (Second Image)" + } + }, + "19": { + "inputs": { + "images": [ + "17", + 0 + ] + }, + "class_type": "SaveTensor", + "_meta": { + "title": "Save First Image Tensor" + } + }, + "20": { + "inputs": { + "images": [ + "18", + 0 + ] + }, + "class_type": "SaveTensor", + "_meta": { + "title": "Save Second Image Tensor" + } + }, + "23": { + "inputs": { + "clip_name": "CLIPText/model.fp16.safetensors", + "type": "stable_diffusion", + "device": "default" + }, + "class_type": "CLIPLoader", + "_meta": { + "title": "Load CLIP" + } + } +} diff --git a/workflows/comfystream/sd15-tensorrt-batch2-performance-api.json b/workflows/comfystream/sd15-tensorrt-batch2-performance-api.json new file mode 100644 index 000000000..3b8098e93 --- /dev/null +++ b/workflows/comfystream/sd15-tensorrt-batch2-performance-api.json @@ -0,0 +1,249 @@ +{ + "1": { + "inputs": { + "batch_size": 2 + }, + "class_type": "LoadBatchTensor", + "_meta": { + "title": "Load Batch Tensor (Size 2)" + } + }, + "2": { + "inputs": { + "operation": "workflow_execution" + }, + "class_type": "StartPerformanceTimerNode", + "_meta": { + "title": "Start Performance Timer" + } + }, + "3": { + "inputs": { + "engine": "depth_anything_vitl14-fp16.engine", + "images": [ + "1", + 0 + ] + }, + "class_type": "DepthAnythingTensorrt", + "_meta": { + "title": "Depth Anything Tensorrt (Batch)" + } + }, + "4": { + "inputs": { + "unet_name": "static-dreamshaper8_SD15_$stat-b-2-h-512-w-512_00001_.engine", + "model_type": "SD15" + }, + "class_type": "TensorRTLoader", + "_meta": { + "title": "TensorRT Loader (Batch Size 2)" + } + }, + "5": { + "inputs": { + "text": "the hulk", + "clip": [ + "23", + 0 + ] + }, + "class_type": "CLIPTextEncode", + "_meta": { + "title": "CLIP Text Encode (Prompt)" + } + }, + "6": { + "inputs": { + "text": "", + "clip": [ + "23", + 0 + ] + }, + "class_type": "CLIPTextEncode", + "_meta": { + "title": "CLIP Text Encode (Negative Prompt)" + } + }, + "7": { + "inputs": { + "seed": 905056445574169, + "steps": 1, + "cfg": 1, + "sampler_name": "lcm", + "scheduler": "normal", + "denoise": 1, + "model": [ + "4", + 0 + ], + "positive": [ + "9", + 0 + ], + "negative": [ + "9", + 1 + ], + "latent_image": [ + "16", + 0 + ] + }, + "class_type": "KSampler", + "_meta": { + "title": "KSampler (Batch Processing)" + } + }, + "8": { + "inputs": { + "control_net_name": "control_v11f1p_sd15_depth_fp16.safetensors" + }, + "class_type": "ControlNetLoader", + "_meta": { + "title": "Load ControlNet Model" + } + }, + "9": { + "inputs": { + "strength": 1, + "start_percent": 0, + "end_percent": 1, + "positive": [ + "5", + 0 + ], + "negative": [ + "6", + 0 + ], + "control_net": [ + "10", + 0 + ], + "image": [ + "3", + 0 + ] + }, + "class_type": "ControlNetApplyAdvanced", + "_meta": { + "title": "Apply ControlNet (Batch)" + } + }, + "10": { + "inputs": { + "backend": "inductor", + "fullgraph": false, + "mode": "reduce-overhead", + "controlnet": [ + "8", + 0 + ] + }, + "class_type": "TorchCompileLoadControlNet", + "_meta": { + "title": "TorchCompileLoadControlNet" + } + }, + "11": { + "inputs": { + "vae_name": "taesd" + }, + "class_type": "VAELoader", + "_meta": { + "title": "Load VAE" + } + }, + "13": { + "inputs": { + "backend": "inductor", + "fullgraph": true, + "mode": "reduce-overhead", + "compile_encoder": true, + "compile_decoder": true, + "vae": [ + "11", + 0 + ] + }, + "class_type": "TorchCompileLoadVAE", + "_meta": { + "title": "TorchCompileLoadVAE" + } + }, + "14": { + "inputs": { + "samples": [ + "7", + 0 + ], + "vae": [ + "13", + 0 + ] + }, + "class_type": "VAEDecode", + "_meta": { + "title": "VAE Decode (Batch)" + } + }, + "15": { + "inputs": { + "images": [ + "14", + 0 + ] + }, + "class_type": "PreviewImage", + "_meta": { + "title": "Preview Image (Batch)" + } + }, + "16": { + "inputs": { + "width": 512, + "height": 512, + "batch_size": 2 + }, + "class_type": "EmptyLatentImage", + "_meta": { + "title": "Empty Latent Image (Batch Size 2)" + } + }, + "17": { + "inputs": { + "images": [ + "14", + 0 + ] + }, + "class_type": "SaveBatchTensor", + "_meta": { + "title": "Save Batch Tensor Output" + } + }, + "18": { + "inputs": { + "operation": "workflow_execution", + "batch_size": 2, + "num_images": 2 + }, + "class_type": "PerformanceTimerNode", + "_meta": { + "title": "End Performance Timer" + } + }, + "23": { + "inputs": { + "clip_name": "CLIPText/model.fp16.safetensors", + "type": "stable_diffusion", + "device": "default" + }, + "class_type": "CLIPLoader", + "_meta": { + "title": "Load CLIP" + } + } +} diff --git a/workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json b/workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json new file mode 100644 index 000000000..7d8311d14 --- /dev/null +++ b/workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json @@ -0,0 +1,227 @@ +{ + "1": { + "inputs": {}, + "class_type": "LoadTensor", + "_meta": { + "title": "Load Tensor (Batch Input)" + } + }, + "2": { + "inputs": { + "engine": "depth_anything_vitl14-fp16.engine", + "images": [ + "1", + 0 + ] + }, + "class_type": "DepthAnythingTensorrt", + "_meta": { + "title": "Depth Anything Tensorrt (Batch)" + } + }, + "3": { + "inputs": { + "unet_name": "static-dreamshaper8_SD15_$stat-b-2-h-512-w-512_00001_.engine", + "model_type": "SD15" + }, + "class_type": "TensorRTLoader", + "_meta": { + "title": "TensorRT Loader (Batch Size 2)" + } + }, + "5": { + "inputs": { + "text": "the hulk", + "clip": [ + "23", + 0 + ] + }, + "class_type": "CLIPTextEncode", + "_meta": { + "title": "CLIP Text Encode (Prompt)" + } + }, + "6": { + "inputs": { + "text": "", + "clip": [ + "23", + 0 + ] + }, + "class_type": "CLIPTextEncode", + "_meta": { + "title": "CLIP Text Encode (Negative Prompt)" + } + }, + "7": { + "inputs": { + "seed": 905056445574169, + "steps": 1, + "cfg": 1, + "sampler_name": "lcm", + "scheduler": "normal", + "denoise": 1, + "model": [ + "3", + 0 + ], + "positive": [ + "9", + 0 + ], + "negative": [ + "9", + 1 + ], + "latent_image": [ + "16", + 0 + ] + }, + "class_type": "KSampler", + "_meta": { + "title": "KSampler (Batch Processing)" + } + }, + "8": { + "inputs": { + "control_net_name": "control_v11f1p_sd15_depth_fp16.safetensors" + }, + "class_type": "ControlNetLoader", + "_meta": { + "title": "Load ControlNet Model" + } + }, + "9": { + "inputs": { + "strength": 1, + "start_percent": 0, + "end_percent": 1, + "positive": [ + "5", + 0 + ], + "negative": [ + "6", + 0 + ], + "control_net": [ + "10", + 0 + ], + "image": [ + "2", + 0 + ] + }, + "class_type": "ControlNetApplyAdvanced", + "_meta": { + "title": "Apply ControlNet (Batch)" + } + }, + "10": { + "inputs": { + "backend": "inductor", + "fullgraph": false, + "mode": "reduce-overhead", + "controlnet": [ + "8", + 0 + ] + }, + "class_type": "TorchCompileLoadControlNet", + "_meta": { + "title": "TorchCompileLoadControlNet" + } + }, + "11": { + "inputs": { + "vae_name": "taesd" + }, + "class_type": "VAELoader", + "_meta": { + "title": "Load VAE" + } + }, + "13": { + "inputs": { + "backend": "inductor", + "fullgraph": true, + "mode": "reduce-overhead", + "compile_encoder": true, + "compile_decoder": true, + "vae": [ + "11", + 0 + ] + }, + "class_type": "TorchCompileLoadVAE", + "_meta": { + "title": "TorchCompileLoadVAE" + } + }, + "14": { + "inputs": { + "samples": [ + "7", + 0 + ], + "vae": [ + "13", + 0 + ] + }, + "class_type": "VAEDecode", + "_meta": { + "title": "VAE Decode (Batch)" + } + }, + "15": { + "inputs": { + "images": [ + "14", + 0 + ] + }, + "class_type": "PreviewImage", + "_meta": { + "title": "Preview Image (Batch)" + } + }, + "16": { + "inputs": { + "width": 512, + "height": 512, + "batch_size": 2 + }, + "class_type": "EmptyLatentImage", + "_meta": { + "title": "Empty Latent Image (Batch Size 2)" + } + }, + "17": { + "inputs": { + "images": [ + "14", + 0 + ] + }, + "class_type": "SaveTensor", + "_meta": { + "title": "Save Batch Tensor Output" + } + }, + "23": { + "inputs": { + "clip_name": "CLIPText/model.fp16.safetensors", + "type": "stable_diffusion", + "device": "default" + }, + "class_type": "CLIPLoader", + "_meta": { + "title": "Load CLIP" + } + } +} From f13bd811ba6b6d032ad5adea6aa2e927fd613e2a Mon Sep 17 00:00:00 2001 From: lukiod Date: Sun, 14 Sep 2025 14:59:52 +0000 Subject: [PATCH 2/7] done --- -files | 258 ++++++++++++++++++ BATCH_PROCESSING_README.md | 178 ------------ benchmark_batch_processing.py | 241 ---------------- example_batch_processing.py | 171 ------------ nodes/tensor_utils/load_batch_tensor.py | 89 ------ nodes/tensor_utils/load_tensor.py | 47 +++- nodes/tensor_utils/performance_nodes.py | 70 +++++ nodes/tensor_utils/performance_timer.py | 150 ---------- nodes/tensor_utils/save_tensor.py | 19 +- src/comfystream/utils.py | 86 +++++- test_batch_processing.py | 250 ----------------- .../sd15-tensorrt-batch2-performance-api.json | 11 +- .../sd15-tensorrt-batch2-tensor-api.json | 11 +- 13 files changed, 485 insertions(+), 1096 deletions(-) create mode 100644 -files delete mode 100644 BATCH_PROCESSING_README.md delete mode 100644 benchmark_batch_processing.py delete mode 100644 example_batch_processing.py delete mode 100644 nodes/tensor_utils/load_batch_tensor.py create mode 100644 nodes/tensor_utils/performance_nodes.py delete mode 100644 nodes/tensor_utils/performance_timer.py delete mode 100644 test_batch_processing.py diff --git a/-files b/-files new file mode 100644 index 000000000..333a0b576 --- /dev/null +++ b/-files @@ -0,0 +1,258 @@ + + SSUUMMMMAARRYY OOFF LLEESSSS CCOOMMMMAANNDDSS + + Commands marked with * may be preceded by a number, _N. + Notes in parentheses indicate the behavior if _N is given. + A key preceded by a caret indicates the Ctrl key; thus ^K is ctrl-K. + + h H Display this help. + q :q Q :Q ZZ Exit. + --------------------------------------------------------------------------- + + MMOOVVIINNGG + + e ^E j ^N CR * Forward one line (or _N lines). + y ^Y k ^K ^P * Backward one line (or _N lines). + f ^F ^V SPACE * Forward one window (or _N lines). + b ^B ESC-v * Backward one window (or _N lines). + z * Forward one window (and set window to _N). + w * Backward one window (and set window to _N). + ESC-SPACE * Forward one window, but don't stop at end-of-file. + d ^D * Forward one half-window (and set half-window to _N). + u ^U * Backward one half-window (and set half-window to _N). + ESC-) RightArrow * Right one half screen width (or _N positions). + ESC-( LeftArrow * Left one half screen width (or _N positions). + ESC-} ^RightArrow Right to last column displayed. + ESC-{ ^LeftArrow Left to first column. + F Forward forever; like "tail -f". + ESC-F Like F but stop when search pattern is found. + r ^R ^L Repaint screen. + R Repaint screen, discarding buffered input. + --------------------------------------------------- + Default "window" is the screen height. + Default "half-window" is half of the screen height. + --------------------------------------------------------------------------- + + SSEEAARRCCHHIINNGG + + /_p_a_t_t_e_r_n * Search forward for (_N-th) matching line. + ?_p_a_t_t_e_r_n * Search backward for (_N-th) matching line. + n * Repeat previous search (for _N-th occurrence). + N * Repeat previous search in reverse direction. + ESC-n * Repeat previous search, spanning files. + ESC-N * Repeat previous search, reverse dir. & spanning files. + ESC-u Undo (toggle) search highlighting. + ESC-U Clear search highlighting. + &_p_a_t_t_e_r_n * Display only matching lines. + --------------------------------------------------- + A search pattern may begin with one or more of: + ^N or ! Search for NON-matching lines. + ^E or * Search multiple files (pass thru END OF FILE). + ^F or @ Start search at FIRST file (for /) or last file (for ?). + ^K Highlight matches, but don't move (KEEP position). + ^R Don't use REGULAR EXPRESSIONS. + ^W WRAP search if no match found. + --------------------------------------------------------------------------- + + JJUUMMPPIINNGG + + g < ESC-< * Go to first line in file (or line _N). + G > ESC-> * Go to last line in file (or line _N). + p % * Go to beginning of file (or _N percent into file). + t * Go to the (_N-th) next tag. + T * Go to the (_N-th) previous tag. + { ( [ * Find close bracket } ) ]. + } ) ] * Find open bracket { ( [. + ESC-^F _<_c_1_> _<_c_2_> * Find close bracket _<_c_2_>. + ESC-^B _<_c_1_> _<_c_2_> * Find open bracket _<_c_1_>. + --------------------------------------------------- + Each "find close bracket" command goes forward to the close bracket + matching the (_N-th) open bracket in the top line. + Each "find open bracket" command goes backward to the open bracket + matching the (_N-th) close bracket in the bottom line. + + m_<_l_e_t_t_e_r_> Mark the current top line with . + M_<_l_e_t_t_e_r_> Mark the current bottom line with . + '_<_l_e_t_t_e_r_> Go to a previously marked position. + '' Go to the previous position. + ^X^X Same as '. + ESC-M_<_l_e_t_t_e_r_> Clear a mark. + --------------------------------------------------- + A mark is any upper-case or lower-case letter. + Certain marks are predefined: + ^ means beginning of the file + $ means end of the file + --------------------------------------------------------------------------- + + CCHHAANNGGIINNGG FFIILLEESS + + :e [_f_i_l_e] Examine a new file. + ^X^V Same as :e. + :n * Examine the (_N-th) next file from the command line. + :p * Examine the (_N-th) previous file from the command line. + :x * Examine the first (or _N-th) file from the command line. + :d Delete the current file from the command line list. + = ^G :f Print current file name. + --------------------------------------------------------------------------- + + MMIISSCCEELLLLAANNEEOOUUSS CCOOMMMMAANNDDSS + + -_<_f_l_a_g_> Toggle a command line option [see OPTIONS below]. + --_<_n_a_m_e_> Toggle a command line option, by name. + __<_f_l_a_g_> Display the setting of a command line option. + ___<_n_a_m_e_> Display the setting of an option, by name. + +_c_m_d Execute the less cmd each time a new file is examined. + + !_c_o_m_m_a_n_d Execute the shell command with $SHELL. + |XX_c_o_m_m_a_n_d Pipe file between current pos & mark XX to shell command. + s _f_i_l_e Save input to a file. + v Edit the current file with $VISUAL or $EDITOR. + V Print version number of "less". + --------------------------------------------------------------------------- + + OOPPTTIIOONNSS + + Most options may be changed either on the command line, + or from within less by using the - or -- command. + Options may be given in one of two forms: either a single + character preceded by a -, or a name preceded by --. + + -? ........ --help + Display help (from command line). + -a ........ --search-skip-screen + Search skips current screen. + -A ........ --SEARCH-SKIP-SCREEN + Search starts just after target line. + -b [_N] .... --buffers=[_N] + Number of buffers. + -B ........ --auto-buffers + Don't automatically allocate buffers for pipes. + -c ........ --clear-screen + Repaint by clearing rather than scrolling. + -d ........ --dumb + Dumb terminal. + -D xx_c_o_l_o_r . --color=xx_c_o_l_o_r + Set screen colors. + -e -E .... --quit-at-eof --QUIT-AT-EOF + Quit at end of file. + -f ........ --force + Force open non-regular files. + -F ........ --quit-if-one-screen + Quit if entire file fits on first screen. + -g ........ --hilite-search + Highlight only last match for searches. + -G ........ --HILITE-SEARCH + Don't highlight any matches for searches. + -h [_N] .... --max-back-scroll=[_N] + Backward scroll limit. + -i ........ --ignore-case + Ignore case in searches that do not contain uppercase. + -I ........ --IGNORE-CASE + Ignore case in all searches. + -j [_N] .... --jump-target=[_N] + Screen position of target lines. + -J ........ --status-column + Display a status column at left edge of screen. + -k [_f_i_l_e] . --lesskey-file=[_f_i_l_e] + Use a lesskey file. + -K ........ --quit-on-intr + Exit less in response to ctrl-C. + -L ........ --no-lessopen + Ignore the LESSOPEN environment variable. + -m -M .... --long-prompt --LONG-PROMPT + Set prompt style. + -n -N .... --line-numbers --LINE-NUMBERS + Don't use line numbers. + -o [_f_i_l_e] . --log-file=[_f_i_l_e] + Copy to log file (standard input only). + -O [_f_i_l_e] . --LOG-FILE=[_f_i_l_e] + Copy to log file (unconditionally overwrite). + -p [_p_a_t_t_e_r_n] --pattern=[_p_a_t_t_e_r_n] + Start at pattern (from command line). + -P [_p_r_o_m_p_t] --prompt=[_p_r_o_m_p_t] + Define new prompt. + -q -Q .... --quiet --QUIET --silent --SILENT + Quiet the terminal bell. + -r -R .... --raw-control-chars --RAW-CONTROL-CHARS + Output "raw" control characters. + -s ........ --squeeze-blank-lines + Squeeze multiple blank lines. + -S ........ --chop-long-lines + Chop (truncate) long lines rather than wrapping. + -t [_t_a_g] .. --tag=[_t_a_g] + Find a tag. + -T [_t_a_g_s_f_i_l_e] --tag-file=[_t_a_g_s_f_i_l_e] + Use an alternate tags file. + -u -U .... --underline-special --UNDERLINE-SPECIAL + Change handling of backspaces. + -V ........ --version + Display the version number of "less". + -w ........ --hilite-unread + Highlight first new line after forward-screen. + -W ........ --HILITE-UNREAD + Highlight first new line after any forward movement. + -x [_N[,...]] --tabs=[_N[,...]] + Set tab stops. + -X ........ --no-init + Don't use termcap init/deinit strings. + -y [_N] .... --max-forw-scroll=[_N] + Forward scroll limit. + -z [_N] .... --window=[_N] + Set size of window. + -" [_c[_c]] . --quotes=[_c[_c]] + Set shell quote characters. + -~ ........ --tilde + Don't display tildes after end of file. + -# [_N] .... --shift=[_N] + Set horizontal scroll amount (0 = one half screen width). + --file-size + Automatically determine the size of the input file. + --follow-name + The F command changes files if the input file is renamed. + --incsearch + Search file as each pattern character is typed in. + --line-num-width=N + Set the width of the -N line number field to N characters. + --mouse + Enable mouse input. + --no-keypad + Don't send termcap keypad init/deinit strings. + --no-histdups + Remove duplicates from command history. + --rscroll=C + Set the character used to mark truncated lines. + --save-marks + Retain marks across invocations of less. + --status-col-width=N + Set the width of the -J status column to N characters. + --use-backslash + Subsequent options use backslash as escape char. + --use-color + Enables colored text. + --wheel-lines=N + Each click of the mouse wheel moves N lines. + + + --------------------------------------------------------------------------- + + LLIINNEE EEDDIITTIINNGG + + These keys can be used to edit text being entered + on the "command line" at the bottom of the screen. + + RightArrow ..................... ESC-l ... Move cursor right one character. + LeftArrow ...................... ESC-h ... Move cursor left one character. + ctrl-RightArrow ESC-RightArrow ESC-w ... Move cursor right one word. + ctrl-LeftArrow ESC-LeftArrow ESC-b ... Move cursor left one word. + HOME ........................... ESC-0 ... Move cursor to start of line. + END ............................ ESC-$ ... Move cursor to end of line. + BACKSPACE ................................ Delete char to left of cursor. + DELETE ......................... ESC-x ... Delete char under cursor. + ctrl-BACKSPACE ESC-BACKSPACE ........... Delete word to left of cursor. + ctrl-DELETE .... ESC-DELETE .... ESC-X ... Delete word under cursor. + ctrl-U ......... ESC (MS-DOS only) ....... Delete entire line. + UpArrow ........................ ESC-k ... Retrieve previous command line. + DownArrow ...................... ESC-j ... Retrieve next command line. + TAB ...................................... Complete filename & cycle. + SHIFT-TAB ...................... ESC-TAB Complete filename & reverse cycle. + ctrl-L ................................... Complete filename, list all. diff --git a/BATCH_PROCESSING_README.md b/BATCH_PROCESSING_README.md deleted file mode 100644 index 1af1a8a37..000000000 --- a/BATCH_PROCESSING_README.md +++ /dev/null @@ -1,178 +0,0 @@ -# ComfyStream Batch Processing Implementation - -This implementation adds batch processing capabilities to ComfyStream, allowing you to process 2 images at a time using the DreamShaper 8 SD 1.5 TensorRT engine optimized for batch size 2. - -## Overview - -The batch processing implementation includes: -- Modified tensor cache to support batch inputs (queue size increased from 1 to 2) -- Custom batch tensor loading and saving nodes -- Performance measurement utilities -- ComfyUI workflows optimized for batch processing -- Benchmarking tools to measure FPS gains - -## Files Modified/Created - -### Core Changes -- `src/comfystream/tensor_cache.py` - Increased `image_inputs` queue size from 1 to 2 -- `nodes/tensor_utils/load_batch_tensor.py` - New batch tensor loading nodes -- `nodes/tensor_utils/performance_timer.py` - Performance measurement utilities - -### Workflows -- `workflows/comfystream/sd15-tensorrt-batch2-api.json` - Basic batch processing workflow -- `workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json` - Tensor-based batch workflow -- `workflows/comfystream/sd15-tensorrt-batch2-performance-api.json` - Performance-measured batch workflow - -### Examples and Tools -- `example_batch_processing.py` - Example usage of batch processing -- `benchmark_batch_processing.py` - Comprehensive performance benchmarking -- `BATCH_PROCESSING_README.md` - This documentation - -## Key Features - -### 1. Batch Tensor Processing -The `LoadBatchTensor` node collects up to `batch_size` images from the tensor cache and stacks them into a single batch tensor. This allows downstream nodes to process multiple images simultaneously. - -### 2. Performance Measurement -The `PerformanceTimerNode` and `StartPerformanceTimerNode` provide built-in performance tracking to measure: -- FPS (Frames Per Second) -- Processing time per image -- Batch efficiency -- Performance improvements - -### 3. Optimized Workflows -The batch workflows are designed to: -- Use TensorRT engines optimized for batch size 2 -- Process Depth Anything and KSampler nodes in batch mode -- Handle non-batchable nodes appropriately -- Measure and report performance metrics - -## Usage - -### Basic Batch Processing - -```python -import asyncio -import torch -from comfystream.client import ComfyStreamClient - -async def process_batch(): - client = ComfyStreamClient() - - # Load batch workflow - with open("./workflows/comfystream/sd15-tensorrt-batch2-performance-api.json", "r") as f: - workflow = json.load(f) - - # Generate test images - images = [ - torch.randn(1, 512, 512, 3, dtype=torch.float32), - torch.randn(1, 512, 512, 3, dtype=torch.float32), - ] - - # Add images to queue - for image in images: - client.put_video_input(image) - - # Process batch - await client.set_prompts([workflow]) - - # Collect outputs - outputs = [] - for _ in range(len(images)): - output = await client.get_video_output() - outputs.append(output) - - await client.cleanup() - return outputs -``` - -### Performance Benchmarking - -```python -from benchmark_batch_processing import BatchProcessingBenchmark - -async def run_benchmark(): - benchmark = BatchProcessingBenchmark() - await benchmark.load_workflows() - results = await benchmark.run_benchmark(num_test_images=10, batch_size=2) - benchmark.save_results() -``` - -## Workflow Structure - -### Batch Processing Workflow Components - -1. **LoadBatchTensor** - Collects images from tensor cache into batches -2. **StartPerformanceTimerNode** - Starts performance measurement -3. **DepthAnythingTensorrt** - Processes depth maps in batch mode -4. **TensorRTLoader** - Loads TensorRT engine optimized for batch size 2 -5. **KSampler** - Performs diffusion sampling in batch mode -6. **ControlNetApplyAdvanced** - Applies control net in batch mode -7. **VAEDecode** - Decodes latents to images in batch mode -8. **SaveBatchTensor** - Saves batch outputs to tensor cache -9. **PerformanceTimerNode** - Ends performance measurement and reports metrics - -## Performance Considerations - -### Expected Benefits -- **2x FPS improvement** when processing 2 images simultaneously -- **Reduced memory overhead** per image due to batch processing -- **Better GPU utilization** with larger batch sizes -- **Lower latency** per image in batch mode - -### Requirements -- TensorRT engine compiled for batch size 2 -- Sufficient GPU memory for batch processing -- ComfyUI nodes that support batch processing - -### Limitations -- Some nodes may not support batching (composite overlays, etc.) -- Memory usage increases with batch size -- Latency may increase if batch is not full - -## TensorRT Engine Setup - -To use this implementation, you need a TensorRT engine compiled for batch size 2: - -1. Use the `Dynamic Model TensorRT Conversion` node in ComfyUI -2. Set batch size parameters: - - `batch_size_min`: 2 - - `batch_size_max`: 2 - - `batch_size_opt`: 2 -3. Specify resolution: 512x512 -4. Provide filename prefix: `tensorrt/dreamshaper8_batch2` -5. Update the workflow to use the correct engine filename - -## Troubleshooting - -### Common Issues - -1. **Queue Full Error**: Ensure the tensor cache queue size is set to 2 or higher -2. **Memory Errors**: Reduce batch size or image resolution -3. **TensorRT Engine Not Found**: Verify the engine filename in the workflow -4. **Performance Not Improved**: Check that all nodes support batching - -### Debug Mode - -Enable debug logging to troubleshoot issues: - -```python -import logging -logging.basicConfig(level=logging.DEBUG) -``` - -## Future Improvements - -- Support for dynamic batch sizes -- Automatic batch size optimization -- Memory usage monitoring -- More sophisticated performance metrics -- Support for larger batch sizes (4, 8, etc.) - -## Contributing - -When adding new batch processing features: -1. Ensure compatibility with existing tensor cache system -2. Add performance measurement capabilities -3. Update documentation and examples -4. Test with various batch sizes and image resolutions diff --git a/benchmark_batch_processing.py b/benchmark_batch_processing.py deleted file mode 100644 index 5ae687343..000000000 --- a/benchmark_batch_processing.py +++ /dev/null @@ -1,241 +0,0 @@ -#!/usr/bin/env python3 -""" -Benchmark script to compare single vs batch processing performance in ComfyStream. -This script measures FPS gains when processing images in batches of 2. -""" - -import asyncio -import time -import torch -import json -import numpy as np -from typing import List, Dict, Any -import logging - -from comfystream.client import ComfyStreamClient -from comfystream.tensor_cache import performance_timer - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class BatchProcessingBenchmark: - def __init__(self, cwd: str = None): - self.client = ComfyStreamClient(cwd=cwd) - self.results = { - "single_processing": {}, - "batch_processing": {}, - "performance_gains": {} - } - - async def load_workflows(self): - """Load the single and batch processing workflows.""" - try: - # Load single processing workflow - with open("./workflows/comfystream/sd15-tensorrt-api.json", "r") as f: - self.single_workflow = json.load(f) - - # Load batch processing workflow - with open("./workflows/comfystream/sd15-tensorrt-batch2-performance-api.json", "r") as f: - self.batch_workflow = json.load(f) - - logger.info("Workflows loaded successfully") - except FileNotFoundError as e: - logger.error(f"Workflow file not found: {e}") - raise - - def generate_test_images(self, num_images: int, height: int = 512, width: int = 512) -> List[torch.Tensor]: - """Generate test images for benchmarking.""" - images = [] - for i in range(num_images): - # Generate random test image - image = torch.randn(1, height, width, 3, dtype=torch.float32) - images.append(image) - return images - - async def benchmark_single_processing(self, test_images: List[torch.Tensor]) -> Dict[str, Any]: - """Benchmark single image processing.""" - logger.info("Starting single processing benchmark...") - - # Reset performance timer - performance_timer.reset() - - start_time = time.time() - - # Process each image individually - for i, image in enumerate(test_images): - logger.info(f"Processing single image {i+1}/{len(test_images)}") - - # Put image in queue - self.client.put_video_input(image) - - # Set workflow and process - await self.client.set_prompts([self.single_workflow]) - - # Wait for output - output = await self.client.get_video_output() - logger.info(f"Single image {i+1} processed, output shape: {output.shape}") - - end_time = time.time() - total_time = end_time - start_time - - results = { - "total_time": total_time, - "num_images": len(test_images), - "fps": len(test_images) / total_time, - "avg_time_per_image": total_time / len(test_images) - } - - logger.info(f"Single processing completed: {results['fps']:.2f} FPS") - return results - - async def benchmark_batch_processing(self, test_images: List[torch.Tensor], batch_size: int = 2) -> Dict[str, Any]: - """Benchmark batch image processing.""" - logger.info("Starting batch processing benchmark...") - - # Reset performance timer - performance_timer.reset() - - start_time = time.time() - - # Process images in batches - for i in range(0, len(test_images), batch_size): - batch_images = test_images[i:i+batch_size] - logger.info(f"Processing batch {i//batch_size + 1}/{(len(test_images) + batch_size - 1)//batch_size}") - - # Put batch images in queue - for image in batch_images: - self.client.put_video_input(image) - - # Set batch workflow and process - await self.client.set_prompts([self.batch_workflow]) - - # Wait for batch output - for _ in range(len(batch_images)): - output = await self.client.get_video_output() - logger.info(f"Batch image processed, output shape: {output.shape}") - - end_time = time.time() - total_time = end_time - start_time - - results = { - "total_time": total_time, - "num_images": len(test_images), - "batch_size": batch_size, - "fps": len(test_images) / total_time, - "avg_time_per_image": total_time / len(test_images), - "avg_time_per_batch": total_time / ((len(test_images) + batch_size - 1) // batch_size) - } - - logger.info(f"Batch processing completed: {results['fps']:.2f} FPS") - return results - - def calculate_performance_gains(self, single_results: Dict[str, Any], batch_results: Dict[str, Any]) -> Dict[str, Any]: - """Calculate performance gains from batch processing.""" - gains = { - "fps_improvement": (batch_results["fps"] - single_results["fps"]) / single_results["fps"] * 100, - "time_reduction": (single_results["total_time"] - batch_results["total_time"]) / single_results["total_time"] * 100, - "efficiency_ratio": batch_results["fps"] / single_results["fps"] - } - return gains - - async def run_benchmark(self, num_test_images: int = 10, batch_size: int = 2): - """Run the complete benchmark.""" - logger.info(f"Starting benchmark with {num_test_images} test images, batch size {batch_size}") - - # Generate test images - test_images = self.generate_test_images(num_test_images) - logger.info(f"Generated {len(test_images)} test images") - - try: - # Benchmark single processing - single_results = await self.benchmark_single_processing(test_images) - self.results["single_processing"] = single_results - - # Wait a bit between tests - await asyncio.sleep(2) - - # Benchmark batch processing - batch_results = await self.benchmark_batch_processing(test_images, batch_size) - self.results["batch_processing"] = batch_results - - # Calculate performance gains - performance_gains = self.calculate_performance_gains(single_results, batch_results) - self.results["performance_gains"] = performance_gains - - # Print results - self.print_results() - - return self.results - - except Exception as e: - logger.error(f"Benchmark failed: {e}") - raise - finally: - # Cleanup - await self.client.cleanup() - - def print_results(self): - """Print benchmark results in a formatted way.""" - print("\n" + "="*60) - print("COMFYSTREAM BATCH PROCESSING BENCHMARK RESULTS") - print("="*60) - - single = self.results["single_processing"] - batch = self.results["batch_processing"] - gains = self.results["performance_gains"] - - print(f"\nTest Configuration:") - print(f" Total Images: {single['num_images']}") - print(f" Batch Size: {batch['batch_size']}") - - print(f"\nSingle Processing:") - print(f" Total Time: {single['total_time']:.2f} seconds") - print(f" FPS: {single['fps']:.2f}") - print(f" Avg Time per Image: {single['avg_time_per_image']:.4f} seconds") - - print(f"\nBatch Processing:") - print(f" Total Time: {batch['total_time']:.2f} seconds") - print(f" FPS: {batch['fps']:.2f}") - print(f" Avg Time per Image: {batch['avg_time_per_image']:.4f} seconds") - print(f" Avg Time per Batch: {batch['avg_time_per_batch']:.4f} seconds") - - print(f"\nPerformance Gains:") - print(f" FPS Improvement: {gains['fps_improvement']:.1f}%") - print(f" Time Reduction: {gains['time_reduction']:.1f}%") - print(f" Efficiency Ratio: {gains['efficiency_ratio']:.2f}x") - - if gains['fps_improvement'] > 0: - print(f"\n✅ Batch processing is {gains['efficiency_ratio']:.2f}x faster!") - else: - print(f"\n❌ Single processing is faster (possible overhead)") - - print("="*60) - - def save_results(self, filename: str = "batch_processing_benchmark_results.json"): - """Save benchmark results to a JSON file.""" - with open(filename, 'w') as f: - json.dump(self.results, f, indent=2) - logger.info(f"Results saved to {filename}") - - -async def main(): - """Main benchmark execution.""" - benchmark = BatchProcessingBenchmark() - - try: - await benchmark.load_workflows() - results = await benchmark.run_benchmark(num_test_images=10, batch_size=2) - benchmark.save_results() - - except Exception as e: - logger.error(f"Benchmark execution failed: {e}") - return 1 - - return 0 - - -if __name__ == "__main__": - exit_code = asyncio.run(main()) - exit(exit_code) diff --git a/example_batch_processing.py b/example_batch_processing.py deleted file mode 100644 index 80981e1d5..000000000 --- a/example_batch_processing.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python3 -""" -Example script demonstrating batch processing with ComfyStream. -This script shows how to process 2 images at a time using the batch workflow. -""" - -import asyncio -import torch -import json -import logging -from typing import List - -from comfystream.client import ComfyStreamClient - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -async def batch_processing_example(): - """Example of batch processing with ComfyStream.""" - - # Initialize ComfyStream client - client = ComfyStreamClient() - - try: - # Load the batch processing workflow - with open("./workflows/comfystream/sd15-tensorrt-batch2-performance-api.json", "r") as f: - workflow = json.load(f) - - logger.info("Loaded batch processing workflow") - - # Generate test images (2 images for batch processing) - test_images = [ - torch.randn(1, 512, 512, 3, dtype=torch.float32), # First image - torch.randn(1, 512, 512, 3, dtype=torch.float32), # Second image - ] - - logger.info(f"Generated {len(test_images)} test images") - - # Put both images in the input queue - for i, image in enumerate(test_images): - client.put_video_input(image) - logger.info(f"Added image {i+1} to input queue") - - # Set the batch processing workflow - await client.set_prompts([workflow]) - logger.info("Started batch processing workflow") - - # Process the batch and collect outputs - outputs = [] - for i in range(len(test_images)): - output = await client.get_video_output() - outputs.append(output) - logger.info(f"Received output {i+1}, shape: {output.shape}") - - logger.info(f"Batch processing completed! Processed {len(outputs)} images") - - # Print performance summary if available - # Note: The performance timer results would be available in the workflow output - # This is a simplified example focusing on the batch processing flow - - return outputs - - except Exception as e: - logger.error(f"Batch processing failed: {e}") - raise - finally: - # Cleanup - await client.cleanup() - - -async def single_vs_batch_comparison(): - """Compare single vs batch processing performance.""" - - client = ComfyStreamClient() - - try: - # Load workflows - with open("./workflows/comfystream/sd15-tensorrt-api.json", "r") as f: - single_workflow = json.load(f) - - with open("./workflows/comfystream/sd15-tensorrt-batch2-performance-api.json", "r") as f: - batch_workflow = json.load(f) - - # Generate test images - test_images = [ - torch.randn(1, 512, 512, 3, dtype=torch.float32), - torch.randn(1, 512, 512, 3, dtype=torch.float32), - ] - - logger.info("=== SINGLE PROCESSING TEST ===") - single_start = asyncio.get_event_loop().time() - - # Process images one by one - for i, image in enumerate(test_images): - client.put_video_input(image) - await client.set_prompts([single_workflow]) - output = await client.get_video_output() - logger.info(f"Single processed image {i+1}, shape: {output.shape}") - - single_end = asyncio.get_event_loop().time() - single_time = single_end - single_start - - logger.info("=== BATCH PROCESSING TEST ===") - batch_start = asyncio.get_event_loop().time() - - # Process images in batch - for image in test_images: - client.put_video_input(image) - - await client.set_prompts([batch_workflow]) - - for i in range(len(test_images)): - output = await client.get_video_output() - logger.info(f"Batch processed image {i+1}, shape: {output.shape}") - - batch_end = asyncio.get_event_loop().time() - batch_time = batch_end - batch_start - - # Calculate performance metrics - single_fps = len(test_images) / single_time - batch_fps = len(test_images) / batch_time - improvement = (batch_fps - single_fps) / single_fps * 100 - - logger.info("=== PERFORMANCE COMPARISON ===") - logger.info(f"Single processing time: {single_time:.2f} seconds") - logger.info(f"Batch processing time: {batch_time:.2f} seconds") - logger.info(f"Single processing FPS: {single_fps:.2f}") - logger.info(f"Batch processing FPS: {batch_fps:.2f}") - logger.info(f"Performance improvement: {improvement:.1f}%") - - if improvement > 0: - logger.info(f"✅ Batch processing is {batch_fps/single_fps:.2f}x faster!") - else: - logger.info("❌ Single processing is faster (possible overhead)") - - except Exception as e: - logger.error(f"Comparison failed: {e}") - raise - finally: - await client.cleanup() - - -async def main(): - """Main example execution.""" - logger.info("ComfyStream Batch Processing Example") - logger.info("=" * 50) - - try: - # Run batch processing example - logger.info("Running batch processing example...") - outputs = await batch_processing_example() - logger.info(f"Successfully processed {len(outputs)} images in batch") - - # Run performance comparison - logger.info("\nRunning performance comparison...") - await single_vs_batch_comparison() - - logger.info("\nExample completed successfully!") - - except Exception as e: - logger.error(f"Example failed: {e}") - return 1 - - return 0 - - -if __name__ == "__main__": - exit_code = asyncio.run(main()) - exit(exit_code) diff --git a/nodes/tensor_utils/load_batch_tensor.py b/nodes/tensor_utils/load_batch_tensor.py deleted file mode 100644 index 7ced0243d..000000000 --- a/nodes/tensor_utils/load_batch_tensor.py +++ /dev/null @@ -1,89 +0,0 @@ -import torch -import numpy as np -from typing import List, Union - -from comfystream import tensor_cache - - -class LoadBatchTensor: - CATEGORY = "tensor_utils" - RETURN_TYPES = ("IMAGE",) - FUNCTION = "execute" - - @classmethod - def INPUT_TYPES(s): - return { - "required": { - "batch_size": ("INT", {"default": 2, "min": 1, "max": 8, "step": 1}), - } - } - - @classmethod - def IS_CHANGED(s): - return float("nan") - - def execute(self, batch_size: int): - """ - Load a batch of images from the tensor cache. - Collects up to batch_size images from the queue. - """ - batch_images = [] - - # Collect images up to batch_size - for i in range(batch_size): - if not tensor_cache.image_inputs.empty(): - frame = tensor_cache.image_inputs.get(block=False) - frame.side_data.skipped = False - batch_images.append(frame.side_data.input) - else: - # If we don't have enough images, pad with the last available image - if batch_images: - batch_images.append(batch_images[-1]) - else: - # If no images available, create a dummy tensor - dummy_tensor = torch.zeros(1, 512, 512, 3, dtype=torch.float32) - batch_images.append(dummy_tensor) - - # Stack images into a batch - if len(batch_images) > 1: - batch_tensor = torch.cat(batch_images, dim=0) - else: - batch_tensor = batch_images[0] - - return (batch_tensor,) - - -class SaveBatchTensor: - CATEGORY = "tensor_utils" - RETURN_TYPES = () - FUNCTION = "execute" - OUTPUT_NODE = True - - @classmethod - def INPUT_TYPES(s): - return { - "required": { - "images": ("IMAGE",), - } - } - - @classmethod - def IS_CHANGED(s): - return float("nan") - - def execute(self, images: torch.Tensor): - """ - Save a batch of images to the tensor cache. - Splits the batch and puts each image individually. - """ - # Split batch into individual images - if images.dim() == 4 and images.shape[0] > 1: - # Batch of images - for i in range(images.shape[0]): - single_image = images[i:i+1] # Keep batch dimension - tensor_cache.image_outputs.put_nowait(single_image) - else: - # Single image - tensor_cache.image_outputs.put_nowait(images) - - return images diff --git a/nodes/tensor_utils/load_tensor.py b/nodes/tensor_utils/load_tensor.py index c39fe8a1d..d6a580a68 100644 --- a/nodes/tensor_utils/load_tensor.py +++ b/nodes/tensor_utils/load_tensor.py @@ -1,3 +1,4 @@ +import torch from comfystream import tensor_cache @@ -8,13 +9,49 @@ class LoadTensor: @classmethod def INPUT_TYPES(s): - return {} + return { + "required": { + "batch_size": ("INT", {"default": 1, "min": 1, "max": 8, "step": 1}), + } + } @classmethod def IS_CHANGED(): return float("nan") - def execute(self): - frame = tensor_cache.image_inputs.get(block=True) - frame.side_data.skipped = False - return (frame.side_data.input,) + def execute(self, batch_size: int = 1): + """ + Load tensor(s) from the tensor cache. + If batch_size > 1, loads multiple tensors and stacks them into a batch. + """ + if batch_size == 1: + # Single tensor loading (original behavior) + frame = tensor_cache.image_inputs.get(block=True) + frame.side_data.skipped = False + return (frame.side_data.input,) + else: + # Batch tensor loading + batch_images = [] + + # Collect images up to batch_size + for i in range(batch_size): + if not tensor_cache.image_inputs.empty(): + frame = tensor_cache.image_inputs.get(block=False) + frame.side_data.skipped = False + batch_images.append(frame.side_data.input) + else: + # If we don't have enough images, pad with the last available image + if batch_images: + batch_images.append(batch_images[-1]) + else: + # If no images available, create a dummy tensor + dummy_tensor = torch.zeros(1, 512, 512, 3, dtype=torch.float32) + batch_images.append(dummy_tensor) + + # Stack images into a batch + if len(batch_images) > 1: + batch_tensor = torch.cat(batch_images, dim=0) + else: + batch_tensor = batch_images[0] + + return (batch_tensor,) diff --git a/nodes/tensor_utils/performance_nodes.py b/nodes/tensor_utils/performance_nodes.py new file mode 100644 index 000000000..74842f903 --- /dev/null +++ b/nodes/tensor_utils/performance_nodes.py @@ -0,0 +1,70 @@ +""" +Performance measurement nodes for ComfyStream batch processing. +These nodes integrate with the existing tensor_utils structure. +""" + +from comfystream.utils import performance_timer + + +class PerformanceTimerNode: + CATEGORY = "tensor_utils" + RETURN_TYPES = ("STRING",) + RETURN_NAMES = ("performance_summary",) + FUNCTION = "execute" + + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "operation": ("STRING", {"default": "workflow_execution"}), + "batch_size": ("INT", {"default": 1, "min": 1, "max": 8, "step": 1}), + "num_images": ("INT", {"default": 1, "min": 1, "max": 100, "step": 1}), + } + } + + @classmethod + def IS_CHANGED(s): + return float("nan") + + def execute(self, operation: str, batch_size: int, num_images: int): + """Record performance metrics and return summary.""" + performance_timer.record_batch_processing(batch_size, num_images) + performance_timer.end_timing(operation) + + summary = performance_timer.get_performance_summary() + + # Format summary as readable string + summary_str = f"Performance Summary:\n" + summary_str += f"Total Images Processed: {summary['total_images_processed']}\n" + summary_str += f"Total FPS: {summary['total_fps']:.2f}\n" + summary_str += f"Average Batch Size: {summary['average_batch_size']:.2f}\n" + + for key, value in summary.items(): + if key not in ["total_images_processed", "total_fps", "average_batch_size"]: + summary_str += f"{key}: {value:.4f}\n" + + return (summary_str,) + + +class StartPerformanceTimerNode: + CATEGORY = "tensor_utils" + RETURN_TYPES = ("STRING",) + RETURN_NAMES = ("timer_started",) + FUNCTION = "execute" + + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "operation": ("STRING", {"default": "workflow_execution"}), + } + } + + @classmethod + def IS_CHANGED(s): + return float("nan") + + def execute(self, operation: str): + """Start timing an operation.""" + performance_timer.start_timing(operation) + return (f"Started timing: {operation}",) diff --git a/nodes/tensor_utils/performance_timer.py b/nodes/tensor_utils/performance_timer.py deleted file mode 100644 index fc0d64799..000000000 --- a/nodes/tensor_utils/performance_timer.py +++ /dev/null @@ -1,150 +0,0 @@ -import time -import torch -from typing import Dict, List, Optional -from contextlib import contextmanager - - -class PerformanceTimer: - """Utility class for measuring performance metrics in ComfyStream workflows.""" - - def __init__(self): - self.timings: Dict[str, List[float]] = {} - self.current_timings: Dict[str, float] = {} - self.batch_sizes: List[int] = [] - self.total_images_processed = 0 - - def start_timing(self, operation: str): - """Start timing an operation.""" - self.current_timings[operation] = time.time() - - def end_timing(self, operation: str): - """End timing an operation and record the duration.""" - if operation in self.current_timings: - duration = time.time() - self.current_timings[operation] - if operation not in self.timings: - self.timings[operation] = [] - self.timings[operation].append(duration) - del self.current_timings[operation] - return duration - return 0.0 - - def record_batch_processing(self, batch_size: int, num_images: int): - """Record a batch processing event.""" - self.batch_sizes.append(batch_size) - self.total_images_processed += num_images - - def get_fps(self, operation: str = "total") -> float: - """Calculate FPS for a specific operation.""" - if operation not in self.timings or not self.timings[operation]: - return 0.0 - - total_time = sum(self.timings[operation]) - if total_time == 0: - return 0.0 - - return self.total_images_processed / total_time - - def get_average_time(self, operation: str) -> float: - """Get average time for an operation.""" - if operation not in self.timings or not self.timings[operation]: - return 0.0 - - return sum(self.timings[operation]) / len(self.timings[operation]) - - def get_performance_summary(self) -> Dict[str, float]: - """Get a comprehensive performance summary.""" - summary = { - "total_images_processed": self.total_images_processed, - "total_fps": self.get_fps("total"), - "average_batch_size": sum(self.batch_sizes) / len(self.batch_sizes) if self.batch_sizes else 0, - } - - for operation in self.timings: - summary[f"{operation}_fps"] = self.get_fps(operation) - summary[f"{operation}_avg_time"] = self.get_average_time(operation) - - return summary - - def reset(self): - """Reset all performance data.""" - self.timings.clear() - self.current_timings.clear() - self.batch_sizes.clear() - self.total_images_processed = 0 - - @contextmanager - def time_operation(self, operation: str): - """Context manager for timing operations.""" - self.start_timing(operation) - try: - yield - finally: - self.end_timing(operation) - - -# Global performance timer instance -performance_timer = PerformanceTimer() - - -class PerformanceTimerNode: - CATEGORY = "tensor_utils" - RETURN_TYPES = ("STRING",) - RETURN_NAMES = ("performance_summary",) - FUNCTION = "execute" - - @classmethod - def INPUT_TYPES(s): - return { - "required": { - "operation": ("STRING", {"default": "workflow_execution"}), - "batch_size": ("INT", {"default": 1, "min": 1, "max": 8, "step": 1}), - "num_images": ("INT", {"default": 1, "min": 1, "max": 100, "step": 1}), - } - } - - @classmethod - def IS_CHANGED(s): - return float("nan") - - def execute(self, operation: str, batch_size: int, num_images: int): - """Record performance metrics and return summary.""" - performance_timer.record_batch_processing(batch_size, num_images) - performance_timer.end_timing(operation) - - summary = performance_timer.get_performance_summary() - - # Format summary as readable string - summary_str = f"Performance Summary:\n" - summary_str += f"Total Images Processed: {summary['total_images_processed']}\n" - summary_str += f"Total FPS: {summary['total_fps']:.2f}\n" - summary_str += f"Average Batch Size: {summary['average_batch_size']:.2f}\n" - - for key, value in summary.items(): - if key not in ["total_images_processed", "total_fps", "average_batch_size"]: - summary_str += f"{key}: {value:.4f}\n" - - return (summary_str,) - - -class StartPerformanceTimerNode: - CATEGORY = "tensor_utils" - RETURN_TYPES = ("STRING",) - RETURN_NAMES = ("timer_started",) - FUNCTION = "execute" - - @classmethod - def INPUT_TYPES(s): - return { - "required": { - "operation": ("STRING", {"default": "workflow_execution"}), - } - } - - @classmethod - def IS_CHANGED(s): - return float("nan") - - def execute(self, operation: str): - """Start timing an operation.""" - performance_timer.start_timing(operation) - return (f"Started timing: {operation}",) diff --git a/nodes/tensor_utils/save_tensor.py b/nodes/tensor_utils/save_tensor.py index 3a021aa5c..39a49b1f4 100644 --- a/nodes/tensor_utils/save_tensor.py +++ b/nodes/tensor_utils/save_tensor.py @@ -14,6 +14,9 @@ def INPUT_TYPES(s): return { "required": { "images": ("IMAGE",), + }, + "optional": { + "split_batch": ("BOOLEAN", {"default": False}), } } @@ -21,6 +24,18 @@ def INPUT_TYPES(s): def IS_CHANGED(s): return float("nan") - def execute(self, images: torch.Tensor): - tensor_cache.image_outputs.put_nowait(images) + def execute(self, images: torch.Tensor, split_batch: bool = False): + """ + Save tensor(s) to the tensor cache. + If split_batch is True and images is a batch, splits it into individual images. + """ + if split_batch and images.dim() == 4 and images.shape[0] > 1: + # Split batch into individual images + for i in range(images.shape[0]): + single_image = images[i:i+1] # Keep batch dimension + tensor_cache.image_outputs.put_nowait(single_image) + else: + # Save as single tensor (original behavior) + tensor_cache.image_outputs.put_nowait(images) + return images diff --git a/src/comfystream/utils.py b/src/comfystream/utils.py index e26b963d0..246214175 100644 --- a/src/comfystream/utils.py +++ b/src/comfystream/utils.py @@ -1,6 +1,8 @@ import copy import importlib -from typing import Dict, Any +import time +from typing import Dict, Any, List, Optional +from contextlib import contextmanager from comfy.api.components.schema.prompt import Prompt, PromptDictInput from .modalities import ( get_node_counts_by_type, @@ -73,3 +75,85 @@ def convert_prompt(prompt: PromptDictInput) -> Prompt: # Validate the processed prompt prompt = Prompt.validate(prompt) return prompt + + +class PerformanceTimer: + """Utility class for measuring performance metrics in ComfyStream workflows.""" + + def __init__(self): + self.timings: Dict[str, List[float]] = {} + self.current_timings: Dict[str, float] = {} + self.batch_sizes: List[int] = [] + self.total_images_processed = 0 + + def start_timing(self, operation: str): + """Start timing an operation.""" + self.current_timings[operation] = time.time() + + def end_timing(self, operation: str): + """End timing an operation and record the duration.""" + if operation in self.current_timings: + duration = time.time() - self.current_timings[operation] + if operation not in self.timings: + self.timings[operation] = [] + self.timings[operation].append(duration) + del self.current_timings[operation] + return duration + return 0.0 + + def record_batch_processing(self, batch_size: int, num_images: int): + """Record a batch processing event.""" + self.batch_sizes.append(batch_size) + self.total_images_processed += num_images + + def get_fps(self, operation: str = "total") -> float: + """Calculate FPS for a specific operation.""" + if operation not in self.timings or not self.timings[operation]: + return 0.0 + + total_time = sum(self.timings[operation]) + if total_time == 0: + return 0.0 + + return self.total_images_processed / total_time + + def get_average_time(self, operation: str) -> float: + """Get average time for an operation.""" + if operation not in self.timings or not self.timings[operation]: + return 0.0 + + return sum(self.timings[operation]) / len(self.timings[operation]) + + def get_performance_summary(self) -> Dict[str, float]: + """Get a comprehensive performance summary.""" + summary = { + "total_images_processed": self.total_images_processed, + "total_fps": self.get_fps("total"), + "average_batch_size": sum(self.batch_sizes) / len(self.batch_sizes) if self.batch_sizes else 0, + } + + for operation in self.timings: + summary[f"{operation}_fps"] = self.get_fps(operation) + summary[f"{operation}_avg_time"] = self.get_average_time(operation) + + return summary + + def reset(self): + """Reset all performance data.""" + self.timings.clear() + self.current_timings.clear() + self.batch_sizes.clear() + self.total_images_processed = 0 + + @contextmanager + def time_operation(self, operation: str): + """Context manager for timing operations.""" + self.start_timing(operation) + try: + yield + finally: + self.end_timing(operation) + + +# Global performance timer instance +performance_timer = PerformanceTimer() diff --git a/test_batch_processing.py b/test_batch_processing.py deleted file mode 100644 index ce86c47f4..000000000 --- a/test_batch_processing.py +++ /dev/null @@ -1,250 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to verify batch processing functionality in ComfyStream. -This script tests the basic batch processing workflow without requiring a full ComfyUI setup. -""" - -import asyncio -import torch -import json -import logging -from typing import List - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -def test_tensor_cache_modifications(): - """Test that tensor cache modifications work correctly.""" - logger.info("Testing tensor cache modifications...") - - try: - from comfystream.tensor_cache import image_inputs - - # Test that queue size is now 2 - assert image_inputs.maxsize == 2, f"Expected maxsize 2, got {image_inputs.maxsize}" - - # Test putting multiple items - test_tensor1 = torch.randn(1, 512, 512, 3) - test_tensor2 = torch.randn(1, 512, 512, 3) - - # Should be able to put 2 items without blocking - image_inputs.put(test_tensor1, block=False) - image_inputs.put(test_tensor2, block=False) - - # Should be full now - assert image_inputs.full(), "Queue should be full after putting 2 items" - - # Clean up - image_inputs.get() - image_inputs.get() - - logger.info("✅ Tensor cache modifications working correctly") - return True - - except Exception as e: - logger.error(f"❌ Tensor cache test failed: {e}") - return False - - -def test_batch_tensor_nodes(): - """Test the batch tensor loading and saving nodes.""" - logger.info("Testing batch tensor nodes...") - - try: - from nodes.tensor_utils.load_batch_tensor import LoadBatchTensor, SaveBatchTensor - - # Test LoadBatchTensor - load_node = LoadBatchTensor() - - # Mock tensor cache with test data - from comfystream import tensor_cache - test_tensor1 = torch.randn(1, 512, 512, 3) - test_tensor2 = torch.randn(1, 512, 512, 3) - - # Add test data to cache - tensor_cache.image_inputs.put(test_tensor1) - tensor_cache.image_inputs.put(test_tensor2) - - # Test loading batch - batch_result = load_node.execute(batch_size=2) - assert len(batch_result) == 1, "Should return tuple with one element" - batch_tensor = batch_result[0] - assert batch_tensor.shape[0] == 2, f"Expected batch size 2, got {batch_tensor.shape[0]}" - - # Test SaveBatchTensor - save_node = SaveBatchTensor() - - # Mock the output queue - original_put = tensor_cache.image_outputs.put_nowait - saved_tensors = [] - - def mock_put(tensor): - saved_tensors.append(tensor) - - tensor_cache.image_outputs.put_nowait = mock_put - - # Test saving batch - save_node.execute(batch_tensor) - assert len(saved_tensors) == 2, f"Expected 2 saved tensors, got {len(saved_tensors)}" - - # Restore original function - tensor_cache.image_outputs.put_nowait = original_put - - # Clean up - while not tensor_cache.image_inputs.empty(): - tensor_cache.image_inputs.get() - - logger.info("✅ Batch tensor nodes working correctly") - return True - - except Exception as e: - logger.error(f"❌ Batch tensor nodes test failed: {e}") - return False - - -def test_performance_timer(): - """Test the performance timer functionality.""" - logger.info("Testing performance timer...") - - try: - from nodes.tensor_utils.performance_timer import PerformanceTimer, PerformanceTimerNode, StartPerformanceTimerNode - - # Test PerformanceTimer class - timer = PerformanceTimer() - - # Test timing operations - timer.start_timing("test_operation") - import time - time.sleep(0.01) # Small delay - timer.end_timing("test_operation") - - timer.record_batch_processing(batch_size=2, num_images=2) - - # Test getting performance summary - summary = timer.get_performance_summary() - assert "total_images_processed" in summary - assert "total_fps" in summary - assert summary["total_images_processed"] == 2 - - # Test nodes - start_node = StartPerformanceTimerNode() - timer_node = PerformanceTimerNode() - - # Test start timer node - result = start_node.execute("test_workflow") - assert "Started timing" in result[0] - - # Test timer node - result = timer_node.execute("test_workflow", 2, 2) - assert "Performance Summary" in result[0] - - logger.info("✅ Performance timer working correctly") - return True - - except Exception as e: - logger.error(f"❌ Performance timer test failed: {e}") - return False - - -def test_workflow_validation(): - """Test that the workflow JSON files are valid.""" - logger.info("Testing workflow validation...") - - workflow_files = [ - "workflows/comfystream/sd15-tensorrt-batch2-api.json", - "workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json", - "workflows/comfystream/sd15-tensorrt-batch2-performance-api.json" - ] - - try: - for workflow_file in workflow_files: - with open(workflow_file, 'r') as f: - workflow = json.load(f) - - # Basic validation - assert isinstance(workflow, dict), f"Workflow {workflow_file} should be a dictionary" - assert len(workflow) > 0, f"Workflow {workflow_file} should not be empty" - - # Check for required nodes - node_ids = list(workflow.keys()) - assert len(node_ids) > 0, f"Workflow {workflow_file} should have nodes" - - logger.info(f"✅ Workflow {workflow_file} is valid") - - logger.info("✅ All workflows are valid") - return True - - except Exception as e: - logger.error(f"❌ Workflow validation failed: {e}") - return False - - -def test_benchmark_script(): - """Test that the benchmark script can be imported and basic functions work.""" - logger.info("Testing benchmark script...") - - try: - from benchmark_batch_processing import BatchProcessingBenchmark - - # Test creating benchmark instance - benchmark = BatchProcessingBenchmark() - assert benchmark.client is not None - assert benchmark.results is not None - - # Test generating test images - test_images = benchmark.generate_test_images(2) - assert len(test_images) == 2 - assert all(isinstance(img, torch.Tensor) for img in test_images) - assert all(img.shape == (1, 512, 512, 3) for img in test_images) - - logger.info("✅ Benchmark script working correctly") - return True - - except Exception as e: - logger.error(f"❌ Benchmark script test failed: {e}") - return False - - -def run_all_tests(): - """Run all tests and report results.""" - logger.info("Starting ComfyStream Batch Processing Tests") - logger.info("=" * 50) - - tests = [ - ("Tensor Cache Modifications", test_tensor_cache_modifications), - ("Batch Tensor Nodes", test_batch_tensor_nodes), - ("Performance Timer", test_performance_timer), - ("Workflow Validation", test_workflow_validation), - ("Benchmark Script", test_benchmark_script), - ] - - passed = 0 - total = len(tests) - - for test_name, test_func in tests: - logger.info(f"\nRunning {test_name} test...") - try: - if test_func(): - passed += 1 - logger.info(f"✅ {test_name} test PASSED") - else: - logger.error(f"❌ {test_name} test FAILED") - except Exception as e: - logger.error(f"❌ {test_name} test FAILED with exception: {e}") - - logger.info("\n" + "=" * 50) - logger.info(f"Test Results: {passed}/{total} tests passed") - - if passed == total: - logger.info("🎉 All tests passed! Batch processing implementation is working correctly.") - return True - else: - logger.error(f"❌ {total - passed} tests failed. Please check the implementation.") - return False - - -if __name__ == "__main__": - success = run_all_tests() - exit(0 if success else 1) diff --git a/workflows/comfystream/sd15-tensorrt-batch2-performance-api.json b/workflows/comfystream/sd15-tensorrt-batch2-performance-api.json index 3b8098e93..d2b9c5faf 100644 --- a/workflows/comfystream/sd15-tensorrt-batch2-performance-api.json +++ b/workflows/comfystream/sd15-tensorrt-batch2-performance-api.json @@ -3,9 +3,9 @@ "inputs": { "batch_size": 2 }, - "class_type": "LoadBatchTensor", + "class_type": "LoadTensor", "_meta": { - "title": "Load Batch Tensor (Size 2)" + "title": "Load Tensor (Batch Size 2)" } }, "2": { @@ -217,11 +217,12 @@ "images": [ "14", 0 - ] + ], + "split_batch": true }, - "class_type": "SaveBatchTensor", + "class_type": "SaveTensor", "_meta": { - "title": "Save Batch Tensor Output" + "title": "Save Batch Tensor Output (Split)" } }, "18": { diff --git a/workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json b/workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json index 7d8311d14..955bcb18b 100644 --- a/workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json +++ b/workflows/comfystream/sd15-tensorrt-batch2-tensor-api.json @@ -1,9 +1,11 @@ { "1": { - "inputs": {}, + "inputs": { + "batch_size": 2 + }, "class_type": "LoadTensor", "_meta": { - "title": "Load Tensor (Batch Input)" + "title": "Load Tensor (Batch Size 2)" } }, "2": { @@ -206,11 +208,12 @@ "images": [ "14", 0 - ] + ], + "split_batch": true }, "class_type": "SaveTensor", "_meta": { - "title": "Save Batch Tensor Output" + "title": "Save Batch Tensor Output (Split)" } }, "23": { From 58655719fb7927232f20939be1567ef0d7ab6dc8 Mon Sep 17 00:00:00 2001 From: lukiod Date: Sat, 27 Sep 2025 02:20:33 +0000 Subject: [PATCH 3/7] Remove accidentally committed -files (less help file) --- -files | 258 --------------------------------------------------------- 1 file changed, 258 deletions(-) delete mode 100644 -files diff --git a/-files b/-files deleted file mode 100644 index 333a0b576..000000000 --- a/-files +++ /dev/null @@ -1,258 +0,0 @@ - - SSUUMMMMAARRYY OOFF LLEESSSS CCOOMMMMAANNDDSS - - Commands marked with * may be preceded by a number, _N. - Notes in parentheses indicate the behavior if _N is given. - A key preceded by a caret indicates the Ctrl key; thus ^K is ctrl-K. - - h H Display this help. - q :q Q :Q ZZ Exit. - --------------------------------------------------------------------------- - - MMOOVVIINNGG - - e ^E j ^N CR * Forward one line (or _N lines). - y ^Y k ^K ^P * Backward one line (or _N lines). - f ^F ^V SPACE * Forward one window (or _N lines). - b ^B ESC-v * Backward one window (or _N lines). - z * Forward one window (and set window to _N). - w * Backward one window (and set window to _N). - ESC-SPACE * Forward one window, but don't stop at end-of-file. - d ^D * Forward one half-window (and set half-window to _N). - u ^U * Backward one half-window (and set half-window to _N). - ESC-) RightArrow * Right one half screen width (or _N positions). - ESC-( LeftArrow * Left one half screen width (or _N positions). - ESC-} ^RightArrow Right to last column displayed. - ESC-{ ^LeftArrow Left to first column. - F Forward forever; like "tail -f". - ESC-F Like F but stop when search pattern is found. - r ^R ^L Repaint screen. - R Repaint screen, discarding buffered input. - --------------------------------------------------- - Default "window" is the screen height. - Default "half-window" is half of the screen height. - --------------------------------------------------------------------------- - - SSEEAARRCCHHIINNGG - - /_p_a_t_t_e_r_n * Search forward for (_N-th) matching line. - ?_p_a_t_t_e_r_n * Search backward for (_N-th) matching line. - n * Repeat previous search (for _N-th occurrence). - N * Repeat previous search in reverse direction. - ESC-n * Repeat previous search, spanning files. - ESC-N * Repeat previous search, reverse dir. & spanning files. - ESC-u Undo (toggle) search highlighting. - ESC-U Clear search highlighting. - &_p_a_t_t_e_r_n * Display only matching lines. - --------------------------------------------------- - A search pattern may begin with one or more of: - ^N or ! Search for NON-matching lines. - ^E or * Search multiple files (pass thru END OF FILE). - ^F or @ Start search at FIRST file (for /) or last file (for ?). - ^K Highlight matches, but don't move (KEEP position). - ^R Don't use REGULAR EXPRESSIONS. - ^W WRAP search if no match found. - --------------------------------------------------------------------------- - - JJUUMMPPIINNGG - - g < ESC-< * Go to first line in file (or line _N). - G > ESC-> * Go to last line in file (or line _N). - p % * Go to beginning of file (or _N percent into file). - t * Go to the (_N-th) next tag. - T * Go to the (_N-th) previous tag. - { ( [ * Find close bracket } ) ]. - } ) ] * Find open bracket { ( [. - ESC-^F _<_c_1_> _<_c_2_> * Find close bracket _<_c_2_>. - ESC-^B _<_c_1_> _<_c_2_> * Find open bracket _<_c_1_>. - --------------------------------------------------- - Each "find close bracket" command goes forward to the close bracket - matching the (_N-th) open bracket in the top line. - Each "find open bracket" command goes backward to the open bracket - matching the (_N-th) close bracket in the bottom line. - - m_<_l_e_t_t_e_r_> Mark the current top line with . - M_<_l_e_t_t_e_r_> Mark the current bottom line with . - '_<_l_e_t_t_e_r_> Go to a previously marked position. - '' Go to the previous position. - ^X^X Same as '. - ESC-M_<_l_e_t_t_e_r_> Clear a mark. - --------------------------------------------------- - A mark is any upper-case or lower-case letter. - Certain marks are predefined: - ^ means beginning of the file - $ means end of the file - --------------------------------------------------------------------------- - - CCHHAANNGGIINNGG FFIILLEESS - - :e [_f_i_l_e] Examine a new file. - ^X^V Same as :e. - :n * Examine the (_N-th) next file from the command line. - :p * Examine the (_N-th) previous file from the command line. - :x * Examine the first (or _N-th) file from the command line. - :d Delete the current file from the command line list. - = ^G :f Print current file name. - --------------------------------------------------------------------------- - - MMIISSCCEELLLLAANNEEOOUUSS CCOOMMMMAANNDDSS - - -_<_f_l_a_g_> Toggle a command line option [see OPTIONS below]. - --_<_n_a_m_e_> Toggle a command line option, by name. - __<_f_l_a_g_> Display the setting of a command line option. - ___<_n_a_m_e_> Display the setting of an option, by name. - +_c_m_d Execute the less cmd each time a new file is examined. - - !_c_o_m_m_a_n_d Execute the shell command with $SHELL. - |XX_c_o_m_m_a_n_d Pipe file between current pos & mark XX to shell command. - s _f_i_l_e Save input to a file. - v Edit the current file with $VISUAL or $EDITOR. - V Print version number of "less". - --------------------------------------------------------------------------- - - OOPPTTIIOONNSS - - Most options may be changed either on the command line, - or from within less by using the - or -- command. - Options may be given in one of two forms: either a single - character preceded by a -, or a name preceded by --. - - -? ........ --help - Display help (from command line). - -a ........ --search-skip-screen - Search skips current screen. - -A ........ --SEARCH-SKIP-SCREEN - Search starts just after target line. - -b [_N] .... --buffers=[_N] - Number of buffers. - -B ........ --auto-buffers - Don't automatically allocate buffers for pipes. - -c ........ --clear-screen - Repaint by clearing rather than scrolling. - -d ........ --dumb - Dumb terminal. - -D xx_c_o_l_o_r . --color=xx_c_o_l_o_r - Set screen colors. - -e -E .... --quit-at-eof --QUIT-AT-EOF - Quit at end of file. - -f ........ --force - Force open non-regular files. - -F ........ --quit-if-one-screen - Quit if entire file fits on first screen. - -g ........ --hilite-search - Highlight only last match for searches. - -G ........ --HILITE-SEARCH - Don't highlight any matches for searches. - -h [_N] .... --max-back-scroll=[_N] - Backward scroll limit. - -i ........ --ignore-case - Ignore case in searches that do not contain uppercase. - -I ........ --IGNORE-CASE - Ignore case in all searches. - -j [_N] .... --jump-target=[_N] - Screen position of target lines. - -J ........ --status-column - Display a status column at left edge of screen. - -k [_f_i_l_e] . --lesskey-file=[_f_i_l_e] - Use a lesskey file. - -K ........ --quit-on-intr - Exit less in response to ctrl-C. - -L ........ --no-lessopen - Ignore the LESSOPEN environment variable. - -m -M .... --long-prompt --LONG-PROMPT - Set prompt style. - -n -N .... --line-numbers --LINE-NUMBERS - Don't use line numbers. - -o [_f_i_l_e] . --log-file=[_f_i_l_e] - Copy to log file (standard input only). - -O [_f_i_l_e] . --LOG-FILE=[_f_i_l_e] - Copy to log file (unconditionally overwrite). - -p [_p_a_t_t_e_r_n] --pattern=[_p_a_t_t_e_r_n] - Start at pattern (from command line). - -P [_p_r_o_m_p_t] --prompt=[_p_r_o_m_p_t] - Define new prompt. - -q -Q .... --quiet --QUIET --silent --SILENT - Quiet the terminal bell. - -r -R .... --raw-control-chars --RAW-CONTROL-CHARS - Output "raw" control characters. - -s ........ --squeeze-blank-lines - Squeeze multiple blank lines. - -S ........ --chop-long-lines - Chop (truncate) long lines rather than wrapping. - -t [_t_a_g] .. --tag=[_t_a_g] - Find a tag. - -T [_t_a_g_s_f_i_l_e] --tag-file=[_t_a_g_s_f_i_l_e] - Use an alternate tags file. - -u -U .... --underline-special --UNDERLINE-SPECIAL - Change handling of backspaces. - -V ........ --version - Display the version number of "less". - -w ........ --hilite-unread - Highlight first new line after forward-screen. - -W ........ --HILITE-UNREAD - Highlight first new line after any forward movement. - -x [_N[,...]] --tabs=[_N[,...]] - Set tab stops. - -X ........ --no-init - Don't use termcap init/deinit strings. - -y [_N] .... --max-forw-scroll=[_N] - Forward scroll limit. - -z [_N] .... --window=[_N] - Set size of window. - -" [_c[_c]] . --quotes=[_c[_c]] - Set shell quote characters. - -~ ........ --tilde - Don't display tildes after end of file. - -# [_N] .... --shift=[_N] - Set horizontal scroll amount (0 = one half screen width). - --file-size - Automatically determine the size of the input file. - --follow-name - The F command changes files if the input file is renamed. - --incsearch - Search file as each pattern character is typed in. - --line-num-width=N - Set the width of the -N line number field to N characters. - --mouse - Enable mouse input. - --no-keypad - Don't send termcap keypad init/deinit strings. - --no-histdups - Remove duplicates from command history. - --rscroll=C - Set the character used to mark truncated lines. - --save-marks - Retain marks across invocations of less. - --status-col-width=N - Set the width of the -J status column to N characters. - --use-backslash - Subsequent options use backslash as escape char. - --use-color - Enables colored text. - --wheel-lines=N - Each click of the mouse wheel moves N lines. - - - --------------------------------------------------------------------------- - - LLIINNEE EEDDIITTIINNGG - - These keys can be used to edit text being entered - on the "command line" at the bottom of the screen. - - RightArrow ..................... ESC-l ... Move cursor right one character. - LeftArrow ...................... ESC-h ... Move cursor left one character. - ctrl-RightArrow ESC-RightArrow ESC-w ... Move cursor right one word. - ctrl-LeftArrow ESC-LeftArrow ESC-b ... Move cursor left one word. - HOME ........................... ESC-0 ... Move cursor to start of line. - END ............................ ESC-$ ... Move cursor to end of line. - BACKSPACE ................................ Delete char to left of cursor. - DELETE ......................... ESC-x ... Delete char under cursor. - ctrl-BACKSPACE ESC-BACKSPACE ........... Delete word to left of cursor. - ctrl-DELETE .... ESC-DELETE .... ESC-X ... Delete word under cursor. - ctrl-U ......... ESC (MS-DOS only) ....... Delete entire line. - UpArrow ........................ ESC-k ... Retrieve previous command line. - DownArrow ...................... ESC-j ... Retrieve next command line. - TAB ...................................... Complete filename & cycle. - SHIFT-TAB ...................... ESC-TAB Complete filename & reverse cycle. - ctrl-L ................................... Complete filename, list all. From 6b9d215d6b9fb537a1e662f1796e9be1d4ba0c84 Mon Sep 17 00:00:00 2001 From: lukiod Date: Sat, 27 Sep 2025 03:33:28 +0000 Subject: [PATCH 4/7] tested and fixed merge conflict --- src/comfystream/tensor_cache.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/comfystream/tensor_cache.py b/src/comfystream/tensor_cache.py index 8902f4f02..5cd54332e 100644 --- a/src/comfystream/tensor_cache.py +++ b/src/comfystream/tensor_cache.py @@ -7,7 +7,6 @@ from typing import Union # TODO: improve eviction policy fifo might not be the best, skip alternate frames instead -# Increased queue size to support batch processing (up to 8 images at a time) image_inputs: Queue[Union[torch.Tensor, np.ndarray]] = Queue(maxsize=1) image_outputs: AsyncQueue[Union[torch.Tensor, np.ndarray]] = AsyncQueue() From 61354e6ca426578796794aa5b0af6ca57da67f8f Mon Sep 17 00:00:00 2001 From: lukiod Date: Sat, 27 Sep 2025 03:37:57 +0000 Subject: [PATCH 5/7] test everything minor bug fix --- nodes/tensor_utils/load_tensor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nodes/tensor_utils/load_tensor.py b/nodes/tensor_utils/load_tensor.py index 6e7b95d01..9d049ff1e 100644 --- a/nodes/tensor_utils/load_tensor.py +++ b/nodes/tensor_utils/load_tensor.py @@ -52,7 +52,7 @@ def execute(self, batch_size: int = 1, timeout_seconds: float = 1.0): for i in range(batch_size): if not tensor_cache.image_inputs.empty(): try: - frame = tensor_cache.image_inputs.get(block=False, timeout=timeout_seconds) + frame = tensor_cache.image_inputs.get(block=True, timeout=timeout_seconds) frame.side_data.skipped = False batch_images.append(frame.side_data.input) except queue.Empty: From bfabb77fa1e366b18b184e39a147cbecaecf000e Mon Sep 17 00:00:00 2001 From: lukiod Date: Sat, 27 Sep 2025 03:49:50 +0000 Subject: [PATCH 6/7] add the changes as requested by elite + registered performance node --- docker/entrypoint.sh | 6 +++++- nodes/tensor_utils/__init__.py | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index ef55c77e2..d286f4818 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -99,7 +99,10 @@ if [ "$1" = "--build-engines" ]; then # Build Static Engine for Dreamshaper - Landscape (704x384) python src/comfystream/scripts/build_trt.py --model /workspace/ComfyUI/models/unet/dreamshaper-8-dmd-1kstep.safetensors --out-engine /workspace/ComfyUI/output/tensorrt/static-dreamshaper8_SD15_\$stat-b-1-h-384-w-704_00001_.engine --width 704 --height 384 - + + # Build Static Engine for Dreamshaper - Square (512x512) - Batch Size 2 + python src/comfystream/scripts/build_trt.py --model /workspace/ComfyUI/models/unet/dreamshaper-8-dmd-1kstep.safetensors --out-engine /workspace/ComfyUI/output/tensorrt/static-dreamshaper8_SD15_\$stat-b-2-h-512-w-512_00001_.engine --width 512 --height 512 --batch-size 2 + # Build Dynamic Engine for Dreamshaper python src/comfystream/scripts/build_trt.py \ --model /workspace/ComfyUI/models/unet/dreamshaper-8-dmd-1kstep.safetensors \ @@ -110,6 +113,7 @@ if [ "$1" = "--build-engines" ]; then --min-height 512 \ --max-width 448 \ --max-height 704 + # Build Engine for Depth Anything V2 if [ ! -f "$DEPTH_ANYTHING_DIR/$DEPTH_ANYTHING_ENGINE" ]; then diff --git a/nodes/tensor_utils/__init__.py b/nodes/tensor_utils/__init__.py index eadb5b843..41c05d55b 100644 --- a/nodes/tensor_utils/__init__.py +++ b/nodes/tensor_utils/__init__.py @@ -3,11 +3,14 @@ from .load_tensor import LoadTensor from .save_tensor import SaveTensor from .save_text_tensor import SaveTextTensor +from .performance_nodes import PerformanceTimerNode, StartPerformanceTimerNode NODE_CLASS_MAPPINGS = { "LoadTensor": LoadTensor, "SaveTensor": SaveTensor, "SaveTextTensor": SaveTextTensor, + "PerformanceTimerNode": PerformanceTimerNode, + "StartPerformanceTimerNode": StartPerformanceTimerNode, } NODE_DISPLAY_NAME_MAPPINGS = {} From 65af26c4eb530e97112e7494d69ad2c274692f9e Mon Sep 17 00:00:00 2001 From: lukiod Date: Sun, 28 Sep 2025 03:37:39 +0000 Subject: [PATCH 7/7] feat: add batch-size aware queue sizing to pipeline Dynamically resize queues based on LoadTensor batch_size to optimize memory. --- src/comfystream/modalities.py | 17 ++++++++++-- src/comfystream/pipeline.py | 51 +++++++++++++++++++++++++++++++++-- src/comfystream/utils.py | 9 +++++-- 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/src/comfystream/modalities.py b/src/comfystream/modalities.py index fccfabad5..1d782ff5e 100644 --- a/src/comfystream/modalities.py +++ b/src/comfystream/modalities.py @@ -11,6 +11,8 @@ class WorkflowModality(TypedDict): video: ModalityIO audio: ModalityIO text: ModalityIO + # Batch processing information + max_batch_size: int # Centralized node type definitions NODE_TYPES = { @@ -88,13 +90,18 @@ def create_empty_workflow_modality() -> WorkflowModality: "video": {"input": False, "output": False}, "audio": {"input": False, "output": False}, "text": {"input": False, "output": False}, + "max_batch_size": 1, } def _merge_workflow_modalities(base: WorkflowModality, other: WorkflowModality) -> WorkflowModality: """Merge two WorkflowModality objects using logical OR for all capabilities.""" for modality in base: - for direction in base[modality]: - base[modality][direction] = base[modality][direction] or other[modality][direction] + if modality == "max_batch_size": + # For batch size, take the maximum + base[modality] = max(base[modality], other[modality]) + elif isinstance(base[modality], dict): + for direction in base[modality]: + base[modality][direction] = base[modality][direction] or other[modality][direction] return base def detect_io_points(prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]]) -> WorkflowModality: @@ -118,6 +125,12 @@ def detect_io_points(prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]]) -> Wo for node in prompts.values(): class_type = node.get("class_type", "") + # Parse batch_size from LoadTensor nodes + if class_type == "LoadTensor": + inputs = node.get("inputs", {}) + batch_size = inputs.get("batch_size", 1) + result["max_batch_size"] = max(result["max_batch_size"], batch_size) + for modality, directions in MODALITY_MAPPINGS.items(): if class_type in directions["input"]: result[modality]["input"] = True diff --git a/src/comfystream/pipeline.py b/src/comfystream/pipeline.py index 3e4febaf1..3391e4c4c 100644 --- a/src/comfystream/pipeline.py +++ b/src/comfystream/pipeline.py @@ -38,8 +38,10 @@ def __init__(self, width: int = 512, height: int = 512, self.width = width self.height = height - self.video_incoming_frames = asyncio.Queue() - self.audio_incoming_frames = asyncio.Queue() + # Initialize queues with default size (will be updated based on workflow analysis) + self._default_queue_size = 10 # Default queue size + self.video_incoming_frames = asyncio.Queue(maxsize=self._default_queue_size) + self.audio_incoming_frames = asyncio.Queue(maxsize=self._default_queue_size) self.processed_audio_buffer = np.array([], dtype=np.int16) @@ -47,6 +49,45 @@ def __init__(self, width: int = 512, height: int = 512, self._cached_modalities: Optional[Set[str]] = None self._cached_io_capabilities: Optional[WorkflowModality] = None + def _recreate_queues(self, new_queue_size: int): + """Recreate queues with new size limits to optimize for batch processing. + + Args: + new_queue_size: Maximum number of frames to store in each queue + """ + # Calculate optimal queue size: batch_size * 2 + some buffer for frame skipping + optimal_size = max(new_queue_size * 2, self._default_queue_size) + + logger.info(f"Recreating queues with size {optimal_size} (batch_size: {new_queue_size})") + + # Create new queues with the calculated size + self.video_incoming_frames = asyncio.Queue(maxsize=optimal_size) + self.audio_incoming_frames = asyncio.Queue(maxsize=optimal_size) + + def _update_queue_sizes_for_batch_processing(self): + """Update queue sizes based on detected batch requirements from workflow analysis.""" + if not hasattr(self.client, 'current_prompts') or not self.client.current_prompts: + logger.debug("No prompts available for batch size analysis") + return + + try: + # Get workflow I/O capabilities (which now includes batch_size) + io_capabilities = self.get_workflow_io_capabilities() + detected_batch_size = io_capabilities.get("max_batch_size", 1) + + # Only recreate queues if batch size has changed significantly + current_queue_size = self.video_incoming_frames.maxsize + optimal_size = max(detected_batch_size * 2, self._default_queue_size) + + if optimal_size != current_queue_size: + logger.info(f"Detected batch_size {detected_batch_size}, updating queue size from {current_queue_size} to {optimal_size}") + self._recreate_queues(detected_batch_size) + else: + logger.debug(f"Queue size already optimal for batch_size {detected_batch_size}") + + except Exception as e: + logger.warning(f"Failed to update queue sizes based on batch analysis: {e}") + async def warm_video(self): """Warm up the video processing pipeline with dummy frames.""" # Only warm if workflow accepts video input @@ -107,6 +148,9 @@ async def set_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]] # Clear cached modalities and I/O capabilities when prompts change self._cached_modalities = None self._cached_io_capabilities = None + + # Update queue sizes based on detected batch requirements + self._update_queue_sizes_for_batch_processing() async def update_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]]): """Update the existing processing prompts. @@ -122,6 +166,9 @@ async def update_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any # Clear cached modalities and I/O capabilities when prompts change self._cached_modalities = None self._cached_io_capabilities = None + + # Update queue sizes based on detected batch requirements + self._update_queue_sizes_for_batch_processing() async def put_video_frame(self, frame: av.VideoFrame): """Queue a video frame for processing. diff --git a/src/comfystream/utils.py b/src/comfystream/utils.py index 53e483936..c2b6cd012 100644 --- a/src/comfystream/utils.py +++ b/src/comfystream/utils.py @@ -13,9 +13,14 @@ get_convertible_node_keys, ) -def create_load_tensor_node(): +def create_load_tensor_node(batch_size: int = 1): + """Create a LoadTensor node with specified batch size. + + Args: + batch_size: Number of frames to process in batch (default: 1) + """ return { - "inputs": {}, + "inputs": {"batch_size": batch_size}, "class_type": "LoadTensor", "_meta": {"title": "LoadTensor"}, }