diff --git a/.github/workflows/code-quality-checks.yml b/.github/workflows/code-quality-checks.yml index 462d22369..36bfa1bb0 100644 --- a/.github/workflows/code-quality-checks.yml +++ b/.github/workflows/code-quality-checks.yml @@ -61,7 +61,7 @@ jobs: # run test suite #---------------------------------------------- - name: Run tests - run: poetry run python -m pytest tests/unit + run: poetry run python -m pytest tests/unit -v -s run-unit-tests-with-arrow: runs-on: ubuntu-latest strategy: @@ -112,7 +112,7 @@ jobs: # run test suite #---------------------------------------------- - name: Run tests - run: poetry run python -m pytest tests/unit + run: poetry run python -m pytest tests/unit -v -s check-linting: runs-on: ubuntu-latest strategy: diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 26705f3f8..cacf0ddb4 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -305,7 +305,7 @@ def read(self) -> Optional[OAuthToken]: self.use_inline_params = self._set_use_inline_params_with_warning( kwargs.get("use_inline_params", False) ) - + print("Connection init : session_id_hex", self.get_session_id_hex(), flush=True) TelemetryClientFactory.initialize_telemetry_client( telemetry_enabled=self.telemetry_enabled, session_id_hex=self.get_session_id_hex(), @@ -440,6 +440,9 @@ def cursor( def close(self) -> None: """Close the underlying session and mark all associated cursors as closed.""" + print( + "Connection close: session_id_hex: ", self.get_session_id_hex(), flush=True + ) self._close() def _close(self, close_cursors=True) -> None: diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index f7fccf47a..2a0b170be 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -367,15 +367,36 @@ def initialize_telemetry_client( """Initialize a telemetry client for a specific connection if telemetry is enabled""" try: + print( + "\nWAITING: Initializing telemetry client: %s", + session_id_hex, + flush=True, + ) with TelemetryClientFactory._lock: + print( + "\nACQUIRED: Initializing telemetry client, got lock: %s", + session_id_hex, + flush=True, + ) TelemetryClientFactory._initialize() + print( + "\n TelemetryClientFactory initialized: %s", + session_id_hex, + flush=True, + ) if session_id_hex not in TelemetryClientFactory._clients: + print( + "\n Session ID not in clients: %s", + session_id_hex, + flush=True, + ) logger.debug( "Creating new TelemetryClient for connection %s", session_id_hex, ) if telemetry_enabled: + print("\n Telemetry enabled: %s", session_id_hex, flush=True) TelemetryClientFactory._clients[ session_id_hex ] = TelemetryClient( @@ -385,11 +406,41 @@ def initialize_telemetry_client( host_url=host_url, executor=TelemetryClientFactory._executor, ) + print( + "\n Telemetry client initialized: %s", + session_id_hex, + flush=True, + ) else: + print( + "\n Telemetry disabled: %s", session_id_hex, flush=True + ) TelemetryClientFactory._clients[ session_id_hex ] = NoopTelemetryClient() + print( + "\n Noop Telemetry client initialized: %s", + session_id_hex, + flush=True, + ) + else: + print( + "\n Session ID already in clients: %s", + session_id_hex, + flush=True, + ) + print( + "\nRELEASED: Telemetry client initialized: %s", + session_id_hex, + flush=True, + ) except Exception as e: + print( + "\nERROR: Failed to initialize telemetry client: %s due to %s", + session_id_hex, + e, + flush=True, + ) logger.debug("Failed to initialize telemetry client: %s", e) # Fallback to NoopTelemetryClient to ensure connection doesn't fail TelemetryClientFactory._clients[session_id_hex] = NoopTelemetryClient() @@ -413,16 +464,23 @@ def get_telemetry_client(session_id_hex): @staticmethod def close(session_id_hex): """Close and remove the telemetry client for a specific connection""" - + print("\nWAITING: Closing telemetry client: %s", session_id_hex, flush=True) with TelemetryClientFactory._lock: - if ( - telemetry_client := TelemetryClientFactory._clients.pop( - session_id_hex, None - ) - ) is not None: + print( + "\nACQUIRED: Closing telemetry client, got lock: %s", + session_id_hex, + flush=True, + ) + # if ( + # telemetry_client := TelemetryClientFactory._clients.pop( + # session_id_hex, None + # ) + # ) is not None: + if session_id_hex in TelemetryClientFactory._clients: logger.debug( "Removing telemetry client for connection %s", session_id_hex ) + telemetry_client = TelemetryClientFactory._clients.pop(session_id_hex) telemetry_client.close() # Shutdown executor if no more clients @@ -430,6 +488,16 @@ def close(session_id_hex): logger.debug( "No more telemetry clients, shutting down thread pool executor" ) + print( + "\nSHUTDOWN: Shutting down thread pool executor: %s", + session_id_hex, + flush=True, + ) TelemetryClientFactory._executor.shutdown(wait=True) TelemetryClientFactory._executor = None TelemetryClientFactory._initialized = False + print( + "\nRELEASED: Thread pool executor shut down: %s", + session_id_hex, + flush=True, + ) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 588b0d70e..3f16d3341 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -353,6 +353,7 @@ def test_context_manager_closes_cursor(self): @patch("%s.client.ThriftBackend" % PACKAGE_NAME) def test_context_manager_closes_connection(self, mock_client_class): + print("stalling test") instance = mock_client_class.return_value mock_open_session_resp = MagicMock(spec=TOpenSessionResp)()