Skip to content

Commit 1943844

Browse files
authored
Effort to fix testDataStreamLifecycleDownsampleRollingRestart #123769 (#125478)
1 parent 67798dd commit 1943844

File tree

3 files changed

+55
-42
lines changed

3 files changed

+55
-42
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,6 @@ tests:
242242
- class: org.elasticsearch.action.admin.cluster.node.tasks.CancellableTasksIT
243243
method: testChildrenTasksCancelledOnTimeout
244244
issue: https://github.com/elastic/elasticsearch/issues/123568
245-
- class: org.elasticsearch.xpack.downsample.DataStreamLifecycleDownsampleDisruptionIT
246-
method: testDataStreamLifecycleDownsampleRollingRestart
247-
issue: https://github.com/elastic/elasticsearch/issues/123769
248245
- class: org.elasticsearch.xpack.searchablesnapshots.FrozenSearchableSnapshotsIntegTests
249246
method: testCreateAndRestorePartialSearchableSnapshot
250247
issue: https://github.com/elastic/elasticsearch/issues/123773

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2477,9 +2477,19 @@ public static void safeAcquire(int permits, Semaphore semaphore) {
24772477
* @return The value with which the {@code listener} was completed.
24782478
*/
24792479
public static <T> T safeAwait(SubscribableListener<T> listener) {
2480+
return safeAwait(listener, SAFE_AWAIT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
2481+
}
2482+
2483+
/**
2484+
* Wait for the successful completion of the given {@link SubscribableListener}, respecting the provided timeout,
2485+
* preserving the thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure.
2486+
*
2487+
* @return The value with which the {@code listener} was completed.
2488+
*/
2489+
public static <T> T safeAwait(SubscribableListener<T> listener, long timeout, TimeUnit unit) {
24802490
final var future = new TestPlainActionFuture<T>();
24812491
listener.addListener(future);
2482-
return safeGet(future);
2492+
return safeGet(future, timeout, unit);
24832493
}
24842494

24852495
/**
@@ -2509,8 +2519,18 @@ public static <T extends ActionResponse> T safeExecute(ElasticsearchClient clien
25092519
* @return The value with which the {@code future} was completed.
25102520
*/
25112521
public static <T> T safeGet(Future<T> future) {
2522+
return safeGet(future, SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS);
2523+
}
2524+
2525+
/**
2526+
* Wait for the successful completion of the given {@link Future}, respecting the provided timeout, preserving the
2527+
* thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure.
2528+
*
2529+
* @return The value with which the {@code future} was completed.
2530+
*/
2531+
public static <T> T safeGet(Future<T> future, long timeout, TimeUnit unit) {
25122532
try {
2513-
return future.get(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS);
2533+
return future.get(timeout, unit);
25142534
} catch (InterruptedException e) {
25152535
Thread.currentThread().interrupt();
25162536
throw new AssertionError("safeGet: interrupted waiting for SubscribableListener", e);

x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,34 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1313
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
14-
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
15-
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
1614
import org.elasticsearch.action.downsample.DownsampleConfig;
17-
import org.elasticsearch.action.support.IndicesOptions;
1815
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
1916
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.cluster.service.ClusterService;
2018
import org.elasticsearch.common.settings.Settings;
2119
import org.elasticsearch.core.TimeValue;
2220
import org.elasticsearch.datastreams.DataStreamsPlugin;
2321
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
2422
import org.elasticsearch.plugins.Plugin;
2523
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
24+
import org.elasticsearch.test.ClusterServiceUtils;
2625
import org.elasticsearch.test.ESIntegTestCase;
2726
import org.elasticsearch.test.InternalTestCluster;
28-
import org.elasticsearch.test.junit.annotations.TestLogging;
2927
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
3028
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
3129

3230
import java.util.Collection;
3331
import java.util.List;
3432
import java.util.concurrent.TimeUnit;
3533

34+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS;
3635
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
3736
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate;
38-
import static org.hamcrest.Matchers.is;
39-
import static org.hamcrest.Matchers.notNullValue;
4037

4138
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4)
4239
public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
4340
private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class);
44-
public static final int DOC_COUNT = 50_000;
41+
public static final int DOC_COUNT = 25_000;
4542

4643
@Override
4744
protected Collection<Class<? extends Plugin>> nodePlugins() {
@@ -55,7 +52,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5552
return settings.build();
5653
}
5754

58-
@TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging")
5955
public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
6056
final InternalTestCluster cluster = internalCluster();
6157
cluster.startMasterOnlyNodes(1);
@@ -88,38 +84,38 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
8884
// testing so DSL doesn't have to wait for the end_time to lapse)
8985
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
9086
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
87+
String sourceIndex = getBackingIndices(client(), dataStreamName).get(0);
88+
final String targetIndex = "downsample-5m-" + sourceIndex;
9189

92-
// DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
93-
// downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution.
94-
long sleepTime = randomLongBetween(3000, 4500);
95-
logger.info("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption", sleepTime);
96-
Thread.sleep(sleepTime);
97-
List<String> backingIndices = getBackingIndices(client(), dataStreamName);
98-
// first generation index
99-
String sourceIndex = backingIndices.get(0);
90+
/**
91+
* DLM runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
92+
* downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index.
93+
*/
94+
logger.info("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption.");
95+
ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.STARTED, TimeValue.timeValueSeconds(8));
10096

101-
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
102-
});
97+
logger.info("-> Starting the disruption.");
98+
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback());
10399

104-
// if the source index has already been downsampled and moved into the data stream just use its name directly
105-
final String targetIndex = sourceIndex.startsWith("downsample-5m-") ? sourceIndex : "downsample-5m-" + sourceIndex;
106-
assertBusy(() -> {
107-
try {
108-
GetSettingsResponse getSettingsResponse = cluster.client()
109-
.admin()
110-
.indices()
111-
.getSettings(
112-
new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(targetIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
113-
)
114-
.actionGet();
115-
Settings indexSettings = getSettingsResponse.getIndexToSettings().get(targetIndex);
116-
assertThat(indexSettings, is(notNullValue()));
117-
assertThat(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexSettings), is(IndexMetadata.DownsampleTaskStatus.SUCCESS));
118-
assertEquals("5m", IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.get(indexSettings));
119-
} catch (Exception e) {
120-
throw new AssertionError(e);
121-
}
122-
}, 120, TimeUnit.SECONDS);
100+
ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.SUCCESS, TimeValue.timeValueSeconds(120));
123101
ensureGreen(targetIndex);
102+
logger.info("-> Relocation has finished");
103+
}
104+
105+
private void ensureDownsamplingStatus(String downsampledIndex, IndexMetadata.DownsampleTaskStatus expectedStatus, TimeValue timeout) {
106+
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
107+
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
108+
final var indexMetadata = clusterState.metadata().getProject().index(downsampledIndex);
109+
if (indexMetadata == null) {
110+
return false;
111+
}
112+
var downsamplingStatus = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
113+
if (expectedStatus == downsamplingStatus) {
114+
logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus);
115+
return true;
116+
}
117+
return false;
118+
});
119+
safeAwait(listener, timeout.millis(), TimeUnit.MILLISECONDS);
124120
}
125121
}

0 commit comments

Comments
 (0)