Skip to content

Conversation

Elbehery
Copy link
Contributor

@Elbehery Elbehery commented Sep 8, 2025

What does this PR do?

This PR migrates MilvusClient to AsyncMilvusClient.

The commit makes the follwing changes.

  • Import statements updated: MilvusClient → AsyncMilvusClient
  • Removed asyncio.to_thread() wrappers: All Milvus operations now use native async/await
  • Test compatibility: Mock objects and fixtures updated to work with AsyncMilvusClient

Closes #2684

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Sep 8, 2025
@Elbehery Elbehery force-pushed the 20250908_migrate_milvusClient_to_asyncMilvusClient branch 5 times, most recently from bd99ac2 to ba8c755 Compare September 8, 2025 16:17
@@ -106,7 +107,13 @@ async def query_vector(self, embedding: NDArray, k: int, score_threshold: float)
return QueryChunksResponse(chunks=chunks, scores=scores)

async def delete(self):
await maybe_await(self.client.delete_collection(self.collection.name))
try:
Copy link
Contributor Author

@Elbehery Elbehery Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes in a separate commit

The test was failing due to accessing a collection after being deleted

The issue is that the ChromaIndex.initialize() method was always empty (doing nothing), but the test pattern of calling delete() followed by initialize() only worked before because:

  1. The collection object was created once in the test fixture and reused
  2. The delete() method removes the collection from Chroma
  3. The empty initialize() method doesn't recreate the collection
  4. When add_chunks() is called, it tries to use a deleted collection

After the migration the test failed consistently, the fix was to re-add the deleted collection.

Please let me know if there is a better approach

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Elbehery was that commit squashed? That link is not working for me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Elbehery

Are you referring to the unit/integration test configuration or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I meant this commit

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason you don't use list_collections() or get_collection() '

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just learnt from previous PRs and reviews, to minimize the changes as much as possible

So i used has_collection() as it was used in the original code.

Let me try to use list_collections() and see if the tests passes without more changes 👍🏽

Copy link
Collaborator

@franciscojavierarceo franciscojavierarceo Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has_collection() is used in milvus, which i don't believe is available in Chroma (I tested the client locally and don't see it listed here: https://docs.trychroma.com/docs/collections/manage-collections).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has_collection() is used in milvus

yes that is what i meant. Therefore i have used the same method, and not list_collection() to minimize code changes.

which i don't believe is available in Chroma

the chroma test that I cited above, started failing after the migration to AsyncMilvusClient

Thus I have made these changes to fix the test in a separate commit, so that it is easier for the reviewer.

Now I am trying use list_collection() in milvus client, and testing whether this would not fail the Chroma test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • reverted commit
  • used list_collection() commit
  • ran all test locally and all passed, waiting for CI now 👍🏽

thanks for your prompt response 🙏🏽

@Elbehery Elbehery force-pushed the 20250908_migrate_milvusClient_to_asyncMilvusClient branch 5 times, most recently from 7a4fc9b to 19f1374 Compare September 9, 2025 00:26
self.kvstore = kvstore

async def initialize(self):
pass
self.collection = await maybe_await(self.client.get_or_create_collection(self.collection_name))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there is a better way to avoid this code change ?


async def initialize(self):
# MilvusIndex does not require explicit initialization
# TODO: could move collection creation into initialization but it is not really necessary
pass

async def delete(self):
if await asyncio.to_thread(self.client.has_collection, self.collection_name):
await asyncio.to_thread(self.client.drop_collection, collection_name=self.collection_name)
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use two try and except blocks here?

squashing the list_collections and drop_collection into one isn't ideal.


async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
assert len(chunks) == len(embeddings), (
f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}"
)

if not await asyncio.to_thread(self.client.has_collection, self.collection_name):
try:
collections = await self.client.list_collections()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the usage here gain, it may make sense to just add a helper _has_collection() that has the try and except block and checks the collection_name

if "attached to a different loop" in str(e):
logger.warning("Recreating client due to event loop issue")

if hasattr(self, "_parent_adapter"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this _parent_adapter? did you have issues with the event loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i had this issue

 Problem: AsyncMilvusClient connections were established in one event loop but used in different event loops during test execution.

  # BROKEN - Client created in setup loop, used in test loop
  RuntimeError: Task <Task pending> got Future <Future pending> attached to a different loop

  Symptoms:
  - RuntimeError: attached to a different loop errors
  - Tests failing during both execution and teardown
  - Milvus operations hanging or crashing

the cause was that MilvusIndex which reference MilvusClient was being initialized before MilvusClient

Therefore the failure, the fix is to swap the init order, which is being fixed separately in this commit

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is that log from? Is that from Claude or some other AI tool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, Claude

collection_name: str,
consistency_level="Strong",
kvstore: KVStore | None = None,
parent_adapter=None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concerned about this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is part of the fix of the init order in commit

please let me know if there is a better approach :_

index_params=index_params,
consistency_level=self.consistency_level,
)
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice thanks for adding this.

@@ -143,8 +175,7 @@ async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
}
)
try:
await asyncio.to_thread(
self.client.insert,
await self.client.insert(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably we should improve the error in the raise

@@ -321,6 +346,15 @@ def __init__(

async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.kvstore)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be good to also add exception handling here. i think someone had an issue with the db_path at some point and we probably could've caught that bug better since it can be configured in the run.yaml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, could you propose the exception to add ?

also shall I add it here, or in a follow-up PR ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this PR it's fine to add!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm doing a little scope creep but while we're here it's a good thing to add a little more resilience as end users will be happy :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm 💯

will do tmrw, i am sleeping on my desk :D

if self.client:
await self.client.close()

async def _recreate_client(self) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not convinced we need this and it feels like we shouldn't need it but maybe i'm missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its part of the init order in commit

I really did not have other options, all these failures happened after the migration, which actually was supposed to be straight forward :)

Copy link
Collaborator

@franciscojavierarceo franciscojavierarceo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we need _recreate_client can you provide more info on why this is necessary?

@Elbehery
Copy link
Contributor Author

Elbehery commented Sep 9, 2025

i don't think we need _recreate_client can you provide more info on why this is necessary?

yes, I saved all the failure of this, as I spent time fixing it and knew would get a lot of questions

AsyncMilvusClient Migration Issue & Fix

  The Problem

  When migrating from the synchronous MilvusClient to AsyncMilvusClient, the codebase encountered two critical issues that caused widespread test failures:

  Issue 1: Client Initialization Order Bug

  Problem: The AsyncMilvusClient was being passed to MilvusIndex constructors before being initialized.

  # BROKEN CODE
  async def initialize(self) -> None:
      # Create indices with uninitialized client (client = None)
      for vector_db_data in stored_vector_dbs:
          index = MilvusIndex(
              client=self.client,  # ❌ self.client is None here!
              collection_name=vector_db.identifier,
          )

      # Client gets initialized AFTER being passed to indices
      self.client = AsyncMilvusClient(uri=uri)  # ❌ Too late!

  Symptoms:
  - AttributeError: 'NoneType' object has no attribute 'has_collection'
  - Index operations failing because they had no valid client

  Issue 2: AsyncIO Event Loop Issues

  Problem: AsyncMilvusClient connections were established in one event loop but used in different event loops during test execution.

  # BROKEN - Client created in setup loop, used in test loop
  RuntimeError: Task <Task pending> got Future <Future pending> attached to a different loop

  Symptoms:
  - RuntimeError: attached to a different loop errors
  - Tests failing during both execution and teardown
  - Milvus operations hanging or crashing

  The Solution

  Fix 1: Correct Initialization Order

  Solution: Initialize the client first, then create indices that reference it.

  # FIXED CODE
  async def initialize(self) -> None:
      # Initialize client FIRST
      if isinstance(self.config, RemoteMilvusVectorIOConfig):
          self.client = AsyncMilvusClient(**self.config.model_dump(exclude_none=True))
      else:
          uri = os.path.expanduser(self.config.db_path)
          self.client = AsyncMilvusClient(uri=uri)

      # Now create indices with valid client
      for vector_db_data in stored_vector_dbs:
          index = MilvusIndex(
              client=self.client,  # ✅ self.client is properly initialized
              collection_name=vector_db.identifier,
              parent_adapter=self,  # ✅ Added reference for client recreation
          )

  Fix 2: Event Loop Recovery Mechanism

  Solution: Detect event loop errors and automatically recreate the client.

  # ERROR DETECTION & RECOVERY
  async def add_chunks(self, chunks, embeddings):
      try:
          collection_exists = await self.client.has_collection(self.collection_name)
      except Exception as e:
          logger.error(f"Failed to check collection existence: {e}")

          # Detect event loop issues
          if "attached to a different loop" in str(e):
              logger.warning("Recreating client due to event loop issue")
              # Trigger client recreation
              if hasattr(self, '_parent_adapter'):
                  await self._parent_adapter._recreate_client()
                  collection_exists = await self.client.has_collection(self.collection_name)
              else:
                  collection_exists = False
          else:
              collection_exists = False

  Client Recreation Method:
  async def _recreate_client(self) -> None:
      """Recreate the AsyncMilvusClient when event loop issues occur"""
      try:
          # Close old client
          if self.client:
              await self.client.close()
      except Exception as e:
          logger.warning(f"Error closing old client: {e}")

      # Create new client in current event loop
      if isinstance(self.config, RemoteMilvusVectorIOConfig):
          self.client = AsyncMilvusClient(**self.config.model_dump(exclude_none=True))
      else:
          uri = os.path.expanduser(self.config.db_path)
          self.client = AsyncMilvusClient(uri=uri)

      # Update all cached indices to use new client
      for index_wrapper in self.cache.values():
          if hasattr(index_wrapper, 'index') and hasattr(index_wrapper.index, 'client'):
              index_wrapper.index.client = self.client

  Fix 3: Enhanced Error Handling

  Solution: Add graceful error handling throughout the codebase.

  # GRACEFUL TEARDOWN
  async def delete(self):
      try:
          if await self.client.has_collection(self.collection_name):
              await self.client.drop_collection(collection_name=self.collection_name)
      except Exception as e:
          logger.warning(f"Failed to check or delete collection {self.collection_name}: {e}")
          # Continue gracefully - collection might already be deleted or client disconnected

  Why This Approach Works

  1. Proactive Detection: The code detects event loop issues by examining exception messages
  2. Automatic Recovery: When detected, it automatically recreates the client in the current event loop
  3. Reference Updates: All cached indices get updated to use the new client
  4. Graceful Degradation: If recovery fails, operations continue with sensible defaults
  5. Clean Teardown: Tests can complete even when connections are in invalid states

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR migrates the Milvus client from synchronous MilvusClient to asynchronous AsyncMilvusClient to improve performance and maintain consistency with the async architecture.

Key changes:

  • Import statements updated from MilvusClient to AsyncMilvusClient
  • Removed asyncio.to_thread() wrappers for all Milvus operations
  • Updated test mocks and fixtures to use AsyncMock for async operations

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
llama_stack/providers/remote/vector_io/milvus/milvus.py Core migration from sync to async client with error handling improvements
tests/unit/providers/vector_io/remote/test_milvus.py Updated test mocks from MagicMock to AsyncMock for async operations
tests/unit/providers/vector_io/conftest.py Updated fixture to use AsyncMilvusClient and added proper cleanup
tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py Added await keywords to vector index operations

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +90 to +93
if hasattr(self, "_parent_adapter"):
await self._parent_adapter._recreate_client()
collections = await self.client.list_collections()
collection_exists = self.collection_name in collections
Copy link
Preview

Copilot AI Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After recreating the client through the parent adapter, the self.client reference may still point to the old client. The client reference should be updated to the new client instance after recreation.

Copilot uses AI. Check for mistakes.

@franciscojavierarceo
Copy link
Collaborator

Initializing the MilvusClient before the index makes sense and we should keep that.

What errors were you seeing that resulted in the alleged event loop issues? Can you provide the exact command you used to reproduce that?

It would be better to reproduce those errors in the CI. Probably you could do that by removing the _recreate_client changes.

@Elbehery
Copy link
Contributor Author

Elbehery commented Sep 9, 2025

What errors were you seeing that resulted in the alleged event loop issues? Can you provide the exact command you used to reproduce that?

after the migration, Unit Tests were failing, and I analyzed the failures, and it was due the order of the init

It would be better to reproduce those errors in the CI.

It was caught by the CI tests 👍🏽

Probably you could do that by removing the _recreate_client changes.

I am searching the logs of Claude for more context

@Elbehery
Copy link
Contributor Author

Elbehery commented Sep 9, 2025

@Elbehery
Copy link
Contributor Author

Elbehery commented Sep 9, 2025

Initializing the MilvusClient before the index makes sense and we should keep that.

What errors were you seeing that resulted in the alleged event loop issues? Can you provide the exact command you used to reproduce that?

It would be better to reproduce those errors in the CI. Probably you could do that by removing the _recreate_client changes.

there are all i could find

 1. Lines 358-375: The code loops through stored_vector_dbs and creates MilvusIndex objects, passing self.client as the client parameter
  2. Lines 350-356: But self.client is only initialized after the loop that creates the indices

  Here's the problematic flow:

  async def initialize(self) -> None:
      self.kvstore = await kvstore_impl(self.config.kvstore)

      # ❌ Client is still None here when creating indices
      start_key = VECTOR_DBS_PREFIX
      end_key = f"{VECTOR_DBS_PREFIX}\xff"
      stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key)

      for vector_db_data in stored_vector_dbs:
          vector_db = VectorDB.model_validate_json(vector_db_data)
          index = VectorDBWithIndex(
              vector_db,
              index=MilvusIndex(
                  client=self.client,  # ❌ self.client is None here!
                  collection_name=vector_db.identifier,
                  # ...
              ),
              # ...
          )
          self.cache[vector_db.identifier] = index

      # ✅ Client gets initialized AFTER being passed to indices  
      if isinstance(self.config, RemoteMilvusVectorIOConfig):
          logger.info(f"Connecting to Milvus server at {self.config.uri}")
          self.client = AsyncMilvusClient(**self.config.model_dump(exclude_none=True))
      else:
          logger.info(f"Connecting to Milvus Lite at: {self.config.db_path}")
          uri = os.path.expanduser(self.config.db_path)
          self.client = AsyncMilvusClient(uri=uri)

  The fix is to move the client initialization before the loop that creates the indices. The client initialization code (lines 350-356) should come before lines 358-375.

but i pasted here the actual context

@Elbehery Elbehery force-pushed the 20250908_migrate_milvusClient_to_asyncMilvusClient branch from 1eaed76 to a9d5bac Compare September 10, 2025 07:46
@franciscojavierarceo
Copy link
Collaborator

looks like unit tests are failing, i updated your branch but this was the failure:

FAILED tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py::test_add_chunks_query_vector[chroma] - chromadb.errors.NotFoundError: Error getting collection: Collection [8d484b27-8682-440f-983b-7e7094d84512] does not exists.

Which I think suggests that the event loop issue may not be resolved by the _recreate_client, but maybe I'm wrong there

@Elbehery
Copy link
Contributor Author

looks like unit tests are failing, i updated your branch but this was the failure:

FAILED tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py::test_add_chunks_query_vector[chroma] - chromadb.errors.NotFoundError: Error getting collection: Collection [8d484b27-8682-440f-983b-7e7094d84512] does not exists.

Which I think suggests that the event loop issue may not be resolved by the _recreate_client, but maybe I'm wrong there

thanks so much for your deep support 🙏🏽

So i checked last thing yesterday the unit-test failure, and i thought its not related as i commented here

Please correct me if i am wrong

I will come back to your reviews soonish, just trying to finish a release :)

@Elbehery Elbehery force-pushed the 20250908_migrate_milvusClient_to_asyncMilvusClient branch from 12b6827 to 1006200 Compare September 10, 2025 13:39
The commit makes the follwing changes.
-  Import statements updated: MilvusClient → AsyncMilvusClient
-  Removed asyncio.to_thread() wrappers: All Milvus operations now use native async/await
-  Test compatibility: Mock objects and fixtures updated to work with AsyncMilvusClient

Signed-off-by: Mustafa Elbehery <[email protected]>
Signed-off-by: Mustafa Elbehery <[email protected]>
…on() with list_collections()

Signed-off-by: Mustafa Elbehery <[email protected]>
@Elbehery Elbehery force-pushed the 20250908_migrate_milvusClient_to_asyncMilvusClient branch from 1006200 to 295d8b9 Compare September 10, 2025 13:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Meta Open Source bot.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Migrate from MilvusClient to AsyncMilvusClient
3 participants