Skip to content

Commit 2f8b1ab

Browse files
authored
Bug fixes in telemetry (databricks#659)
* flush fix, sync fix in e2e test Signed-off-by: Sai Shree Pradhan <[email protected]> * sync fix Signed-off-by: Sai Shree Pradhan <[email protected]> --------- Signed-off-by: Sai Shree Pradhan <[email protected]>
1 parent fe8cd57 commit 2f8b1ab

File tree

2 files changed

+28
-17
lines changed

2 files changed

+28
-17
lines changed

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
127127
def close(self):
128128
pass
129129

130+
def _flush(self):
131+
pass
132+
130133

131134
class TelemetryClient(BaseTelemetryClient):
132135
"""

tests/e2e/test_concurrent_telemetry.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from concurrent.futures import wait
12
import random
23
import threading
34
import time
@@ -35,6 +36,7 @@ def telemetry_setup_teardown(self):
3536
if TelemetryClientFactory._executor:
3637
TelemetryClientFactory._executor.shutdown(wait=True)
3738
TelemetryClientFactory._executor = None
39+
TelemetryClientFactory._stop_flush_thread()
3840
TelemetryClientFactory._initialized = False
3941

4042
def test_concurrent_queries_sends_telemetry(self):
@@ -47,8 +49,7 @@ def test_concurrent_queries_sends_telemetry(self):
4749
captured_telemetry = []
4850
captured_session_ids = []
4951
captured_statement_ids = []
50-
captured_responses = []
51-
captured_exceptions = []
52+
captured_futures = []
5253

5354
original_send_telemetry = TelemetryClient._send_telemetry
5455
original_callback = TelemetryClient._telemetry_request_callback
@@ -63,18 +64,9 @@ def callback_wrapper(self_client, future, sent_count):
6364
Wraps the original callback to capture the server's response
6465
or any exceptions from the async network call.
6566
"""
66-
try:
67-
original_callback(self_client, future, sent_count)
68-
69-
# Now, capture the result for our assertions
70-
response = future.result()
71-
response.raise_for_status() # Raise an exception for 4xx/5xx errors
72-
telemetry_response = response.json()
73-
with capture_lock:
74-
captured_responses.append(telemetry_response)
75-
except Exception as e:
76-
with capture_lock:
77-
captured_exceptions.append(e)
67+
with capture_lock:
68+
captured_futures.append(future)
69+
original_callback(self_client, future, sent_count)
7870

7971
with patch.object(TelemetryClient, "_send_telemetry", send_telemetry_wrapper), \
8072
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
@@ -101,10 +93,26 @@ def execute_query_worker(thread_id):
10193
# Run the workers concurrently
10294
run_in_threads(execute_query_worker, num_threads, pass_index=True)
10395

104-
if TelemetryClientFactory._executor:
105-
TelemetryClientFactory._executor.shutdown(wait=True)
96+
timeout_seconds = 60
97+
start_time = time.time()
98+
expected_event_count = num_threads
99+
100+
while len(captured_futures) < expected_event_count and time.time() - start_time < timeout_seconds:
101+
time.sleep(0.1)
102+
103+
done, not_done = wait(captured_futures, timeout=timeout_seconds)
104+
assert not not_done
105+
106+
captured_exceptions = []
107+
captured_responses = []
108+
for future in done:
109+
try:
110+
response = future.result()
111+
response.raise_for_status()
112+
captured_responses.append(response.json())
113+
except Exception as e:
114+
captured_exceptions.append(e)
106115

107-
# --- VERIFICATION ---
108116
assert not captured_exceptions
109117
assert len(captured_responses) > 0
110118

0 commit comments

Comments
 (0)