2121import org .apache .amoro .client .AmsServerInfo ;
2222import org .apache .amoro .config .Configurations ;
2323import org .apache .amoro .properties .AmsHAProperties ;
24+ import org .apache .amoro .shade .thrift .org .apache .commons .lang3 .StringUtils ;
2425import org .apache .amoro .shade .zookeeper3 .org .apache .curator .framework .CuratorFramework ;
2526import org .apache .amoro .shade .zookeeper3 .org .apache .curator .framework .CuratorFrameworkFactory ;
2627import org .apache .amoro .shade .zookeeper3 .org .apache .curator .framework .api .transaction .CuratorOp ;
@@ -45,6 +46,8 @@ public class HighAvailabilityContainer implements LeaderLatchListener {
4546 private final CuratorFramework zkClient ;
4647 private final String tableServiceMasterPath ;
4748 private final String optimizingServiceMasterPath ;
49+ // path to signal that this node has completed ams dispose
50+ private final String disposeCompletePath ;
4851 private final AmsServerInfo tableServiceServerInfo ;
4952 private final AmsServerInfo optimizingServiceServerInfo ;
5053 private volatile CountDownLatch followerLatch ;
@@ -59,6 +62,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception
5962 String haClusterName = serviceConfig .getString (AmoroManagementConf .HA_CLUSTER_NAME );
6063 tableServiceMasterPath = AmsHAProperties .getTableServiceMasterPath (haClusterName );
6164 optimizingServiceMasterPath = AmsHAProperties .getOptimizingServiceMasterPath (haClusterName );
65+ disposeCompletePath = AmsHAProperties .getMasterReleaseConfirmPath (haClusterName );
6266 ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry (1000 , 3 , 5000 );
6367 this .zkClient =
6468 CuratorFrameworkFactory .builder ()
@@ -90,6 +94,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception
9094 zkClient = null ;
9195 tableServiceMasterPath = null ;
9296 optimizingServiceMasterPath = null ;
97+ disposeCompletePath = null ;
9398 tableServiceServerInfo = null ;
9499 optimizingServiceServerInfo = null ;
95100 // block follower latch forever when ha is disabled
@@ -102,6 +107,8 @@ public void waitLeaderShip() throws Exception {
102107 if (leaderLatch != null ) {
103108 leaderLatch .await ();
104109 if (leaderLatch .hasLeadership ()) {
110+ waitPreviousLeaderDisposeComplete ();
111+
105112 CuratorOp tableServiceMasterPathOp =
106113 zkClient
107114 .transactionOp ()
@@ -134,6 +141,83 @@ public void waitFollowerShip() throws Exception {
134141 LOG .info ("Became the follower of AMS" );
135142 }
136143
144+ public void waitPreviousLeaderDisposeComplete () throws Exception {
145+ // 1、Create the path if it does not exist, to ensure it exists for future primary and standby
146+ // node switchover.
147+ if (zkClient .checkExists ().forPath (disposeCompletePath ) == null ) {
148+ createPathIfNeeded (disposeCompletePath );
149+ }
150+
151+ // 2、Determine if there is a previous leader, or if it is different from current node.
152+ boolean hasPreviousOtherLeader = false ;
153+ try {
154+ byte [] masterData = zkClient .getData ().forPath (tableServiceMasterPath );
155+ if (masterData != null && masterData .length > 0 ) {
156+ String masterInfoInZkNode = new String (masterData , StandardCharsets .UTF_8 );
157+ if (!StringUtils .isEmpty (masterInfoInZkNode )) {
158+ try {
159+ // If data cannot be parsed correctly, it indicates that the AMS service is starting for
160+ // the first time.
161+ AmsServerInfo previousLeaderInfo =
162+ JacksonUtil .parseObject (masterInfoInZkNode , AmsServerInfo .class );
163+ if (previousLeaderInfo != null ) {
164+ // If parsing succeeds, check if it's different from current node
165+ String currentInfoStr = JacksonUtil .toJSONString (tableServiceServerInfo );
166+ LOG .debug (
167+ "Current node info JSON: {}, ZK node info JSON: {}" ,
168+ currentInfoStr ,
169+ masterInfoInZkNode );
170+ if (!masterInfoInZkNode .equals (currentInfoStr )) {
171+ hasPreviousOtherLeader = true ;
172+ } else {
173+ LOG .debug (
174+ "Previous leader is the same as current node (self-restart)."
175+ + " No need to wait for dispose signal." );
176+ }
177+ }
178+ } catch (Exception e ) {
179+ LOG .warn (
180+ "Failed to parse master info from ZooKeeper: {}, treating as no previous leader" ,
181+ masterInfoInZkNode ,
182+ e );
183+ // If parsing fails, treat as no previous leader
184+ }
185+ }
186+ }
187+ } catch (KeeperException .NoNodeException e ) {
188+ // No previous leader node found, indicating that this is the first startup of ams.
189+ LOG .debug ("No previous leader node found, indicating that this is the first startup of ams." );
190+ }
191+
192+ if (!hasPreviousOtherLeader ) {
193+ LOG .debug ("No previous other master detected, start service immediately." );
194+ return ;
195+ }
196+
197+ // If disposeCompletePath exists, the following scenarios may occur:
198+ // 1) A primary-standby node switchover occurs, and the former primary
199+ // node has not completed the AMS dispose operation.
200+ // 2) The previous primary node is unreachable due to network issues.
201+ // 3) No primary-standby node switchover occurred, but ZK retains
202+ // information about the previous primary node.
203+ long startTime = System .currentTimeMillis ();
204+ int maxWaitTime = 30000 ; // 30s
205+ while (System .currentTimeMillis () - startTime <= maxWaitTime ) {
206+ // At this point, the disposeCompletePath does not exist,
207+ // indicating that the previous master node has completed
208+ // the AMS service shutdown operation and deleted the path.
209+ if (zkClient .checkExists ().forPath (disposeCompletePath ) == null ) {
210+ LOG .info ("Previous leader has completed dispose. Proceeding." );
211+ return ;
212+ }
213+ }
214+
215+ LOG .debug (
216+ "Timeout ({}ms) waiting for previous other leader to signal dispose complete. Proceeding anyway. "
217+ + "This might indicate the previous leader is unresponsive." ,
218+ maxWaitTime );
219+ }
220+
137221 public void close () {
138222 if (leaderLatch != null ) {
139223 try {
@@ -163,6 +247,27 @@ public void notLeader() {
163247 followerLatch .countDown ();
164248 }
165249
250+ /**
251+ * In HA mode, when the AMS service is stopped, delete the existing `disposeCompletePath` file
252+ * from ZK to indicate that the AMS service has been terminated.
253+ */
254+ public void signalDisposeComplete () {
255+ // when HA is disabled, do nothing
256+ if (zkClient == null ) {
257+ return ;
258+ }
259+
260+ try {
261+ if (zkClient .checkExists ().forPath (disposeCompletePath ) != null ) {
262+ zkClient .delete ().forPath (disposeCompletePath );
263+ return ;
264+ }
265+ LOG .debug ("ams dispose complete signal written." );
266+ } catch (Exception e ) {
267+ LOG .warn ("Failed to write dispose complete signal" , e );
268+ }
269+ }
270+
166271 private AmsServerInfo buildServerInfo (String host , int thriftBindPort , int restBindPort ) {
167272 AmsServerInfo amsServerInfo = new AmsServerInfo ();
168273 amsServerInfo .setHost (host );
0 commit comments