diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 6ae88749b5e43..a3158a7c6017c 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -3477,12 +3477,33 @@ impl Coordinator { } /// Publishes a notice message to all sessions. + /// + /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`] + /// so we keep it around. + #[allow(dead_code)] pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) { for meta in self.active_conns.values() { let _ = meta.notice_tx.send(notice.clone()); } } + /// Returns a closure that will publish a notice to all sessions that were active at the time + /// this method was called. + pub(crate) fn broadcast_notice_tx( + &self, + ) -> Box () + Send + 'static> { + let senders: Vec<_> = self + .active_conns + .values() + .map(|meta| meta.notice_tx.clone()) + .collect(); + Box::new(move |notice| { + for tx in senders { + let _ = tx.send(notice.clone()); + } + }) + } + pub(crate) fn active_conns(&self) -> &BTreeMap { &self.active_conns } diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index b3d34d24b5998..7f237a310d158 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -756,13 +756,8 @@ impl Coordinator { ); let builtin_table_updates = vec![builtin_table_retraction, builtin_table_addition]; - - self.builtin_table_update() - .execute(builtin_table_updates) - .await - .0 - .instrument(info_span!("coord::message_cluster_event::table_updates")) - .await; + // Returns a Future that completes when the Builtin Table write is completed. + let builtin_table_completion = self.builtin_table_update().defer(builtin_table_updates); let cluster = self.catalog().get_cluster(event.cluster_id); let replica = cluster.replica(event.replica_id).expect("Replica exists"); @@ -771,12 +766,24 @@ impl Coordinator { .get_cluster_replica_status(event.cluster_id, event.replica_id); if old_replica_status != new_replica_status { - self.broadcast_notice(AdapterNotice::ClusterReplicaStatusChanged { + let notifier = self.broadcast_notice_tx(); + let notice = AdapterNotice::ClusterReplicaStatusChanged { cluster: cluster.name.clone(), replica: replica.name.clone(), status: new_replica_status, time: event.time, - }); + }; + // In a separate task, so we don't block the Coordinator, wait for the builtin + // table update to complete, and then notify active sessions. + mz_ore::task::spawn( + || format!("cluster_event-{}-{}", event.cluster_id, event.replica_id), + async move { + // Wait for the builtin table updates to complete. + builtin_table_completion.await; + // Notify all sessions that were active at the time the cluster status changed. + (notifier)(notice) + }, + ); } } }