Skip to content

[471] - Close underlying HTTP Client on closing Connection #674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4437a2a
Refactor codebase to use a unified http client
vikrantpuppala Aug 8, 2025
30c04a6
Some more fixes and aligned tests
vikrantpuppala Aug 8, 2025
4294600
Fix all tests
vikrantpuppala Aug 8, 2025
3155211
fmt
vikrantpuppala Aug 8, 2025
1143838
preliminary connection closure func
Varun0157 Aug 9, 2025
68cc822
unit test for backend closure
Varun0157 Aug 9, 2025
ef1d9fd
remove redundant comment
Varun0157 Aug 9, 2025
4bb2e4b
assert SEA http client closure in unit tests
Varun0157 Aug 10, 2025
734dd06
correct docstrng
Varun0157 Aug 10, 2025
d00e3c8
fix e2e
vikrantpuppala Aug 11, 2025
000d3a3
fix unit
vikrantpuppala Aug 11, 2025
cba3da7
more fixes
vikrantpuppala Aug 11, 2025
2a1f719
more fixes
vikrantpuppala Aug 11, 2025
1dd40a1
review comments
vikrantpuppala Aug 12, 2025
3847aca
fix warnings
vikrantpuppala Aug 12, 2025
d9a4797
fix check-types
vikrantpuppala Aug 12, 2025
ba2a3a9
remove separate http client for telemetry
vikrantpuppala Aug 12, 2025
d1f045e
more clean up
vikrantpuppala Aug 12, 2025
ea3b0b0
Merge remote-tracking branch 'target/http-client-refactor-2' into clo…
Varun0157 Aug 13, 2025
4e66230
remove excess release_connection call
Varun0157 Aug 13, 2025
bf0a2f6
Merge remote-tracking branch 'target/main' into close-conn
Varun0157 Aug 13, 2025
67020f1
formatting (black) - fix some closures
Varun0157 Aug 13, 2025
496d7f7
Revert "formatting (black) - fix some closures"
Varun0157 Aug 13, 2025
84ec33a
add more http_client closures
Varun0157 Aug 13, 2025
76ce5ce
remove excess close call
Varun0157 Aug 13, 2025
4452725
wait for _flush before closing HTTP client
Varun0157 Aug 14, 2025
d90ac80
make close() async
Varun0157 Aug 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/databricks/sql/auth/thrift_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def startRetryTimer(self):
self.retry_policy and self.retry_policy.start_retry_timer()

def open(self):

# self.__pool replaces the self.__http used by the original THttpClient
_pool_kwargs = {"maxsize": self.max_connections}

Expand Down Expand Up @@ -140,19 +139,21 @@ def open(self):
else:
self.__pool = pool_class(self.host, self.port, **_pool_kwargs)

def close(self):
def release_connection(self):
self.__resp and self.__resp.drain_conn()
self.__resp and self.__resp.release_conn()
self.__resp = None

def close(self):
self.__pool.close()

def read(self, sz):
return self.__resp.read(sz)

def isOpen(self):
return self.__resp is not None

def flush(self):

# Pull data out of buffer that will be sent in this request
data = self.__wbuf.getvalue()
self.__wbuf = BytesIO()
Expand Down
21 changes: 18 additions & 3 deletions src/databricks/sql/backend/sea/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def open_session(

return SessionId.from_sea_session_id(session_id)

def close_session(self, session_id: SessionId) -> None:
def _close_session(self, session_id: SessionId) -> None:
"""
Closes an existing session with the Databricks SQL service.
Expand All @@ -285,8 +285,6 @@ def close_session(self, session_id: SessionId) -> None:
OperationalError: If there's an error closing the session
"""

logger.debug("SeaDatabricksClient.close_session(session_id=%s)", session_id)

if session_id.backend_type != BackendType.SEA:
raise ValueError("Not a valid SEA session ID")
sea_session_id = session_id.to_sea_session_id()
Expand All @@ -302,6 +300,23 @@ def close_session(self, session_id: SessionId) -> None:
data=request_data.to_dict(),
)

def close_session(self, session_id: SessionId) -> None:
"""
Closes the session and the underlying HTTP client.
Args:
session_id: The session identifier returned by open_session()
Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error closing the session
"""

logger.debug("SeaDatabricksClient.close_session(session_id=%s)", session_id)

self._close_session(session_id)
self._http_client.close()
Comment on lines +317 to +318
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is a session and http client closed together? I feel it is easier to imagine the backend module as a function module for interacting with SQL EXEC. So, with that perspective, there should be minimal state in this and the backend should receive resources by higher level classes and those higher level classes should take care of closing the resources (for example, the connection/cursor should pass a http client to this and close the http client. Same goes for session: cursor maintains the session obtained functionally through this backend and should take care of closing the session). Please let me know if this doesn't look okay.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the philosophy overall, but I think passing the HTTP client into the backend from a higher-level object is likely infeasible right now since both backends have distinct, custom HTTP clients that expose different methods and are processed differently. Until we unify them, it makes sense to make the backend responsible for it's own HTTP Client instantiation, as a result of which it should be responsible for closing it as well.

Both HTTP Clients do have largely the same functionality, so unifying them (for the most part) could be a separate PR, at which point we can pass in the HTTP client from the Session class.
Let me know if I should do it in this PR instead.

Note that the pattern of instantiating the required HTTP client in the constructor of the concrete backend is common across Thrift and SEA.


def _extract_description_from_manifest(
self, manifest: ResultManifest
) -> List[Tuple]:
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/sql/backend/sea/utils/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def _open(self):
def close(self):
"""Close the connection pool."""
if self._pool:
self._pool.clear()
self._pool.close()

def using_proxy(self) -> bool:
"""Check if proxy is being used."""
Expand Down
4 changes: 2 additions & 2 deletions src/databricks/sql/backend/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ def attempt_request(attempt):
)
)
finally:
# Calling `close()` here releases the active HTTP connection back to the pool
self._transport.close()
# Calling `release_connection()` here releases the active HTTP connection back to the pool
self._transport.release_connection()

return RequestErrorInfo(
error=error,
Expand Down
32 changes: 29 additions & 3 deletions src/databricks/sql/telemetry/telemetry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future
from concurrent.futures import wait
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional, TYPE_CHECKING
from databricks.sql.telemetry.models.event import (
Expand Down Expand Up @@ -182,6 +183,7 @@ def __init__(
self._user_agent = None
self._events_batch = []
self._lock = threading.RLock()
self._pending_futures = set()
self._driver_connection_params = None
self._host_url = host_url
self._executor = executor
Expand Down Expand Up @@ -245,6 +247,9 @@ def _send_telemetry(self, events):
timeout=900,
)

with self._lock:
self._pending_futures.add(future)

future.add_done_callback(
lambda fut: self._telemetry_request_callback(fut, sent_count=sent_count)
)
Expand Down Expand Up @@ -303,6 +308,9 @@ def _telemetry_request_callback(self, future, sent_count: int):

except Exception as e:
logger.debug("Telemetry request failed with exception: %s", e)
finally:
with self._lock:
self._pending_futures.discard(future)

def _export_telemetry_log(self, **telemetry_event_kwargs):
"""
Expand Down Expand Up @@ -356,10 +364,29 @@ def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
)

def close(self):
"""Flush remaining events before closing"""
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
"""Schedules the client to be closed in the background."""
logger.debug(
"Scheduling background closure for TelemetryClient of connection %s",
self._session_id_hex,
)
self._executor.submit(self._close_and_wait)

def _close_and_wait(self):
"""Flush remaining events and wait for them to complete before closing."""
self._flush()

with self._lock:
futures_to_wait_on = list(self._pending_futures)

if futures_to_wait_on:
logger.debug(
"Waiting for %s pending telemetry requests to complete.",
len(futures_to_wait_on),
)
wait(futures_to_wait_on)

self._http_client.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch!
the issue here would be that _flush is async, it is going to start a bunch of futures to perform the POST /telemetry-ext but then the main thread is going to close the http_client which is then going to cause all those futures to fail. We need to have to do something smarter here to ensure that all futures are completes in the background before closing this http client (also in the background)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have merged the updated main into my branch.

Thanks for the catch, I didn't realise _flush was async.
Regarding the above: waiting for the futures to complete would essentially make the TelementryClient's close() synchronous, right? Is this an acceptable tradeoff? It seems necessary if we want to prevent the TCP connection from remaining open.

I'll go ahead and implement this change by adding some state that tracks each future and waits for them to complete before invoking the close() of the HTTP Client, but we can revert it or iterate on it if it seems like the telemetry changes begin to hamper the main process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wouldn't want to make the close() synchronous, I was trying this out, but the delay of this being sync is just too much. ideally we would like to perform async close of http_client based on future completion



class TelemetryClientFactory:
"""
Expand Down Expand Up @@ -460,7 +487,6 @@ def initialize_telemetry_client(
):
"""Initialize a telemetry client for a specific connection if telemetry is enabled"""
try:

with TelemetryClientFactory._lock:
TelemetryClientFactory._initialize()

Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_sea_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def test_session_management(self, sea_client, mock_http_client, thrift_session_i
path=sea_client.SESSION_PATH_WITH_ID.format("test-session-789"),
data={"session_id": "test-session-789", "warehouse_id": "abc123"},
)
mock_http_client.close.assert_called_once()

# Test close_session with invalid ID type
with pytest.raises(ValueError) as excinfo:
Expand Down
9 changes: 8 additions & 1 deletion tests/unit/test_thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1436,8 +1436,12 @@ def test_op_handle_respected_in_close_command(self, tcli_service_class):
)

@patch("databricks.sql.backend.thrift_backend.TCLIService.Client", autospec=True)
def test_session_handle_respected_in_close_session(self, tcli_service_class):
@patch("databricks.sql.auth.thrift_http_client.THttpClient", autospec=True)
def test_session_handle_respected_in_close_session(
self, mock_http_client_class, tcli_service_class
):
tcli_service_instance = tcli_service_class.return_value
mock_http_client_instance = mock_http_client_class.return_value
thrift_backend = ThriftDatabricksClient(
"foobar",
443,
Expand All @@ -1447,12 +1451,15 @@ def test_session_handle_respected_in_close_session(self, tcli_service_class):
ssl_options=SSLOptions(),
http_client=MagicMock(),
)
thrift_backend._transport = mock_http_client_instance

session_id = SessionId.from_thrift_handle(self.session_handle)
thrift_backend.close_session(session_id)
self.assertEqual(
tcli_service_instance.CloseSession.call_args[0][0].sessionHandle,
self.session_handle,
)
mock_http_client_instance.close.assert_called_once()

@patch("databricks.sql.backend.thrift_backend.TCLIService.Client", autospec=True)
def test_non_arrow_non_column_based_set_triggers_exception(
Expand Down