Skip to content

Commit fb3e32c

Browse files
authored
Fix : Unable to poll results in async API (#515)
* Fixed the async issue * Added unit tests * Minor change * Changed to 5 * Increased time
1 parent 3111158 commit fb3e32c

File tree

3 files changed

+66
-10
lines changed

3 files changed

+66
-10
lines changed

src/databricks/sql/thrift_backend.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -899,8 +899,12 @@ def execute_command(
899899
sessionHandle=session_handle,
900900
statement=operation,
901901
runAsync=True,
902-
getDirectResults=ttypes.TSparkGetDirectResults(
903-
maxRows=max_rows, maxBytes=max_bytes
902+
# For async operation we don't want the direct results
903+
getDirectResults=None
904+
if async_op
905+
else ttypes.TSparkGetDirectResults(
906+
maxRows=max_rows,
907+
maxBytes=max_bytes,
904908
),
905909
canReadArrowResult=True if pyarrow else False,
906910
canDecompressLZ4Result=lz4_compression,

tests/e2e/common/large_queries_mixin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_long_running_query(self):
9494
scale_factor = 1
9595
with self.cursor() as cursor:
9696
while duration < min_duration:
97-
assert scale_factor < 512, "Detected infinite loop"
97+
assert scale_factor < 1024, "Detected infinite loop"
9898
start = time.time()
9999

100100
cursor.execute(

tests/e2e/test_driver.py

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,19 +177,22 @@ def test_cloud_fetch(self):
177177
for i in range(len(cf_result)):
178178
assert cf_result[i] == noop_result[i]
179179

180-
def test_execute_async(self):
181-
def isExecuting(operation_state):
182-
return not operation_state or operation_state in [
183-
ttypes.TOperationState.RUNNING_STATE,
184-
ttypes.TOperationState.PENDING_STATE,
185-
]
180+
181+
class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase):
182+
def isExecuting(self, operation_state):
183+
return not operation_state or operation_state in [
184+
ttypes.TOperationState.RUNNING_STATE,
185+
ttypes.TOperationState.PENDING_STATE,
186+
]
187+
188+
def test_execute_async__long_running(self):
186189

187190
long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'"
188191
with self.cursor() as cursor:
189192
cursor.execute_async(long_running_query)
190193

191194
## Polling after every POLLING_INTERVAL seconds
192-
while isExecuting(cursor.get_query_state()):
195+
while self.isExecuting(cursor.get_query_state()):
193196
time.sleep(self.POLLING_INTERVAL)
194197
log.info("Polling the status in test_execute_async")
195198

@@ -198,6 +201,55 @@ def isExecuting(operation_state):
198201

199202
assert result[0].asDict() == {"count(1)": 0}
200203

204+
def test_execute_async__small_result(self):
205+
small_result_query = "SELECT 1"
206+
207+
with self.cursor() as cursor:
208+
cursor.execute_async(small_result_query)
209+
210+
## Fake sleep for 5 secs
211+
time.sleep(5)
212+
213+
## Polling after every POLLING_INTERVAL seconds
214+
while self.isExecuting(cursor.get_query_state()):
215+
time.sleep(self.POLLING_INTERVAL)
216+
log.info("Polling the status in test_execute_async")
217+
218+
cursor.get_async_execution_result()
219+
result = cursor.fetchall()
220+
221+
assert result[0].asDict() == {"1": 1}
222+
223+
def test_execute_async__large_result(self):
224+
x_dimension = 1000
225+
y_dimension = 1000
226+
large_result_query = f"""
227+
SELECT
228+
x.id AS x_id,
229+
y.id AS y_id,
230+
FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date
231+
FROM
232+
RANGE({x_dimension}) x
233+
JOIN
234+
RANGE({y_dimension}) y
235+
"""
236+
237+
with self.cursor() as cursor:
238+
cursor.execute_async(large_result_query)
239+
240+
## Fake sleep for 5 secs
241+
time.sleep(5)
242+
243+
## Polling after every POLLING_INTERVAL seconds
244+
while self.isExecuting(cursor.get_query_state()):
245+
time.sleep(self.POLLING_INTERVAL)
246+
log.info("Polling the status in test_execute_async")
247+
248+
cursor.get_async_execution_result()
249+
result = cursor.fetchall()
250+
251+
assert len(result) == x_dimension * y_dimension
252+
201253

202254
# Exclude Retry tests because they require specific setups, and LargeQueries too slow for core
203255
# tests

0 commit comments

Comments
 (0)