Skip to content

Commit

Permalink
polished sample, added distance function direction helper
Browse files Browse the repository at this point in the history
  • Loading branch information
eavanvalkenburg committed Feb 20, 2025
1 parent d5b8389 commit d07c6bc
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 108 deletions.
51 changes: 34 additions & 17 deletions python/samples/concepts/memory/new_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import numpy as np

from samples.concepts.resources.utils import Colors
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import (
AzureTextEmbedding,
Expand Down Expand Up @@ -41,6 +42,7 @@
VectorTextSearchMixin,
vectorstoremodel,
)
from semantic_kernel.data.const import DISTANCE_FUNCTION_DIRECTION_HELPER


def get_data_model_array(index_kind: IndexKind, distance_function: DistanceFunction) -> type:
Expand Down Expand Up @@ -106,9 +108,10 @@ class DataModelList:


collection_name = "test"
distance_function = DistanceFunction.COSINE_SIMILARITY
# Depending on the vector database, the index kind and distance function may need to be adjusted,
# since not all combinations are supported by all databases.
DataModel = get_data_model_array(IndexKind.IVF_FLAT, DistanceFunction.COSINE_SIMILARITY)
DataModel = get_data_model_array(IndexKind.IVF_FLAT, distance_function)

# A list of VectorStoreRecordCollection that can be used.
# Available collections are:
Expand Down Expand Up @@ -177,12 +180,13 @@ class DataModelList:
def print_record(result: VectorSearchResult | None = None, record: DataModel | None = None):
if result:
record = result.record
print(f" Found id: {record.id}")
print(f" Content: {record.content}")
if record.vector is not None:
print(f" Vector (first five): {record.vector[:5]}")
print(Colors.CGREEN + f" Found id: {record.id}" + Colors.CEND)
if result and result.score is not None:
print(f" Score: {result.score}")
print(Colors.CGREEN + f" Score: {result.score}" + Colors.CEND)
print(Colors.CWHITE + f" Content: {record.content}")
print(f" Tag: {record.tag}" + Colors.CEND)
if record.vector is not None:
print(Colors.CWHITE + f" Vector (first five): {record.vector[:5]}" + Colors.CEND)


async def main(collection: str, use_azure_openai: bool, embedding_model: str):
Expand All @@ -195,7 +199,7 @@ async def main(collection: str, use_azure_openai: bool, embedding_model: str):
embedder = OpenAITextEmbedding(service_id=service_id, ai_model_id=embedding_model)
kernel.add_service(embedder)
async with collections[collection]() as record_collection:
print(f"Creating {collection} collection!")
print(Colors.CGREY + f"Creating {collection} collection!" + Colors.CEND)
await record_collection.delete_collection()
await record_collection.create_collection_if_not_exists()

Expand All @@ -211,16 +215,22 @@ async def main(collection: str, use_azure_openai: bool, embedding_model: str):
title="Semantic Kernel Languages",
tag="general",
)
record3 = DataModel(
content="```python\nfrom semantic_kernel import Kernel\nkernel = Kernel()\n```",
id="d5c9913a-e015-4944-b960-5d4a84bca002",
title="Code sample",
tag="code",
)

print("Adding records!")
print(Colors.CBLUE + "Adding records!" + Colors.CEND)
records = await VectorStoreRecordUtils(kernel).add_vector_to_records(
[record1, record2], data_model_type=DataModel
[record1, record2, record3], data_model_type=DataModel
)

keys = await record_collection.upsert_batch(records)
print(f" Upserted {keys=}")
print("Getting records!")
results = await record_collection.get_batch([record1.id, record2.id])
print(Colors.CBLUE + "Getting records!" + Colors.CEND)
results = await record_collection.get_batch([record1.id, record2.id, record3.id])
if results:
[print_record(record=result) for result in results]
else:
Expand All @@ -230,9 +240,11 @@ async def main(collection: str, use_azure_openai: bool, embedding_model: str):
include_vectors=True,
filter=VectorSearchFilter.equal_to("tag", "general"),
)
print("-" * 30)
print(Colors.CBLUE + "Searching for 'python', with filter 'tag=general'" + Colors.CEND)
if isinstance(record_collection, VectorTextSearchMixin):
print("-" * 30)
print("Using text search")
print(Colors.CBLUE + "Using text search" + Colors.CEND)
try:
search_results = await record_collection.text_search("python", options)
if search_results.total_count == 0:
Expand All @@ -244,8 +256,9 @@ async def main(collection: str, use_azure_openai: bool, embedding_model: str):
if isinstance(record_collection, VectorizedSearchMixin):
print("-" * 30)
print(
"Using vectorized search, depending on the distance function, "
"the better score might be higher or lower."
Colors.CBLUE + f"Using vectorized search, for {distance_function.value}, "
f"the {'higher' if DISTANCE_FUNCTION_DIRECTION_HELPER[distance_function](1, 0) else 'lower'} the score the better" # noqa: E501
f"" + Colors.CEND
)
try:
search_results = await record_collection.vectorized_search(
Expand All @@ -260,7 +273,11 @@ async def main(collection: str, use_azure_openai: bool, embedding_model: str):
print("Vectorized search could not execute.")
if isinstance(record_collection, VectorizableTextSearchMixin):
print("-" * 30)
print("Using vectorizable text search")
print(
Colors.CBLUE + f"Using vectorized search, for {distance_function.value}, "
f"the {'higher' if DISTANCE_FUNCTION_DIRECTION_HELPER[distance_function](1, 0) else 'lower'} the score the better" # noqa: E501
+ Colors.CEND
)
try:
search_results = await record_collection.vectorizable_text_search("python", options)
if search_results.total_count == 0:
Expand All @@ -270,9 +287,9 @@ async def main(collection: str, use_azure_openai: bool, embedding_model: str):
except Exception:
print("Vectorizable text search could not execute.")
print("-" * 30)
print("Deleting collection!")
print(Colors.CBLUE + "Deleting collection!" + Colors.CEND)
await record_collection.delete_collection()
print("Done!")
print(Colors.CGREY + "Done!" + Colors.CEND)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,7 @@ async def create_collection(self, **kwargs) -> None:
for more information.
Other kwargs are passed to the create_collection method.
"""
collection = await self._get_database().create_collection(self.collection_name, **kwargs)
await collection.create_index({
field.name: "text"
for field in self.data_model_definition.fields.values()
if isinstance(field, VectorStoreRecordDataField) and (field.is_filterable or field.is_full_text_searchable)
})
await self._get_database().create_collection(self.collection_name, **kwargs)
await self._get_database().command(command=self._get_vector_index(**kwargs))

def _get_vector_index(self, **kwargs: Any) -> dict[str, Any]:
Expand Down Expand Up @@ -219,37 +214,6 @@ async def _inner_vectorized_search(
total_count=None, # no way to get a count before looping through the result cursor
)

async def _inner_text_search(
self,
options: VectorSearchOptions,
search_text: str,
**kwargs: Any,
) -> KernelSearchResults[VectorSearchResult[TModel]]:
search_query = {"$text": {"$search": search_text}}
if options.filter.filters:
search_query = {
"$and": [
search_query,
self._build_filter_dict(options.filter),
]
}
projection_query: dict[str, int | dict] = {
field: 1
for field in self.data_model_definition.get_field_names(
include_vector_fields=options.include_vectors,
include_key_field=False, # _id is always included
)
}
projection_query[MONGODB_SCORE_FIELD] = {"$meta": "textScore"}
return KernelSearchResults(
results=self._get_vector_search_results_from_cursor(
filter=search_query,
projection=projection_query,
options=options,
),
total_count=None, # no way to get a count before looping through the result cursor
)

async def _get_vector_search_results_from_cursor(
self,
filter: dict[str, Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@
DistanceFunction.MANHATTAN: cityblock,
DistanceFunction.HAMMING: hamming,
DistanceFunction.DOT_PROD: dot,
"default": cosine,
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from collections.abc import AsyncIterable, Callable, Mapping, Sequence
from typing import Any, ClassVar, TypeVar

from pydantic import Field

if sys.version_info >= (3, 12):
from typing import override # pragma: no cover
else:
from typing_extensions import override # pragma: no cover

from pydantic import Field

from semantic_kernel.connectors.memory.in_memory.const import DISTANCE_FUNCTION_MAP
from semantic_kernel.data.const import DistanceFunction
from semantic_kernel.data.const import DISTANCE_FUNCTION_DIRECTION_HELPER, DistanceFunction
from semantic_kernel.data.filter_clauses.any_tags_equal_to_filter_clause import AnyTagsEqualTo
from semantic_kernel.data.filter_clauses.equal_to_filter_clause import EqualTo
from semantic_kernel.data.filter_clauses.filter_clause_base import FilterClauseBase
Expand Down Expand Up @@ -149,7 +149,10 @@ async def _inner_search_vectorized(
raise ValueError("Vector field name must be provided in options for vector search.")
field = options.vector_field_name
assert isinstance(self.data_model_definition.fields.get(field), VectorStoreRecordVectorField) # nosec
distance_metric = self.data_model_definition.fields.get(field).distance_function or "default" # type: ignore
distance_metric = (
self.data_model_definition.fields.get(field).distance_function # type: ignore
or DistanceFunction.COSINE_DISTANCE
)
distance_func = DISTANCE_FUNCTION_MAP[distance_metric]

for key, record in self._get_filtered_records(options).items():
Expand All @@ -160,10 +163,13 @@ async def _inner_search_vectorized(
distance_func,
invert_score=distance_metric == DistanceFunction.COSINE_SIMILARITY,
)
if distance_metric in [DistanceFunction.COSINE_SIMILARITY, DistanceFunction.DOT_PROD]:
sorted_records = dict(sorted(return_records.items(), key=lambda item: item[1], reverse=True))
else:
sorted_records = dict(sorted(return_records.items(), key=lambda item: item[1]))
sorted_records = dict(
sorted(
return_records.items(),
key=lambda item: item[1],
reverse=DISTANCE_FUNCTION_DIRECTION_HELPER[distance_metric](1, 0),
)
)
if sorted_records:
return KernelSearchResults(
results=self._get_vector_search_results_from_results(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from importlib import metadata
from typing import Any, ClassVar, Generic, TypeVar

from semantic_kernel.data.vector_search.vector_text_search import VectorTextSearchMixin

if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
else:
Expand Down Expand Up @@ -59,7 +57,6 @@
class MongoDBAtlasCollection(
VectorSearchBase[str, TModel],
VectorizedSearchMixin[TModel],
VectorTextSearchMixin[TModel],
Generic[TModel],
):
"""MongoDB Atlas collection implementation."""
Expand Down Expand Up @@ -254,48 +251,7 @@ async def _inner_search(
) -> KernelSearchResults[VectorSearchResult[TModel]]:
if vector is not None:
return await self._inner_vectorized_search(options, vector, **kwargs)
if search_text is not None:
return await self._inner_text_search(options, search_text, **kwargs)
raise VectorStoreOperationException("Vector or text is required for search.")

async def _inner_text_search(
self,
options: VectorSearchOptions,
search_text: str,
**kwargs: Any,
) -> KernelSearchResults[VectorSearchResult[TModel]]:
collection = self._get_collection()
search_query: dict[str, Any] = {
"limit": options.top + options.skip,
"query": search_text,
"path": [
field.name
for field in self.data_model_definition.fields.values()
if isinstance(field, VectorStoreRecordDataField) and field.is_full_text_searchable
],
}
if options.filter.filters:
search_query["filter"] = self._build_filter_dict(options.filter)

projection_query: dict[str, int | dict] = {
field: 1
for field in self.data_model_definition.get_field_names(
include_vector_fields=options.include_vectors,
include_key_field=False, # _id is always included
)
}
projection_query[MONGODB_SCORE_FIELD] = {"$meta": "searchScore"}
try:
raw_results = await collection.aggregate([
{"$search": {"text": search_query}},
{"$project": projection_query},
])
except Exception as exc:
raise VectorSearchExecutionException("Failed to search the collection.") from exc
return KernelSearchResults(
results=self._get_vector_search_results_from_results(raw_results, options),
total_count=None, # no way to get a count before looping through the result cursor
)
raise VectorStoreOperationException("Vector is required for search.")

async def _inner_vectorized_search(
self,
Expand Down
13 changes: 13 additions & 0 deletions python/semantic_kernel/data/const.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.

import operator
from collections.abc import Callable
from enum import Enum
from typing import Final

Expand Down Expand Up @@ -91,3 +93,14 @@ class DistanceFunction(str, Enum):
EUCLIDEAN_SQUARED_DISTANCE = "euclidean_squared_distance"
MANHATTAN = "manhattan"
HAMMING = "hamming"


DISTANCE_FUNCTION_DIRECTION_HELPER: Final[dict[DistanceFunction, Callable[[int | float, int | float], bool]]] = {
DistanceFunction.COSINE_SIMILARITY: operator.gt,
DistanceFunction.COSINE_DISTANCE: operator.le,
DistanceFunction.DOT_PROD: operator.gt,
DistanceFunction.EUCLIDEAN_DISTANCE: operator.le,
DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE: operator.le,
DistanceFunction.MANHATTAN: operator.le,
DistanceFunction.HAMMING: operator.le,
}

0 comments on commit d07c6bc

Please sign in to comment.