Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""
Qdrant Hybrid Search with Native Sparse Embeddings
==================================================

Demonstrates Qdrant hybrid retrieval using a custom embedder that provides
native sparse vectors via `get_sparse_embedding()`.

This example shows how to:
1. index documents with dense + native sparse vectors
2. retrieve documents with hybrid search using the same sparse representation

This is useful for embedders such as bge-m3 or any custom embedder that
exposes sparse vectors natively.
"""

import asyncio
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import typer
from agno.agent import Agent
from agno.knowledge.embedder.base import Embedder
from agno.knowledge.knowledge import Knowledge
from agno.models.ollama import Ollama
from agno.vectordb.qdrant import Qdrant
from agno.vectordb.search import SearchType
from rich.prompt import Prompt

try:
import numpy as np
except ImportError as exc:
raise ImportError("numpy not installed. Install with: pip install numpy") from exc

try:
from FlagEmbedding import BGEM3FlagModel
except ImportError as exc:
raise ImportError(
"FlagEmbedding not installed. Install with: pip install FlagEmbedding"
) from exc


DENSE_MODEL = os.getenv("EMBEDDING_MODEL", "BAAI/bge-m3")
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "gemma4:latest")
COLLECTION_NAME = "thai-recipes-native-sparse"
DOC_URL = "https://agno-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"

_bge_model: Optional[BGEM3FlagModel] = None


def get_bge_model() -> BGEM3FlagModel:
"""Lazily initialize the bge-m3 model singleton."""
global _bge_model
if _bge_model is None:
_bge_model = BGEM3FlagModel(DENSE_MODEL, use_fp16=True)
return _bge_model


@dataclass
class BgeM3Embedder(Embedder):
"""
Dense + sparse embedder backed by BAAI/bge-m3.

Required Embedder interface:
- get_embedding() -> List[float]

Optional extension detected by Qdrant via duck typing:
- get_sparse_embedding() -> {"indices": [...], "values": [...]}
"""

id: str = DENSE_MODEL
dimensions: int = 1024

@staticmethod
def _encode(text: str) -> Dict:
model = get_bge_model()
return model.encode(
[text],
batch_size=1,
return_dense=True,
return_sparse=True,
)

def get_embedding(self, text: str) -> List[float]:
dense_vec = self._encode(text)["dense_vecs"][0]
return (
dense_vec.tolist() if isinstance(dense_vec, np.ndarray) else list(dense_vec)
)

def get_embedding_and_usage(self, text: str) -> Tuple[List[float], Optional[Dict]]:
return self.get_embedding(text), None

async def async_get_embedding(self, text: str) -> List[float]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self.get_embedding, text)

async def async_get_embedding_and_usage(
self, text: str
) -> Tuple[List[float], Optional[Dict]]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self.get_embedding_and_usage, text)

def get_sparse_embedding(self, text: str) -> Dict[str, List]:
"""
Return sparse lexical weights in Qdrant-compatible format.

Qdrant will prefer this native sparse representation during hybrid
indexing and retrieval when `get_sparse_embedding()` is available.
"""
lexical_weights = self._encode(text)["lexical_weights"][0]
return {
"indices": [int(k) for k in lexical_weights.keys()],
"values": [float(v) for v in lexical_weights.values()],
}


# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------

vector_db = Qdrant(
collection=COLLECTION_NAME,
url=QDRANT_URL,
search_type=SearchType.hybrid,
embedder=BgeM3Embedder(),
)

knowledge = Knowledge(
name="Qdrant Native Sparse Knowledge Base",
vector_db=vector_db,
)


# ---------------------------------------------------------------------------
# Create Agent
# ---------------------------------------------------------------------------


def qdrant_agent(user: str = "user") -> None:
agent = Agent(
user_id=user,
knowledge=knowledge,
search_knowledge=True,
model=Ollama(id=OLLAMA_MODEL),
markdown=True,
)

while True:
message = Prompt.ask(f"[bold]{user}[/bold]")
if message.lower() in {"exit", "bye", "quit"}:
break
agent.print_response(message)


# ---------------------------------------------------------------------------
# Run Agent
# ---------------------------------------------------------------------------


def main(
user: str = typer.Option("user", help="User id for the agent session."),
load_knowledge: bool = typer.Option(
True,
"--load-knowledge/--no-load-knowledge",
help="Load the sample PDF into Qdrant before starting the agent.",
),
) -> None:
if load_knowledge:
knowledge.insert(
name="Recipes",
url=DOC_URL,
metadata={
"doc_type": "recipe_book",
"retrieval": "hybrid_native_sparse",
},
)

qdrant_agent(user=user)


if __name__ == "__main__":
typer.run(main)
77 changes: 61 additions & 16 deletions libs/agno/agno/vectordb/qdrant/qdrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,30 @@ async def async_name_exists(self, name: str) -> bool: # type: ignore[override]
return len(scroll_result[0]) > 0
return False

def _get_sparse_vector(self, text: str):
"""
Return a sparse vector for the given text.

Priority:
1. Native sparse embeddings from the configured embedder
2. FastEmbed sparse encoder fallback
"""
if not text:
return None

# Prefer native sparse embeddings from the configured embedder
if self.embedder is not None and hasattr(self.embedder, "get_sparse_embedding"):
sparse_vector = self.embedder.get_sparse_embedding(text)
if sparse_vector is not None:
return sparse_vector

# Fallback to FastEmbed sparse encoder, if available
if getattr(self, "sparse_encoder", None) is not None:
sparse_vectors = list(self.sparse_encoder.embed([text]))
if sparse_vectors:
return sparse_vectors[0]
return None

def insert(
self,
content_hash: str,
Expand Down Expand Up @@ -344,12 +368,13 @@ def insert(
if self.search_type in [SearchType.hybrid]:
document.embed(embedder=self.embedder)
vector[self.dense_vector_name] = document.embedding

if self.search_type in [SearchType.keyword, SearchType.hybrid]:
vector[self.sparse_vector_name] = next(
iter(self.sparse_encoder.embed([document.content]))
).as_object() # type: ignore

sparse_embedding = self._get_sparse_vector(document.content)
if sparse_embedding is None:
raise ValueError("Sparse embedding could not be generated for document insert.")
if hasattr(sparse_embedding, "as_object"):
sparse_embedding = sparse_embedding.as_object()
vector[self.sparse_vector_name] = sparse_embedding # type: ignore
# Create payload with document properties
payload = {
"name": document.name,
Expand Down Expand Up @@ -449,13 +474,16 @@ async def process_document(document):
else:
# For other search types, use named vectors
vector = {}
if self.search_type in [SearchType.hybrid]:
vector[self.dense_vector_name] = document.embedding # Already embedded above

if self.search_type in [SearchType.keyword, SearchType.hybrid]:
vector[self.sparse_vector_name] = next(
iter(self.sparse_encoder.embed([document.content]))
).as_object() # type: ignore
sparse_embedding = self._get_sparse_vector(document.content)

if sparse_embedding is None:
raise ValueError("Sparse embedding could not be generated for async document insert.")

if hasattr(sparse_embedding, "as_object"):
sparse_embedding = sparse_embedding.as_object()

vector[self.sparse_vector_name] = sparse_embedding # type: ignore

if self.search_type in [SearchType.keyword, SearchType.hybrid]:
vector[self.sparse_vector_name] = next(iter(self.sparse_encoder.embed([document.content]))).as_object()
Expand Down Expand Up @@ -567,12 +595,16 @@ def _run_hybrid_search_sync(
formatted_filters: Optional[models.Filter],
) -> List[models.ScoredPoint]:
dense_embedding = self.embedder.get_embedding(query)
sparse_embedding = next(iter(self.sparse_encoder.embed([query]))).as_object()
sparse_embedding = self._get_sparse_vector(query)
if sparse_embedding is None:
raise ValueError("Sparse embedding could not be generated for hybrid search.")
if hasattr(sparse_embedding, "as_object"):
sparse_embedding = sparse_embedding.as_object()
call = self.client.query_points(
collection_name=self.collection,
prefetch=[
models.Prefetch(
query=models.SparseVector(**sparse_embedding), # type: ignore # type: ignore
query=models.SparseVector(**sparse_embedding), # type: ignore
limit=limit,
using=self.sparse_vector_name,
),
Expand Down Expand Up @@ -623,7 +655,12 @@ def _run_keyword_search_sync(
limit: int,
formatted_filters: Optional[models.Filter],
) -> List[models.ScoredPoint]:
sparse_embedding = next(iter(self.sparse_encoder.embed([query]))).as_object()
sparse_embedding = self._get_sparse_vector(query)
if sparse_embedding is None:
raise ValueError("Sparse embedding could not be generated for keyword search.")

if hasattr(sparse_embedding, "as_object"):
sparse_embedding = sparse_embedding.as_object()
call = self.client.query_points(
collection_name=self.collection,
query=models.SparseVector(**sparse_embedding), # type: ignore
Expand Down Expand Up @@ -672,7 +709,11 @@ async def _run_keyword_search_async(
limit: int,
formatted_filters: Optional[models.Filter],
) -> List[models.ScoredPoint]:
sparse_embedding = next(iter(self.sparse_encoder.embed([query]))).as_object()
sparse_embedding = self._get_sparse_vector(query)
if sparse_embedding is None:
raise ValueError("Sparse embedding could not be generated for keyword search.")
if hasattr(sparse_embedding, "as_object"):
sparse_embedding = sparse_embedding.as_object()
call = await self.async_client.query_points(
collection_name=self.collection,
query=models.SparseVector(**sparse_embedding), # type: ignore
Expand All @@ -691,7 +732,11 @@ async def _run_hybrid_search_async(
formatted_filters: Optional[models.Filter],
) -> List[models.ScoredPoint]:
dense_embedding = await self.embedder.async_get_embedding(query)
sparse_embedding = next(iter(self.sparse_encoder.embed([query]))).as_object()
sparse_embedding = self._get_sparse_vector(query)
if sparse_embedding is None:
raise ValueError("Sparse embedding could not be generated for hybrid search.")
if hasattr(sparse_embedding, "as_object"):
sparse_embedding = sparse_embedding.as_object()
call = await self.async_client.query_points(
collection_name=self.collection,
prefetch=[
Expand Down
Loading