3232
3333package org .opensearch .indices .state ;
3434
35+ import org .apache .logging .log4j .LogManager ;
36+ import org .apache .logging .log4j .Logger ;
3537import org .opensearch .ExceptionsHelper ;
3638import org .opensearch .action .ActionRequestValidationException ;
3739import org .opensearch .action .admin .indices .close .CloseIndexRequestBuilder ;
4648import org .opensearch .cluster .node .DiscoveryNode ;
4749import org .opensearch .cluster .routing .ShardRouting ;
4850import org .opensearch .common .settings .Settings ;
51+ import org .opensearch .common .unit .TimeValue ;
4952import org .opensearch .common .util .set .Sets ;
5053import org .opensearch .core .common .unit .ByteSizeUnit ;
5154import org .opensearch .core .common .unit .ByteSizeValue ;
6467import java .util .List ;
6568import java .util .Locale ;
6669import java .util .concurrent .CountDownLatch ;
70+ import java .util .concurrent .TimeUnit ;
6771import java .util .stream .Collectors ;
6872import java .util .stream .IntStream ;
6973
7680import static org .hamcrest .Matchers .containsString ;
7781import static org .hamcrest .Matchers .empty ;
7882import static org .hamcrest .Matchers .equalTo ;
83+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
7984import static org .hamcrest .Matchers .hasSize ;
8085import static org .hamcrest .Matchers .is ;
8186import static org .hamcrest .Matchers .not ;
8489
8590public class CloseIndexIT extends OpenSearchIntegTestCase {
8691
92+ private final Logger logger = LogManager .getLogger (CloseIndexIT .class );
8793 private static final int MAX_DOCS = 25_000 ;
94+ private static final TimeValue EXTENDED_TIMEOUT = TimeValue .timeValueSeconds (60 );
95+ private static final TimeValue STANDARD_TIMEOUT = TimeValue .timeValueSeconds (30 );
8896
8997 @ Override
9098 public Settings indexSettings () {
@@ -98,6 +106,7 @@ public Settings indexSettings() {
98106 }
99107
100108 public void testCloseMissingIndex () {
109+ ensureStableCluster (internalCluster ().size ());
101110 IndexNotFoundException e = expectThrows (IndexNotFoundException .class , () -> client ().admin ().indices ().prepareClose ("test" ).get ());
102111 assertThat (e .getMessage (), is ("no such index [test]" ));
103112 }
@@ -113,7 +122,12 @@ public void testCloseOneMissingIndex() {
113122
114123 public void testCloseOneMissingIndexIgnoreMissing () throws Exception {
115124 createIndex ("test1" );
116- assertBusy (() -> assertAcked (client ().admin ().indices ().prepareClose ("test1" , "test2" ).setIndicesOptions (lenientExpandOpen ())));
125+ ensureGreen ("test1" );
126+ assertBusy (
127+ () -> assertAcked (client ().admin ().indices ().prepareClose ("test1" , "test2" ).setIndicesOptions (lenientExpandOpen ())),
128+ STANDARD_TIMEOUT .getSeconds (),
129+ TimeUnit .SECONDS
130+ );
117131 assertIndexIsClosed ("test1" );
118132 }
119133
@@ -147,7 +161,9 @@ public void testCloseIndex() throws Exception {
147161 .collect (toList ())
148162 );
149163
150- assertBusy (() -> closeIndices (indexName ));
164+ ensureGreen (indexName );
165+ refresh (indexName );
166+ assertBusy (() -> closeIndices (indexName ), STANDARD_TIMEOUT .getSeconds (), TimeUnit .SECONDS );
151167 assertIndexIsClosed (indexName );
152168
153169 assertAcked (client ().admin ().indices ().prepareOpen (indexName ));
@@ -168,8 +184,10 @@ public void testCloseAlreadyClosedIndex() throws Exception {
168184 .collect (toList ())
169185 );
170186 }
187+ ensureGreen (indexName );
188+ refresh (indexName );
171189 // First close should be fully acked
172- assertBusy (() -> closeIndices (indexName ));
190+ assertBusy (() -> closeIndices (indexName ), STANDARD_TIMEOUT . getSeconds (), TimeUnit . SECONDS );
173191 assertIndexIsClosed (indexName );
174192
175193 // Second close should be acked too
@@ -178,7 +196,7 @@ public void testCloseAlreadyClosedIndex() throws Exception {
178196 CloseIndexResponse response = client ().admin ().indices ().prepareClose (indexName ).setWaitForActiveShards (activeShardCount ).get ();
179197 assertAcked (response );
180198 assertTrue (response .getIndices ().isEmpty ());
181- });
199+ }, STANDARD_TIMEOUT . getSeconds (), TimeUnit . SECONDS );
182200 assertIndexIsClosed (indexName );
183201 }
184202
@@ -193,7 +211,11 @@ public void testCloseUnassignedIndex() throws Exception {
193211 assertThat (clusterState .metadata ().indices ().get (indexName ).getState (), is (IndexMetadata .State .OPEN ));
194212 assertThat (clusterState .routingTable ().allShards ().stream ().allMatch (ShardRouting ::unassigned ), is (true ));
195213
196- assertBusy (() -> closeIndices (client ().admin ().indices ().prepareClose (indexName ).setWaitForActiveShards (ActiveShardCount .NONE )));
214+ assertBusy (
215+ () -> closeIndices (client ().admin ().indices ().prepareClose (indexName ).setWaitForActiveShards (ActiveShardCount .NONE )),
216+ STANDARD_TIMEOUT .getSeconds (),
217+ TimeUnit .SECONDS
218+ );
197219 assertIndexIsClosed (indexName );
198220 }
199221
@@ -210,7 +232,10 @@ public void testConcurrentClose() throws InterruptedException {
210232 .mapToObj (i -> client ().prepareIndex (indexName ).setId (String .valueOf (i )).setSource ("num" , i ))
211233 .collect (toList ())
212234 );
213- ensureYellowAndNoInitializingShards (indexName );
235+ ensureGreen (indexName );
236+ refresh (indexName );
237+ // Wait for cluster to stabilize before concurrent operations
238+ ensureStableCluster (internalCluster ().size ());
214239
215240 final CountDownLatch startClosing = new CountDownLatch (1 );
216241 final Thread [] threads = new Thread [randomIntBetween (2 , 5 )];
@@ -247,7 +272,9 @@ public void testCloseWhileIndexingDocuments() throws Exception {
247272 indexer .setFailureAssertion (t -> assertException (t , indexName ));
248273
249274 waitForDocs (randomIntBetween (10 , 50 ), indexer );
250- assertBusy (() -> closeIndices (indexName ));
275+ ensureGreen (indexName );
276+ refresh (indexName );
277+ assertBusy (() -> closeIndices (indexName ), EXTENDED_TIMEOUT .getSeconds (), TimeUnit .SECONDS );
251278 indexer .stopAndAwaitStopped ();
252279 nbDocs += indexer .totalIndexedDocs ();
253280 }
@@ -274,6 +301,9 @@ public void testCloseWhileDeletingIndices() throws Exception {
274301 }
275302 indices [i ] = indexName ;
276303 }
304+ ensureGreen (indices );
305+ refresh (indices );
306+ ensureStableCluster (internalCluster ().size ());
277307 assertThat (client ().admin ().cluster ().prepareState ().get ().getState ().metadata ().indices ().size (), equalTo (indices .length ));
278308
279309 final List <Thread > threads = new ArrayList <>();
@@ -297,11 +327,14 @@ public void testCloseWhileDeletingIndices() throws Exception {
297327 threads .add (new Thread (() -> {
298328 try {
299329 latch .await ();
330+ // Add small random delay to reduce exact simultaneous operations
331+ Thread .sleep (randomIntBetween (0 , 50 ));
300332 } catch (InterruptedException e ) {
333+ Thread .currentThread ().interrupt ();
301334 throw new AssertionError (e );
302335 }
303336 try {
304- client ().admin ().indices ().prepareClose (indexToClose ).setTimeout ("60s" ).get ();
337+ client ().admin ().indices ().prepareClose (indexToClose ).setTimeout (STANDARD_TIMEOUT ).get ();
305338 } catch (final Exception e ) {
306339 assertException (e , indexToClose );
307340 }
@@ -312,9 +345,25 @@ public void testCloseWhileDeletingIndices() throws Exception {
312345 thread .start ();
313346 }
314347 latch .countDown ();
348+
349+ // Wait for all threads with timeout to prevent hanging
350+ boolean allCompleted = true ;
315351 for (Thread thread : threads ) {
316- thread .join ();
352+ thread .join (STANDARD_TIMEOUT .millis ());
353+ if (thread .isAlive ()) {
354+ logger .warn ("Thread {} did not complete in time, interrupting" , thread .getName ());
355+ thread .interrupt ();
356+ allCompleted = false ;
357+ }
317358 }
359+
360+ if (!allCompleted ) {
361+ // Give interrupted threads a moment to clean up
362+ Thread .sleep (1000 );
363+ }
364+
365+ // Wait for cluster state to stabilize after concurrent operations
366+ waitForClusterStateConvergence ();
318367 }
319368
320369 public void testConcurrentClosesAndOpens () throws Exception {
@@ -437,6 +486,9 @@ public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception {
437486 .collect (toList ())
438487 );
439488 ensureGreen (indexName );
489+ refresh (indexName );
490+ // Wait for cluster to stabilize
491+ ensureStableCluster (internalCluster ().size ());
440492
441493 // Closing an index should execute noop peer recovery
442494 assertAcked (client ().admin ().indices ().prepareClose (indexName ).get ());
@@ -459,7 +511,12 @@ public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception {
459511 */
460512 public void testRecoverExistingReplica () throws Exception {
461513 final String indexName = "test-recover-existing-replica" ;
462- internalCluster ().ensureAtLeastNumDataNodes (2 );
514+ final int minDataNodes = 2 ;
515+ internalCluster ().ensureAtLeastNumDataNodes (minDataNodes );
516+
517+ // Wait for initial cluster stability
518+ ensureStableCluster (internalCluster ().size ());
519+
463520 List <String > dataNodes = randomSubsetOf (
464521 2 ,
465522 Sets .newHashSet (clusterService ().state ().nodes ().getDataNodes ().values ().iterator ())
@@ -484,20 +541,54 @@ public void testRecoverExistingReplica() throws Exception {
484541 .collect (toList ())
485542 );
486543 ensureGreen (indexName );
544+ refresh (indexName );
545+ // Wait for cluster to stabilize and ensure all nodes see the same state
546+ ensureStableCluster (internalCluster ().size ());
547+ waitForClusterStateConvergence ();
487548 client ().admin ().indices ().prepareFlush (indexName ).get ();
549+
550+ // Store the original cluster size before restarting node
551+ final int originalClusterSize = internalCluster ().size ();
552+
488553 // index more documents while one shard copy is offline
489554 internalCluster ().restartNode (dataNodes .get (1 ), new InternalTestCluster .RestartCallback () {
490555 @ Override
491556 public Settings onNodeStopped (String nodeName ) throws Exception {
557+ Thread .sleep (1000 );
558+
492559 Client client = client (dataNodes .get (0 ));
560+ try {
561+ assertBusy (() -> {
562+ ClusterState state = client .admin ().cluster ().prepareState ().get ().getState ();
563+ // The cluster should have one less node now
564+ assertThat (state .nodes ().getSize (), equalTo (originalClusterSize - 1 ));
565+ }, STANDARD_TIMEOUT .getSeconds (), TimeUnit .SECONDS );
566+ } catch (Exception e ) {
567+ logger .warn ("Failed to verify cluster state after node stop" , e );
568+ }
569+
493570 int moreDocs = randomIntBetween (1 , 50 );
494571 for (int i = 0 ; i < moreDocs ; i ++) {
495572 client .prepareIndex (indexName ).setSource ("num" , i ).get ();
496573 }
497- assertAcked (client .admin ().indices ().prepareClose (indexName ));
574+
575+ // Wait for cluster to stabilize with the remaining nodes
576+ try {
577+ ensureStableCluster (originalClusterSize - 1 );
578+ } catch (Exception e ) {
579+ logger .warn ("Cluster not stable after node stop, continuing anyway" , e );
580+ }
581+
582+ assertAcked (client .admin ().indices ().prepareClose (indexName ).setTimeout (STANDARD_TIMEOUT ));
498583 return super .onNodeStopped (nodeName );
499584 }
500585 });
586+
587+ // Wait for node to fully rejoin and cluster to stabilize
588+ assertBusy (() -> { ensureStableCluster (originalClusterSize ); }, EXTENDED_TIMEOUT .getSeconds (), TimeUnit .SECONDS );
589+
590+ waitForClusterStateConvergence ();
591+
501592 assertIndexIsClosed (indexName );
502593 ensureGreen (indexName );
503594 internalCluster ().assertSameDocIdsOnShards ();
@@ -515,6 +606,8 @@ public Settings onNodeStopped(String nodeName) throws Exception {
515606 public void testRelocatedClosedIndexIssue () throws Exception {
516607 final String indexName = "closed-index" ;
517608 final List <String > dataNodes = internalCluster ().startDataOnlyNodes (2 );
609+ // Wait for cluster to stabilize after adding nodes
610+ ensureStableCluster (internalCluster ().size ());
518611 // allocate shard to first data node
519612 createIndex (
520613 indexName ,
@@ -561,14 +654,21 @@ public void testResyncPropagatePrimaryTerm() throws Exception {
561654 .collect (toList ())
562655 );
563656 ensureGreen (indexName );
564- assertAcked (client ().admin ().indices ().prepareClose (indexName ));
657+ refresh (indexName );
658+ waitForClusterStateConvergence ();
659+
660+ assertAcked (client ().admin ().indices ().prepareClose (indexName ).setTimeout (STANDARD_TIMEOUT ));
565661 assertIndexIsClosed (indexName );
566662 ensureGreen (indexName );
663+
567664 String nodeWithPrimary = clusterService ().state ()
568665 .nodes ()
569666 .get (clusterService ().state ().routingTable ().index (indexName ).shard (0 ).primaryShard ().currentNodeId ())
570667 .getName ();
668+
571669 internalCluster ().restartNode (nodeWithPrimary , new InternalTestCluster .RestartCallback ());
670+ assertBusy (() -> { ensureStableCluster (internalCluster ().size ()); }, EXTENDED_TIMEOUT .getSeconds (), TimeUnit .SECONDS );
671+
572672 ensureGreen (indexName );
573673 long primaryTerm = clusterService ().state ().metadata ().index (indexName ).primaryTerm (0 );
574674 for (String nodeName : internalCluster ().nodesInclude (indexName )) {
@@ -584,6 +684,7 @@ private static void closeIndices(final String... indices) {
584684 }
585685
586686 private static void closeIndices (final CloseIndexRequestBuilder requestBuilder ) {
687+ requestBuilder .setTimeout (STANDARD_TIMEOUT );
587688 final CloseIndexResponse response = requestBuilder .get ();
588689 assertThat (response .isAcknowledged (), is (true ));
589690 assertThat (response .isShardsAcknowledged (), is (true ));
@@ -613,25 +714,49 @@ private static void closeIndices(final CloseIndexRequestBuilder requestBuilder)
613714 }
614715
615716 static void assertIndexIsClosed (final String ... indices ) {
616- final ClusterState clusterState = client ().admin ().cluster ().prepareState ().get ().getState ();
617- for (String index : indices ) {
618- final IndexMetadata indexMetadata = clusterState .metadata ().indices ().get (index );
619- assertThat (indexMetadata .getState (), is (IndexMetadata .State .CLOSE ));
620- final Settings indexSettings = indexMetadata .getSettings ();
621- assertThat (indexSettings .hasValue (MetadataIndexStateService .VERIFIED_BEFORE_CLOSE_SETTING .getKey ()), is (true ));
622- assertThat (indexSettings .getAsBoolean (MetadataIndexStateService .VERIFIED_BEFORE_CLOSE_SETTING .getKey (), false ), is (true ));
623- assertThat (clusterState .routingTable ().index (index ), notNullValue ());
624- assertThat (clusterState .blocks ().hasIndexBlock (index , MetadataIndexStateService .INDEX_CLOSED_BLOCK ), is (true ));
625- assertThat (
626- "Index " + index + " must have only 1 block with [id=" + MetadataIndexStateService .INDEX_CLOSED_BLOCK_ID + "]" ,
627- clusterState .blocks ()
628- .indices ()
629- .getOrDefault (index , emptySet ())
630- .stream ()
631- .filter (clusterBlock -> clusterBlock .id () == MetadataIndexStateService .INDEX_CLOSED_BLOCK_ID )
632- .count (),
633- equalTo (1L )
634- );
717+ for (int retry = 0 ; retry < 3 ; retry ++) {
718+ try {
719+ final ClusterState clusterState = client ().admin ().cluster ().prepareState ().get ().getState ();
720+ boolean allClosed = true ;
721+ for (String index : indices ) {
722+ final IndexMetadata indexMetadata = clusterState .metadata ().indices ().get (index );
723+ if (indexMetadata == null || indexMetadata .getState () != IndexMetadata .State .CLOSE ) {
724+ allClosed = false ;
725+ break ;
726+ }
727+ }
728+ if (!allClosed && retry < 2 ) {
729+ Thread .sleep (500 );
730+ continue ;
731+ }
732+
733+ for (String index : indices ) {
734+ final IndexMetadata indexMetadata = clusterState .metadata ().indices ().get (index );
735+ assertThat (indexMetadata .getState (), is (IndexMetadata .State .CLOSE ));
736+ final Settings indexSettings = indexMetadata .getSettings ();
737+ assertThat (indexSettings .hasValue (MetadataIndexStateService .VERIFIED_BEFORE_CLOSE_SETTING .getKey ()), is (true ));
738+ assertThat (
739+ indexSettings .getAsBoolean (MetadataIndexStateService .VERIFIED_BEFORE_CLOSE_SETTING .getKey (), false ),
740+ is (true )
741+ );
742+ assertThat (clusterState .routingTable ().index (index ), notNullValue ());
743+ assertThat (clusterState .blocks ().hasIndexBlock (index , MetadataIndexStateService .INDEX_CLOSED_BLOCK ), is (true ));
744+ assertThat (
745+ "Index " + index + " must have only 1 block with [id=" + MetadataIndexStateService .INDEX_CLOSED_BLOCK_ID + "]" ,
746+ clusterState .blocks ()
747+ .indices ()
748+ .getOrDefault (index , emptySet ())
749+ .stream ()
750+ .filter (clusterBlock -> clusterBlock .id () == MetadataIndexStateService .INDEX_CLOSED_BLOCK_ID )
751+ .count (),
752+ equalTo (1L )
753+ );
754+ }
755+ break ;
756+ } catch (InterruptedException e ) {
757+ Thread .currentThread ().interrupt ();
758+ throw new RuntimeException (e );
759+ }
635760 }
636761 }
637762
@@ -672,4 +797,23 @@ void assertNoFileBasedRecovery(String indexName) {
672797 }
673798 }
674799 }
800+
801+ private void waitForClusterStateConvergence () {
802+ try {
803+ final long stateVersion = client ().admin ().cluster ().prepareState ().get ().getState ().version ();
804+ assertBusy (() -> {
805+ for (String nodeName : internalCluster ().getNodeNames ()) {
806+ ClusterState nodeState = client (nodeName ).admin ().cluster ().prepareState ().setLocal (true ).get ().getState ();
807+ assertThat (
808+ "Node " + nodeName + " has not caught up to cluster state version " + stateVersion ,
809+ nodeState .version (),
810+ greaterThanOrEqualTo (stateVersion )
811+ );
812+ }
813+ }, STANDARD_TIMEOUT .getSeconds (), TimeUnit .SECONDS );
814+ Thread .sleep (100 );
815+ } catch (Exception e ) {
816+ logger .warn ("Failed to wait for cluster state convergence" , e );
817+ }
818+ }
675819}
0 commit comments