Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 22 additions & 32 deletions libs/agno/agno/agent/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,11 @@ def _run(

return run_response
except (InputCheckError, OutputCheckError) as e:
# Handle exceptions during streaming
# Update status and log the failure, then re-raise so callers can
# handle the exception (e.g. `except InputCheckError as e: ...`).
# Previously the exception was silently converted to a RunOutput
# with status=error, making it impossible to catch from user code.
run_response.status = RunStatus.error
# If the content is None, set it to the error message
if run_response.content is None:
run_response.content = str(e)

Expand All @@ -656,7 +658,7 @@ def _run(
user_id=user_id,
)

return run_response
raise
except KeyboardInterrupt:
run_response = cast(RunOutput, run_response)
run_response.status = RunStatus.cancelled
Expand Down Expand Up @@ -1131,9 +1133,11 @@ def _run_stream(
)
break
except (InputCheckError, OutputCheckError) as e:
# Handle exceptions during streaming
# Update status, emit the error event for streaming consumers, then
# re-raise so callers can catch `InputCheckError`/`OutputCheckError`.
# Previously the exception was swallowed (break), making it impossible
# to catch from user code.
run_response.status = RunStatus.error
# Add error event to list of events
run_error = create_run_error_event(
run_response,
error=str(e),
Expand All @@ -1143,7 +1147,6 @@ def _run_stream(
)
run_response.events = add_error_event(error=run_error, events=run_response.events)

# If the content is None, set it to the error message
if run_response.content is None:
run_response.content = str(e)

Expand All @@ -1158,7 +1161,7 @@ def _run_stream(
user_id=user_id,
)
yield run_error
break
raise
except KeyboardInterrupt:
run_response = cast(RunOutput, run_response)
yield handle_event( # type: ignore
Expand Down Expand Up @@ -1734,9 +1737,8 @@ async def _arun(

return run_response
except (InputCheckError, OutputCheckError) as e:
# Handle exceptions during streaming
# Update status and log, then re-raise so callers can catch it.
run_response.status = RunStatus.error
# If the content is None, set it to the error message
if run_response.content is None:
run_response.content = str(e)

Expand All @@ -1751,7 +1753,7 @@ async def _arun(
user_id=user_id,
)

return run_response
raise

except KeyboardInterrupt:
run_response = cast(RunOutput, run_response)
Expand Down Expand Up @@ -2483,9 +2485,8 @@ async def _arun_stream(
break

except (InputCheckError, OutputCheckError) as e:
# Handle exceptions during async streaming
# Update status, emit the error event, then re-raise so callers can catch it.
run_response.status = RunStatus.error
# Add error event to list of events
run_error = create_run_error_event(
run_response,
error=str(e),
Expand All @@ -2495,13 +2496,11 @@ async def _arun_stream(
)
run_response.events = add_error_event(error=run_error, events=run_response.events)

# If the content is None, set it to the error message
if run_response.content is None:
run_response.content = str(e)

log_error(f"Validation failed: {str(e)} | Check trigger: {e.check_trigger}")

# Cleanup and store the run response and session
if agent_session is not None:
await acleanup_and_store(
agent,
Expand All @@ -2511,9 +2510,8 @@ async def _arun_stream(
user_id=user_id,
)

# Yield the error event
yield run_error
break
raise

except KeyboardInterrupt:
run_response = cast(RunOutput, run_response)
Expand Down Expand Up @@ -3216,9 +3214,8 @@ def _continue_run(
return run_response
except (InputCheckError, OutputCheckError) as e:
run_response = cast(RunOutput, run_response)
# Handle exceptions during streaming
# Update status and log, then re-raise so callers can catch it.
run_response.status = RunStatus.error
# If the content is None, set it to the error message
if run_response.content is None:
run_response.content = str(e)

Expand All @@ -3228,7 +3225,7 @@ def _continue_run(
agent, run_response=run_response, session=session, run_context=run_context, user_id=user_id
)

return run_response
raise
except KeyboardInterrupt:
run_response = cast(RunOutput, run_response)
run_response.status = RunStatus.cancelled
Expand Down Expand Up @@ -3488,9 +3485,8 @@ def _continue_run_stream(
break
except (InputCheckError, OutputCheckError) as e:
run_response = cast(RunOutput, run_response)
# Handle exceptions during streaming
# Update status, emit the error event, then re-raise so callers can catch it.
run_response.status = RunStatus.error
# Add error event to list of events
run_error = create_run_error_event(
run_response,
error=str(e),
Expand All @@ -3500,7 +3496,6 @@ def _continue_run_stream(
)
run_response.events = add_error_event(error=run_error, events=run_response.events)

# If the content is None, set it to the error message
if run_response.content is None:
run_response.content = str(e)

Expand All @@ -3510,7 +3505,7 @@ def _continue_run_stream(
agent, run_response=run_response, session=session, run_context=run_context, user_id=user_id
)
yield run_error
break
raise
except KeyboardInterrupt:
run_response = cast(RunOutput, run_response)
yield handle_event( # type: ignore
Expand Down Expand Up @@ -4175,9 +4170,8 @@ async def _acontinue_run(
return run_response
except (InputCheckError, OutputCheckError) as e:
run_response = cast(RunOutput, run_response)
# Handle exceptions during streaming
# Update status and log, then re-raise so callers can catch it.
run_response.status = RunStatus.error
# If the content is None, set it to the error message
if run_response.content is None:
run_response.content = str(e)

Expand All @@ -4192,7 +4186,7 @@ async def _acontinue_run(
user_id=user_id,
)

return run_response
raise

except KeyboardInterrupt:
run_response = cast(RunOutput, run_response)
Expand Down Expand Up @@ -4642,9 +4636,8 @@ async def _acontinue_run_stream(
if run_response is None:
run_response = RunOutput(run_id=run_id)
run_response = cast(RunOutput, run_response)
# Handle exceptions during async streaming
# Update status, emit the error event, then re-raise so callers can catch it.
run_response.status = RunStatus.error
# Add error event to list of events
run_error = create_run_error_event(
run_response,
error=str(e),
Expand All @@ -4654,13 +4647,11 @@ async def _acontinue_run_stream(
)
run_response.events = add_error_event(error=run_error, events=run_response.events)

# If the content is None, set it to the error message
if run_response.content is None:
run_response.content = str(e)

log_error(f"Validation failed: {str(e)} | Check trigger: {e.check_trigger}")

# Cleanup and store the run response and session
if agent_session is not None:
await acleanup_and_store(
agent,
Expand All @@ -4670,9 +4661,8 @@ async def _acontinue_run_stream(
user_id=user_id,
)

# Yield the error event
yield run_error
break
raise
except KeyboardInterrupt:
if run_response is None:
run_response = RunOutput(run_id=run_id)
Expand Down
Loading