diff --git a/cookbook/07_knowledge/09_archive/vector_dbs/qdrant_db_hybrid_search_native_sparse.py b/cookbook/07_knowledge/09_archive/vector_dbs/qdrant_db_hybrid_search_native_sparse.py new file mode 100644 index 0000000000..2164cb26b7 --- /dev/null +++ b/cookbook/07_knowledge/09_archive/vector_dbs/qdrant_db_hybrid_search_native_sparse.py @@ -0,0 +1,183 @@ +""" +Qdrant Hybrid Search with Native Sparse Embeddings +================================================== + +Demonstrates Qdrant hybrid retrieval using a custom embedder that provides +native sparse vectors via `get_sparse_embedding()`. + +This example shows how to: +1. index documents with dense + native sparse vectors +2. retrieve documents with hybrid search using the same sparse representation + +This is useful for embedders such as bge-m3 or any custom embedder that +exposes sparse vectors natively. +""" + +import asyncio +import os +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple + +import typer +from agno.agent import Agent +from agno.knowledge.embedder.base import Embedder +from agno.knowledge.knowledge import Knowledge +from agno.models.ollama import Ollama +from agno.vectordb.qdrant import Qdrant +from agno.vectordb.search import SearchType +from rich.prompt import Prompt + +try: + import numpy as np +except ImportError as exc: + raise ImportError("numpy not installed. Install with: pip install numpy") from exc + +try: + from FlagEmbedding import BGEM3FlagModel +except ImportError as exc: + raise ImportError( + "FlagEmbedding not installed. Install with: pip install FlagEmbedding" + ) from exc + + +DENSE_MODEL = os.getenv("EMBEDDING_MODEL", "BAAI/bge-m3") +QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333") +OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "gemma4:latest") +COLLECTION_NAME = "thai-recipes-native-sparse" +DOC_URL = "https://agno-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf" + +_bge_model: Optional[BGEM3FlagModel] = None + + +def get_bge_model() -> BGEM3FlagModel: + """Lazily initialize the bge-m3 model singleton.""" + global _bge_model + if _bge_model is None: + _bge_model = BGEM3FlagModel(DENSE_MODEL, use_fp16=True) + return _bge_model + + +@dataclass +class BgeM3Embedder(Embedder): + """ + Dense + sparse embedder backed by BAAI/bge-m3. + + Required Embedder interface: + - get_embedding() -> List[float] + + Optional extension detected by Qdrant via duck typing: + - get_sparse_embedding() -> {"indices": [...], "values": [...]} + """ + + id: str = DENSE_MODEL + dimensions: int = 1024 + + @staticmethod + def _encode(text: str) -> Dict: + model = get_bge_model() + return model.encode( + [text], + batch_size=1, + return_dense=True, + return_sparse=True, + ) + + def get_embedding(self, text: str) -> List[float]: + dense_vec = self._encode(text)["dense_vecs"][0] + return ( + dense_vec.tolist() if isinstance(dense_vec, np.ndarray) else list(dense_vec) + ) + + def get_embedding_and_usage(self, text: str) -> Tuple[List[float], Optional[Dict]]: + return self.get_embedding(text), None + + async def async_get_embedding(self, text: str) -> List[float]: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, self.get_embedding, text) + + async def async_get_embedding_and_usage( + self, text: str + ) -> Tuple[List[float], Optional[Dict]]: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, self.get_embedding_and_usage, text) + + def get_sparse_embedding(self, text: str) -> Dict[str, List]: + """ + Return sparse lexical weights in Qdrant-compatible format. + + Qdrant will prefer this native sparse representation during hybrid + indexing and retrieval when `get_sparse_embedding()` is available. + """ + lexical_weights = self._encode(text)["lexical_weights"][0] + return { + "indices": [int(k) for k in lexical_weights.keys()], + "values": [float(v) for v in lexical_weights.values()], + } + + +# --------------------------------------------------------------------------- +# Setup +# --------------------------------------------------------------------------- + +vector_db = Qdrant( + collection=COLLECTION_NAME, + url=QDRANT_URL, + search_type=SearchType.hybrid, + embedder=BgeM3Embedder(), +) + +knowledge = Knowledge( + name="Qdrant Native Sparse Knowledge Base", + vector_db=vector_db, +) + + +# --------------------------------------------------------------------------- +# Create Agent +# --------------------------------------------------------------------------- + + +def qdrant_agent(user: str = "user") -> None: + agent = Agent( + user_id=user, + knowledge=knowledge, + search_knowledge=True, + model=Ollama(id=OLLAMA_MODEL), + markdown=True, + ) + + while True: + message = Prompt.ask(f"[bold]{user}[/bold]") + if message.lower() in {"exit", "bye", "quit"}: + break + agent.print_response(message) + + +# --------------------------------------------------------------------------- +# Run Agent +# --------------------------------------------------------------------------- + + +def main( + user: str = typer.Option("user", help="User id for the agent session."), + load_knowledge: bool = typer.Option( + True, + "--load-knowledge/--no-load-knowledge", + help="Load the sample PDF into Qdrant before starting the agent.", + ), +) -> None: + if load_knowledge: + knowledge.insert( + name="Recipes", + url=DOC_URL, + metadata={ + "doc_type": "recipe_book", + "retrieval": "hybrid_native_sparse", + }, + ) + + qdrant_agent(user=user) + + +if __name__ == "__main__": + typer.run(main) diff --git a/libs/agno/agno/vectordb/qdrant/qdrant.py b/libs/agno/agno/vectordb/qdrant/qdrant.py index b28d341a38..15bb8dc633 100644 --- a/libs/agno/agno/vectordb/qdrant/qdrant.py +++ b/libs/agno/agno/vectordb/qdrant/qdrant.py @@ -305,6 +305,30 @@ async def async_name_exists(self, name: str) -> bool: # type: ignore[override] return len(scroll_result[0]) > 0 return False + def _get_sparse_vector(self, text: str): + """ + Return a sparse vector for the given text. + + Priority: + 1. Native sparse embeddings from the configured embedder + 2. FastEmbed sparse encoder fallback + """ + if not text: + return None + + # Prefer native sparse embeddings from the configured embedder + if self.embedder is not None and hasattr(self.embedder, "get_sparse_embedding"): + sparse_vector = self.embedder.get_sparse_embedding(text) + if sparse_vector is not None: + return sparse_vector + + # Fallback to FastEmbed sparse encoder, if available + if getattr(self, "sparse_encoder", None) is not None: + sparse_vectors = list(self.sparse_encoder.embed([text])) + if sparse_vectors: + return sparse_vectors[0] + return None + def insert( self, content_hash: str, @@ -344,12 +368,13 @@ def insert( if self.search_type in [SearchType.hybrid]: document.embed(embedder=self.embedder) vector[self.dense_vector_name] = document.embedding - if self.search_type in [SearchType.keyword, SearchType.hybrid]: - vector[self.sparse_vector_name] = next( - iter(self.sparse_encoder.embed([document.content])) - ).as_object() # type: ignore - + sparse_embedding = self._get_sparse_vector(document.content) + if sparse_embedding is None: + raise ValueError("Sparse embedding could not be generated for document insert.") + if hasattr(sparse_embedding, "as_object"): + sparse_embedding = sparse_embedding.as_object() + vector[self.sparse_vector_name] = sparse_embedding # type: ignore # Create payload with document properties payload = { "name": document.name, @@ -449,13 +474,16 @@ async def process_document(document): else: # For other search types, use named vectors vector = {} - if self.search_type in [SearchType.hybrid]: - vector[self.dense_vector_name] = document.embedding # Already embedded above - if self.search_type in [SearchType.keyword, SearchType.hybrid]: - vector[self.sparse_vector_name] = next( - iter(self.sparse_encoder.embed([document.content])) - ).as_object() # type: ignore + sparse_embedding = self._get_sparse_vector(document.content) + + if sparse_embedding is None: + raise ValueError("Sparse embedding could not be generated for async document insert.") + + if hasattr(sparse_embedding, "as_object"): + sparse_embedding = sparse_embedding.as_object() + + vector[self.sparse_vector_name] = sparse_embedding # type: ignore if self.search_type in [SearchType.keyword, SearchType.hybrid]: vector[self.sparse_vector_name] = next(iter(self.sparse_encoder.embed([document.content]))).as_object() @@ -567,12 +595,16 @@ def _run_hybrid_search_sync( formatted_filters: Optional[models.Filter], ) -> List[models.ScoredPoint]: dense_embedding = self.embedder.get_embedding(query) - sparse_embedding = next(iter(self.sparse_encoder.embed([query]))).as_object() + sparse_embedding = self._get_sparse_vector(query) + if sparse_embedding is None: + raise ValueError("Sparse embedding could not be generated for hybrid search.") + if hasattr(sparse_embedding, "as_object"): + sparse_embedding = sparse_embedding.as_object() call = self.client.query_points( collection_name=self.collection, prefetch=[ models.Prefetch( - query=models.SparseVector(**sparse_embedding), # type: ignore # type: ignore + query=models.SparseVector(**sparse_embedding), # type: ignore limit=limit, using=self.sparse_vector_name, ), @@ -623,7 +655,12 @@ def _run_keyword_search_sync( limit: int, formatted_filters: Optional[models.Filter], ) -> List[models.ScoredPoint]: - sparse_embedding = next(iter(self.sparse_encoder.embed([query]))).as_object() + sparse_embedding = self._get_sparse_vector(query) + if sparse_embedding is None: + raise ValueError("Sparse embedding could not be generated for keyword search.") + + if hasattr(sparse_embedding, "as_object"): + sparse_embedding = sparse_embedding.as_object() call = self.client.query_points( collection_name=self.collection, query=models.SparseVector(**sparse_embedding), # type: ignore @@ -672,7 +709,11 @@ async def _run_keyword_search_async( limit: int, formatted_filters: Optional[models.Filter], ) -> List[models.ScoredPoint]: - sparse_embedding = next(iter(self.sparse_encoder.embed([query]))).as_object() + sparse_embedding = self._get_sparse_vector(query) + if sparse_embedding is None: + raise ValueError("Sparse embedding could not be generated for keyword search.") + if hasattr(sparse_embedding, "as_object"): + sparse_embedding = sparse_embedding.as_object() call = await self.async_client.query_points( collection_name=self.collection, query=models.SparseVector(**sparse_embedding), # type: ignore @@ -691,7 +732,11 @@ async def _run_hybrid_search_async( formatted_filters: Optional[models.Filter], ) -> List[models.ScoredPoint]: dense_embedding = await self.embedder.async_get_embedding(query) - sparse_embedding = next(iter(self.sparse_encoder.embed([query]))).as_object() + sparse_embedding = self._get_sparse_vector(query) + if sparse_embedding is None: + raise ValueError("Sparse embedding could not be generated for hybrid search.") + if hasattr(sparse_embedding, "as_object"): + sparse_embedding = sparse_embedding.as_object() call = await self.async_client.query_points( collection_name=self.collection, prefetch=[ diff --git a/libs/agno/tests/unit/vectordb/test_qdrantdb.py b/libs/agno/tests/unit/vectordb/test_qdrantdb.py index 3b7ef97fa5..01ef176439 100644 --- a/libs/agno/tests/unit/vectordb/test_qdrantdb.py +++ b/libs/agno/tests/unit/vectordb/test_qdrantdb.py @@ -6,6 +6,7 @@ from agno.knowledge.document import Document from agno.vectordb.qdrant import Qdrant +from agno.vectordb.search import SearchType @pytest.fixture @@ -453,6 +454,7 @@ def sync_get_embedding(text: str): return mock_embedding mock.get_embedding = sync_get_embedding + mock.get_sparse_embedding = Mock(return_value=None) async def async_get_embedding(text: str): mock.async_call_count += 1 @@ -492,6 +494,9 @@ async def test_async_hybrid_search_uses_async_embedder(tracking_embedder): """ db = Qdrant(embedder=tracking_embedder, collection="test_collection") + # Force fallback to sparse encoder + db.embedder.get_sparse_embedding = Mock(return_value=None) + # Mock the sparse encoder mock_sparse_embedding = Mock() mock_sparse_embedding.as_object.return_value = {"indices": [1, 2, 3], "values": [0.1, 0.2, 0.3]} @@ -540,3 +545,109 @@ async def test_concurrent_async_searches_no_blocking(tracking_embedder): # All should use async embedder assert tracking_embedder.async_call_count == 5, "async_get_embedding should be called 5 times" assert tracking_embedder.sync_call_count == 0, "sync get_embedding should NOT be called" + + +def test_get_sparse_vector_returns_native_sparse_embedding(mock_embedder): + db = Qdrant(embedder=mock_embedder, collection="test_collection") + native_sparse = {"indices": [1, 3], "values": [0.5, 0.8]} + + mock_embedder.get_sparse_embedding = Mock(return_value=native_sparse) + db.sparse_encoder = Mock() + + result = db._get_sparse_vector("hello world") + + assert result == native_sparse + mock_embedder.get_sparse_embedding.assert_called_once_with("hello world") + db.sparse_encoder.embed.assert_not_called() + + +def test_get_sparse_vector_falls_back_to_sparse_encoder_when_native_is_none(mock_embedder): + db = Qdrant(embedder=mock_embedder, collection="test_collection") + mock_embedder.get_sparse_embedding = Mock(return_value=None) + + sparse_obj = Mock() + db.sparse_encoder = Mock() + db.sparse_encoder.embed.return_value = iter([sparse_obj]) + + result = db._get_sparse_vector("hello world") + + assert result == sparse_obj + mock_embedder.get_sparse_embedding.assert_called_once_with("hello world") + db.sparse_encoder.embed.assert_called_once_with(["hello world"]) + + +def test_get_sparse_vector_returns_none_for_empty_text(mock_embedder): + db = Qdrant(embedder=mock_embedder, collection="test_collection") + mock_embedder.get_sparse_embedding = Mock() + db.sparse_encoder = Mock() + + result = db._get_sparse_vector("") + + assert result is None + mock_embedder.get_sparse_embedding.assert_not_called() + db.sparse_encoder.embed.assert_not_called() + + +def test_run_keyword_search_sync_uses_sparse_helper(mock_embedder): + db = Qdrant(embedder=mock_embedder, collection="test_collection") + db.search_type = SearchType.keyword + db.sparse_vector_name = "sparse" + + sparse_obj = Mock() + sparse_obj.as_object.return_value = {"indices": [1], "values": [0.7]} + + db._get_sparse_vector = Mock(return_value=sparse_obj) + db._client = Mock() + + query_response = Mock() + query_response.points = [] + db._client.query_points.return_value = query_response + + results = db._run_keyword_search_sync("hello", limit=5, formatted_filters=None) + + db._get_sparse_vector.assert_called_once_with("hello") + db._client.query_points.assert_called_once() + assert results == [] + + +def test_run_hybrid_search_sync_uses_sparse_helper(mock_embedder): + db = Qdrant(embedder=mock_embedder, collection="test_collection") + db.search_type = SearchType.hybrid + db.sparse_vector_name = "sparse" + db.dense_vector_name = "dense" + + mock_embedder.get_embedding = Mock(return_value=[0.1] * 1024) + + sparse_obj = Mock() + sparse_obj.as_object.return_value = {"indices": [1], "values": [0.7]} + + db._get_sparse_vector = Mock(return_value=sparse_obj) + db._client = Mock() + + query_response = Mock() + query_response.points = [] + db._client.query_points.return_value = query_response + + results = db._run_hybrid_search_sync("hello", limit=5, formatted_filters=None) + + mock_embedder.get_embedding.assert_called_once_with("hello") + db._get_sparse_vector.assert_called_once_with("hello") + db._client.query_points.assert_called_once() + assert results == [] + + +def test_run_keyword_search_sync_raises_when_sparse_embedding_is_none(mock_embedder): + db = Qdrant(embedder=mock_embedder, collection="test_collection") + db._get_sparse_vector = Mock(return_value=None) + + with pytest.raises(ValueError, match="Sparse embedding could not be generated for keyword search."): + db._run_keyword_search_sync("hello", limit=5, formatted_filters=None) + + +def test_run_hybrid_search_sync_raises_when_sparse_embedding_is_none(mock_embedder): + db = Qdrant(embedder=mock_embedder, collection="test_collection") + mock_embedder.get_embedding = Mock(return_value=[0.1] * 1024) + db._get_sparse_vector = Mock(return_value=None) + + with pytest.raises(ValueError, match="Sparse embedding could not be generated for hybrid search."): + db._run_hybrid_search_sync("hello", limit=5, formatted_filters=None)