Skip to content

bugfix: stalling test issue (close in TelemetryClientFactory) #609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: telemetry
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/code-quality-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
# run test suite
#----------------------------------------------
- name: Run tests
run: poetry run python -m pytest tests/unit
run: poetry run python -m pytest tests/unit -v -s
run-unit-tests-with-arrow:
runs-on: ubuntu-latest
strategy:
Expand Down Expand Up @@ -112,7 +112,7 @@ jobs:
# run test suite
#----------------------------------------------
- name: Run tests
run: poetry run python -m pytest tests/unit
run: poetry run python -m pytest tests/unit -v -s
check-linting:
runs-on: ubuntu-latest
strategy:
Expand Down
5 changes: 4 additions & 1 deletion src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def read(self) -> Optional[OAuthToken]:
self.use_inline_params = self._set_use_inline_params_with_warning(
kwargs.get("use_inline_params", False)
)

print("Connection init : session_id_hex", self.get_session_id_hex(), flush=True)
TelemetryClientFactory.initialize_telemetry_client(
telemetry_enabled=self.telemetry_enabled,
session_id_hex=self.get_session_id_hex(),
Expand Down Expand Up @@ -440,6 +440,9 @@ def cursor(

def close(self) -> None:
"""Close the underlying session and mark all associated cursors as closed."""
print(
"Connection close: session_id_hex: ", self.get_session_id_hex(), flush=True
)
self._close()

def _close(self, close_cursors=True) -> None:
Expand Down
80 changes: 74 additions & 6 deletions src/databricks/sql/telemetry/telemetry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,36 @@ def initialize_telemetry_client(
"""Initialize a telemetry client for a specific connection if telemetry is enabled"""
try:

print(
"\nWAITING: Initializing telemetry client: %s",
session_id_hex,
flush=True,
)
with TelemetryClientFactory._lock:
print(
"\nACQUIRED: Initializing telemetry client, got lock: %s",
session_id_hex,
flush=True,
)
TelemetryClientFactory._initialize()
print(
"\n TelemetryClientFactory initialized: %s",
session_id_hex,
flush=True,
)

if session_id_hex not in TelemetryClientFactory._clients:
print(
"\n Session ID not in clients: %s",
session_id_hex,
flush=True,
)
logger.debug(
"Creating new TelemetryClient for connection %s",
session_id_hex,
)
if telemetry_enabled:
print("\n Telemetry enabled: %s", session_id_hex, flush=True)
TelemetryClientFactory._clients[
session_id_hex
] = TelemetryClient(
Expand All @@ -385,11 +406,41 @@ def initialize_telemetry_client(
host_url=host_url,
executor=TelemetryClientFactory._executor,
)
print(
"\n Telemetry client initialized: %s",
session_id_hex,
flush=True,
)
else:
print(
"\n Telemetry disabled: %s", session_id_hex, flush=True
)
TelemetryClientFactory._clients[
session_id_hex
] = NoopTelemetryClient()
print(
"\n Noop Telemetry client initialized: %s",
session_id_hex,
flush=True,
)
else:
print(
"\n Session ID already in clients: %s",
session_id_hex,
flush=True,
)
print(
"\nRELEASED: Telemetry client initialized: %s",
session_id_hex,
flush=True,
)
except Exception as e:
print(
"\nERROR: Failed to initialize telemetry client: %s due to %s",
session_id_hex,
e,
flush=True,
)
logger.debug("Failed to initialize telemetry client: %s", e)
# Fallback to NoopTelemetryClient to ensure connection doesn't fail
TelemetryClientFactory._clients[session_id_hex] = NoopTelemetryClient()
Expand All @@ -413,23 +464,40 @@ def get_telemetry_client(session_id_hex):
@staticmethod
def close(session_id_hex):
"""Close and remove the telemetry client for a specific connection"""

print("\nWAITING: Closing telemetry client: %s", session_id_hex, flush=True)
with TelemetryClientFactory._lock:
if (
telemetry_client := TelemetryClientFactory._clients.pop(
session_id_hex, None
)
) is not None:
print(
"\nACQUIRED: Closing telemetry client, got lock: %s",
session_id_hex,
flush=True,
)
# if (
# telemetry_client := TelemetryClientFactory._clients.pop(
# session_id_hex, None
# )
# ) is not None:
if session_id_hex in TelemetryClientFactory._clients:
logger.debug(
"Removing telemetry client for connection %s", session_id_hex
)
telemetry_client = TelemetryClientFactory._clients.pop(session_id_hex)
telemetry_client.close()

# Shutdown executor if no more clients
if not TelemetryClientFactory._clients and TelemetryClientFactory._executor:
logger.debug(
"No more telemetry clients, shutting down thread pool executor"
)
print(
"\nSHUTDOWN: Shutting down thread pool executor: %s",
session_id_hex,
flush=True,
)
TelemetryClientFactory._executor.shutdown(wait=True)
TelemetryClientFactory._executor = None
TelemetryClientFactory._initialized = False
print(
"\nRELEASED: Thread pool executor shut down: %s",
session_id_hex,
flush=True,
)
1 change: 1 addition & 0 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def test_context_manager_closes_cursor(self):

@patch("%s.client.ThriftBackend" % PACKAGE_NAME)
def test_context_manager_closes_connection(self, mock_client_class):
print("stalling test")
instance = mock_client_class.return_value

mock_open_session_resp = MagicMock(spec=TOpenSessionResp)()
Expand Down
Loading