-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: Adding FunctionCallContent and FunctionResultContent while streaming messages #10274
Comments
Well, I kind of managed to solve it, though this looks quite hacky. Not sure if there's a "cleaner" way. Basically my concern is that I am assuming that once I get a FunctionResultContent item, there will be a full FunctionCallContent right before. Then when there's a FunctionResultContent, only then I add both to the chat history with add_assistant_message_list and add_tool_message_list. I had some weird exceptions sometimes, but it turned out the problem was caused by parellel function calls enabled by default, so I disabled it. Now it seems to work well.
|
If what you have works, then that's great. Another approach, that is slightly different could be to keep local buffers for each type of content and finalize (add to the chat history) only when we detect that a chunk is fully received (when we get the FunctionResultContent we know the FunctionCallContent has ended). This method also avoids having to repeatedly call async def stream_and_accumulate(answer_stream, user_input, chat_history, msg):
"""An example state-based approach to handle function calls/results as they stream."""
chat_history.add_user_message(user_input)
# Temporary lists where we accumulate function call chunks and function result chunks
func_call_chunks = []
func_result_chunks = []
assistant_chunks = []
async for message in answer_stream:
chat_message = message[0]
for item in chat_message.items:
match item:
case FunctionCallContent():
func_call_chunks.append(item)
case FunctionResultContent():
func_result_chunks.append(item)
# As soon as we see a function result, that typically means
# the previous function call is "done." So finalize them:
if func_call_chunks:
# Turn the accumulated function-call chunks into a single message
all_func_call = _combine_chunks_into_chat_message(
func_call_chunks,
role=AuthorRole.ASSISTANT
)
chat_history.add_assistant_message_list(all_func_call.items)
func_call_chunks.clear()
# Now also finalize the function-result chunks themselves
if func_result_chunks:
all_func_result = _combine_chunks_into_chat_message(
func_result_chunks,
role=AuthorRole.TOOL
)
chat_history.add_tool_message_list(all_func_result.items)
func_result_chunks.clear()
case StreamingChatMessageContent() if chat_message.role == AuthorRole.ASSISTANT:
assistant_chunks.append(item)
# In parallel, stream tokens to the user interface (if needed)
token = str(chat_message)
if token:
await msg.stream_token(token)
# Once the streaming completes, if there's leftover "assistant" text,
# combine it into a single message and add it to the chat.
if assistant_chunks:
combined_assistant_msg = _combine_chunks_into_chat_message(
assistant_chunks,
role=AuthorRole.ASSISTANT
)
chat_history.add_assistant_message(str(combined_assistant_msg))
assistant_chunks.clear()
def _combine_chunks_into_chat_message(chunks, role):
# Example approach: just merge the text pieces from these chunks.
# or do the reduce(...) logic here to keep the full messages
combined_text = "".join(str(chunk) for chunk in chunks)
return ChatMessageContent(role=role, content=combined_text) |
Your solution is good, and I provided another. Please ping on the issue if you need further help. |
Hi!
This is a followup of the issue #9408, which has been addressed with a fix. Now the kernel yields both FunctionCallContent and FunctionResultContent. It's possible to combine the streaming message chunks, to get these, but only AFTER the streaming is completed:
But my problem now is that I can't seem to find a way to do this while the streaming is still in progress. The reason why I need this is that my agent is handling user requests that would call multiple functions after each other, the output of some functions is the input of followup function calls. This works pretty well if I emit the function call result in the chat history using a function_invocation filter. But there I am just putting the function call output as an ASSISTANT message, as I couldn't figure out the way to "convert" the context to KernelContent (I suppose I need that to add a TOOL_CALL message to the history).
Anyway, my issue is how to "detect" if a FunctionCallContent and FunctionResultContent stream is "finished", so that I could add them right away to the message history.
Thanks!
The text was updated successfully, but these errors were encountered: