Skip to content

Commit b23dbf1

Browse files
committed
Add timeout buffer to DownstreamReceiver request handling
Introduced a `timeout_buffer` parameter to enhance flexibility in managing request timeouts. This ensures a buffer is added to the wait time, preventing premature timeout errors during request processing. Updated `setup.py` to bump the version from 3.5.0 to 3.5.1.
1 parent 218d0a1 commit b23dbf1

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

kubemq/queues/downstream_receiver.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from kubemq.common import *
1515
from kubemq.common.helpers import decode_grpc_error, is_channel_error
1616

17-
1817
class DownstreamReceiver:
1918
"""
2019
Class representing a downstream receiver for sending requests to a KubeMQ server.
@@ -55,6 +54,7 @@ def __init__(
5554
queue_size: int = 0,
5655
queue_timeout: float = 0.1,
5756
request_sleep_interval: float = 0.1,
57+
timeout_buffer: float = 0.5,
5858
):
5959
"""Initialize a new DownstreamReceiver.
6060
@@ -65,6 +65,7 @@ def __init__(
6565
queue_size: Maximum size of the request queue (0 for unlimited)
6666
queue_timeout: Timeout in seconds for queue polling
6767
request_sleep_interval: Sleep interval in seconds between requests (0 to disable)
68+
timeout_buffer: Timeout buffer in seconds for request timeouts
6869
"""
6970
self.transport = transport
7071
self.clientStub = transport.kubemq_client()
@@ -77,6 +78,7 @@ def __init__(
7778
self.allow_new_requests = True
7879
self.queue_timeout = queue_timeout
7980
self.request_sleep_interval = request_sleep_interval
81+
self.timeout_buffer = timeout_buffer
8082
threading.Thread(target=self._send_queue_stream, args=(), daemon=True).start()
8183

8284
def send(
@@ -109,7 +111,8 @@ def send(
109111
response_result,
110112
)
111113
self.queue.put(request)
112-
response_result.wait(request.WaitTimeout / 1000)
114+
request_wait_timeout = (request.WaitTimeout / 1000) + self.timeout_buffer
115+
response_result.wait(request_wait_timeout)
113116
response: QueuesDownstreamResponse = response_container.get("response")
114117
with self.lock:
115118
if self.response_tracking.get(request.RequestID):

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
setup(
1111
name="kubemq",
12-
version="3.5.0",
12+
version="3.5.1",
1313
description="KubeMQ SDK for Python",
1414
long_description=README,
1515
long_description_content_type="text/markdown",

0 commit comments

Comments
 (0)