@@ -607,13 +607,12 @@ public CompletableFuture<Void> closeAsync() {
607
607
}
608
608
}
609
609
610
- closeLocalMetadataStore ();
610
+ asyncCloseFutures .add (closeLocalMetadataStore ());
611
+ if (configMetadataSynchronizer != null ) {
612
+ asyncCloseFutures .add (configMetadataSynchronizer .closeAsync ());
613
+ }
611
614
if (configurationMetadataStore != null && shouldShutdownConfigurationMetadataStore ) {
612
615
configurationMetadataStore .close ();
613
- if (configMetadataSynchronizer != null ) {
614
- configMetadataSynchronizer .close ();
615
- configMetadataSynchronizer = null ;
616
- }
617
616
}
618
617
619
618
if (transactionExecutorProvider != null ) {
@@ -1160,14 +1159,16 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
1160
1159
.build ());
1161
1160
}
1162
1161
1163
- protected void closeLocalMetadataStore () throws Exception {
1162
+ protected CompletableFuture < Void > closeLocalMetadataStore () throws Exception {
1164
1163
if (localMetadataStore != null ) {
1165
1164
localMetadataStore .close ();
1166
1165
}
1167
1166
if (localMetadataSynchronizer != null ) {
1168
- localMetadataSynchronizer .close ();
1167
+ CompletableFuture < Void > closeSynchronizer = localMetadataSynchronizer .closeAsync ();
1169
1168
localMetadataSynchronizer = null ;
1169
+ return closeSynchronizer ;
1170
1170
}
1171
+ return CompletableFuture .completedFuture (null );
1171
1172
}
1172
1173
1173
1174
protected void startLeaderElectionService () {
@@ -1928,4 +1929,69 @@ public CompletableFuture<TopicCompactionService> newTopicCompactionService(Strin
1928
1929
return CompletableFuture .failedFuture (e );
1929
1930
}
1930
1931
}
1932
+
1933
+ public void initConfigMetadataSynchronizerIfNeeded () {
1934
+ mutex .lock ();
1935
+ try {
1936
+ final String newTopic = config .getConfigurationMetadataSyncEventTopic ();
1937
+ final PulsarMetadataEventSynchronizer oldSynchronizer = configMetadataSynchronizer ;
1938
+ // Skip if not support.
1939
+ if (!(configurationMetadataStore instanceof MetadataStoreExtended )) {
1940
+ LOG .info (
1941
+ "Skip to update Metadata Synchronizer because of the Configuration Metadata Store using[{}]"
1942
+ + " does not support." , configurationMetadataStore .getClass ().getName ());
1943
+ return ;
1944
+ }
1945
+ // Skip if no changes.
1946
+ // case-1: both null.
1947
+ // case-2: both topics are the same.
1948
+ if ((oldSynchronizer == null && StringUtils .isBlank (newTopic ))) {
1949
+ LOG .info ("Skip to update Metadata Synchronizer because the topic[null] does not changed." );
1950
+ }
1951
+ if (StringUtils .isNotBlank (newTopic ) && oldSynchronizer != null ) {
1952
+ TopicName newTopicName = TopicName .get (newTopic );
1953
+ TopicName oldTopicName = TopicName .get (oldSynchronizer .getTopicName ());
1954
+ if (newTopicName .equals (oldTopicName )) {
1955
+ LOG .info ("Skip to update Metadata Synchronizer because the topic[{}] does not changed." ,
1956
+ oldTopicName );
1957
+ }
1958
+ }
1959
+ // Update(null or not null).
1960
+ // 1.set the new one.
1961
+ // 2.close the old one.
1962
+ // 3.async start the new one.
1963
+ if (StringUtils .isBlank (newTopic )) {
1964
+ configMetadataSynchronizer = null ;
1965
+ } else {
1966
+ configMetadataSynchronizer = new PulsarMetadataEventSynchronizer (this , newTopic );
1967
+ }
1968
+ // close the old one and start the new one.
1969
+ PulsarMetadataEventSynchronizer newSynchronizer = configMetadataSynchronizer ;
1970
+ MetadataStoreExtended metadataStoreExtended = (MetadataStoreExtended ) configurationMetadataStore ;
1971
+ metadataStoreExtended .updateMetadataEventSynchronizer (newSynchronizer );
1972
+ Runnable startNewSynchronizer = () -> {
1973
+ if (newSynchronizer == null ) {
1974
+ return ;
1975
+ }
1976
+ try {
1977
+ newSynchronizer .start ();
1978
+ } catch (Exception e ) {
1979
+ // It only occurs when get internal client fails.
1980
+ LOG .error ("Start Metadata Synchronizer with topic {} failed." ,
1981
+ newTopic , e );
1982
+ }
1983
+ };
1984
+ executor .submit (() -> {
1985
+ if (oldSynchronizer != null ) {
1986
+ oldSynchronizer .closeAsync ().whenComplete ((ignore , ex ) -> {
1987
+ startNewSynchronizer .run ();
1988
+ });
1989
+ } else {
1990
+ startNewSynchronizer .run ();
1991
+ }
1992
+ });
1993
+ } finally {
1994
+ mutex .unlock ();
1995
+ }
1996
+ }
1931
1997
}
0 commit comments