Skip to content

Fix grpc timeouts when async_req=True #510

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/testing-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ jobs:
python_version: ${{ fromJson(inputs.python_versions_json) }}
test_suite:
- data
- data_grpc_futures
steps:
- uses: actions/checkout@v4
- name: Setup Poetry
Expand Down
10 changes: 9 additions & 1 deletion pinecone/grpc/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,15 @@ def __init__(
_endpoint_override: Optional[str] = None,
):
self.config = config
self.grpc_client_config = grpc_config or GRPCClientConfig()
# If grpc_config is passed, use it. Otherwise, build a new one with
# default values and passing in the ssl_verify value from the config.
if self.config.ssl_verify is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little unclear on why we are checking on ssl_verify property for setting default_grpc_config = GRPCClientConfig()

Copy link
Collaborator Author

@jhamon jhamon Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config object here has an optional boolean field called ssl_verify that can have three values: None (by default, if unset by the user), True, or False. The user sets these by passing a value to the Pinecone or PineconeGRPC constructor like this: pc = PineconeGRPC(ssl_verify=False)

Separately, GRPC configurations can be set at the index level with GRPCClientConfig by passing pc.Index(name='my-index', grpc_config=grpc_config). GRPCClientConfig is implemented as a NamedTuple, an immutable data structure. This object controls the SSL behavior any many other things within the grpc code. For SSL verification, the relevant property in this object is called secure. I discovered in testing that unlike ssl_verify, secure is a true boolean (True/False only) and if you set None you will get errors.

For configuring SSL verify behavior in GRPC, there are three scenarios we need to consider:

  • The user never set anything in particular regarding SSL verification; they want the default behavior everywhere. By the time this constructor is executing, config.ssl_verify will be None so we use the GRPCClientConfig() with no modifications to configure the GRPC channel. This has SSL verification on by default.
  • The user passed PineconeGRPC(ssl_verify=False) but did not specify a grpc_config object for the index client; in that scenario, we want to thread that setting through even to the index level and override the default grpc behavior for ssl verification.
  • The user explicitly passed grpc_config to the index constructor with pc.Index(name='foo', grpc_config=grppc_config). In this scenario, we use the configuration the user has provided.

The lesson here is that we should endeavor to have only one way to configure things if at all possible. Merging multiple config sources, resolution order, etc gets complicated in a hurry.

default_grpc_config = GRPCClientConfig()
else:
default_grpc_config = GRPCClientConfig(secure=self.config.ssl_verify)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change allows me to test against a local grpc server implementation with PineconeGrpc(ssl_veriify=False) setting getting carried through all the way to the index class.


self.grpc_client_config = grpc_config or default_grpc_config

self.pool_threads = pool_threads

self._endpoint_override = _endpoint_override
Expand Down
17 changes: 13 additions & 4 deletions pinecone/grpc/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,19 @@ def _timeout(self, timeout: Optional[int] = None) -> int:
return self._default_timeout

def _wrap_rpc_exception(self, e):
if e._state and e._state.debug_error_string:
return PineconeException(e._state.debug_error_string)
else:
return PineconeException("Unknown GRPC error")
# The way the grpc package is using multiple inheritance makes
# it a little unclear whether it's safe to always assume that
# the e.code(), e.details(), and e.debug_error_string() methods
# exist. So, we try/catch to avoid errors.
try:
grpc_info = {"grpc_error_code": e.code().value[0], "grpc_message": e.details()}

return PineconeException(f"GRPC error: {grpc_info}")
except Exception:
try:
return PineconeException(f"Unknown GRPC error: {e.debug_error_string()}")
except Exception:
return PineconeException(f"Unknown GRPC error: {e}")

def __del__(self):
self._grpc_future.cancel()
Expand Down
23 changes: 18 additions & 5 deletions pinecone/grpc/index_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
parse_fetch_response,
parse_query_response,
parse_stats_response,
parse_upsert_response,
parse_update_response,
parse_delete_response,
)
from .vector_factory_grpc import VectorFactoryGRPC
from .sparse_values_factory import SparseValuesFactory
Expand Down Expand Up @@ -145,7 +148,9 @@ def upsert(
args_dict = self._parse_non_empty_args([("namespace", namespace)])
request = UpsertRequest(vectors=vectors, **args_dict, **kwargs)
future = self.runner.run(self.stub.Upsert.future, request, timeout=timeout)
return PineconeGrpcFuture(future)
return PineconeGrpcFuture(
future, timeout=timeout, result_transformer=parse_upsert_response
)

if batch_size is None:
return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs)
Expand Down Expand Up @@ -297,7 +302,9 @@ def delete(
request = DeleteRequest(**args_dict, **kwargs)
if async_req:
future = self.runner.run(self.stub.Delete.future, request, timeout=timeout)
return PineconeGrpcFuture(future)
return PineconeGrpcFuture(
future, timeout=timeout, result_transformer=parse_delete_response
)
else:
return self.runner.run(self.stub.Delete, request, timeout=timeout)

Expand Down Expand Up @@ -334,7 +341,9 @@ def fetch(

if async_req:
future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout)
return PineconeGrpcFuture(future, result_transformer=parse_fetch_response)
return PineconeGrpcFuture(
future, result_transformer=parse_fetch_response, timeout=timeout
)
else:
response = self.runner.run(self.stub.Fetch, request, timeout=timeout)
return parse_fetch_response(response)
Expand Down Expand Up @@ -424,7 +433,9 @@ def query(

if async_req:
future = self.runner.run(self.stub.Query.future, request, timeout=timeout)
return PineconeGrpcFuture(future)
return PineconeGrpcFuture(
future, result_transformer=parse_query_response, timeout=timeout
)
else:
response = self.runner.run(self.stub.Query, request, timeout=timeout)
json_response = json_format.MessageToDict(response)
Expand Down Expand Up @@ -535,7 +546,9 @@ def update(
request = UpdateRequest(id=id, **args_dict)
if async_req:
future = self.runner.run(self.stub.Update.future, request, timeout=timeout)
return PineconeGrpcFuture(future)
return PineconeGrpcFuture(
future, timeout=timeout, result_transformer=parse_update_response
)
else:
return self.runner.run(self.stub.Update, request, timeout=timeout)

Expand Down
1 change: 1 addition & 0 deletions pinecone/grpc/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,6 @@ def Index(self, name: str = "", host: str = "", **kwargs):
source_tag=self._config.source_tag,
proxy_url=self._config.proxy_url,
ssl_ca_certs=self._config.ssl_ca_certs,
ssl_verify=self._config.ssl_verify,
)
return GRPCIndex(index_name=name, config=config, pool_threads=pt, **kwargs)
30 changes: 25 additions & 5 deletions pinecone/grpc/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, Union
from google.protobuf import json_format
from google.protobuf.message import Message

Expand All @@ -11,6 +11,7 @@
SparseValues,
QueryResponse,
IndexDescription as DescribeIndexStatsResponse,
UpsertResponse,
NamespaceSummary,
)
from pinecone.db_data.dataclasses import FetchResponse
Expand Down Expand Up @@ -63,9 +64,28 @@ def parse_usage(usage: dict):
return Usage(read_units=int(usage.get("readUnits", 0)))


def parse_query_response(response: dict, _check_type: bool = False):
def parse_upsert_response(response: Message, _check_type: bool = False):
json_response = json_format.MessageToDict(response)
upserted_count = json_response.get("upsertedCount", 0)
return UpsertResponse(upserted_count=int(upserted_count))


def parse_update_response(response: Union[dict, Message], _check_type: bool = False):
return {}


def parse_delete_response(response: Union[dict, Message], _check_type: bool = False):
return {}
Comment on lines +73 to +78
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two requests don't actually return anything, so there's nothing to do here.



def parse_query_response(response: Union[dict, Message], _check_type: bool = False):
if isinstance(response, Message):
json_response = json_format.MessageToDict(response)
else:
json_response = response

matches = []
for item in response.get("matches", []):
for item in json_response.get("matches", []):
sc = ScoredVector(
id=item["id"],
score=item.get("score", 0.0),
Expand All @@ -80,11 +100,11 @@ def parse_query_response(response: dict, _check_type: bool = False):
# creating empty `Usage` objects and then passing them into QueryResponse
# when they are not actually present in the response from the server.
args = {
"namespace": response.get("namespace", ""),
"namespace": json_response.get("namespace", ""),
"matches": matches,
"_check_type": _check_type,
}
usage = response.get("usage")
usage = json_response.get("usage")
if usage:
args["usage"] = parse_usage(usage)
return QueryResponse(**args)
Expand Down
Empty file.
109 changes: 109 additions & 0 deletions tests/integration/data_grpc_futures/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import pytest
import json
import uuid
from ..helpers import get_environment_var, index_tags as index_tags_helper, generate_name
import logging
from pinecone import EmbedModel, CloudProvider, AwsRegion, IndexEmbed
from pinecone.grpc import PineconeGRPC

logger = logging.getLogger(__name__)

RUN_ID = str(uuid.uuid4())

created_indexes = []


@pytest.fixture(scope="session")
def index_tags(request):
return index_tags_helper(request, RUN_ID)


@pytest.fixture(scope="session")
def pc():
return PineconeGRPC()


@pytest.fixture(scope="session")
def spec():
spec_json = get_environment_var(
"SPEC", '{"serverless": {"cloud": "aws", "region": "us-east-1" }}'
)
return json.loads(spec_json)


@pytest.fixture(scope="session")
def model_idx(pc, index_tags, request):
model_index_name = generate_name(request.node.name, "embed")
if not pc.has_index(name=model_index_name):
logger.info(f"Creating index {model_index_name}")
pc.create_index_for_model(
name=model_index_name,
cloud=CloudProvider.AWS,
region=AwsRegion.US_WEST_2,
embed=IndexEmbed(
model=EmbedModel.Multilingual_E5_Large,
field_map={"text": "my_text_field"},
metric="cosine",
),
tags=index_tags,
)
created_indexes.append(model_index_name)
else:
logger.info(f"Index {model_index_name} already exists")

description = pc.describe_index(name=model_index_name)
return pc.Index(host=description.host)


def create_index(pc, create_args):
if not pc.has_index(name=create_args["name"]):
logger.info(f"Creating index {create_args['name']}")
pc.create_index(**create_args)
else:
logger.info(f"Index {create_args['name']} already exists")

host = pc.describe_index(name=create_args["name"]).host

return host


@pytest.fixture(scope="session")
def idx(pc, spec, index_tags, request):
index_name = generate_name(request.node.name, "dense")
logger.info(f"Request: {request.node}")
create_args = {
"name": index_name,
"dimension": 2,
"metric": "cosine",
"spec": spec,
"tags": index_tags,
}
host = create_index(pc, create_args)
logger.info(f"Using index {index_name} with host {host} as idx")
created_indexes.append(index_name)
return pc.Index(host=host)


@pytest.fixture(scope="session")
def sparse_idx(pc, spec, index_tags, request):
index_name = generate_name(request.node.name, "sparse")
create_args = {
"name": index_name,
"metric": "dotproduct",
"spec": spec,
"vector_type": "sparse",
"tags": index_tags,
}
host = create_index(pc, create_args)
created_indexes.append(index_name)
return pc.Index(host=host)


def pytest_sessionfinish(session, exitstatus):
for index in created_indexes:
try:
logger.info(f"Deleting index {index}")
pc = PineconeGRPC()
pc.delete_index(name=index, timeout=-1)
except Exception as e:
logger.error(f"Error deleting index {index}: {e}")
95 changes: 95 additions & 0 deletions tests/integration/data_grpc_futures/stub_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import time
import grpc
import logging
from concurrent import futures
import pinecone.core.grpc.protos.db_data_2025_01_pb2 as pb2
import pinecone.core.grpc.protos.db_data_2025_01_pb2_grpc as pb2_grpc

logger = logging.getLogger(__name__)


class TestVectorService(pb2_grpc.VectorServiceServicer):
def __init__(self, sleep_seconds=5):
self.sleep_seconds = sleep_seconds

def Upsert(self, request, context):
# Simulate a delay that will cause a timeout
logger.info("Received an upsert request from test client")
logger.info(f"Request: {request}")
logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
time.sleep(self.sleep_seconds)
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
logger.info("Returning an upsert response from test server")
return pb2.UpsertResponse(upserted_count=1)

def Query(self, request, context):
# Simulate a delay that will cause a timeout
logger.info("Received a query request from test client")
logger.info(f"Request: {request}")

logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
time.sleep(self.sleep_seconds)
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
logger.info("Returning a query response from test server")
return pb2.QueryResponse(
results=[],
matches=[pb2.ScoredVector(id="1", score=1.0, values=[1.0, 2.0, 3.0])],
namespace="testnamespace",
usage=pb2.Usage(read_units=1),
)

def Update(self, request, context):
# Simulate a delay that will cause a timeout
logger.info("Received an update request from test client")
logger.info(f"Request: {request}")
logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
time.sleep(self.sleep_seconds)
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
logger.info("Returning an update response from test server")
return pb2.UpdateResponse()

def Delete(self, request, context):
# Simulate a delay that will cause a timeout
logger.info("Received a delete request from test client")
logger.info(f"Request: {request}")
logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
time.sleep(self.sleep_seconds)
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
logger.info("Returning a delete response from test server")
return pb2.DeleteResponse()

def Fetch(self, request, context):
logger.info("Received a fetch request from test client")
logger.info(f"Request: {request}")
logger.info(f"Sleeping for {self.sleep_seconds} seconds to simulate a slow server call")
time.sleep(self.sleep_seconds)
logger.info(f"Done sleeping for {self.sleep_seconds} seconds")
logger.info("Returning a fetch response from test server")
return pb2.FetchResponse(
vectors={
"1": pb2.Vector(id="1", values=[1.0, 2.0, 3.0]),
"2": pb2.Vector(id="2", values=[4.0, 5.0, 6.0]),
"3": pb2.Vector(id="3", values=[7.0, 8.0, 9.0]),
},
namespace="testnamespace",
usage=pb2.Usage(read_units=1),
)


def create_sleepy_test_server(port=50051, sleep_seconds=5):
"""Creates and returns a configured gRPC server for testing.

Args:
port (int): The port number to run the server on
sleep_seconds (int): The extra latency in seconds for simulated operations

Returns:
grpc.Server: A configured and started gRPC server instance
"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_VectorServiceServicer_to_server(
TestVectorService(sleep_seconds=sleep_seconds), server
)
server.add_insecure_port(f"[::]:{port}")
server.start()
return server
Loading