diff --git a/cassandra/cluster.py b/cassandra/cluster.py index fd73803eb8..94caa4166d 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -4220,15 +4220,14 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai if self._is_shutdown: return - if not connection: - connection = self._connection + current_connection = connection or self._connection if preloaded_results: log.debug("[control connection] Attempting to use preloaded results for schema agreement") peers_result = preloaded_results[0] local_result = preloaded_results[1] - schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint) + schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint) if schema_mismatches is None: return True @@ -4237,16 +4236,27 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai elapsed = 0 cl = ConsistencyLevel.ONE schema_mismatches = None - select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection) + select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, current_connection) + error_signaled = False while elapsed < total_timeout: + if current_connection != connection or self._connection: + current_connection = connection or self._connection + error_signaled = False + + if current_connection.is_defunct or current_connection.is_closed: + log.debug("[control connection] connection is closed, wait and trying again") + self._time.sleep(0.2) + elapsed = self._time.time() - start + continue + peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout), consistency_level=cl) local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout), consistency_level=cl) try: timeout = min(self._timeout, total_timeout - elapsed) - peers_result, local_result = connection.wait_for_responses( + peers_result, local_result = current_connection.wait_for_responses( peers_query, local_query, timeout=timeout) except OperationTimedOut as timeout: log.debug("[control connection] Timed out waiting for " @@ -4257,10 +4267,12 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai if self._is_shutdown: log.debug("[control connection] Aborting wait for schema match due to shutdown") return None - else: - raise + elif not error_signaled: + self._signal_error() + error_signaled = True + continue - schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint) + schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint) if schema_mismatches is None: return True @@ -4269,7 +4281,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai elapsed = self._time.time() - start log.warning("Node %s is reporting a schema disagreement: %s", - connection.endpoint, schema_mismatches) + current_connection.endpoint, schema_mismatches) return False def _get_schema_mismatches(self, peers_result, local_result, local_address):