From 00b6e04106c438052d94f9ad3ad722423d9ce782 Mon Sep 17 00:00:00 2001 From: Lucas Gomide Date: Thu, 10 Apr 2025 17:43:31 -0300 Subject: [PATCH 1/2] (wip)feat: emit properly tools event when using a stream LLM mode --- src/crewai/llm.py | 72 ++++++++++++++++++++++- src/crewai/utilities/events/llm_events.py | 1 + 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/src/crewai/llm.py b/src/crewai/llm.py index 7415446620..dad82a71c4 100644 --- a/src/crewai/llm.py +++ b/src/crewai/llm.py @@ -4,7 +4,9 @@ import sys import threading import warnings +from collections import defaultdict from contextlib import contextmanager +from types import SimpleNamespace from typing import ( Any, Dict, @@ -371,6 +373,15 @@ def _handle_streaming_response( last_chunk = None chunk_count = 0 usage_info = None + tool_calls = None + accumulated_tool_args = defaultdict( + lambda: SimpleNamespace( + function=SimpleNamespace( + name="", + arguments="", + ), + ), + ) # --- 2) Make sure stream is set to True and include usage metrics params["stream"] = True @@ -428,6 +439,19 @@ def _handle_streaming_response( if chunk_content is None and isinstance(delta, dict): # Some models might send empty content chunks chunk_content = "" + + if "tool_calls" in delta: + tool_calls = delta["tool_calls"] + + if tool_calls: + result = self._handle_streaming_tool_calls( + tool_calls=tool_calls, + accumulated_tool_args=accumulated_tool_args, + available_functions=available_functions, + ) + if result is not None: + chunk_content = result + except Exception as e: logging.debug(f"Error extracting content from chunk: {e}") logging.debug(f"Chunk format: {type(chunk)}, content: {chunk}") @@ -501,7 +525,7 @@ def _handle_streaming_response( ) # --- 6) If still empty, raise an error instead of using a default response - if not full_response.strip(): + if not full_response.strip() and len(accumulated_tool_args) == 0: raise Exception( "No content received from streaming response. Received empty chunks or failed to extract content." ) @@ -568,6 +592,52 @@ def _handle_streaming_response( ) raise Exception(f"Failed to get streaming response: {str(e)}") + def _handle_streaming_tool_calls( + self, + tool_calls: List[Any], + accumulated_tool_args: Dict[int, SimpleNamespace], + available_functions: Optional[Dict[str, Any]] = None, + ) -> None | str: + + for tool_call in tool_calls: + if tool_call.function.name: + accumulated_tool_args[tool_call.index].function.name = ( + tool_call.function.name + ) + + if tool_call.function.arguments: + accumulated_tool_args[ + tool_call.index + ].function.arguments += tool_call.function.arguments + + crewai_event_bus.emit( + self, + event=LLMStreamChunkEvent( + tool_call=tool_call.to_dict(), + chunk=tool_call.function.arguments, + ), + ) + + if ( + accumulated_tool_args[tool_call.index].function.name + and accumulated_tool_args[tool_call.index].function.arguments + and available_functions + ): + try: + # Try to parse the accumulated arguments + json.loads( + accumulated_tool_args[tool_call.index].function.arguments + ) + + # Execute the tool call + return self._handle_tool_call( + [accumulated_tool_args[tool_call.index]], + available_functions, + ) + except json.JSONDecodeError: + # If JSON is incomplete, continue accumulating + continue + def _handle_streaming_callbacks( self, callbacks: Optional[List[Any]], diff --git a/src/crewai/utilities/events/llm_events.py b/src/crewai/utilities/events/llm_events.py index 07a17a48b7..0d271f41b2 100644 --- a/src/crewai/utilities/events/llm_events.py +++ b/src/crewai/utilities/events/llm_events.py @@ -46,3 +46,4 @@ class LLMStreamChunkEvent(BaseEvent): type: str = "llm_stream_chunk" chunk: str + tool_call: Optional[dict] = None From 0485b2ed43fed9bbe9cc15f35e14d491900d6877 Mon Sep 17 00:00:00 2001 From: Suhas Deshpande Date: Mon, 14 Apr 2025 23:40:28 -0700 Subject: [PATCH 2/2] fix(llm): add fallback to non-streaming mode when content extraction fails When streaming responses don't contain extractable content, the code now falls back to non-streaming mode instead of raising an exception. This creates a more resilient system that can handle a wider variety of response formats from different LLM providers, preventing crashes in the agent flow. The key changes: - Add fallback to non-streaming call when content extraction fails - Simplify error handling logic - Keep stream_options for usage metrics collection --- src/crewai/llm.py | 142 +++------------------------------------------- 1 file changed, 9 insertions(+), 133 deletions(-) diff --git a/src/crewai/llm.py b/src/crewai/llm.py index dad82a71c4..fcde91cdcb 100644 --- a/src/crewai/llm.py +++ b/src/crewai/llm.py @@ -4,9 +4,7 @@ import sys import threading import warnings -from collections import defaultdict from contextlib import contextmanager -from types import SimpleNamespace from typing import ( Any, Dict, @@ -373,15 +371,6 @@ def _handle_streaming_response( last_chunk = None chunk_count = 0 usage_info = None - tool_calls = None - accumulated_tool_args = defaultdict( - lambda: SimpleNamespace( - function=SimpleNamespace( - name="", - arguments="", - ), - ), - ) # --- 2) Make sure stream is set to True and include usage metrics params["stream"] = True @@ -439,19 +428,6 @@ def _handle_streaming_response( if chunk_content is None and isinstance(delta, dict): # Some models might send empty content chunks chunk_content = "" - - if "tool_calls" in delta: - tool_calls = delta["tool_calls"] - - if tool_calls: - result = self._handle_streaming_tool_calls( - tool_calls=tool_calls, - accumulated_tool_args=accumulated_tool_args, - available_functions=available_functions, - ) - if result is not None: - chunk_content = result - except Exception as e: logging.debug(f"Error extracting content from chunk: {e}") logging.debug(f"Chunk format: {type(chunk)}, content: {chunk}") @@ -467,70 +443,16 @@ def _handle_streaming_response( event=LLMStreamChunkEvent(chunk=chunk_content), ) - # --- 4) Fallback to non-streaming if no content received - if not full_response.strip() and chunk_count == 0: + # --- 4) If no content received or extraction failed, fall back to non-streaming mode + if not full_response.strip(): logging.warning( - "No chunks received in streaming response, falling back to non-streaming" + f"Received {chunk_count} chunks but unable to extract text content. Falling back to non-streaming call." ) non_streaming_params = params.copy() non_streaming_params["stream"] = False - non_streaming_params.pop( - "stream_options", None - ) # Remove stream_options for non-streaming call - return self._handle_non_streaming_response( - non_streaming_params, callbacks, available_functions - ) + return self._handle_non_streaming_response(non_streaming_params, callbacks, available_functions) - # --- 5) Handle empty response with chunks - if not full_response.strip() and chunk_count > 0: - logging.warning( - f"Received {chunk_count} chunks but no content was extracted" - ) - if last_chunk is not None: - try: - # Try to extract content from the last chunk's message - choices = None - if isinstance(last_chunk, dict) and "choices" in last_chunk: - choices = last_chunk["choices"] - elif hasattr(last_chunk, "choices"): - if not isinstance(getattr(last_chunk, "choices"), type): - choices = getattr(last_chunk, "choices") - - if choices and len(choices) > 0: - choice = choices[0] - - # Try to get content from message - message = None - if isinstance(choice, dict) and "message" in choice: - message = choice["message"] - elif hasattr(choice, "message"): - message = getattr(choice, "message") - - if message: - content = None - if isinstance(message, dict) and "content" in message: - content = message["content"] - elif hasattr(message, "content"): - content = getattr(message, "content") - - if content: - full_response = content - logging.info( - f"Extracted content from last chunk message: {full_response}" - ) - except Exception as e: - logging.debug(f"Error extracting content from last chunk: {e}") - logging.debug( - f"Last chunk format: {type(last_chunk)}, content: {last_chunk}" - ) - - # --- 6) If still empty, raise an error instead of using a default response - if not full_response.strip() and len(accumulated_tool_args) == 0: - raise Exception( - "No content received from streaming response. Received empty chunks or failed to extract content." - ) - - # --- 7) Check for tool calls in the final response + # --- 5) Check for tool calls in the final response tool_calls = None try: if last_chunk: @@ -558,7 +480,7 @@ def _handle_streaming_response( except Exception as e: logging.debug(f"Error checking for tool calls: {e}") - # --- 8) If no tool calls or no available functions, return the text response directly + # --- 6) If no tool calls or no available functions, return the text response directly if not tool_calls or not available_functions: # Log token usage if available in streaming mode self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) @@ -566,15 +488,15 @@ def _handle_streaming_response( self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) return full_response - # --- 9) Handle tool calls if present + # --- 7) Handle tool calls if present tool_result = self._handle_tool_call(tool_calls, available_functions) if tool_result is not None: return tool_result - # --- 10) Log token usage if available in streaming mode + # --- 8) Log token usage if available in streaming mode self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) - # --- 11) Emit completion event and return response + # --- 9) Emit completion event and return response self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) return full_response @@ -592,52 +514,6 @@ def _handle_streaming_response( ) raise Exception(f"Failed to get streaming response: {str(e)}") - def _handle_streaming_tool_calls( - self, - tool_calls: List[Any], - accumulated_tool_args: Dict[int, SimpleNamespace], - available_functions: Optional[Dict[str, Any]] = None, - ) -> None | str: - - for tool_call in tool_calls: - if tool_call.function.name: - accumulated_tool_args[tool_call.index].function.name = ( - tool_call.function.name - ) - - if tool_call.function.arguments: - accumulated_tool_args[ - tool_call.index - ].function.arguments += tool_call.function.arguments - - crewai_event_bus.emit( - self, - event=LLMStreamChunkEvent( - tool_call=tool_call.to_dict(), - chunk=tool_call.function.arguments, - ), - ) - - if ( - accumulated_tool_args[tool_call.index].function.name - and accumulated_tool_args[tool_call.index].function.arguments - and available_functions - ): - try: - # Try to parse the accumulated arguments - json.loads( - accumulated_tool_args[tool_call.index].function.arguments - ) - - # Execute the tool call - return self._handle_tool_call( - [accumulated_tool_args[tool_call.index]], - available_functions, - ) - except json.JSONDecodeError: - # If JSON is incomplete, continue accumulating - continue - def _handle_streaming_callbacks( self, callbacks: Optional[List[Any]],