diff --git a/.github/workflows/testing-integration.yaml b/.github/workflows/testing-integration.yaml index 189f5dd9..69d9e7b0 100644 --- a/.github/workflows/testing-integration.yaml +++ b/.github/workflows/testing-integration.yaml @@ -82,6 +82,7 @@ jobs: python_version: ${{ fromJson(inputs.python_versions_json) }} test_suite: - data + - data_grpc_futures steps: - uses: actions/checkout@v4 - name: Setup Poetry diff --git a/pinecone/grpc/base.py b/pinecone/grpc/base.py index 5c99305b..8582e8fe 100644 --- a/pinecone/grpc/base.py +++ b/pinecone/grpc/base.py @@ -29,7 +29,15 @@ def __init__( _endpoint_override: Optional[str] = None, ): self.config = config - self.grpc_client_config = grpc_config or GRPCClientConfig() + # If grpc_config is passed, use it. Otherwise, build a new one with + # default values and passing in the ssl_verify value from the config. + if self.config.ssl_verify is None: + default_grpc_config = GRPCClientConfig() + else: + default_grpc_config = GRPCClientConfig(secure=self.config.ssl_verify) + + self.grpc_client_config = grpc_config or default_grpc_config + self.pool_threads = pool_threads self._endpoint_override = _endpoint_override diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index 14fe9cf0..a1ed9061 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -75,10 +75,19 @@ def _timeout(self, timeout: Optional[int] = None) -> int: return self._default_timeout def _wrap_rpc_exception(self, e): - if e._state and e._state.debug_error_string: - return PineconeException(e._state.debug_error_string) - else: - return PineconeException("Unknown GRPC error") + # The way the grpc package is using multiple inheritance makes + # it a little unclear whether it's safe to always assume that + # the e.code(), e.details(), and e.debug_error_string() methods + # exist. So, we try/catch to avoid errors. + try: + grpc_info = {"grpc_error_code": e.code().value[0], "grpc_message": e.details()} + + return PineconeException(f"GRPC error: {grpc_info}") + except Exception: + try: + return PineconeException(f"Unknown GRPC error: {e.debug_error_string()}") + except Exception: + return PineconeException(f"Unknown GRPC error: {e}") def __del__(self): self._grpc_future.cancel() diff --git a/pinecone/grpc/index_grpc.py b/pinecone/grpc/index_grpc.py index 6da16a07..dae3db37 100644 --- a/pinecone/grpc/index_grpc.py +++ b/pinecone/grpc/index_grpc.py @@ -12,6 +12,9 @@ parse_fetch_response, parse_query_response, parse_stats_response, + parse_upsert_response, + parse_update_response, + parse_delete_response, ) from .vector_factory_grpc import VectorFactoryGRPC from .sparse_values_factory import SparseValuesFactory @@ -145,7 +148,9 @@ def upsert( args_dict = self._parse_non_empty_args([("namespace", namespace)]) request = UpsertRequest(vectors=vectors, **args_dict, **kwargs) future = self.runner.run(self.stub.Upsert.future, request, timeout=timeout) - return PineconeGrpcFuture(future) + return PineconeGrpcFuture( + future, timeout=timeout, result_transformer=parse_upsert_response + ) if batch_size is None: return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs) @@ -297,7 +302,9 @@ def delete( request = DeleteRequest(**args_dict, **kwargs) if async_req: future = self.runner.run(self.stub.Delete.future, request, timeout=timeout) - return PineconeGrpcFuture(future) + return PineconeGrpcFuture( + future, timeout=timeout, result_transformer=parse_delete_response + ) else: return self.runner.run(self.stub.Delete, request, timeout=timeout) @@ -334,7 +341,9 @@ def fetch( if async_req: future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout) - return PineconeGrpcFuture(future, result_transformer=parse_fetch_response) + return PineconeGrpcFuture( + future, result_transformer=parse_fetch_response, timeout=timeout + ) else: response = self.runner.run(self.stub.Fetch, request, timeout=timeout) return parse_fetch_response(response) @@ -424,7 +433,9 @@ def query( if async_req: future = self.runner.run(self.stub.Query.future, request, timeout=timeout) - return PineconeGrpcFuture(future) + return PineconeGrpcFuture( + future, result_transformer=parse_query_response, timeout=timeout + ) else: response = self.runner.run(self.stub.Query, request, timeout=timeout) json_response = json_format.MessageToDict(response) @@ -535,7 +546,9 @@ def update( request = UpdateRequest(id=id, **args_dict) if async_req: future = self.runner.run(self.stub.Update.future, request, timeout=timeout) - return PineconeGrpcFuture(future) + return PineconeGrpcFuture( + future, timeout=timeout, result_transformer=parse_update_response + ) else: return self.runner.run(self.stub.Update, request, timeout=timeout) diff --git a/pinecone/grpc/pinecone.py b/pinecone/grpc/pinecone.py index 37202fb1..b03da15d 100644 --- a/pinecone/grpc/pinecone.py +++ b/pinecone/grpc/pinecone.py @@ -133,5 +133,6 @@ def Index(self, name: str = "", host: str = "", **kwargs): source_tag=self._config.source_tag, proxy_url=self._config.proxy_url, ssl_ca_certs=self._config.ssl_ca_certs, + ssl_verify=self._config.ssl_verify, ) return GRPCIndex(index_name=name, config=config, pool_threads=pt, **kwargs) diff --git a/pinecone/grpc/utils.py b/pinecone/grpc/utils.py index c2869e73..92f2ce15 100644 --- a/pinecone/grpc/utils.py +++ b/pinecone/grpc/utils.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Union from google.protobuf import json_format from google.protobuf.message import Message @@ -11,6 +11,7 @@ SparseValues, QueryResponse, IndexDescription as DescribeIndexStatsResponse, + UpsertResponse, NamespaceSummary, ) from pinecone.db_data.dataclasses import FetchResponse @@ -63,9 +64,28 @@ def parse_usage(usage: dict): return Usage(read_units=int(usage.get("readUnits", 0))) -def parse_query_response(response: dict, _check_type: bool = False): +def parse_upsert_response(response: Message, _check_type: bool = False): + json_response = json_format.MessageToDict(response) + upserted_count = json_response.get("upsertedCount", 0) + return UpsertResponse(upserted_count=int(upserted_count)) + + +def parse_update_response(response: Union[dict, Message], _check_type: bool = False): + return {} + + +def parse_delete_response(response: Union[dict, Message], _check_type: bool = False): + return {} + + +def parse_query_response(response: Union[dict, Message], _check_type: bool = False): + if isinstance(response, Message): + json_response = json_format.MessageToDict(response) + else: + json_response = response + matches = [] - for item in response.get("matches", []): + for item in json_response.get("matches", []): sc = ScoredVector( id=item["id"], score=item.get("score", 0.0), @@ -80,11 +100,11 @@ def parse_query_response(response: dict, _check_type: bool = False): # creating empty `Usage` objects and then passing them into QueryResponse # when they are not actually present in the response from the server. args = { - "namespace": response.get("namespace", ""), + "namespace": json_response.get("namespace", ""), "matches": matches, "_check_type": _check_type, } - usage = response.get("usage") + usage = json_response.get("usage") if usage: args["usage"] = parse_usage(usage) return QueryResponse(**args) diff --git a/tests/integration/data_grpc_futures/__init__.py b/tests/integration/data_grpc_futures/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/data_grpc_futures/conftest.py b/tests/integration/data_grpc_futures/conftest.py new file mode 100644 index 00000000..13d2cc2f --- /dev/null +++ b/tests/integration/data_grpc_futures/conftest.py @@ -0,0 +1,109 @@ +import pytest +import json +import uuid +from ..helpers import get_environment_var, index_tags as index_tags_helper, generate_name +import logging +from pinecone import EmbedModel, CloudProvider, AwsRegion, IndexEmbed +from pinecone.grpc import PineconeGRPC + +logger = logging.getLogger(__name__) + +RUN_ID = str(uuid.uuid4()) + +created_indexes = [] + + +@pytest.fixture(scope="session") +def index_tags(request): + return index_tags_helper(request, RUN_ID) + + +@pytest.fixture(scope="session") +def pc(): + return PineconeGRPC() + + +@pytest.fixture(scope="session") +def spec(): + spec_json = get_environment_var( + "SPEC", '{"serverless": {"cloud": "aws", "region": "us-east-1" }}' + ) + return json.loads(spec_json) + + +@pytest.fixture(scope="session") +def model_idx(pc, index_tags, request): + model_index_name = generate_name(request.node.name, "embed") + if not pc.has_index(name=model_index_name): + logger.info(f"Creating index {model_index_name}") + pc.create_index_for_model( + name=model_index_name, + cloud=CloudProvider.AWS, + region=AwsRegion.US_WEST_2, + embed=IndexEmbed( + model=EmbedModel.Multilingual_E5_Large, + field_map={"text": "my_text_field"}, + metric="cosine", + ), + tags=index_tags, + ) + created_indexes.append(model_index_name) + else: + logger.info(f"Index {model_index_name} already exists") + + description = pc.describe_index(name=model_index_name) + return pc.Index(host=description.host) + + +def create_index(pc, create_args): + if not pc.has_index(name=create_args["name"]): + logger.info(f"Creating index {create_args['name']}") + pc.create_index(**create_args) + else: + logger.info(f"Index {create_args['name']} already exists") + + host = pc.describe_index(name=create_args["name"]).host + + return host + + +@pytest.fixture(scope="session") +def idx(pc, spec, index_tags, request): + index_name = generate_name(request.node.name, "dense") + logger.info(f"Request: {request.node}") + create_args = { + "name": index_name, + "dimension": 2, + "metric": "cosine", + "spec": spec, + "tags": index_tags, + } + host = create_index(pc, create_args) + logger.info(f"Using index {index_name} with host {host} as idx") + created_indexes.append(index_name) + return pc.Index(host=host) + + +@pytest.fixture(scope="session") +def sparse_idx(pc, spec, index_tags, request): + index_name = generate_name(request.node.name, "sparse") + create_args = { + "name": index_name, + "metric": "dotproduct", + "spec": spec, + "vector_type": "sparse", + "tags": index_tags, + } + host = create_index(pc, create_args) + created_indexes.append(index_name) + return pc.Index(host=host) + + +def pytest_sessionfinish(session, exitstatus): + for index in created_indexes: + try: + logger.info(f"Deleting index {index}") + pc = PineconeGRPC() + pc.delete_index(name=index, timeout=-1) + except Exception as e: + logger.error(f"Error deleting index {index}: {e}") diff --git a/tests/integration/data_grpc_futures/stub_backend.py b/tests/integration/data_grpc_futures/stub_backend.py new file mode 100644 index 00000000..d8983db9 --- /dev/null +++ b/tests/integration/data_grpc_futures/stub_backend.py @@ -0,0 +1,95 @@ +import time +import grpc +import logging +from concurrent import futures +import pinecone.core.grpc.protos.db_data_2025_01_pb2 as pb2 +import pinecone.core.grpc.protos.db_data_2025_01_pb2_grpc as pb2_grpc + +logger = logging.getLogger(__name__) + + +class TestVectorService(pb2_grpc.VectorServiceServicer): + def __init__(self, sleep_seconds=5): + self.sleep_seconds = sleep_seconds + + def Upsert(self, request, context): + # Simulate a delay that will cause a timeout + logger.info("Received an upsert request from test client") + logger.info(f"Request: {request}") + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning an upsert response from test server") + return pb2.UpsertResponse(upserted_count=1) + + def Query(self, request, context): + # Simulate a delay that will cause a timeout + logger.info("Received a query request from test client") + logger.info(f"Request: {request}") + + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning a query response from test server") + return pb2.QueryResponse( + results=[], + matches=[pb2.ScoredVector(id="1", score=1.0, values=[1.0, 2.0, 3.0])], + namespace="testnamespace", + usage=pb2.Usage(read_units=1), + ) + + def Update(self, request, context): + # Simulate a delay that will cause a timeout + logger.info("Received an update request from test client") + logger.info(f"Request: {request}") + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning an update response from test server") + return pb2.UpdateResponse() + + def Delete(self, request, context): + # Simulate a delay that will cause a timeout + logger.info("Received a delete request from test client") + logger.info(f"Request: {request}") + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning a delete response from test server") + return pb2.DeleteResponse() + + def Fetch(self, request, context): + logger.info("Received a fetch request from test client") + logger.info(f"Request: {request}") + logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call") + time.sleep(self.sleep_seconds) + logger.info(f"Done sleeping for {self.sleep_seconds} seconds") + logger.info("Returning a fetch response from test server") + return pb2.FetchResponse( + vectors={ + "1": pb2.Vector(id="1", values=[1.0, 2.0, 3.0]), + "2": pb2.Vector(id="2", values=[4.0, 5.0, 6.0]), + "3": pb2.Vector(id="3", values=[7.0, 8.0, 9.0]), + }, + namespace="testnamespace", + usage=pb2.Usage(read_units=1), + ) + + +def create_sleepy_test_server(port=50051, sleep_seconds=5): + """Creates and returns a configured gRPC server for testing. + + Args: + port (int): The port number to run the server on + sleep_seconds (int): The extra latency in seconds for simulated operations + + Returns: + grpc.Server: A configured and started gRPC server instance + """ + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + pb2_grpc.add_VectorServiceServicer_to_server( + TestVectorService(sleep_seconds=sleep_seconds), server + ) + server.add_insecure_port(f"[::]:{port}") + server.start() + return server diff --git a/tests/integration/data/test_delete_future.py b/tests/integration/data_grpc_futures/test_delete_future.py similarity index 74% rename from tests/integration/data/test_delete_future.py rename to tests/integration/data_grpc_futures/test_delete_future.py index 0680737a..2a0eb29d 100644 --- a/tests/integration/data/test_delete_future.py +++ b/tests/integration/data_grpc_futures/test_delete_future.py @@ -1,14 +1,10 @@ -import os -import pytest from pinecone import Vector from ..helpers import poll_stats_for_namespace, random_string import logging +import time logger = logging.getLogger(__name__) -if os.environ.get("USE_GRPC") == "true": - from pinecone.grpc import GRPCDeleteResponse - def seed_vectors(idx, namespace): logger.info("Seeding vectors with ids [id1, id2, id3] to namespace '%s'", namespace) @@ -24,9 +20,6 @@ def seed_vectors(idx, namespace): class TestDeleteFuture: - @pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" - ) def test_delete_future(self, idx): namespace = random_string(10) @@ -39,11 +32,21 @@ def test_delete_future(self, idx): for future in as_completed([delete_one, delete_two], timeout=10): resp = future.result() - assert isinstance(resp, GRPCDeleteResponse) + assert resp == {} + + time.sleep(10) + + # Verify that the vectors are deleted + from concurrent.futures import wait, ALL_COMPLETED + + fetch_results = idx.fetch(ids=["id1", "id2"], namespace=namespace, async_req=True) + done, not_done = wait([fetch_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + results = fetch_results.result() + assert len(results.vectors) == 0 - @pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" - ) def test_delete_future_by_namespace(self, idx): namespace = random_string(10) @@ -55,8 +58,9 @@ def test_delete_future_by_namespace(self, idx): delete_ns1 = idx.delete(namespace=ns1, delete_all=True, async_req=True) delete_ns2 = idx.delete(namespace=ns2, delete_all=True, async_req=True) + from concurrent.futures import as_completed for future in as_completed([delete_ns1, delete_ns2], timeout=10): resp = future.result() - assert isinstance(resp, GRPCDeleteResponse) + assert resp == {} diff --git a/tests/integration/data/test_fetch_future.py b/tests/integration/data_grpc_futures/test_fetch_future.py similarity index 95% rename from tests/integration/data/test_fetch_future.py rename to tests/integration/data_grpc_futures/test_fetch_future.py index e100f11a..a503b64a 100644 --- a/tests/integration/data/test_fetch_future.py +++ b/tests/integration/data_grpc_futures/test_fetch_future.py @@ -1,18 +1,15 @@ -import os import pytest -from ..helpers import poll_fetch_for_ids_in_namespace, embedding_values, random_string +from ..helpers import poll_fetch_for_ids_in_namespace, embedding_values, generate_name from pinecone import Vector import logging +from pinecone.grpc import PineconeGrpcFuture logger = logging.getLogger(__name__) -if os.environ.get("USE_GRPC") == "true": - from pinecone.grpc import PineconeGrpcFuture - @pytest.fixture(scope="session") def fetch_namespace_future(): - return random_string(10) + return generate_name("TestFetchFuture", "fetch-namespace") def seed(idx, namespace): @@ -66,9 +63,6 @@ def seed_for_fetch(idx, fetch_namespace_future): @pytest.mark.usefixtures("seed_for_fetch") -@pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" -) class TestFetchFuture: def setup_method(self): self.expected_dimension = 2 diff --git a/tests/integration/data_grpc_futures/test_query_future.py b/tests/integration/data_grpc_futures/test_query_future.py new file mode 100644 index 00000000..3e4a1c56 --- /dev/null +++ b/tests/integration/data_grpc_futures/test_query_future.py @@ -0,0 +1,381 @@ +import pytest +from pinecone import QueryResponse, Vector +from ..helpers import embedding_values, poll_fetch_for_ids_in_namespace, generate_name +import logging +import time +from pinecone.grpc import GRPCIndex +from concurrent.futures import wait, ALL_COMPLETED + + +logger = logging.getLogger(__name__) + + +def find_by_id(matches, id): + with_id = [match for match in matches if match.id == id] + return with_id[0] if len(with_id) > 0 else None + + +@pytest.fixture(scope="session") +def query_namespace(): + return generate_name("query_namespace", "test") + + +def seed(idx, namespace): + # Upsert without metadata + logger.info(f"Seeding vectors without metadata into namespace '{namespace}'") + upsert1 = idx.upsert( + vectors=[ + ("1", embedding_values(2)), + ("2", embedding_values(2)), + ("3", embedding_values(2)), + ], + namespace=namespace, + async_req=True, + ) + + # Upsert with metadata + logger.info(f"Seeding vectors with metadata into namespace '{namespace}'") + upsert2 = idx.upsert( + vectors=[ + Vector( + id="4", values=embedding_values(2), metadata={"genre": "action", "runtime": 120} + ), + Vector(id="5", values=embedding_values(2), metadata={"genre": "comedy", "runtime": 90}), + Vector( + id="6", values=embedding_values(2), metadata={"genre": "romance", "runtime": 240} + ), + ], + namespace=namespace, + async_req=True, + ) + + # Upsert with dict + upsert3 = idx.upsert( + vectors=[ + {"id": "7", "values": embedding_values(2)}, + {"id": "8", "values": embedding_values(2)}, + {"id": "9", "values": embedding_values(2)}, + ], + namespace=namespace, + async_req=True, + ) + + wait([upsert1, upsert2, upsert3], timeout=10, return_when=ALL_COMPLETED) + + poll_fetch_for_ids_in_namespace( + idx, ids=["1", "2", "3", "4", "5", "6", "7", "8", "9"], namespace=namespace + ) + + +@pytest.fixture(scope="class") +def seed_for_query(idx, query_namespace): + seed(idx, query_namespace) + seed(idx, "") + time.sleep(30) + yield + + +@pytest.mark.usefixtures("seed_for_query") +@pytest.mark.parametrize("use_nondefault_namespace", [True, False]) +class TestQueryAsync: + def setup_method(self): + self.expected_dimension = 2 + + def test_query_by_id( + self, idx: GRPCIndex, query_namespace: str, use_nondefault_namespace: bool + ): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_future = idx.query(id="1", namespace=target_namespace, top_k=10, async_req=True) + + done, not_done = wait([query_future], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + query_result = query_future.result() + + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + assert query_result.usage is not None + assert query_result.usage["read_units"] is not None + assert query_result.usage["read_units"] > 0 + + # By default, does not include values or metadata + record_with_metadata = find_by_id(query_result.matches, "4") + assert record_with_metadata.metadata is None + assert record_with_metadata.values == [] + + def test_query_by_vector(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + vector=embedding_values(2), namespace=target_namespace, top_k=10, async_req=True + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + def test_query_by_vector_include_values(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + vector=embedding_values(2), + namespace=target_namespace, + include_values=True, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) > 0 + assert query_result.matches[0].values is not None + assert len(query_result.matches[0].values) == self.expected_dimension + + def test_query_by_vector_include_metadata(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + vector=embedding_values(2), + namespace=target_namespace, + include_metadata=True, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 3 + assert find_by_id(query_result.matches, "4").metadata["genre"] == "action" + + def test_query_by_vector_include_values_and_metadata( + self, idx, query_namespace, use_nondefault_namespace + ): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + vector=embedding_values(2), + namespace=target_namespace, + include_values=True, + include_metadata=True, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 3 + assert find_by_id(query_result.matches, "4").metadata["genre"] == "action" + assert len(query_result.matches[0].values) == self.expected_dimension + + +class TestQueryEdgeCasesAsync: + def test_query_in_empty_namespace(self, idx): + query_result = idx.query(id="1", namespace="empty", top_k=10, async_req=True).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == "empty" + assert len(query_result.matches) == 0 + + +@pytest.mark.usefixtures("seed_for_query") +@pytest.mark.parametrize("use_nondefault_namespace", [True, False]) +class TestQueryWithFilterAsync: + def test_query_by_id_with_filter(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + query_result = idx.query( + id="1", namespace=target_namespace, filter={"genre": "action"}, top_k=10, async_req=True + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 1 + assert query_result.matches[0].id == "4" + + def test_query_by_id_with_filter_gt(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"runtime": {"$gt": 100}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 2 + assert find_by_id(query_result.matches, "4") is not None + assert find_by_id(query_result.matches, "6") is not None + + def test_query_by_id_with_filter_gte(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"runtime": {"$gte": 90}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 3 + assert find_by_id(query_result.matches, "4") is not None + assert find_by_id(query_result.matches, "5") is not None + assert find_by_id(query_result.matches, "6") is not None + + def test_query_by_id_with_filter_lt(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"runtime": {"$lt": 100}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 1 + assert find_by_id(query_result.matches, "5") is not None + + def test_query_by_id_with_filter_lte(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"runtime": {"$lte": 120}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 2 + assert find_by_id(query_result.matches, "4") is not None + assert find_by_id(query_result.matches, "5") is not None + + def test_query_by_id_with_filter_in(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"genre": {"$in": ["romance"]}}, + top_k=10, + async_req=True, + ).result() + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + assert len(query_result.matches) == 1 + assert find_by_id(query_result.matches, "6") is not None + + def test_query_by_id_with_filter_nin(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"genre": {"$nin": ["romance"]}}, + include_metadata=True, + top_k=10, + async_req=True, + ).result() + + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 2 + for match in matches_with_metadata: + assert match.metadata["genre"] != "romance" + + def test_query_by_id_with_filter_eq(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"genre": {"$eq": "action"}}, + include_metadata=True, + top_k=10, + async_req=True, + ).result() + + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + for match in query_result.matches: + logger.info(f"Match: id: {match.id} metadata: {match.metadata}") + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 1 + for match in matches_with_metadata: + assert match.metadata["genre"] == "action" + + def test_query_by_id_with_filter_ne(self, idx, query_namespace, use_nondefault_namespace): + target_namespace = query_namespace if use_nondefault_namespace else "" + + # Vector(id='4', values=embedding_values(2), metadata={'genre': 'action', 'runtime': 120 }), + # Vector(id='5', values=embedding_values(2), metadata={'genre': 'comedy', 'runtime': 90 }), + # Vector(id='6', values=embedding_values(2), metadata={'genre': 'romance', 'runtime': 240 }) + query_result = idx.query( + id="1", + namespace=target_namespace, + filter={"genre": {"$ne": "action"}}, + include_metadata=True, + top_k=10, + async_req=True, + ).result() + for match in query_result.matches: + logger.info(f"Match: id: {match.id} metadata: {match.metadata}") + assert isinstance(query_result, QueryResponse) == True + assert query_result.namespace == target_namespace + + matches_with_metadata = [ + match + for match in query_result.matches + if match.metadata is not None and match.metadata != {} + ] + assert len(matches_with_metadata) == 2 + for match in matches_with_metadata: + assert match.metadata["genre"] != "action" + assert match.id != "4" diff --git a/tests/integration/data_grpc_futures/test_timeouts.py b/tests/integration/data_grpc_futures/test_timeouts.py new file mode 100644 index 00000000..6a329a5e --- /dev/null +++ b/tests/integration/data_grpc_futures/test_timeouts.py @@ -0,0 +1,408 @@ +import pytest +from pinecone import QueryResponse, UpsertResponse, FetchResponse, Vector, PineconeException +from ..helpers import embedding_values +from .stub_backend import create_sleepy_test_server +import logging +from pinecone.grpc import GRPCIndex, PineconeGRPC +from concurrent.futures import wait, ALL_COMPLETED + +logger = logging.getLogger(__name__) + +SERVER_SLEEP_SECONDS = 1 + + +@pytest.fixture(scope="session") +def grpc_server(): + logger.info("Starting gRPC test server") + server = create_sleepy_test_server(port=50051, sleep_seconds=SERVER_SLEEP_SECONDS) + yield server + logger.info("Stopping gRPC test server") + server.stop(0) + + +@pytest.fixture(scope="session") +def local_idx(): + pc = PineconeGRPC(api_key="test", ssl_verify=False) + idx = pc.Index(host="localhost:50051") + return idx + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_QueryByID: + def test_query_by_id_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + query_results = local_idx.query( + id="1", namespace="testnamespace", top_k=10, async_req=True, timeout=deadline + ) + + assert query_results._default_timeout == deadline + done, not_done = wait( + [query_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + query_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_query_by_id_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + query_results = local_idx.query( + id="1", namespace="testnamespace", top_k=10, async_req=True, timeout=deadline + ) + + assert query_results._default_timeout == deadline + done, not_done = wait( + [query_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = query_results.result() + assert result is not None + assert result.usage.read_units == 1 + + def test_query_by_id_with_default_timeout(self, local_idx: GRPCIndex): + query_results = local_idx.query(id="1", namespace="testnamespace", top_k=10, async_req=True) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert query_results._default_timeout == 5 + + done, not_done = wait([query_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = query_results.result() + assert result is not None + assert isinstance(result, QueryResponse) + assert result.usage.read_units == 1 + assert result.matches[0].id == "1" + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_QueryByVector: + def test_query_by_vector_with_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + query_results = local_idx.query( + vector=embedding_values(2), + namespace="testnamespace", + top_k=10, + async_req=True, + timeout=deadline, + ) + + assert query_results._default_timeout == deadline + done, not_done = wait( + [query_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + query_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_query_by_vector_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + query_results = local_idx.query( + vector=embedding_values(2), + namespace="testnamespace", + top_k=10, + async_req=True, + timeout=deadline, + ) + + assert query_results._default_timeout == deadline + done, not_done = wait( + [query_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = query_results.result() + assert result is not None + assert isinstance(result, QueryResponse) + assert result.usage.read_units == 1 + assert result.matches[0].id == "1" + + def test_query_by_vector_with_default_timeout(self, local_idx: GRPCIndex): + query_results = local_idx.query( + vector=embedding_values(2), namespace="testnamespace", top_k=10, async_req=True + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert query_results._default_timeout == 5 + + done, not_done = wait([query_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = query_results.result() + assert result is not None + assert result.usage.read_units == 1 + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_Upsert: + def test_upsert_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + upsert_results = local_idx.upsert( + vectors=[Vector(id="1", values=embedding_values(2), metadata={"genre": "action"})], + namespace="testnamespace", + async_req=True, + timeout=deadline, + ) + + assert upsert_results._default_timeout == deadline + done, not_done = wait( + [upsert_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + upsert_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_upsert_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + upsert_results = local_idx.upsert( + vectors=[Vector(id="1", values=embedding_values(2), metadata={"genre": "action"})], + namespace="testnamespace", + async_req=True, + timeout=deadline, + ) + + assert upsert_results._default_timeout == deadline + done, not_done = wait( + [upsert_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = upsert_results.result() + assert result is not None + assert isinstance(result, UpsertResponse) + assert result.upserted_count == 1 + + def test_upsert_with_default_timeout(self, local_idx: GRPCIndex): + upsert_results = local_idx.upsert( + vectors=[Vector(id="1", values=embedding_values(2), metadata={"genre": "action"})], + namespace="testnamespace", + async_req=True, + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert upsert_results._default_timeout == 5 + + done, not_done = wait([upsert_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = upsert_results.result() + assert result is not None + assert isinstance(result, UpsertResponse) + assert result.upserted_count == 1 + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_Update: + def test_update_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + update_results = local_idx.update( + id="1", + namespace="testnamespace", + values=embedding_values(2), + async_req=True, + timeout=deadline, + ) + + assert update_results._default_timeout == deadline + done, not_done = wait( + [update_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + update_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_update_with_default_timeout(self, local_idx: GRPCIndex): + update_results = local_idx.update( + id="1", namespace="testnamespace", values=embedding_values(2), async_req=True + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert update_results._default_timeout == 5 + + done, not_done = wait([update_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = update_results.result() + assert result is not None + assert result == {} + + def test_update_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + update_results = local_idx.update( + id="1", + namespace="testnamespace", + values=embedding_values(2), + async_req=True, + timeout=deadline, + ) + + assert update_results._default_timeout == deadline + done, not_done = wait( + [update_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = update_results.result() + assert result is not None + assert result == {} + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_Delete: + def test_delete_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + delete_results = local_idx.delete( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True, timeout=deadline + ) + + assert delete_results._default_timeout == deadline + done, not_done = wait( + [delete_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + delete_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_delete_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + delete_results = local_idx.delete( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True, timeout=deadline + ) + + assert delete_results._default_timeout == deadline + done, not_done = wait( + [delete_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = delete_results.result() + assert result is not None + assert result == {} + + def test_delete_with_default_timeout(self, local_idx: GRPCIndex): + delete_results = local_idx.delete( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert delete_results._default_timeout == 5 + + done, not_done = wait([delete_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = delete_results.result() + assert result is not None + assert result == {} + + +@pytest.mark.usefixtures("grpc_server") +class TestGrpcAsyncTimeouts_Fetch: + def test_fetch_with_custom_timeout_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS - 0.5 + + fetch_results = local_idx.fetch( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True, timeout=deadline + ) + + assert fetch_results._default_timeout == deadline + done, not_done = wait( + [fetch_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + with pytest.raises(PineconeException) as e: + fetch_results.result() + + assert "Deadline Exceeded" in str(e.value) + + def test_fetch_with_custom_timeout_not_exceeded(self, local_idx: GRPCIndex): + deadline = SERVER_SLEEP_SECONDS + 1 + fetch_results = local_idx.fetch( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True, timeout=deadline + ) + + assert fetch_results._default_timeout == deadline + done, not_done = wait( + [fetch_results], timeout=SERVER_SLEEP_SECONDS + 1, return_when=ALL_COMPLETED + ) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = fetch_results.result() + assert result is not None + assert isinstance(result, FetchResponse) + + def test_fetch_with_default_timeout(self, local_idx: GRPCIndex): + fetch_results = local_idx.fetch( + ids=["1", "2", "3"], namespace="testnamespace", async_req=True + ) + + # Default timeout is 5 seconds, which is longer than the test server sleep + assert fetch_results._default_timeout == 5 + + done, not_done = wait([fetch_results], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 1 + assert len(not_done) == 0 + + result = fetch_results.result() + assert result is not None + assert isinstance(result, FetchResponse) + + assert result.vectors["1"].id == "1" + assert result.vectors["2"].id == "2" + assert result.vectors["3"].id == "3" + assert result.usage.read_units == 1 + assert result.namespace == "testnamespace" diff --git a/tests/integration/data_grpc_futures/test_update_future.py b/tests/integration/data_grpc_futures/test_update_future.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/data/test_upsert_future.py b/tests/integration/data_grpc_futures/test_upsert_future.py similarity index 91% rename from tests/integration/data/test_upsert_future.py rename to tests/integration/data_grpc_futures/test_upsert_future.py index 7ca4b27e..321c9cea 100644 --- a/tests/integration/data/test_upsert_future.py +++ b/tests/integration/data_grpc_futures/test_upsert_future.py @@ -1,19 +1,15 @@ import pytest -import os from pinecone import Vector, PineconeException -from ..helpers import poll_stats_for_namespace, embedding_values, random_string +from ..helpers import poll_stats_for_namespace, embedding_values, generate_name @pytest.fixture(scope="class") -def namespace_query_async(): - return random_string(10) +def namespace_query_async(request): + return generate_name(request.node.name, "upsert-namespace") @pytest.mark.usefixtures("namespace_query_async") class TestUpsertWithAsyncReq: - @pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" - ) def test_upsert_to_namespace(self, idx, namespace_query_async): target_namespace = namespace_query_async @@ -66,9 +62,6 @@ def test_upsert_to_namespace(self, idx, namespace_query_async): assert total_upserted == 9 - @pytest.mark.skipif( - os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" - ) def test_upsert_to_namespace_when_failed_req(self, idx, namespace_query_async): target_namespace = namespace_query_async diff --git a/tests/integration/helpers/__init__.py b/tests/integration/helpers/__init__.py index 35b85873..cca1451d 100644 --- a/tests/integration/helpers/__init__.py +++ b/tests/integration/helpers/__init__.py @@ -13,6 +13,7 @@ delete_indexes_from_run, default_create_index_params, ) +from .names import generate_name __all__ = [ "fake_api_key", @@ -28,4 +29,5 @@ "delete_backups_from_run", "delete_indexes_from_run", "default_create_index_params", + "generate_name", ] diff --git a/tests/integration/helpers/names.py b/tests/integration/helpers/names.py new file mode 100644 index 00000000..8d24d8e3 --- /dev/null +++ b/tests/integration/helpers/names.py @@ -0,0 +1,14 @@ +import hashlib + + +def generate_name(test_name: str, label: str, max_length: int = 20) -> str: + """ + The goal of this function is to produce names that are unique across the + test suite but deterministic when the test is run multiple times, when ordering + changes, different subsets of tests are run, etc. + + To accomodate this, we hash the test name and label. We truncate the hexdigest + since the full length of 64 characters exceeds the allowed length of some fields + in the API. For example, index names must be 45 characters or less. + """ + return hashlib.sha256(f"{test_name}-{label}".encode()).hexdigest()[:max_length] diff --git a/tests/unit_grpc/test_futures.py b/tests/unit_grpc/test_futures.py index 43960b00..5393f0af 100644 --- a/tests/unit_grpc/test_futures.py +++ b/tests/unit_grpc/test_futures.py @@ -25,6 +25,9 @@ def __init__(self, mocker): self._state = mocker.Mock() self._state.debug_error_string = "Test gRPC error" + def debug_error_string(self): + return self._state.debug_error_string + class TestPineconeGrpcFuture: def test_wraps_grpc_future_already_done(self, mocker):