diff --git a/memori/llm/_invoke.py b/memori/llm/_invoke.py index ce8f304a..cddb808c 100644 --- a/memori/llm/_invoke.py +++ b/memori/llm/_invoke.py @@ -71,9 +71,26 @@ async def invoke(self, **kwargs): ) raw_response = await self._method(**kwargs) + + # Check if streaming is enabled - if so, return a wrapped async generator + # that handles post-processing after the stream is consumed. + # This fixes the "cannot pickle '_thread.RLock' object" error when using + # OpenAI with Agno in streaming mode (Issue #214) + if kwargs.get("stream", False): + return self._wrap_stream(raw_response, kwargs, start) + self.handle_post_response(kwargs, start, raw_response) return raw_response + async def _wrap_stream(self, stream, kwargs, start): + """Wrap an async stream to handle post-processing after consumption.""" + raw_response = {} + async for chunk in stream: + raw_response = merge_chunk(raw_response, chunk.__dict__) + yield chunk + + self.handle_post_response(kwargs, start, raw_response) + class InvokeAsyncIterator(BaseInvoke): async def invoke(self, **kwargs):