Skip to content

Commit 86c381c

Browse files
authored
Fix grpc timeouts when async_req=True (#510)
## Problem When users pass `async_req=True` and a `timeout` value to GRPC methods, the timeout value was not being honored; instead, a default timeout of 5 seconds was being imposed by the `PineconeGrpcFuture`. ## Solution The future object returned from the `grpc` package do not conform to the `Future` interface of python's `concurrent.futures` package, so we wrap those futures in our own class `PineconeGrpcFuture` to adapt the interface and try to make it more ergonomic to work with. We forgot to pass optional args to the `PineconeGrpcFuture` constructor, so in many cases (upsert, query, update, delete, fetch all affected) we were not respecting timeout values being passed by the caller. The fix itself is pretty straightforward; pass the `timeout` value into the `PineconeGrpcFuture`. Majority of the work in this PR relates to testing. I've extracted the tests of the async grpc functions into a separate folder so we don't need to mess around with so many `@pytest.mark.skip` annotations to label grpc-specific tests in what are otherwise shared tests between rest and grpc located in the `tests/integration/data` folder. Besides moving existing tests, I have added some new tests for query. And a completely new test file, `test_timeouts.py` that uses a mock grpc server implementation to simulate the interaction between timeouts in the client and long response times. To summarize: - Refactor async_req=True tests into a different folder - Add detailed tests of timeout behavior using a mock server that allows us to control request latency in the test - Tweaked instantiation of the IndexGRPC class to respect the `ssl_verify` param, if any has been set on the parent `PineconeGRPC` object. This was needed to get the test interactions with the mock server running on localhost passing. ## Type of Change - [x] Bug fix (non-breaking change which fixes an issue)
1 parent d1282ce commit 86c381c

18 files changed

+1102
-47
lines changed

.github/workflows/testing-integration.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ jobs:
8282
python_version: ${{ fromJson(inputs.python_versions_json) }}
8383
test_suite:
8484
- data
85+
- data_grpc_futures
8586
steps:
8687
- uses: actions/checkout@v4
8788
- name: Setup Poetry

pinecone/grpc/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,15 @@ def __init__(
2929
_endpoint_override: Optional[str] = None,
3030
):
3131
self.config = config
32-
self.grpc_client_config = grpc_config or GRPCClientConfig()
32+
# If grpc_config is passed, use it. Otherwise, build a new one with
33+
# default values and passing in the ssl_verify value from the config.
34+
if self.config.ssl_verify is None:
35+
default_grpc_config = GRPCClientConfig()
36+
else:
37+
default_grpc_config = GRPCClientConfig(secure=self.config.ssl_verify)
38+
39+
self.grpc_client_config = grpc_config or default_grpc_config
40+
3341
self.pool_threads = pool_threads
3442

3543
self._endpoint_override = _endpoint_override

pinecone/grpc/future.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,19 @@ def _timeout(self, timeout: Optional[int] = None) -> int:
7575
return self._default_timeout
7676

7777
def _wrap_rpc_exception(self, e):
78-
if e._state and e._state.debug_error_string:
79-
return PineconeException(e._state.debug_error_string)
80-
else:
81-
return PineconeException("Unknown GRPC error")
78+
# The way the grpc package is using multiple inheritance makes
79+
# it a little unclear whether it's safe to always assume that
80+
# the e.code(), e.details(), and e.debug_error_string() methods
81+
# exist. So, we try/catch to avoid errors.
82+
try:
83+
grpc_info = {"grpc_error_code": e.code().value[0], "grpc_message": e.details()}
84+
85+
return PineconeException(f"GRPC error: {grpc_info}")
86+
except Exception:
87+
try:
88+
return PineconeException(f"Unknown GRPC error: {e.debug_error_string()}")
89+
except Exception:
90+
return PineconeException(f"Unknown GRPC error: {e}")
8291

8392
def __del__(self):
8493
self._grpc_future.cancel()

pinecone/grpc/index_grpc.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
parse_fetch_response,
1313
parse_query_response,
1414
parse_stats_response,
15+
parse_upsert_response,
16+
parse_update_response,
17+
parse_delete_response,
1518
)
1619
from .vector_factory_grpc import VectorFactoryGRPC
1720
from .sparse_values_factory import SparseValuesFactory
@@ -145,7 +148,9 @@ def upsert(
145148
args_dict = self._parse_non_empty_args([("namespace", namespace)])
146149
request = UpsertRequest(vectors=vectors, **args_dict, **kwargs)
147150
future = self.runner.run(self.stub.Upsert.future, request, timeout=timeout)
148-
return PineconeGrpcFuture(future)
151+
return PineconeGrpcFuture(
152+
future, timeout=timeout, result_transformer=parse_upsert_response
153+
)
149154

150155
if batch_size is None:
151156
return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs)
@@ -297,7 +302,9 @@ def delete(
297302
request = DeleteRequest(**args_dict, **kwargs)
298303
if async_req:
299304
future = self.runner.run(self.stub.Delete.future, request, timeout=timeout)
300-
return PineconeGrpcFuture(future)
305+
return PineconeGrpcFuture(
306+
future, timeout=timeout, result_transformer=parse_delete_response
307+
)
301308
else:
302309
return self.runner.run(self.stub.Delete, request, timeout=timeout)
303310

@@ -334,7 +341,9 @@ def fetch(
334341

335342
if async_req:
336343
future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout)
337-
return PineconeGrpcFuture(future, result_transformer=parse_fetch_response)
344+
return PineconeGrpcFuture(
345+
future, result_transformer=parse_fetch_response, timeout=timeout
346+
)
338347
else:
339348
response = self.runner.run(self.stub.Fetch, request, timeout=timeout)
340349
return parse_fetch_response(response)
@@ -424,7 +433,9 @@ def query(
424433

425434
if async_req:
426435
future = self.runner.run(self.stub.Query.future, request, timeout=timeout)
427-
return PineconeGrpcFuture(future)
436+
return PineconeGrpcFuture(
437+
future, result_transformer=parse_query_response, timeout=timeout
438+
)
428439
else:
429440
response = self.runner.run(self.stub.Query, request, timeout=timeout)
430441
json_response = json_format.MessageToDict(response)
@@ -535,7 +546,9 @@ def update(
535546
request = UpdateRequest(id=id, **args_dict)
536547
if async_req:
537548
future = self.runner.run(self.stub.Update.future, request, timeout=timeout)
538-
return PineconeGrpcFuture(future)
549+
return PineconeGrpcFuture(
550+
future, timeout=timeout, result_transformer=parse_update_response
551+
)
539552
else:
540553
return self.runner.run(self.stub.Update, request, timeout=timeout)
541554

pinecone/grpc/pinecone.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,5 +133,6 @@ def Index(self, name: str = "", host: str = "", **kwargs):
133133
source_tag=self._config.source_tag,
134134
proxy_url=self._config.proxy_url,
135135
ssl_ca_certs=self._config.ssl_ca_certs,
136+
ssl_verify=self._config.ssl_verify,
136137
)
137138
return GRPCIndex(index_name=name, config=config, pool_threads=pt, **kwargs)

pinecone/grpc/utils.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Optional
1+
from typing import Optional, Union
22
from google.protobuf import json_format
33
from google.protobuf.message import Message
44

@@ -11,6 +11,7 @@
1111
SparseValues,
1212
QueryResponse,
1313
IndexDescription as DescribeIndexStatsResponse,
14+
UpsertResponse,
1415
NamespaceSummary,
1516
)
1617
from pinecone.db_data.dataclasses import FetchResponse
@@ -63,9 +64,28 @@ def parse_usage(usage: dict):
6364
return Usage(read_units=int(usage.get("readUnits", 0)))
6465

6566

66-
def parse_query_response(response: dict, _check_type: bool = False):
67+
def parse_upsert_response(response: Message, _check_type: bool = False):
68+
json_response = json_format.MessageToDict(response)
69+
upserted_count = json_response.get("upsertedCount", 0)
70+
return UpsertResponse(upserted_count=int(upserted_count))
71+
72+
73+
def parse_update_response(response: Union[dict, Message], _check_type: bool = False):
74+
return {}
75+
76+
77+
def parse_delete_response(response: Union[dict, Message], _check_type: bool = False):
78+
return {}
79+
80+
81+
def parse_query_response(response: Union[dict, Message], _check_type: bool = False):
82+
if isinstance(response, Message):
83+
json_response = json_format.MessageToDict(response)
84+
else:
85+
json_response = response
86+
6787
matches = []
68-
for item in response.get("matches", []):
88+
for item in json_response.get("matches", []):
6989
sc = ScoredVector(
7090
id=item["id"],
7191
score=item.get("score", 0.0),
@@ -80,11 +100,11 @@ def parse_query_response(response: dict, _check_type: bool = False):
80100
# creating empty `Usage` objects and then passing them into QueryResponse
81101
# when they are not actually present in the response from the server.
82102
args = {
83-
"namespace": response.get("namespace", ""),
103+
"namespace": json_response.get("namespace", ""),
84104
"matches": matches,
85105
"_check_type": _check_type,
86106
}
87-
usage = response.get("usage")
107+
usage = json_response.get("usage")
88108
if usage:
89109
args["usage"] = parse_usage(usage)
90110
return QueryResponse(**args)

tests/integration/data_grpc_futures/__init__.py

Whitespace-only changes.
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import pytest
2+
import json
3+
import uuid
4+
from ..helpers import get_environment_var, index_tags as index_tags_helper, generate_name
5+
import logging
6+
from pinecone import EmbedModel, CloudProvider, AwsRegion, IndexEmbed
7+
from pinecone.grpc import PineconeGRPC
8+
9+
logger = logging.getLogger(__name__)
10+
11+
RUN_ID = str(uuid.uuid4())
12+
13+
created_indexes = []
14+
15+
16+
@pytest.fixture(scope="session")
17+
def index_tags(request):
18+
return index_tags_helper(request, RUN_ID)
19+
20+
21+
@pytest.fixture(scope="session")
22+
def pc():
23+
return PineconeGRPC()
24+
25+
26+
@pytest.fixture(scope="session")
27+
def spec():
28+
spec_json = get_environment_var(
29+
"SPEC", '{"serverless": {"cloud": "aws", "region": "us-east-1" }}'
30+
)
31+
return json.loads(spec_json)
32+
33+
34+
@pytest.fixture(scope="session")
35+
def model_idx(pc, index_tags, request):
36+
model_index_name = generate_name(request.node.name, "embed")
37+
if not pc.has_index(name=model_index_name):
38+
logger.info(f"Creating index {model_index_name}")
39+
pc.create_index_for_model(
40+
name=model_index_name,
41+
cloud=CloudProvider.AWS,
42+
region=AwsRegion.US_WEST_2,
43+
embed=IndexEmbed(
44+
model=EmbedModel.Multilingual_E5_Large,
45+
field_map={"text": "my_text_field"},
46+
metric="cosine",
47+
),
48+
tags=index_tags,
49+
)
50+
created_indexes.append(model_index_name)
51+
else:
52+
logger.info(f"Index {model_index_name} already exists")
53+
54+
description = pc.describe_index(name=model_index_name)
55+
return pc.Index(host=description.host)
56+
57+
58+
def create_index(pc, create_args):
59+
if not pc.has_index(name=create_args["name"]):
60+
logger.info(f"Creating index {create_args['name']}")
61+
pc.create_index(**create_args)
62+
else:
63+
logger.info(f"Index {create_args['name']} already exists")
64+
65+
host = pc.describe_index(name=create_args["name"]).host
66+
67+
return host
68+
69+
70+
@pytest.fixture(scope="session")
71+
def idx(pc, spec, index_tags, request):
72+
index_name = generate_name(request.node.name, "dense")
73+
logger.info(f"Request: {request.node}")
74+
create_args = {
75+
"name": index_name,
76+
"dimension": 2,
77+
"metric": "cosine",
78+
"spec": spec,
79+
"tags": index_tags,
80+
}
81+
host = create_index(pc, create_args)
82+
logger.info(f"Using index {index_name} with host {host} as idx")
83+
created_indexes.append(index_name)
84+
return pc.Index(host=host)
85+
86+
87+
@pytest.fixture(scope="session")
88+
def sparse_idx(pc, spec, index_tags, request):
89+
index_name = generate_name(request.node.name, "sparse")
90+
create_args = {
91+
"name": index_name,
92+
"metric": "dotproduct",
93+
"spec": spec,
94+
"vector_type": "sparse",
95+
"tags": index_tags,
96+
}
97+
host = create_index(pc, create_args)
98+
created_indexes.append(index_name)
99+
return pc.Index(host=host)
100+
101+
102+
def pytest_sessionfinish(session, exitstatus):
103+
for index in created_indexes:
104+
try:
105+
logger.info(f"Deleting index {index}")
106+
pc = PineconeGRPC()
107+
pc.delete_index(name=index, timeout=-1)
108+
except Exception as e:
109+
logger.error(f"Error deleting index {index}: {e}")
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import time
2+
import grpc
3+
import logging
4+
from concurrent import futures
5+
import pinecone.core.grpc.protos.db_data_2025_01_pb2 as pb2
6+
import pinecone.core.grpc.protos.db_data_2025_01_pb2_grpc as pb2_grpc
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class TestVectorService(pb2_grpc.VectorServiceServicer):
12+
def __init__(self, sleep_seconds=5):
13+
self.sleep_seconds = sleep_seconds
14+
15+
def Upsert(self, request, context):
16+
# Simulate a delay that will cause a timeout
17+
logger.info("Received an upsert request from test client")
18+
logger.info(f"Request: {request}")
19+
logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
20+
time.sleep(self.sleep_seconds)
21+
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
22+
logger.info("Returning an upsert response from test server")
23+
return pb2.UpsertResponse(upserted_count=1)
24+
25+
def Query(self, request, context):
26+
# Simulate a delay that will cause a timeout
27+
logger.info("Received a query request from test client")
28+
logger.info(f"Request: {request}")
29+
30+
logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
31+
time.sleep(self.sleep_seconds)
32+
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
33+
logger.info("Returning a query response from test server")
34+
return pb2.QueryResponse(
35+
results=[],
36+
matches=[pb2.ScoredVector(id="1", score=1.0, values=[1.0, 2.0, 3.0])],
37+
namespace="testnamespace",
38+
usage=pb2.Usage(read_units=1),
39+
)
40+
41+
def Update(self, request, context):
42+
# Simulate a delay that will cause a timeout
43+
logger.info("Received an update request from test client")
44+
logger.info(f"Request: {request}")
45+
logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
46+
time.sleep(self.sleep_seconds)
47+
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
48+
logger.info("Returning an update response from test server")
49+
return pb2.UpdateResponse()
50+
51+
def Delete(self, request, context):
52+
# Simulate a delay that will cause a timeout
53+
logger.info("Received a delete request from test client")
54+
logger.info(f"Request: {request}")
55+
logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
56+
time.sleep(self.sleep_seconds)
57+
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
58+
logger.info("Returning a delete response from test server")
59+
return pb2.DeleteResponse()
60+
61+
def Fetch(self, request, context):
62+
logger.info("Received a fetch request from test client")
63+
logger.info(f"Request: {request}")
64+
logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
65+
time.sleep(self.sleep_seconds)
66+
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
67+
logger.info("Returning a fetch response from test server")
68+
return pb2.FetchResponse(
69+
vectors={
70+
"1": pb2.Vector(id="1", values=[1.0, 2.0, 3.0]),
71+
"2": pb2.Vector(id="2", values=[4.0, 5.0, 6.0]),
72+
"3": pb2.Vector(id="3", values=[7.0, 8.0, 9.0]),
73+
},
74+
namespace="testnamespace",
75+
usage=pb2.Usage(read_units=1),
76+
)
77+
78+
79+
def create_sleepy_test_server(port=50051, sleep_seconds=5):
80+
"""Creates and returns a configured gRPC server for testing.
81+
82+
Args:
83+
port (int): The port number to run the server on
84+
sleep_seconds (int): The extra latency in seconds for simulated operations
85+
86+
Returns:
87+
grpc.Server: A configured and started gRPC server instance
88+
"""
89+
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
90+
pb2_grpc.add_VectorServiceServicer_to_server(
91+
TestVectorService(sleep_seconds=sleep_seconds), server
92+
)
93+
server.add_insecure_port(f"[::]:{port}")
94+
server.start()
95+
return server

0 commit comments

Comments
 (0)