From 1ff6df4e98a2f2f986da20be7b04b2e49efc43b6 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Wed, 11 Jun 2025 09:59:45 -0400 Subject: [PATCH 1/7] Fix timeouts for async_req=True with grpc --- .github/workflows/testing-integration.yaml | 1 + pinecone/grpc/base.py | 5 +- pinecone/grpc/future.py | 17 +- pinecone/grpc/index_grpc.py | 23 ++- pinecone/grpc/pinecone.py | 1 + pinecone/grpc/utils.py | 28 +++- tests/integration/data/test_delete_future.py | 62 ------- tests/integration/data/test_fetch_future.py | 162 ------------------- tests/integration/data/test_upsert_future.py | 126 --------------- tests/integration/helpers/__init__.py | 2 + 10 files changed, 63 insertions(+), 364 deletions(-) delete mode 100644 tests/integration/data/test_delete_future.py delete mode 100644 tests/integration/data/test_fetch_future.py delete mode 100644 tests/integration/data/test_upsert_future.py diff --git a/.github/workflows/testing-integration.yaml b/.github/workflows/testing-integration.yaml index 189f5dd9..69d9e7b0 100644 --- a/.github/workflows/testing-integration.yaml +++ b/.github/workflows/testing-integration.yaml @@ -82,6 +82,7 @@ jobs: python_version: ${{ fromJson(inputs.python_versions_json) }} test_suite: - data + - data_grpc_futures steps: - uses: actions/checkout@v4 - name: Setup Poetry diff --git a/pinecone/grpc/base.py b/pinecone/grpc/base.py index 5c99305b..bc98544e 100644 --- a/pinecone/grpc/base.py +++ b/pinecone/grpc/base.py @@ -29,7 +29,10 @@ def __init__( _endpoint_override: Optional[str] = None, ): self.config = config - self.grpc_client_config = grpc_config or GRPCClientConfig() + # If grpc_config is passed, use it. Otherwise, build a new one with + # default values and passing in the ssl_verify value from the config. + self.grpc_client_config = grpc_config or GRPCClientConfig(secure=self.config.ssl_verify) + self.pool_threads = pool_threads self._endpoint_override = _endpoint_override diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index 14fe9cf0..a1ed9061 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -75,10 +75,19 @@ def _timeout(self, timeout: Optional[int] = None) -> int: return self._default_timeout def _wrap_rpc_exception(self, e): - if e._state and e._state.debug_error_string: - return PineconeException(e._state.debug_error_string) - else: - return PineconeException("Unknown GRPC error") + # The way the grpc package is using multiple inheritance makes + # it a little unclear whether it's safe to always assume that + # the e.code(), e.details(), and e.debug_error_string() methods + # exist. So, we try/catch to avoid errors. + try: + grpc_info = {"grpc_error_code": e.code().value[0], "grpc_message": e.details()} + + return PineconeException(f"GRPC error: {grpc_info}") + except Exception: + try: + return PineconeException(f"Unknown GRPC error: {e.debug_error_string()}") + except Exception: + return PineconeException(f"Unknown GRPC error: {e}") def __del__(self): self._grpc_future.cancel() diff --git a/pinecone/grpc/index_grpc.py b/pinecone/grpc/index_grpc.py index 6da16a07..dae3db37 100644 --- a/pinecone/grpc/index_grpc.py +++ b/pinecone/grpc/index_grpc.py @@ -12,6 +12,9 @@ parse_fetch_response, parse_query_response, parse_stats_response, + parse_upsert_response, + parse_update_response, + parse_delete_response, ) from .vector_factory_grpc import VectorFactoryGRPC from .sparse_values_factory import SparseValuesFactory @@ -145,7 +148,9 @@ def upsert( args_dict = self._parse_non_empty_args([("namespace", namespace)]) request = UpsertRequest(vectors=vectors, **args_dict, **kwargs) future = self.runner.run(self.stub.Upsert.future, request, timeout=timeout) - return PineconeGrpcFuture(future) + return PineconeGrpcFuture( + future, timeout=timeout, result_transformer=parse_upsert_response + ) if batch_size is None: return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs) @@ -297,7 +302,9 @@ def delete( request = DeleteRequest(**args_dict, **kwargs) if async_req: future = self.runner.run(self.stub.Delete.future, request, timeout=timeout) - return PineconeGrpcFuture(future) + return PineconeGrpcFuture( + future, timeout=timeout, result_transformer=parse_delete_response + ) else: return self.runner.run(self.stub.Delete, request, timeout=timeout) @@ -334,7 +341,9 @@ def fetch( if async_req: future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout) - return PineconeGrpcFuture(future, result_transformer=parse_fetch_response) + return PineconeGrpcFuture( + future, result_transformer=parse_fetch_response, timeout=timeout + ) else: response = self.runner.run(self.stub.Fetch, request, timeout=timeout) return parse_fetch_response(response) @@ -424,7 +433,9 @@ def query( if async_req: future = self.runner.run(self.stub.Query.future, request, timeout=timeout) - return PineconeGrpcFuture(future) + return PineconeGrpcFuture( + future, result_transformer=parse_query_response, timeout=timeout + ) else: response = self.runner.run(self.stub.Query, request, timeout=timeout) json_response = json_format.MessageToDict(response) @@ -535,7 +546,9 @@ def update( request = UpdateRequest(id=id, **args_dict) if async_req: future = self.runner.run(self.stub.Update.future, request, timeout=timeout) - return PineconeGrpcFuture(future) + return PineconeGrpcFuture( + future, timeout=timeout, result_transformer=parse_update_response + ) else: return self.runner.run(self.stub.Update, request, timeout=timeout) diff --git a/pinecone/grpc/pinecone.py b/pinecone/grpc/pinecone.py index 37202fb1..b03da15d 100644 --- a/pinecone/grpc/pinecone.py +++ b/pinecone/grpc/pinecone.py @@ -133,5 +133,6 @@ def Index(self, name: str = "", host: str = "", **kwargs): source_tag=self._config.source_tag, proxy_url=self._config.proxy_url, ssl_ca_certs=self._config.ssl_ca_certs, + ssl_verify=self._config.ssl_verify, ) return GRPCIndex(index_name=name, config=config, pool_threads=pt, **kwargs) diff --git a/pinecone/grpc/utils.py b/pinecone/grpc/utils.py index c2869e73..a42f42cc 100644 --- a/pinecone/grpc/utils.py +++ b/pinecone/grpc/utils.py @@ -11,6 +11,7 @@ SparseValues, QueryResponse, IndexDescription as DescribeIndexStatsResponse, + UpsertResponse, NamespaceSummary, ) from pinecone.db_data.dataclasses import FetchResponse @@ -63,9 +64,28 @@ def parse_usage(usage: dict): return Usage(read_units=int(usage.get("readUnits", 0))) -def parse_query_response(response: dict, _check_type: bool = False): +def parse_upsert_response(response: dict | Message, _check_type: bool = False): + json_response = json_format.MessageToDict(response) + upserted_count = json_response.get("upsertedCount", 0) + return UpsertResponse(upserted_count=int(upserted_count)) + + +def parse_update_response(response: dict | Message, _check_type: bool = False): + return {} + + +def parse_delete_response(response: dict | Message, _check_type: bool = False): + return {} + + +def parse_query_response(response: dict | Message, _check_type: bool = False): + if isinstance(response, Message): + json_response = json_format.MessageToDict(response) + else: + json_response = response + matches = [] - for item in response.get("matches", []): + for item in json_response.get("matches", []): sc = ScoredVector( id=item["id"], score=item.get("score", 0.0), @@ -80,11 +100,11 @@ def parse_query_response(response: dict, _check_type: bool = False): # creating empty `Usage` objects and then passing them into QueryResponse # when they are not actually present in the response from the server. args = { - "namespace": response.get("namespace", ""), + "namespace": json_response.get("namespace", ""), "matches": matches, "_check_type": _check_type, } - usage = response.get("usage") + usage = json_response.get("usage") if usage: args["usage"] = parse_usage(usage) return QueryResponse(**args) diff --git a/tests/integration/data/test_delete_future.py b/tests/integration/data/test_delete_future.py deleted file mode 100644 index 0680737a..00000000 --- a/tests/integration/data/test_delete_future.py +++ /dev/null @@ -1,62 +0,0 @@ -import os -import pytest -from pinecone import Vector -from ..helpers import poll_stats_for_namespace, random_string -import logging - -logger = logging.getLogger(__name__) - -if os.environ.get("USE_GRPC") == "true": - from pinecone.grpc import GRPCDeleteResponse - - -def seed_vectors(idx, namespace): - logger.info("Seeding vectors with ids [id1, id2, id3] to namespace '%s'", namespace) - idx.upsert( - vectors=[ - Vector(id="id1", values=[0.1, 0.2]), - Vector(id="id2", values=[0.1, 0.2]), - Vector(id="id3", values=[0.1, 0.2]), - ], - namespace=namespace, - ) - poll_stats_for_namespace(idx, namespace, 3) - - -class TestDeleteFuture: - @pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" - ) - def test_delete_future(self, idx): - namespace = random_string(10) - - seed_vectors(idx, namespace) - - delete_one = idx.delete(ids=["id1"], namespace=namespace, async_req=True) - delete_two = idx.delete(ids=["id2"], namespace=namespace, async_req=True) - - from concurrent.futures import as_completed - - for future in as_completed([delete_one, delete_two], timeout=10): - resp = future.result() - assert isinstance(resp, GRPCDeleteResponse) - - @pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" - ) - def test_delete_future_by_namespace(self, idx): - namespace = random_string(10) - - ns1 = f"{namespace}-1" - ns2 = f"{namespace}-2" - - seed_vectors(idx, ns1) - seed_vectors(idx, ns2) - - delete_ns1 = idx.delete(namespace=ns1, delete_all=True, async_req=True) - delete_ns2 = idx.delete(namespace=ns2, delete_all=True, async_req=True) - from concurrent.futures import as_completed - - for future in as_completed([delete_ns1, delete_ns2], timeout=10): - resp = future.result() - assert isinstance(resp, GRPCDeleteResponse) diff --git a/tests/integration/data/test_fetch_future.py b/tests/integration/data/test_fetch_future.py deleted file mode 100644 index e100f11a..00000000 --- a/tests/integration/data/test_fetch_future.py +++ /dev/null @@ -1,162 +0,0 @@ -import os -import pytest -from ..helpers import poll_fetch_for_ids_in_namespace, embedding_values, random_string -from pinecone import Vector -import logging - -logger = logging.getLogger(__name__) - -if os.environ.get("USE_GRPC") == "true": - from pinecone.grpc import PineconeGrpcFuture - - -@pytest.fixture(scope="session") -def fetch_namespace_future(): - return random_string(10) - - -def seed(idx, namespace): - # Upsert without metadata - logger.info("Seeding vectors without metadata to namespace '%s'", namespace) - idx.upsert( - vectors=[ - ("1", embedding_values(2)), - ("2", embedding_values(2)), - ("3", embedding_values(2)), - ], - namespace=namespace, - ) - - # Upsert with metadata - logger.info("Seeding vectors with metadata to namespace '%s'", namespace) - idx.upsert( - vectors=[ - Vector( - id="4", values=embedding_values(2), metadata={"genre": "action", "runtime": 120} - ), - Vector(id="5", values=embedding_values(2), metadata={"genre": "comedy", "runtime": 90}), - Vector( - id="6", values=embedding_values(2), metadata={"genre": "romance", "runtime": 240} - ), - ], - namespace=namespace, - ) - - # Upsert with dict - idx.upsert( - vectors=[ - {"id": "7", "values": embedding_values(2)}, - {"id": "8", "values": embedding_values(2)}, - {"id": "9", "values": embedding_values(2)}, - ], - namespace=namespace, - ) - - poll_fetch_for_ids_in_namespace( - idx, ids=["1", "2", "3", "4", "5", "6", "7", "8", "9"], namespace=namespace - ) - - -@pytest.mark.usefixtures("fetch_namespace_future") -@pytest.fixture(scope="class") -def seed_for_fetch(idx, fetch_namespace_future): - seed(idx, fetch_namespace_future) - seed(idx, "") - yield - - -@pytest.mark.usefixtures("seed_for_fetch") -@pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" -) -class TestFetchFuture: - def setup_method(self): - self.expected_dimension = 2 - - def test_fetch_multiple_by_id(self, idx, fetch_namespace_future): - target_namespace = fetch_namespace_future - - results = idx.fetch(ids=["1", "2", "4"], namespace=target_namespace, async_req=True) - assert isinstance(results, PineconeGrpcFuture) - - from concurrent.futures import wait, FIRST_COMPLETED - - done, _ = wait([results], return_when=FIRST_COMPLETED) - - results = done.pop().result() - assert results.usage is not None - assert results.usage["read_units"] is not None - assert results.usage["read_units"] > 0 - - assert results.namespace == target_namespace - assert len(results.vectors) == 3 - assert results.vectors["1"].id == "1" - assert results.vectors["2"].id == "2" - # Metadata included, if set - assert results.vectors["1"].metadata is None - assert results.vectors["2"].metadata is None - assert results.vectors["4"].metadata is not None - assert results.vectors["4"].metadata["genre"] == "action" - assert results.vectors["4"].metadata["runtime"] == 120 - # Values included - assert results.vectors["1"].values is not None - assert len(results.vectors["1"].values) == self.expected_dimension - - def test_fetch_single_by_id(self, idx, fetch_namespace_future): - target_namespace = fetch_namespace_future - - future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True) - - from concurrent.futures import wait, FIRST_COMPLETED - - done, _ = wait([future], return_when=FIRST_COMPLETED) - results = done.pop().result() - - assert results.namespace == target_namespace - assert len(results.vectors) == 1 - assert results.vectors["1"].id == "1" - assert results.vectors["1"].metadata is None - assert results.vectors["1"].values is not None - assert len(results.vectors["1"].values) == self.expected_dimension - - def test_fetch_nonexistent_id(self, idx, fetch_namespace_future): - target_namespace = fetch_namespace_future - - # Fetch id that is missing - future = idx.fetch(ids=["100"], namespace=target_namespace, async_req=True) - - from concurrent.futures import wait, FIRST_COMPLETED - - done, _ = wait([future], return_when=FIRST_COMPLETED) - results = done.pop().result() - - assert results.namespace == target_namespace - assert len(results.vectors) == 0 - - def test_fetch_nonexistent_namespace(self, idx): - target_namespace = "nonexistent-namespace" - - # Fetch from namespace with no vectors - future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True) - - from concurrent.futures import wait, FIRST_COMPLETED - - done, _ = wait([future], return_when=FIRST_COMPLETED) - results = done.pop().result() - - assert results.namespace == target_namespace - assert len(results.vectors) == 0 - - def test_fetch_unspecified_namespace(self, idx): - # Fetch without specifying namespace gives default namespace results - future = idx.fetch(ids=["1", "4"], async_req=True) - - from concurrent.futures import wait, FIRST_COMPLETED - - done, _ = wait([future], return_when=FIRST_COMPLETED) - results = done.pop().result() - - assert results.namespace == "" - assert results.vectors["1"].id == "1" - assert results.vectors["1"].values is not None - assert results.vectors["4"].metadata is not None diff --git a/tests/integration/data/test_upsert_future.py b/tests/integration/data/test_upsert_future.py deleted file mode 100644 index 7ca4b27e..00000000 --- a/tests/integration/data/test_upsert_future.py +++ /dev/null @@ -1,126 +0,0 @@ -import pytest -import os -from pinecone import Vector, PineconeException -from ..helpers import poll_stats_for_namespace, embedding_values, random_string - - -@pytest.fixture(scope="class") -def namespace_query_async(): - return random_string(10) - - -@pytest.mark.usefixtures("namespace_query_async") -class TestUpsertWithAsyncReq: - @pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" - ) - def test_upsert_to_namespace(self, idx, namespace_query_async): - target_namespace = namespace_query_async - - # Upsert with tuples - upsert1 = idx.upsert( - vectors=[ - ("1", embedding_values()), - ("2", embedding_values()), - ("3", embedding_values()), - ], - namespace=target_namespace, - async_req=True, - ) - - # Upsert with objects - upsert2 = idx.upsert( - vectors=[ - Vector(id="4", values=embedding_values()), - Vector(id="5", values=embedding_values()), - Vector(id="6", values=embedding_values()), - ], - namespace=target_namespace, - async_req=True, - ) - - # Upsert with dict - upsert3 = idx.upsert( - vectors=[ - {"id": "7", "values": embedding_values()}, - {"id": "8", "values": embedding_values()}, - {"id": "9", "values": embedding_values()}, - ], - namespace=target_namespace, - async_req=True, - ) - - poll_stats_for_namespace(idx, target_namespace, 9) - - # Check the vector count reflects some data has been upserted - stats = idx.describe_index_stats() - assert stats.total_vector_count >= 9 - assert stats.namespaces[target_namespace].vector_count == 9 - - # Use returned futures - from concurrent.futures import as_completed - - total_upserted = 0 - for future in as_completed([upsert1, upsert2, upsert3], timeout=10): - total_upserted += future.result().upserted_count - - assert total_upserted == 9 - - @pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" - ) - def test_upsert_to_namespace_when_failed_req(self, idx, namespace_query_async): - target_namespace = namespace_query_async - - # Upsert with tuples - upsert1 = idx.upsert( - vectors=[ - ("1", embedding_values()), - ("2", embedding_values()), - ("3", embedding_values()), - ], - namespace=target_namespace, - async_req=True, - ) - - # Upsert with objects - wrong_dimension = 10 - upsert2 = idx.upsert( - vectors=[ - Vector(id="4", values=embedding_values(wrong_dimension)), - Vector(id="5", values=embedding_values(wrong_dimension)), - Vector(id="6", values=embedding_values(wrong_dimension)), - ], - namespace=target_namespace, - async_req=True, - ) - - # Upsert with dict - upsert3 = idx.upsert( - vectors=[ - {"id": "7", "values": embedding_values()}, - {"id": "8", "values": embedding_values()}, - {"id": "9", "values": embedding_values()}, - ], - namespace=target_namespace, - async_req=True, - ) - - from concurrent.futures import wait, ALL_COMPLETED - - done, not_done = wait([upsert1, upsert2, upsert3], timeout=10, return_when=ALL_COMPLETED) - - assert len(done) == 3 - assert len(not_done) == 0 - - total_upserted = 0 - for future in done: - if future.exception(): - assert future is upsert2 - assert isinstance(future.exception(), PineconeException) - assert "Vector dimension 10 does not match the dimension of the index 2" in str( - future.exception() - ) - else: - total_upserted += future.result().upserted_count - assert total_upserted == 6 diff --git a/tests/integration/helpers/__init__.py b/tests/integration/helpers/__init__.py index 35b85873..cca1451d 100644 --- a/tests/integration/helpers/__init__.py +++ b/tests/integration/helpers/__init__.py @@ -13,6 +13,7 @@ delete_indexes_from_run, default_create_index_params, ) +from .names import generate_name __all__ = [ "fake_api_key", @@ -28,4 +29,5 @@ "delete_backups_from_run", "delete_indexes_from_run", "default_create_index_params", + "generate_name", ] From ce07dce4a922c0397a91f226656148c2faecdcac Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Wed, 11 Jun 2025 11:02:50 -0400 Subject: [PATCH 2/7] Add tests --- pinecone/grpc/base.py | 7 +- .../integration/data_grpc_futures/__init__.py | 0 .../integration/data_grpc_futures/conftest.py | 109 +++++ .../data_grpc_futures/stub_backend.py | 95 +++++ .../data_grpc_futures/test_delete_future.py | 66 +++ .../data_grpc_futures/test_fetch_future.py | 156 +++++++ .../data_grpc_futures/test_query_future.py | 350 ++++++++++++++++ .../data_grpc_futures/test_timeouts.py | 391 ++++++++++++++++++ .../data_grpc_futures/test_update_future.py | 0 .../data_grpc_futures/test_upsert_future.py | 119 ++++++ tests/integration/helpers/names.py | 14 + 11 files changed, 1306 insertions(+), 1 deletion(-) create mode 100644 tests/integration/data_grpc_futures/__init__.py create mode 100644 tests/integration/data_grpc_futures/conftest.py create mode 100644 tests/integration/data_grpc_futures/stub_backend.py create mode 100644 tests/integration/data_grpc_futures/test_delete_future.py create mode 100644 tests/integration/data_grpc_futures/test_fetch_future.py create mode 100644 tests/integration/data_grpc_futures/test_query_future.py create mode 100644 tests/integration/data_grpc_futures/test_timeouts.py create mode 100644 tests/integration/data_grpc_futures/test_update_future.py create mode 100644 tests/integration/data_grpc_futures/test_upsert_future.py create mode 100644 tests/integration/helpers/names.py diff --git a/pinecone/grpc/base.py b/pinecone/grpc/base.py index bc98544e..8582e8fe 100644 --- a/pinecone/grpc/base.py +++ b/pinecone/grpc/base.py @@ -31,7 +31,12 @@ def __init__( self.config = config # If grpc_config is passed, use it. Otherwise, build a new one with # default values and passing in the ssl_verify value from the config. - self.grpc_client_config = grpc_config or GRPCClientConfig(secure=self.config.ssl_verify) + if self.config.ssl_verify is None: + default_grpc_config = GRPCClientConfig() + else: + default_grpc_config = GRPCClientConfig(secure=self.config.ssl_verify) + + self.grpc_client_config = grpc_config or default_grpc_config self.pool_threads = pool_threads diff --git a/tests/integration/data_grpc_futures/__init__.py b/tests/integration/data_grpc_futures/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/data_grpc_futures/conftest.py b/tests/integration/data_grpc_futures/conftest.py new file mode 100644 index 00000000..13d2cc2f --- /dev/null +++ b/tests/integration/data_grpc_futures/conftest.py @@ -0,0 +1,109 @@ +import pytest +import json +import uuid +from ..helpers import get_environment_var, index_tags as index_tags_helper, generate_name +import logging +from pinecone import EmbedModel, CloudProvider, AwsRegion, IndexEmbed +from pinecone.grpc import PineconeGRPC + +logger = logging.getLogger(__name__) + +RUN_ID = str(uuid.uuid4()) + +created_indexes = [] + + +@pytest.fixture(scope="session") +def index_tags(request): + return index_tags_helper(request, RUN_ID) + + +@pytest.fixture(scope="session") +def pc(): + return PineconeGRPC() + + +@pytest.fixture(scope="session") +def spec(): + spec_json = get_environment_var( + "SPEC", '{"serverless": {"cloud": "aws", "region": "us-east-1" }}' + ) + return json.loads(spec_json) + + +@pytest.fixture(scope="session") +def model_idx(pc, index_tags, request): + model_index_name = generate_name(request.node.name, "embed") + if not pc.has_index(name=model_index_name): + logger.info(f"Creating index {model_index_name}") + pc.create_index_for_model( + name=model_index_name, + cloud=CloudProvider.AWS, + region=AwsRegion.US_WEST_2, + embed=IndexEmbed( + model=EmbedModel.Multilingual_E5_Large, + field_map={"text": "my_text_field"}, + metric="cosine", + ), + tags=index_tags, + ) + created_indexes.append(model_index_name) + else: + logger.info(f"Index {model_index_name} already exists") + + description = pc.describe_index(name=model_index_name) + return pc.Index(host=description.host) + + +def create_index(pc, create_args): + if not pc.has_index(name=create_args["name"]): + logger.info(f"Creating index {create_args['name']}") + pc.create_index(**create_args) + else: + logger.info(f"Index {create_args['name']} already exists") + + host = pc.describe_index(name=create_args["name"]).host + + return host + + +@pytest.fixture(scope="session") +def idx(pc, spec, index_tags, request): + index_name = generate_name(request.node.name, "dense") + logger.info(f"Request: {request.node}") + create_args = { + "name": index_name, + "dimension": 2, + "metric": "cosine", + "spec": spec, + "tags": index_tags, + } + host = create_index(pc, create_args) + logger.info(f"Using index {index_name} with host {host} as idx") + created_indexes.append(index_name) + return pc.Index(host=host) + + +@pytest.fixture(scope="session") +def sparse_idx(pc, spec, index_tags, request): + index_name = generate_name(request.node.name, "sparse") + create_args = { + "name": index_name, + "metric": "dotproduct", + "spec": spec, + "vector_type": "sparse", + "tags": index_tags, + } + host = create_index(pc, create_args) + created_indexes.append(index_name) + return pc.Index(host=host) + + +def pytest_sessionfinish(session, exitstatus): + for index in created_indexes: + try: + logger.info(f"Deleting index {index}") + pc = PineconeGRPC() + pc.delete_index(name=index, timeout=-1) + except Exception as e: + logger.error(f"Error deleting index {index}: {e}") diff --git a/tests/integration/data_grpc_futures/stub_backend.py b/tests/integration/data_grpc_futures/stub_backend.py new file mode 100644 index 00000000..d8983db9 --- /dev/null +++ b/tests/integration/data_grpc_futures/stub_backend.py @@ -0,0 +1,95 @@ +import time +import grpc +import logging +from concurrent import futures +import pinecone.core.grpc.protos.db_data_2025_01_pb2 as pb2 +import pinecone.core.grpc.protos.db_data_2025_01_pb2_grpc as pb2_grpc + +logger = logging.getLogger(__name__) + + +class TestVectorService(pb2_grpc.VectorServiceServicer): + def __init__(self, sleep_seconds=5): + self.sleep_seconds = sleep_seconds + + def Upsert(self, request, context): + # Simulate a delay that will cause a timeout + logger.info("Received an upsert request from test client") + logger.info(f"Request: {request}") + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning an upsert response from test server") + return pb2.UpsertResponse(upserted_count=1) + + def Query(self, request, context): + # Simulate a delay that will cause a timeout + logger.info("Received a query request from test client") + logger.info(f"Request: {request}") + + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning a query response from test server") + return pb2.QueryResponse( + results=[], + matches=[pb2.ScoredVector(id="1", score=1.0, values=[1.0, 2.0, 3.0])], + namespace="testnamespace", + usage=pb2.Usage(read_units=1), + ) + + def Update(self, request, context): + # Simulate a delay that will cause a timeout + logger.info("Received an update request from test client") + logger.info(f"Request: {request}") + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning an update response from test server") + return pb2.UpdateResponse() + + def Delete(self, request, context): + # Simulate a delay that will cause a timeout + logger.info("Received a delete request from test client") + logger.info(f"Request: {request}") + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning a delete response from test server") + return pb2.DeleteResponse() + + def Fetch(self, request, context): + logger.info("Received a fetch request from test client") + logger.info(f"Request: {request}") + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning a fetch response from test server") + return pb2.FetchResponse( + vectors={ + "1": pb2.Vector(id="1", values=[1.0, 2.0, 3.0]), + "2": pb2.Vector(id="2", values=[4.0, 5.0, 6.0]), + "3": pb2.Vector(id="3", values=[7.0, 8.0, 9.0]), + }, + namespace="testnamespace", + usage=pb2.Usage(read_units=1), + ) + + +def create_sleepy_test_server(port=50051, sleep_seconds=5): + """Creates and returns a configured gRPC server for testing. + + Args: + port (int): The port number to run the server on + sleep_seconds (int): The extra latency in seconds for simulated operations + + Returns: + grpc.Server: A configured and started gRPC server instance + """ + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + pb2_grpc.add_VectorServiceServicer_to_server( + TestVectorService(sleep_seconds=sleep_seconds), server + ) + server.add_insecure_port(f"[::]:{port}") + server.start() + return server diff --git a/tests/integration/data_grpc_futures/test_delete_future.py b/tests/integration/data_grpc_futures/test_delete_future.py new file mode 100644 index 00000000..2a0eb29d --- /dev/null +++ b/tests/integration/data_grpc_futures/test_delete_future.py @@ -0,0 +1,66 @@ +from pinecone import Vector +from ..helpers import poll_stats_for_namespace, random_string +import logging +import time + +logger = logging.getLogger(__name__) + + +def seed_vectors(idx, namespace): + logger.info("Seeding vectors with ids [id1, id2, id3] to namespace '%s'", namespace) + idx.upsert( + vectors=[ + Vector(id="id1", values=[0.1, 0.2]), + Vector(id="id2", values=[0.1, 0.2]), + Vector(id="id3", values=[0.1, 0.2]), + ], + namespace=namespace, + ) + poll_stats_for_namespace(idx, namespace, 3) + + +class TestDeleteFuture: + def test_delete_future(self, idx): + namespace = random_string(10) + + seed_vectors(idx, namespace) + + delete_one = idx.delete(ids=["id1"], namespace=namespace, async_req=True) + delete_two = idx.delete(ids=["id2"], namespace=namespace, async_req=True) + + from concurrent.futures import as_completed + + for future in as_completed([delete_one, delete_two], timeout=10): + resp = future.result() + assert resp == {} + + time.sleep(10) + + # Verify that the vectors are deleted + from concurrent.futures import wait, ALL_COMPLETED + + fetch_results = idx.fetch(ids=["id1", "id2"], namespace=namespace, async_req=True) + done, not_done = wait([fetch_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + results = fetch_results.result() + assert len(results.vectors) == 0 + + def test_delete_future_by_namespace(self, idx): + namespace = random_string(10) + + ns1 = f"{namespace}-1" + ns2 = f"{namespace}-2" + + seed_vectors(idx, ns1) + seed_vectors(idx, ns2) + + delete_ns1 = idx.delete(namespace=ns1, delete_all=True, async_req=True) + delete_ns2 = idx.delete(namespace=ns2, delete_all=True, async_req=True) + + from concurrent.futures import as_completed + + for future in as_completed([delete_ns1, delete_ns2], timeout=10): + resp = future.result() + assert resp == {} diff --git a/tests/integration/data_grpc_futures/test_fetch_future.py b/tests/integration/data_grpc_futures/test_fetch_future.py new file mode 100644 index 00000000..a503b64a --- /dev/null +++ b/tests/integration/data_grpc_futures/test_fetch_future.py @@ -0,0 +1,156 @@ +import pytest +from ..helpers import poll_fetch_for_ids_in_namespace, embedding_values, generate_name +from pinecone import Vector +import logging +from pinecone.grpc import PineconeGrpcFuture + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="session") +def fetch_namespace_future(): + return generate_name("TestFetchFuture", "fetch-namespace") + + +def seed(idx, namespace): + # Upsert without metadata + logger.info("Seeding vectors without metadata to namespace '%s'", namespace) + idx.upsert( + vectors=[ + ("1", embedding_values(2)), + ("2", embedding_values(2)), + ("3", embedding_values(2)), + ], + namespace=namespace, + ) + + # Upsert with metadata + logger.info("Seeding vectors with metadata to namespace '%s'", namespace) + idx.upsert( + vectors=[ + Vector( + id="4", values=embedding_values(2), metadata={"genre": "action", "runtime": 120} + ), + Vector(id="5", values=embedding_values(2), metadata={"genre": "comedy", "runtime": 90}), + Vector( + id="6", values=embedding_values(2), metadata={"genre": "romance", "runtime": 240} + ), + ], + namespace=namespace, + ) + + # Upsert with dict + idx.upsert( + vectors=[ + {"id": "7", "values": embedding_values(2)}, + {"id": "8", "values": embedding_values(2)}, + {"id": "9", "values": embedding_values(2)}, + ], + namespace=namespace, + ) + + poll_fetch_for_ids_in_namespace( + idx, ids=["1", "2", "3", "4", "5", "6", "7", "8", "9"], namespace=namespace + ) + + +@pytest.mark.usefixtures("fetch_namespace_future") +@pytest.fixture(scope="class") +def seed_for_fetch(idx, fetch_namespace_future): + seed(idx, fetch_namespace_future) + seed(idx, "") + yield + + +@pytest.mark.usefixtures("seed_for_fetch") +class TestFetchFuture: + def setup_method(self): + self.expected_dimension = 2 + + def test_fetch_multiple_by_id(self, idx, fetch_namespace_future): + target_namespace = fetch_namespace_future + + results = idx.fetch(ids=["1", "2", "4"], namespace=target_namespace, async_req=True) + assert isinstance(results, PineconeGrpcFuture) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([results], return_when=FIRST_COMPLETED) + + results = done.pop().result() + assert results.usage is not None + assert results.usage["read_units"] is not None + assert results.usage["read_units"] > 0 + + assert results.namespace == target_namespace + assert len(results.vectors) == 3 + assert results.vectors["1"].id == "1" + assert results.vectors["2"].id == "2" + # Metadata included, if set + assert results.vectors["1"].metadata is None + assert results.vectors["2"].metadata is None + assert results.vectors["4"].metadata is not None + assert results.vectors["4"].metadata["genre"] == "action" + assert results.vectors["4"].metadata["runtime"] == 120 + # Values included + assert results.vectors["1"].values is not None + assert len(results.vectors["1"].values) == self.expected_dimension + + def test_fetch_single_by_id(self, idx, fetch_namespace_future): + target_namespace = fetch_namespace_future + + future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([future], return_when=FIRST_COMPLETED) + results = done.pop().result() + + assert results.namespace == target_namespace + assert len(results.vectors) == 1 + assert results.vectors["1"].id == "1" + assert results.vectors["1"].metadata is None + assert results.vectors["1"].values is not None + assert len(results.vectors["1"].values) == self.expected_dimension + + def test_fetch_nonexistent_id(self, idx, fetch_namespace_future): + target_namespace = fetch_namespace_future + + # Fetch id that is missing + future = idx.fetch(ids=["100"], namespace=target_namespace, async_req=True) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([future], return_when=FIRST_COMPLETED) + results = done.pop().result() + + assert results.namespace == target_namespace + assert len(results.vectors) == 0 + + def test_fetch_nonexistent_namespace(self, idx): + target_namespace = "nonexistent-namespace" + + # Fetch from namespace with no vectors + future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([future], return_when=FIRST_COMPLETED) + results = done.pop().result() + + assert results.namespace == target_namespace + assert len(results.vectors) == 0 + + def test_fetch_unspecified_namespace(self, idx): + # Fetch without specifying namespace gives default namespace results + future = idx.fetch(ids=["1", "4"], async_req=True) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([future], return_when=FIRST_COMPLETED) + results = done.pop().result() + + assert results.namespace == "" + assert results.vectors["1"].id == "1" + assert results.vectors["1"].values is not None + assert results.vectors["4"].metadata is not None diff --git a/tests/integration/data_grpc_futures/test_query_future.py b/tests/integration/data_grpc_futures/test_query_future.py new file mode 100644 index 00000000..15a37356 --- /dev/null +++ b/tests/integration/data_grpc_futures/test_query_future.py @@ -0,0 +1,350 @@ +import pytest +from pinecone import QueryResponse, Vector +from ..helpers import embedding_values, poll_fetch_for_ids_in_namespace, generate_name +import logging +import time +from pinecone.grpc import GRPCIndex +from concurrent.futures import wait, ALL_COMPLETED + + +logger = logging.getLogger(__name__) + + +def find_by_id(matches, id): + with_id = [match for match in matches if match.id == id] + return with_id[0] if len(with_id) > 0 else None + + +@pytest.fixture(scope="session") +def query_namespace(): + return generate_name("query_namespace", "test") + + +def seed(idx, namespace): + # Upsert without metadata + logger.info(f"Seeding vectors without metadata into namespace '{namespace}'") + upsert1 = idx.upsert( + vectors=[ + ("1", embedding_values(2)), + ("2", embedding_values(2)), + ("3", embedding_values(2)), + ], + namespace=namespace, + async_req=True, + ) + + # Upsert with metadata + logger.info(f"Seeding vectors with metadata into namespace '{namespace}'") + upsert2 = idx.upsert( + vectors=[ + Vector( + id="4", values=embedding_values(2), metadata={"genre": "action", "runtime": 120} + ), + Vector(id="5", values=embedding_values(2), metadata={"genre": "comedy", "runtime": 90}), + Vector( + id="6", values=embedding_values(2), metadata={"genre": "romance", "runtime": 240} + ), + ], + namespace=namespace, + async_req=True, + ) + + # Upsert with dict + upsert3 = idx.upsert( + vectors=[ + {"id": "7", "values": embedding_values(2)}, + {"id": "8", "values": embedding_values(2)}, + {"id": "9", "values": embedding_values(2)}, + ], + namespace=namespace, + async_req=True, + ) + + wait([upsert1, upsert2, upsert3], timeout=10, return_when=ALL_COMPLETED) + + poll_fetch_for_ids_in_namespace( + idx, ids=["1", "2", "3", "4", "5", "6", "7", "8", "9"], namespace=namespace + ) + + +@pytest.fixture(scope="class") +def seed_for_query(idx, query_namespace): + seed(idx, query_namespace) + seed(idx, "") + time.sleep(30) + yield + + +@pytest.mark.usefixtures("seed_for_query") +@pytest.mark.parametrize("use_nondefault_namespace", [True, False]) +class TestQueryAsync: + def setup_method(self): + self.expected_dimension = 2 + + def test_query_by_id( + self, idx: GRPCIndex, query_namespace: str, use_nondefault_namespace: bool + ): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_future = idx.query(id="1", namespace=target_namespace, top_k=10, async_req=True) + + done, not_done = wait([query_future], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + query_result = query_future.result() + + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + assert query_result.usage is not None + assert query_result.usage["read_units"] is not None + assert query_result.usage["read_units"] > 0 + + # By default, does not include values or metadata + record_with_metadata = find_by_id(query_result.matches, "4") + assert record_with_metadata.metadata is None + assert record_with_metadata.values == [] + + def test_query_by_vector(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + vector=embedding_values(2), namespace=target_namespace, top_k=10, async_req=True + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + def test_query_by_vector_include_values(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + vector=embedding_values(2), + namespace=target_namespace, + include_values=True, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) > 0 + assert query_result.matches[0].values is not None + assert len(query_result.matches[0].values) == self.expected_dimension + + def test_query_by_vector_include_metadata(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + vector=embedding_values(2), + namespace=target_namespace, + include_metadata=True, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 3 + assert find_by_id(query_result.matches, "4").metadata["genre"] == "action" + + def test_query_by_vector_include_values_and_metadata( + self, idx, query_namespace, use_nondefault_namespace + ): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + vector=embedding_values(2), + namespace=target_namespace, + include_values=True, + include_metadata=True, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 3 + assert find_by_id(query_result.matches, "4").metadata["genre"] == "action" + assert len(query_result.matches[0].values) == self.expected_dimension + + +class TestQueryEdgeCasesAsync: + def test_query_in_empty_namespace(self, idx): + query_result = idx.query(id="1", namespace="empty", top_k=10, async_req=True).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == "empty" + assert len(query_result.matches) == 0 + + +@pytest.mark.usefixtures("seed_for_query") +@pytest.mark.parametrize("use_nondefault_namespace", [True, False]) +class TestQueryWithFilterAsync: + def test_query_by_id_with_filter(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + id="1", namespace=target_namespace, filter={"genre": "action"}, top_k=10, async_req=True + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 1 + assert query_result.matches[0].id == "4" + + def test_query_by_id_with_filter_gt(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"runtime": {"$gt": 100}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 2 + assert find_by_id(query_result.matches, "4") is not None + assert find_by_id(query_result.matches, "6") is not None + + def test_query_by_id_with_filter_gte(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"runtime": {"$gte": 90}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 3 + assert find_by_id(query_result.matches, "4") is not None + assert find_by_id(query_result.matches, "5") is not None + assert find_by_id(query_result.matches, "6") is not None + + def test_query_by_id_with_filter_lt(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"runtime": {"$lt": 100}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 1 + assert find_by_id(query_result.matches, "5") is not None + + def test_query_by_id_with_filter_lte(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"runtime": {"$lte": 120}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 2 + assert find_by_id(query_result.matches, "4") is not None + assert find_by_id(query_result.matches, "5") is not None + + def test_query_by_id_with_filter_in(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"genre": {"$in": ["romance"]}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 1 + assert find_by_id(query_result.matches, "6") is not None + + @pytest.mark.skip(reason="Seems like a bug in the server") + def test_query_by_id_with_filter_nin(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", namespace=target_namespace, filter={"genre": {"$nin": ["romance"]}}, top_k=10 + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 2 + assert find_by_id(query_result.matches, "4") is not None + assert find_by_id(query_result.matches, "5") is not None + + def test_query_by_id_with_filter_eq(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"genre": {"$eq": "action"}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 1 + assert find_by_id(query_result.matches, "4") is not None + + @pytest.mark.skip(reason="Seems like a bug in the server") + def test_query_by_id_with_filter_ne(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"genre": {"$ne": "action"}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 2 + assert find_by_id(query_result.matches, "5") is not None + assert find_by_id(query_result.matches, "6") is not None diff --git a/tests/integration/data_grpc_futures/test_timeouts.py b/tests/integration/data_grpc_futures/test_timeouts.py new file mode 100644 index 00000000..1e5ed4af --- /dev/null +++ b/tests/integration/data_grpc_futures/test_timeouts.py @@ -0,0 +1,391 @@ +import pytest +from pinecone import QueryResponse, UpsertResponse, FetchResponse, Vector, PineconeException +from ..helpers import embedding_values +from .stub_backend import create_sleepy_test_server +import logging +from pinecone.grpc import GRPCIndex, PineconeGRPC +from concurrent.futures import wait, ALL_COMPLETED + +logger = logging.getLogger(__name__) + +SERVER_SLEEP_SECONDS = 1 + + +@pytest.fixture(scope="session") +def grpc_server(): + logger.info("Starting gRPC test server") + server = create_sleepy_test_server(port=50051, sleep_seconds=SERVER_SLEEP_SECONDS) + yield server + logger.info("Stopping gRPC test server") + server.stop(0) + + +@pytest.fixture(scope="session") +def local_idx(): + pc = PineconeGRPC(api_key="test", ssl_verify=False) + idx = pc.Index(host="localhost:50051") + return idx + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_QueryByID: + def test_query_by_id_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + query_results = local_idx.query( + id="1", namespace="testnamespace", top_k=10, async_req=True, timeout=deadline + ) + + assert query_results._default_timeout == deadline + done, not_done = wait( + [query_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + query_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_query_by_id_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + query_results = local_idx.query( + id="1", namespace="testnamespace", top_k=10, async_req=True, timeout=deadline + ) + + assert query_results._default_timeout == deadline + done, not_done = wait( + [query_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = query_results.result() + assert result is not None + assert result.usage.read_units == 1 + + def test_query_by_id_with_default_timeout(self, local_idx: GRPCIndex): + query_results = local_idx.query(id="1", namespace="testnamespace", top_k=10, async_req=True) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert query_results._default_timeout == 5 + + done, not_done = wait([query_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = query_results.result() + assert result is not None + assert isinstance(result, QueryResponse) + assert result.usage.read_units == 1 + assert result.matches[0].id == "1" + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_QueryByVector: + def test_query_by_vector_with_timeout(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + query_results = local_idx.query( + vector=embedding_values(2), + namespace="testnamespace", + top_k=10, + async_req=True, + timeout=deadline, + ) + + assert query_results._default_timeout == deadline + done, not_done = wait( + [query_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + query_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_query_by_vector_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + query_results = local_idx.query( + vector=embedding_values(2), + namespace="testnamespace", + top_k=10, + async_req=True, + timeout=deadline, + ) + + assert query_results._default_timeout == deadline + done, not_done = wait( + [query_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = query_results.result() + assert result is not None + assert isinstance(result, QueryResponse) + assert result.usage.read_units == 1 + assert result.matches[0].id == "1" + + def test_query_by_vector_with_default_timeout(self, local_idx: GRPCIndex): + query_results = local_idx.query( + vector=embedding_values(2), namespace="testnamespace", top_k=10, async_req=True + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert query_results._default_timeout == 5 + + done, not_done = wait([query_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = query_results.result() + assert result is not None + assert result.usage.read_units == 1 + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_Upsert: + def test_upsert_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + upsert_results = local_idx.upsert( + vectors=[Vector(id="1", values=embedding_values(2), metadata={"genre": "action"})], + namespace="testnamespace", + async_req=True, + timeout=deadline, + ) + + assert upsert_results._default_timeout == deadline + done, not_done = wait( + [upsert_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + upsert_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_upsert_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + upsert_results = local_idx.upsert( + vectors=[Vector(id="1", values=embedding_values(2), metadata={"genre": "action"})], + namespace="testnamespace", + async_req=True, + timeout=deadline, + ) + + assert upsert_results._default_timeout == deadline + done, not_done = wait( + [upsert_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = upsert_results.result() + assert result is not None + assert isinstance(result, UpsertResponse) + assert result.upserted_count == 1 + + def test_upsert_with_default_timeout(self, local_idx: GRPCIndex): + upsert_results = local_idx.upsert( + vectors=[Vector(id="1", values=embedding_values(2), metadata={"genre": "action"})], + namespace="testnamespace", + async_req=True, + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert upsert_results._default_timeout == 5 + + done, not_done = wait([upsert_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = upsert_results.result() + assert result is not None + assert isinstance(result, UpsertResponse) + assert result.upserted_count == 1 + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_Update: + def test_update_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + update_results = local_idx.update( + id="1", + namespace="testnamespace", + values=embedding_values(2), + async_req=True, + timeout=deadline, + ) + + assert update_results._default_timeout == deadline + done, not_done = wait( + [update_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + update_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_update_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + update_results = local_idx.update( + id="1", + namespace="testnamespace", + values=embedding_values(2), + async_req=True, + timeout=deadline, + ) + + assert update_results._default_timeout == deadline + done, not_done = wait( + [update_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = update_results.result() + assert result is not None + assert result == {} + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_Delete: + def test_delete_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + delete_results = local_idx.delete( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True, timeout=deadline + ) + + assert delete_results._default_timeout == deadline + done, not_done = wait( + [delete_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + delete_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_delete_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + delete_results = local_idx.delete( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True, timeout=deadline + ) + + assert delete_results._default_timeout == deadline + done, not_done = wait( + [delete_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = delete_results.result() + assert result is not None + assert result == {} + + def test_delete_with_default_timeout(self, local_idx: GRPCIndex): + delete_results = local_idx.delete( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert delete_results._default_timeout == 5 + + done, not_done = wait([delete_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = delete_results.result() + assert result is not None + assert result == {} + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_Fetch: + def test_fetch_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + fetch_results = local_idx.fetch( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True, timeout=deadline + ) + + assert fetch_results._default_timeout == deadline + done, not_done = wait( + [fetch_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + fetch_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_fetch_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + fetch_results = local_idx.fetch( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True, timeout=deadline + ) + + assert fetch_results._default_timeout == deadline + done, not_done = wait( + [fetch_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = fetch_results.result() + assert result is not None + assert isinstance(result, FetchResponse) + + def test_fetch_with_default_timeout(self, local_idx: GRPCIndex): + fetch_results = local_idx.fetch( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert fetch_results._default_timeout == 5 + + done, not_done = wait([fetch_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = fetch_results.result() + assert result is not None + assert isinstance(result, FetchResponse) + + assert result.vectors["1"].id == "1" + assert result.vectors["2"].id == "2" + assert result.vectors["3"].id == "3" + assert result.usage.read_units == 1 + assert result.namespace == "testnamespace" diff --git a/tests/integration/data_grpc_futures/test_update_future.py b/tests/integration/data_grpc_futures/test_update_future.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/data_grpc_futures/test_upsert_future.py b/tests/integration/data_grpc_futures/test_upsert_future.py new file mode 100644 index 00000000..321c9cea --- /dev/null +++ b/tests/integration/data_grpc_futures/test_upsert_future.py @@ -0,0 +1,119 @@ +import pytest +from pinecone import Vector, PineconeException +from ..helpers import poll_stats_for_namespace, embedding_values, generate_name + + +@pytest.fixture(scope="class") +def namespace_query_async(request): + return generate_name(request.node.name, "upsert-namespace") + + +@pytest.mark.usefixtures("namespace_query_async") +class TestUpsertWithAsyncReq: + def test_upsert_to_namespace(self, idx, namespace_query_async): + target_namespace = namespace_query_async + + # Upsert with tuples + upsert1 = idx.upsert( + vectors=[ + ("1", embedding_values()), + ("2", embedding_values()), + ("3", embedding_values()), + ], + namespace=target_namespace, + async_req=True, + ) + + # Upsert with objects + upsert2 = idx.upsert( + vectors=[ + Vector(id="4", values=embedding_values()), + Vector(id="5", values=embedding_values()), + Vector(id="6", values=embedding_values()), + ], + namespace=target_namespace, + async_req=True, + ) + + # Upsert with dict + upsert3 = idx.upsert( + vectors=[ + {"id": "7", "values": embedding_values()}, + {"id": "8", "values": embedding_values()}, + {"id": "9", "values": embedding_values()}, + ], + namespace=target_namespace, + async_req=True, + ) + + poll_stats_for_namespace(idx, target_namespace, 9) + + # Check the vector count reflects some data has been upserted + stats = idx.describe_index_stats() + assert stats.total_vector_count >= 9 + assert stats.namespaces[target_namespace].vector_count == 9 + + # Use returned futures + from concurrent.futures import as_completed + + total_upserted = 0 + for future in as_completed([upsert1, upsert2, upsert3], timeout=10): + total_upserted += future.result().upserted_count + + assert total_upserted == 9 + + def test_upsert_to_namespace_when_failed_req(self, idx, namespace_query_async): + target_namespace = namespace_query_async + + # Upsert with tuples + upsert1 = idx.upsert( + vectors=[ + ("1", embedding_values()), + ("2", embedding_values()), + ("3", embedding_values()), + ], + namespace=target_namespace, + async_req=True, + ) + + # Upsert with objects + wrong_dimension = 10 + upsert2 = idx.upsert( + vectors=[ + Vector(id="4", values=embedding_values(wrong_dimension)), + Vector(id="5", values=embedding_values(wrong_dimension)), + Vector(id="6", values=embedding_values(wrong_dimension)), + ], + namespace=target_namespace, + async_req=True, + ) + + # Upsert with dict + upsert3 = idx.upsert( + vectors=[ + {"id": "7", "values": embedding_values()}, + {"id": "8", "values": embedding_values()}, + {"id": "9", "values": embedding_values()}, + ], + namespace=target_namespace, + async_req=True, + ) + + from concurrent.futures import wait, ALL_COMPLETED + + done, not_done = wait([upsert1, upsert2, upsert3], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 3 + assert len(not_done) == 0 + + total_upserted = 0 + for future in done: + if future.exception(): + assert future is upsert2 + assert isinstance(future.exception(), PineconeException) + assert "Vector dimension 10 does not match the dimension of the index 2" in str( + future.exception() + ) + else: + total_upserted += future.result().upserted_count + assert total_upserted == 6 diff --git a/tests/integration/helpers/names.py b/tests/integration/helpers/names.py new file mode 100644 index 00000000..8d24d8e3 --- /dev/null +++ b/tests/integration/helpers/names.py @@ -0,0 +1,14 @@ +import hashlib + + +def generate_name(test_name: str, label: str, max_length: int = 20) -> str: + """ + The goal of this function is to produce names that are unique across the + test suite but deterministic when the test is run multiple times, when ordering + changes, different subsets of tests are run, etc. + + To accomodate this, we hash the test name and label. We truncate the hexdigest + since the full length of 64 characters exceeds the allowed length of some fields + in the API. For example, index names must be 45 characters or less. + """ + return hashlib.sha256(f"{test_name}-{label}".encode()).hexdigest()[:max_length] From 38d3d8fec5f9b66e6f244820208fc32599e7406a Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Wed, 11 Jun 2025 11:16:03 -0400 Subject: [PATCH 3/7] Fix types --- pinecone/grpc/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinecone/grpc/utils.py b/pinecone/grpc/utils.py index a42f42cc..673479fb 100644 --- a/pinecone/grpc/utils.py +++ b/pinecone/grpc/utils.py @@ -64,7 +64,7 @@ def parse_usage(usage: dict): return Usage(read_units=int(usage.get("readUnits", 0))) -def parse_upsert_response(response: dict | Message, _check_type: bool = False): +def parse_upsert_response(response: Message, _check_type: bool = False): json_response = json_format.MessageToDict(response) upserted_count = json_response.get("upsertedCount", 0) return UpsertResponse(upserted_count=int(upserted_count)) From 07fdd9badf4ad406833bc44fe941da7a77567a2a Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Wed, 11 Jun 2025 11:25:55 -0400 Subject: [PATCH 4/7] Adjust unit test --- tests/unit_grpc/test_futures.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit_grpc/test_futures.py b/tests/unit_grpc/test_futures.py index 43960b00..5393f0af 100644 --- a/tests/unit_grpc/test_futures.py +++ b/tests/unit_grpc/test_futures.py @@ -25,6 +25,9 @@ def __init__(self, mocker): self._state = mocker.Mock() self._state.debug_error_string = "Test gRPC error" + def debug_error_string(self): + return self._state.debug_error_string + class TestPineconeGrpcFuture: def test_wraps_grpc_future_already_done(self, mocker): From 1cd842c564a9d883c97cffc72778a8013ce673be Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Wed, 11 Jun 2025 13:20:56 -0400 Subject: [PATCH 5/7] Fix type issues for py3.9 --- pinecone/grpc/utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pinecone/grpc/utils.py b/pinecone/grpc/utils.py index 673479fb..92f2ce15 100644 --- a/pinecone/grpc/utils.py +++ b/pinecone/grpc/utils.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Union from google.protobuf import json_format from google.protobuf.message import Message @@ -70,15 +70,15 @@ def parse_upsert_response(response: Message, _check_type: bool = False): return UpsertResponse(upserted_count=int(upserted_count)) -def parse_update_response(response: dict | Message, _check_type: bool = False): +def parse_update_response(response: Union[dict, Message], _check_type: bool = False): return {} -def parse_delete_response(response: dict | Message, _check_type: bool = False): +def parse_delete_response(response: Union[dict, Message], _check_type: bool = False): return {} -def parse_query_response(response: dict | Message, _check_type: bool = False): +def parse_query_response(response: Union[dict, Message], _check_type: bool = False): if isinstance(response, Message): json_response = json_format.MessageToDict(response) else: From f1f2ee9b2f807c5d77c0161971cc60fe2004a011 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Thu, 12 Jun 2025 11:18:11 -0400 Subject: [PATCH 6/7] Add missing test --- .../data_grpc_futures/test_timeouts.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/tests/integration/data_grpc_futures/test_timeouts.py b/tests/integration/data_grpc_futures/test_timeouts.py index 1e5ed4af..6a329a5e 100644 --- a/tests/integration/data_grpc_futures/test_timeouts.py +++ b/tests/integration/data_grpc_futures/test_timeouts.py @@ -86,7 +86,7 @@ def test_query_by_id_with_default_timeout(self, local_idx: GRPCIndex): @pytest.mark.usefixtures("grpc_server") class TestGrpcAsyncTimeouts_QueryByVector: - def test_query_by_vector_with_timeout(self, local_idx: GRPCIndex): + def test_query_by_vector_with_timeout_exceeded(self, local_idx: GRPCIndex): deadline = SERVER_SLEEP_SECONDS - 0.5 query_results = local_idx.query( @@ -246,6 +246,23 @@ def test_update_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): assert "Deadline Exceeded" in str(e.value) + def test_update_with_default_timeout(self, local_idx: GRPCIndex): + update_results = local_idx.update( + id="1", namespace="testnamespace", values=embedding_values(2), async_req=True + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert update_results._default_timeout == 5 + + done, not_done = wait([update_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = update_results.result() + assert result is not None + assert result == {} + def test_update_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): deadline = SERVER_SLEEP_SECONDS + 1 update_results = local_idx.update( From a83b7cf0f1eab5a8f70bee181de1e976ba299256 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Thu, 12 Jun 2025 13:37:28 -0400 Subject: [PATCH 7/7] Enable tests that were previously skipped --- .../data_grpc_futures/test_query_future.py | 53 +++++++++++++++---- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/tests/integration/data_grpc_futures/test_query_future.py b/tests/integration/data_grpc_futures/test_query_future.py index 15a37356..3e4a1c56 100644 --- a/tests/integration/data_grpc_futures/test_query_future.py +++ b/tests/integration/data_grpc_futures/test_query_future.py @@ -295,7 +295,6 @@ def test_query_by_id_with_filter_in(self, idx, query_namespace, use_nondefault_n assert len(query_result.matches) == 1 assert find_by_id(query_result.matches, "6") is not None - @pytest.mark.skip(reason="Seems like a bug in the server") def test_query_by_id_with_filter_nin(self, idx, query_namespace, use_nondefault_namespace): target_namespace = query_namespace if use_nondefault_namespace else "" @@ -303,13 +302,25 @@ def test_query_by_id_with_filter_nin(self, idx, query_namespace, use_nondefault_ # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) query_result = idx.query( - id="1", namespace=target_namespace, filter={"genre": {"$nin": ["romance"]}}, top_k=10 + id="1", + namespace=target_namespace, + filter={"genre": {"$nin": ["romance"]}}, + include_metadata=True, + top_k=10, + async_req=True, ).result() + assert isinstance(query_result, QueryResponse) == True assert query_result.namespace == target_namespace - assert len(query_result.matches) == 2 - assert find_by_id(query_result.matches, "4") is not None - assert find_by_id(query_result.matches, "5") is not None + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 2 + for match in matches_with_metadata: + assert match.metadata["genre"] != "romance" def test_query_by_id_with_filter_eq(self, idx, query_namespace, use_nondefault_namespace): target_namespace = query_namespace if use_nondefault_namespace else "" @@ -321,15 +332,25 @@ def test_query_by_id_with_filter_eq(self, idx, query_namespace, use_nondefault_n id="1", namespace=target_namespace, filter={"genre": {"$eq": "action"}}, + include_metadata=True, top_k=10, async_req=True, ).result() + assert isinstance(query_result, QueryResponse) == True assert query_result.namespace == target_namespace - assert len(query_result.matches) == 1 - assert find_by_id(query_result.matches, "4") is not None + for match in query_result.matches: + logger.info(f"Match: id: {match.id} metadata: {match.metadata}") + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 1 + for match in matches_with_metadata: + assert match.metadata["genre"] == "action" - @pytest.mark.skip(reason="Seems like a bug in the server") def test_query_by_id_with_filter_ne(self, idx, query_namespace, use_nondefault_namespace): target_namespace = query_namespace if use_nondefault_namespace else "" @@ -340,11 +361,21 @@ def test_query_by_id_with_filter_ne(self, idx, query_namespace, use_nondefault_n id="1", namespace=target_namespace, filter={"genre": {"$ne": "action"}}, + include_metadata=True, top_k=10, async_req=True, ).result() + for match in query_result.matches: + logger.info(f"Match: id: {match.id} metadata: {match.metadata}") assert isinstance(query_result, QueryResponse) == True assert query_result.namespace == target_namespace - assert len(query_result.matches) == 2 - assert find_by_id(query_result.matches, "5") is not None - assert find_by_id(query_result.matches, "6") is not None + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 2 + for match in matches_with_metadata: + assert match.metadata["genre"] != "action" + assert match.id != "4"