Skip to content

fix(llm): add fallback to non-streaming mode when content extraction fails #2605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: feat-emit-stream-tool
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 9 additions & 63 deletions src/crewai/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,70 +443,16 @@ def _handle_streaming_response(
event=LLMStreamChunkEvent(chunk=chunk_content),
)

# --- 4) Fallback to non-streaming if no content received
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LLM Streaming Response Error Handling Improvement

Purpose

This PR addresses critical errors that occur when LLM streaming responses don't match the expected format, causing agent flows to crash. We've replaced complex multi-stage extraction logic with a simpler, more reliable fallback mechanism.

Issue Observed

When using certain LLM providers or models (specifically encountered with "openai/gpt-4o"), streaming responses sometimes don't contain extractable content in the expected format, resulting in errors like:

[Agent] ERROR:root:Error in streaming response: No content received from streaming response. Received empty chunks or failed to extract content.
[Agent] ╭───────────────────────────────── LLM Error ──────────────────────────────────╮
[Agent] │  ❌ LLM Call Failed                                                          │
[Agent] │  Error: No content received from streaming response. Received empty chunks   │
[Agent] │  or failed to extract content.                                               │
[Agent] ╰──────────────────────────────────────────────────────────────────────────────╯

This exception terminates the agent flow, causing a poor user experience:

[Agent] Exception: Failed to get streaming response: No content received from streaming response. Received empty chunks or failed to extract content.
[Agent] [Flow._execute_single_listener] Error in method chat: Failed to get streaming response: No content received from streaming response. Received empty chunks or failed to extract content.

Solution

This solution maintains the original content extraction logic but removes the multiple nested fallback attempts that were still failing in some cases. Instead, we:

  1. Keep the initial streaming request with usage metrics
  2. Try to extract content using the existing logic
  3. If extraction fails (when full_response is empty), fall back to non-streaming mode instead of raising an exception

This approach is more robust because it leverages the already working non-streaming implementation when streaming format extraction fails.

Benefits

  • Prevents agent flows from crashing when streaming responses don't match expected formats
  • Maintains all existing functionality (including usage metrics collection)
  • Works reliably across various LLM providers without requiring provider-specific handling
  • Simplifies code maintenance by reducing complexity in error handling

This change significantly improves reliability when working with different models and streaming response formats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few points:
1. I’d like to fully understand why the streaming extraction is failing, could it be due to a tool misconfiguration?
2. It seems the capability to send an event for each tool_call was removed.

Overall, it looks like the stream feature with tools might have been dropped, and that’s what I’m concerned about.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! Let me address each point:

  1. Regarding why streaming extraction is failing:

    • The issue is not with tool misconfiguration, but with response format compatibility.
    • When using certain models (like gpt-4o via litellm), the streaming chunks returned don't match the expected structure that the extraction code is looking for.
    • Specifically, the code expects chunks to have a clear "delta" -> "content" path, but some models/providers return content in different formats.
    • This is a common issue when working with multiple LLM providers through a unified interface.
  2. About the event emission for tool calls:

    • The tool call detection and handling is still fully present. The main tool call handling happens in the _handle_tool_call method which remains unchanged.
    • The emission of events for tool calls is preserved in both the remaining code sections:
      • Line ~550: self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
      • Line ~596: self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
    • What we removed was the redundant extraction attempts that were still resulting in errors.
  3. Regarding streaming with tools:

    • Streaming with tools is absolutely not dropped - it still works exactly as before when the response format matches expectations.
    • We've simply added a fallback to non-streaming mode when content extraction fails, rather than crashing the flow with an exception.
    • The non-streaming mode still fully supports tool calls, so functionality is preserved even in fallback mode.

Let's discuss in more detail on our call.

if not full_response.strip() and chunk_count == 0:
# --- 4) If no content received or extraction failed, fall back to non-streaming mode
if not full_response.strip():
logging.warning(
"No chunks received in streaming response, falling back to non-streaming"
f"Received {chunk_count} chunks but unable to extract text content. Falling back to non-streaming call."
)
non_streaming_params = params.copy()
non_streaming_params["stream"] = False
non_streaming_params.pop(
"stream_options", None
) # Remove stream_options for non-streaming call
return self._handle_non_streaming_response(
non_streaming_params, callbacks, available_functions
)

# --- 5) Handle empty response with chunks
if not full_response.strip() and chunk_count > 0:
logging.warning(
f"Received {chunk_count} chunks but no content was extracted"
)
if last_chunk is not None:
try:
# Try to extract content from the last chunk's message
choices = None
if isinstance(last_chunk, dict) and "choices" in last_chunk:
choices = last_chunk["choices"]
elif hasattr(last_chunk, "choices"):
if not isinstance(getattr(last_chunk, "choices"), type):
choices = getattr(last_chunk, "choices")

if choices and len(choices) > 0:
choice = choices[0]

# Try to get content from message
message = None
if isinstance(choice, dict) and "message" in choice:
message = choice["message"]
elif hasattr(choice, "message"):
message = getattr(choice, "message")

if message:
content = None
if isinstance(message, dict) and "content" in message:
content = message["content"]
elif hasattr(message, "content"):
content = getattr(message, "content")

if content:
full_response = content
logging.info(
f"Extracted content from last chunk message: {full_response}"
)
except Exception as e:
logging.debug(f"Error extracting content from last chunk: {e}")
logging.debug(
f"Last chunk format: {type(last_chunk)}, content: {last_chunk}"
)

# --- 6) If still empty, raise an error instead of using a default response
if not full_response.strip():
raise Exception(
"No content received from streaming response. Received empty chunks or failed to extract content."
)
return self._handle_non_streaming_response(non_streaming_params, callbacks, available_functions)

# --- 7) Check for tool calls in the final response
# --- 5) Check for tool calls in the final response
tool_calls = None
try:
if last_chunk:
Expand Down Expand Up @@ -534,23 +480,23 @@ def _handle_streaming_response(
except Exception as e:
logging.debug(f"Error checking for tool calls: {e}")

# --- 8) If no tool calls or no available functions, return the text response directly
# --- 6) If no tool calls or no available functions, return the text response directly
if not tool_calls or not available_functions:
# Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response

# --- 9) Handle tool calls if present
# --- 7) Handle tool calls if present
tool_result = self._handle_tool_call(tool_calls, available_functions)
if tool_result is not None:
return tool_result

# --- 10) Log token usage if available in streaming mode
# --- 8) Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)

# --- 11) Emit completion event and return response
# --- 9) Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response

Expand Down
1 change: 1 addition & 0 deletions src/crewai/utilities/events/llm_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ class LLMStreamChunkEvent(BaseEvent):

type: str = "llm_stream_chunk"
chunk: str
tool_call: Optional[dict] = None