Skip to content

Commit e70b016

Browse files
authored
Add query support for elasticity changes in the system
1 parent aef263a commit e70b016

File tree

11 files changed

+478
-165
lines changed

11 files changed

+478
-165
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/).
1919
23.3 and higher and will not be immediately available in the cloud service
2020
- Cloud only: added support for Cloud feature Global Active Tables. This
2121
includes new requests and classes:
22-
22+
2323
- AddReplicaRequest
2424
- DropReplicaRequest
2525
- ReplicaStatsRequest/Result
@@ -29,6 +29,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/).
2929
as well as additional replica-related information
3030
and interfaces in TableResult
3131

32+
## Changed
33+
34+
- Modified internal query processing to better support elasticity operations
3235

3336
## Fixed
3437

src/borneo/client.py

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
OperationNotSupportedException, RequestSizeLimitException)
2323
from .http import RateLimiterMap, RequestUtils
2424
from .kv import StoreAccessTokenProvider
25-
from .operations import GetTableRequest, QueryResult, TableRequest, WriteRequest
25+
from .operations import (
26+
GetTableRequest, QueryRequest, QueryResult, TableRequest, WriteRequest)
2627
from .query import QueryDriver
2728
from .serdeutil import SerdeUtil
2829
from .stats import StatsControl
@@ -94,6 +95,8 @@ def __init__(self, config, logger):
9495
self._sess.mount(self._url.scheme + '://', adapter)
9596
if self._proxy_host is not None:
9697
self._check_and_set_proxy(self._sess)
98+
self.query_version = QueryDriver.QUERY_VERSION
99+
self._topology_info = None
97100
self.serial_version = config.get_serial_version()
98101
# StoreAccessTokenProvider means onprem
99102
self._is_cloud = not isinstance(self._auth_provider, StoreAccessTokenProvider)
@@ -202,7 +205,6 @@ def execute(self, request):
202205
'QueryRequest has no QueryDriver, but is prepared', 2)
203206
driver = QueryDriver(request)
204207
driver.set_client(self)
205-
driver.set_topology_info(request.topology_info())
206208
return QueryResult(request, False)
207209
"""
208210
If we are here, then this is either (a) a simple query or (b) an
@@ -216,7 +218,11 @@ def execute(self, request):
216218
"""
217219
self._trace(
218220
'QueryRequest has no QueryDriver and is not prepared', 2)
221+
request.incr_batch_counter()
222+
219223
timeout_ms = request.get_timeout()
224+
if request is QueryRequest or request.is_query_request():
225+
request.set_topo_seq_num(self.get_topo_seq_num())
220226
headers = {'Host': self._url.hostname,
221227
'Content-Type': 'application/octet-stream',
222228
'Connection': 'keep-alive',
@@ -256,9 +262,27 @@ def execute(self, request):
256262
request_utils = RequestUtils(
257263
self._sess, self._logutils, request, self._retry_handler, self,
258264
self._rate_limiter_map)
259-
return request_utils.do_post_request(self._request_uri, headers,
260-
content, timeout_ms,
261-
self._stats_control)
265+
res = request_utils.do_post_request(self._request_uri, headers,
266+
content, timeout_ms,
267+
self._stats_control)
268+
self._set_topology_info(res.get_topology_info())
269+
if res is QueryResult and request.is_query_request():
270+
request.set_query_traces(res.get_query_traces())
271+
return res
272+
273+
@synchronized
274+
def _set_topology_info(self, topo):
275+
if topo is None:
276+
return
277+
if self.get_topo_seq_num() < topo.get_seq_num():
278+
self._topology_info = topo
279+
280+
def get_topo_seq_num(self):
281+
return -1 if self._topology_info is None else \
282+
self._topology_info.get_seq_num()
283+
284+
def get_topology(self):
285+
return self._topology_info
262286

263287
# set the session cookie if in return headers (see RequestUtils in http.py)
264288
@synchronized
@@ -484,6 +508,24 @@ def _update_table_limiters(self, table_name):
484508
'Background thread added limiters for table "' + table_name +
485509
'"')
486510

511+
def decrement_query_version(self):
512+
"""
513+
Decrements the query version, if it is greater than the minimum.
514+
For internal use only.
515+
516+
The current minimum value is V3.
517+
:returns: true if the version was decremented, false otherwise.
518+
:rtype: bool
519+
"""
520+
if self.query_version > QueryDriver.QUERY_V3:
521+
self.query_version -= 1
522+
msg = ('Unsupported query version error, decrementing ' +
523+
'version to ' + str(self.query_version) +
524+
' and retrying')
525+
self._logutils.log_info(msg)
526+
return True
527+
return False
528+
487529
def decrement_serial_version(self):
488530
"""
489531
Decrements the serial version, if it is greater than the minimum.
@@ -534,6 +576,8 @@ def serialize_request(self, request, headers):
534576
:returns: the bytearray that contains the content.
535577
:rtype: bytearray
536578
"""
579+
# in case it's a query or prepare
580+
request.set_query_version(self.query_version)
537581
content = self._write_content(request)
538582
headers.update({'Content-Length': str(len(content))})
539583
return content

src/borneo/common.py

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,7 +1075,7 @@ class PreparedStatement(object):
10751075
"""
10761076
OPCODE_SELECT = 5
10771077

1078-
def __init__(self, sql_text, query_plan, query_schema, topology_info,
1078+
def __init__(self, sql_text, query_plan, query_schema,
10791079
proxy_statement, driver_plan, num_iterators, num_registers,
10801080
external_vars, namespace, table_name, operation):
10811081
"""
@@ -1091,8 +1091,6 @@ def __init__(self, sql_text, query_plan, query_schema, topology_info,
10911091
self._sql_text = sql_text
10921092
self._query_plan = query_plan
10931093
self._query_schema = query_schema
1094-
# Applicable to advanced queries only.
1095-
self._topology_info = topology_info
10961094
# The serialized PreparedStatement created at the backend store. It is
10971095
# opaque for the driver. It is received from the proxy and sent back to
10981096
# the proxy every time a new batch of results is needed.
@@ -1140,7 +1138,7 @@ def copy_statement(self):
11401138
:rtype: PreparedStatement
11411139
"""
11421140
return PreparedStatement(
1143-
self._sql_text, self._query_plan, self._query_schema, self._topology_info,
1141+
self._sql_text, self._query_plan, self._query_schema,
11441142
self._proxy_statement, self._driver_query_plan, self._num_iterators,
11451143
self._num_registers, self._variables, self._namespace,
11461144
self._table_name, self._operation)
@@ -1227,16 +1225,6 @@ def print_driver_plan(self):
12271225
return self._driver_query_plan.display()
12281226
return None
12291227

1230-
@synchronized
1231-
def set_topology_info(self, topology_info):
1232-
if topology_info is None:
1233-
return
1234-
if self._topology_info is None:
1235-
self._topology_info = topology_info
1236-
return
1237-
if self._topology_info.get_seq_num() < topology_info.get_seq_num():
1238-
self._topology_info = topology_info
1239-
12401228
def set_variable(self, variable, value):
12411229
"""
12421230
Binds an external variable to a given value. The variable is identified
@@ -1277,14 +1265,6 @@ def set_variable(self, variable, value):
12771265
raise IllegalArgumentException(
12781266
'There is no external variable at position ' + str(variable))
12791267

1280-
def topology_info(self):
1281-
return self._topology_info
1282-
1283-
@synchronized
1284-
def topology_seq_num(self):
1285-
return (-1 if self._topology_info is None else
1286-
self._topology_info.get_seq_num())
1287-
12881268

12891269
class PutOption(object):
12901270
"""

src/borneo/exception.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ class UnsupportedQueryVersionException(NoSQLException):
301301
"""
302302

303303
def __init__(self, message):
304-
super(UnauthorizedException, self).__init__(message)
304+
super(UnsupportedQueryVersionException, self).__init__(message)
305305

306306

307307
class IndexExistsException(ResourceExistsException):

src/borneo/http.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
from .exception import (
1818
IllegalStateException, NoSQLException,
1919
ReadThrottlingException, RequestTimeoutException, RetryableException,
20-
SecurityInfoNotReadyException, UnsupportedProtocolException,
21-
WriteThrottlingException)
20+
SecurityInfoNotReadyException, UnsupportedQueryVersionException,
21+
UnsupportedProtocolException, WriteThrottlingException)
2222
from .serdeutil import SerdeUtil
2323

2424
try:
@@ -323,7 +323,6 @@ def _do_request(self, method, uri, headers, payload, timeout_ms,
323323
stats_config.observe(self._request, req_size,
324324
len(response.content),
325325
network_time)
326-
327326
# check for a Set-Cookie header
328327
cookie = response.headers.get('Set-Cookie', None)
329328
if cookie is not None and cookie.startswith('session='):
@@ -405,6 +404,19 @@ def _do_request(self, method, uri, headers, payload, timeout_ms,
405404
self._request.increment_retries()
406405
exception = re
407406
continue
407+
except UnsupportedQueryVersionException as uqve:
408+
if self._client.decrement_query_version():
409+
if self._request is not None:
410+
payload = self._client.serialize_request(self._request,
411+
headers)
412+
self._request.increment_retries()
413+
# don't set exception for this case -- it is misleading
414+
# exception = uqve
415+
continue
416+
self._logutils.log_error(
417+
'Client execution UnsupportedQueryVersionException: ' +
418+
str(uqve))
419+
raise uqve
408420
except UnsupportedProtocolException as upe:
409421
if self._client.decrement_serial_version():
410422
if self._request is not None:

0 commit comments

Comments
 (0)