Skip to content

Commit 59d28b0

Browse files
authored
added logs for cloud fetch speed (#654)
1 parent 0a7a6ab commit 59d28b0

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

src/databricks/sql/cloudfetch/downloader.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,14 @@ class DownloadableResultSettings:
5454
link_expiry_buffer_secs (int): Time in seconds to prevent download of a link before it expires. Default 0 secs.
5555
download_timeout (int): Timeout for download requests. Default 60 secs.
5656
max_consecutive_file_download_retries (int): Number of consecutive download retries before shutting down.
57+
min_cloudfetch_download_speed (float): Threshold in MB/s below which to log warning. Default 0.1 MB/s.
5758
"""
5859

5960
is_lz4_compressed: bool
6061
link_expiry_buffer_secs: int = 0
6162
download_timeout: int = 60
6263
max_consecutive_file_download_retries: int = 0
64+
min_cloudfetch_download_speed: float = 0.1
6365

6466

6567
class ResultSetDownloadHandler:
@@ -100,6 +102,8 @@ def run(self) -> DownloadedFile:
100102
self.link, self.settings.link_expiry_buffer_secs
101103
)
102104

105+
start_time = time.time()
106+
103107
with self._http_client.execute(
104108
method=HttpMethod.GET,
105109
url=self.link.fileLink,
@@ -112,6 +116,13 @@ def run(self) -> DownloadedFile:
112116

113117
# Save (and decompress if needed) the downloaded file
114118
compressed_data = response.content
119+
120+
# Log download metrics
121+
download_duration = time.time() - start_time
122+
self._log_download_metrics(
123+
self.link.fileLink, len(compressed_data), download_duration
124+
)
125+
115126
decompressed_data = (
116127
ResultSetDownloadHandler._decompress_data(compressed_data)
117128
if self.settings.is_lz4_compressed
@@ -138,6 +149,32 @@ def run(self) -> DownloadedFile:
138149
self.link.rowCount,
139150
)
140151

152+
def _log_download_metrics(
153+
self, url: str, bytes_downloaded: int, duration_seconds: float
154+
):
155+
"""Log download speed metrics at INFO/WARN levels."""
156+
# Calculate speed in MB/s (ensure float division for precision)
157+
speed_mbps = (float(bytes_downloaded) / (1024 * 1024)) / duration_seconds
158+
159+
urlEndpoint = url.split("?")[0]
160+
# INFO level logging
161+
logger.info(
162+
"CloudFetch download completed: %.4f MB/s, %d bytes in %.3fs from %s",
163+
speed_mbps,
164+
bytes_downloaded,
165+
duration_seconds,
166+
urlEndpoint,
167+
)
168+
169+
# WARN level logging if below threshold
170+
if speed_mbps < self.settings.min_cloudfetch_download_speed:
171+
logger.warning(
172+
"CloudFetch download slower than threshold: %.4f MB/s (threshold: %.1f MB/s) from %s",
173+
speed_mbps,
174+
self.settings.min_cloudfetch_download_speed,
175+
url,
176+
)
177+
141178
@staticmethod
142179
def _validate_link(link: TSparkArrowResultLink, expiry_buffer_secs: int):
143180
"""

tests/unit/test_downloader.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ class DownloaderTests(unittest.TestCase):
2323
Unit tests for checking downloader logic.
2424
"""
2525

26+
def _setup_time_mock_for_download(self, mock_time, end_time):
27+
"""Helper to setup time mock that handles logging system calls."""
28+
call_count = [0]
29+
def time_side_effect():
30+
call_count[0] += 1
31+
if call_count[0] <= 2: # First two calls (validation, start_time)
32+
return 1000
33+
else: # All subsequent calls (logging, duration calculation)
34+
return end_time
35+
mock_time.side_effect = time_side_effect
36+
2637
@patch("time.time", return_value=1000)
2738
def test_run_link_expired(self, mock_time):
2839
settings = Mock()
@@ -90,13 +101,17 @@ def test_run_get_response_not_ok(self, mock_time):
90101
d.run()
91102
self.assertTrue("404" in str(context.exception))
92103

93-
@patch("time.time", return_value=1000)
104+
@patch("time.time")
94105
def test_run_uncompressed_successful(self, mock_time):
106+
self._setup_time_mock_for_download(mock_time, 1000.5)
107+
95108
http_client = DatabricksHttpClient.get_instance()
96109
file_bytes = b"1234567890" * 10
97110
settings = Mock(link_expiry_buffer_secs=0, download_timeout=0, use_proxy=False)
98111
settings.is_lz4_compressed = False
112+
settings.min_cloudfetch_download_speed = 1.0
99113
result_link = Mock(bytesNum=100, expiryTime=1001)
114+
result_link.fileLink = "https://s3.amazonaws.com/bucket/file.arrow?token=abc123"
100115

101116
with patch.object(
102117
http_client,
@@ -115,15 +130,19 @@ def test_run_uncompressed_successful(self, mock_time):
115130

116131
assert file.file_bytes == b"1234567890" * 10
117132

118-
@patch("time.time", return_value=1000)
133+
@patch("time.time")
119134
def test_run_compressed_successful(self, mock_time):
135+
self._setup_time_mock_for_download(mock_time, 1000.2)
136+
120137
http_client = DatabricksHttpClient.get_instance()
121138
file_bytes = b"1234567890" * 10
122139
compressed_bytes = b'\x04"M\x18h@d\x00\x00\x00\x00\x00\x00\x00#\x14\x00\x00\x00\xaf1234567890\n\x00BP67890\x00\x00\x00\x00'
123140

124141
settings = Mock(link_expiry_buffer_secs=0, download_timeout=0, use_proxy=False)
125142
settings.is_lz4_compressed = True
143+
settings.min_cloudfetch_download_speed = 1.0
126144
result_link = Mock(bytesNum=100, expiryTime=1001)
145+
result_link.fileLink = "https://s3.amazonaws.com/bucket/file.arrow?token=xyz789"
127146
with patch.object(
128147
http_client,
129148
"execute",

0 commit comments

Comments
 (0)