Skip to content

Commit 3342033

Browse files
authored
Fix expiration time in ES|QL async (elastic#135209) (elastic#135240)
Currently, we incorrectly use the initial keep-alive value when creating the document for the async response. Instead, we should use the latest expiration from the search task, which is updated by get requests. Closes elastic#135169
1 parent 7d463a2 commit 3342033

File tree

5 files changed

+76
-71
lines changed

5 files changed

+76
-71
lines changed

docs/changelog/135209.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135209
2+
summary: Fix expiration time in ES|QL async
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135169

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ public AsyncTaskManagementService(
175175
public void asyncExecute(
176176
Request request,
177177
TimeValue waitForCompletionTimeout,
178-
TimeValue keepAlive,
179178
boolean keepOnCompletion,
180179
ActionListener<Response> listener
181180
) {
@@ -188,7 +187,7 @@ public void asyncExecute(
188187
operation.execute(
189188
request,
190189
searchTask,
191-
wrapStoringListener(searchTask, waitForCompletionTimeout, keepAlive, keepOnCompletion, listener)
190+
wrapStoringListener(searchTask, waitForCompletionTimeout, keepOnCompletion, listener)
192191
);
193192
operationStarted = true;
194193
} finally {
@@ -203,7 +202,6 @@ public void asyncExecute(
203202
private ActionListener<Response> wrapStoringListener(
204203
T searchTask,
205204
TimeValue waitForCompletionTimeout,
206-
TimeValue keepAlive,
207205
boolean keepOnCompletion,
208206
ActionListener<Response> listener
209207
) {
@@ -225,7 +223,7 @@ private ActionListener<Response> wrapStoringListener(
225223
if (keepOnCompletion) {
226224
storeResults(
227225
searchTask,
228-
new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()),
226+
new StoredAsyncResponse<>(response, searchTask.getExpirationTimeMillis()),
229227
ActionListener.running(() -> acquiredListener.onResponse(response))
230228
);
231229
} else {
@@ -237,7 +235,7 @@ private ActionListener<Response> wrapStoringListener(
237235
// We finished after timeout - saving results
238236
storeResults(
239237
searchTask,
240-
new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()),
238+
new StoredAsyncResponse<>(response, searchTask.getExpirationTimeMillis()),
241239
ActionListener.running(response::decRef)
242240
);
243241
}
@@ -249,7 +247,7 @@ private ActionListener<Response> wrapStoringListener(
249247
if (keepOnCompletion) {
250248
storeResults(
251249
searchTask,
252-
new StoredAsyncResponse<>(e, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()),
250+
new StoredAsyncResponse<>(e, searchTask.getExpirationTimeMillis()),
253251
ActionListener.running(() -> acquiredListener.onFailure(e))
254252
);
255253
} else {
@@ -259,7 +257,7 @@ private ActionListener<Response> wrapStoringListener(
259257
}
260258
} else {
261259
// We finished after timeout - saving exception
262-
storeResults(searchTask, new StoredAsyncResponse<>(e, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()));
260+
storeResults(searchTask, new StoredAsyncResponse<>(e, searchTask.getExpirationTimeMillis()));
263261
}
264262
});
265263
}

x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementServiceTests.java

Lines changed: 63 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.ActionResponse;
1212
import org.elasticsearch.action.LegacyActionRequest;
1313
import org.elasticsearch.action.support.ActionTestUtils;
14+
import org.elasticsearch.action.support.PlainActionFuture;
1415
import org.elasticsearch.cluster.service.ClusterService;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -40,6 +41,7 @@
4041

4142
import static org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService.addCompletionListener;
4243
import static org.hamcrest.Matchers.equalTo;
44+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4345
import static org.hamcrest.Matchers.notNullValue;
4446
import static org.hamcrest.Matchers.nullValue;
4547

@@ -52,9 +54,11 @@ public class AsyncTaskManagementServiceTests extends ESSingleNodeTestCase {
5254

5355
public static class TestRequest extends LegacyActionRequest {
5456
private final String string;
57+
private final TimeValue keepAlive;
5558

56-
public TestRequest(String string) {
59+
public TestRequest(String string, TimeValue keepAlive) {
5760
this.string = string;
61+
this.keepAlive = keepAlive;
5862
}
5963

6064
@Override
@@ -129,7 +133,7 @@ public TestTask createTask(
129133
headers,
130134
originHeaders,
131135
asyncExecutionId,
132-
TimeValue.timeValueDays(5)
136+
request.keepAlive
133137
);
134138
}
135139

@@ -172,7 +176,7 @@ public void setup() {
172176
);
173177
results = new AsyncResultsService<>(
174178
store,
175-
true,
179+
false,
176180
TestTask.class,
177181
(task, listener, timeout) -> addCompletionListener(transportService.getThreadPool(), task, listener, timeout),
178182
transportService.getTaskManager(),
@@ -212,23 +216,17 @@ public void testReturnBeforeTimeout() throws Exception {
212216
boolean success = randomBoolean();
213217
boolean keepOnCompletion = randomBoolean();
214218
CountDownLatch latch = new CountDownLatch(1);
215-
TestRequest request = new TestRequest(success ? randomAlphaOfLength(10) : "die");
216-
service.asyncExecute(
217-
request,
218-
TimeValue.timeValueMinutes(1),
219-
TimeValue.timeValueMinutes(10),
220-
keepOnCompletion,
221-
ActionListener.wrap(r -> {
222-
assertThat(success, equalTo(true));
223-
assertThat(r.string, equalTo("response for [" + request.string + "]"));
224-
assertThat(r.id, notNullValue());
225-
latch.countDown();
226-
}, e -> {
227-
assertThat(success, equalTo(false));
228-
assertThat(e.getMessage(), equalTo("test exception"));
229-
latch.countDown();
230-
})
231-
);
219+
TestRequest request = new TestRequest(success ? randomAlphaOfLength(10) : "die", TimeValue.timeValueDays(1));
220+
service.asyncExecute(request, TimeValue.timeValueMinutes(1), keepOnCompletion, ActionListener.wrap(r -> {
221+
assertThat(success, equalTo(true));
222+
assertThat(r.string, equalTo("response for [" + request.string + "]"));
223+
assertThat(r.id, notNullValue());
224+
latch.countDown();
225+
}, e -> {
226+
assertThat(success, equalTo(false));
227+
assertThat(e.getMessage(), equalTo("test exception"));
228+
latch.countDown();
229+
}));
232230
assertThat(latch.await(10, TimeUnit.SECONDS), equalTo(true));
233231
}
234232

@@ -252,20 +250,14 @@ public void execute(TestRequest request, TestTask task, ActionListener<TestRespo
252250
boolean timeoutOnFirstAttempt = randomBoolean();
253251
boolean waitForCompletion = randomBoolean();
254252
CountDownLatch latch = new CountDownLatch(1);
255-
TestRequest request = new TestRequest(success ? randomAlphaOfLength(10) : "die");
253+
TestRequest request = new TestRequest(success ? randomAlphaOfLength(10) : "die", TimeValue.timeValueDays(1));
256254
AtomicReference<TestResponse> responseHolder = new AtomicReference<>();
257-
service.asyncExecute(
258-
request,
259-
TimeValue.timeValueMillis(1),
260-
TimeValue.timeValueMinutes(10),
261-
keepOnCompletion,
262-
ActionTestUtils.assertNoFailureListener(r -> {
263-
assertThat(r.string, nullValue());
264-
assertThat(r.id, notNullValue());
265-
assertThat(responseHolder.getAndSet(r), nullValue());
266-
latch.countDown();
267-
})
268-
);
255+
service.asyncExecute(request, TimeValue.timeValueMillis(1), keepOnCompletion, ActionTestUtils.assertNoFailureListener(r -> {
256+
assertThat(r.string, nullValue());
257+
assertThat(r.id, notNullValue());
258+
assertThat(responseHolder.getAndSet(r), nullValue());
259+
latch.countDown();
260+
}));
269261
assertThat(latch.await(20, TimeUnit.SECONDS), equalTo(true));
270262

271263
if (timeoutOnFirstAttempt) {
@@ -281,17 +273,11 @@ public void execute(TestRequest request, TestTask task, ActionListener<TestRespo
281273
if (waitForCompletion) {
282274
// now we are waiting for the task to finish
283275
logger.trace("Waiting for response to complete");
284-
AtomicReference<StoredAsyncResponse<TestResponse>> responseRef = new AtomicReference<>();
285-
CountDownLatch getResponseCountDown = getResponse(
286-
responseHolder.get().id,
287-
TimeValue.timeValueSeconds(5),
288-
ActionTestUtils.assertNoFailureListener(responseRef::set)
289-
);
276+
var getFuture = getResponse(responseHolder.get().id, TimeValue.timeValueSeconds(5), TimeValue.MINUS_ONE);
290277

291278
executionLatch.countDown();
292-
assertThat(getResponseCountDown.await(10, TimeUnit.SECONDS), equalTo(true));
279+
var response = safeGet(getFuture);
293280

294-
StoredAsyncResponse<TestResponse> response = responseRef.get();
295281
if (success) {
296282
assertThat(response.getException(), nullValue());
297283
assertThat(response.getResponse(), notNullValue());
@@ -326,26 +312,46 @@ public void execute(TestRequest request, TestTask task, ActionListener<TestRespo
326312
}
327313
}
328314

315+
public void testUpdateKeepAliveToTask() throws Exception {
316+
long now = System.currentTimeMillis();
317+
CountDownLatch executionLatch = new CountDownLatch(1);
318+
AsyncTaskManagementService<TestRequest, TestResponse, TestTask> service = createManagementService(new TestOperation() {
319+
@Override
320+
public void execute(TestRequest request, TestTask task, ActionListener<TestResponse> listener) {
321+
executorService.submit(() -> {
322+
try {
323+
assertThat(executionLatch.await(10, TimeUnit.SECONDS), equalTo(true));
324+
} catch (InterruptedException ex) {
325+
throw new AssertionError(ex);
326+
}
327+
super.execute(request, task, listener);
328+
});
329+
}
330+
});
331+
TestRequest request = new TestRequest(randomAlphaOfLength(10), TimeValue.timeValueHours(1));
332+
PlainActionFuture<TestResponse> submitResp = new PlainActionFuture<>();
333+
try {
334+
service.asyncExecute(request, TimeValue.timeValueMillis(1), true, submitResp);
335+
String id = submitResp.get().id;
336+
assertThat(id, notNullValue());
337+
TimeValue keepAlive = TimeValue.timeValueDays(between(1, 10));
338+
var resp1 = safeGet(getResponse(id, TimeValue.ZERO, keepAlive));
339+
assertThat(resp1.getExpirationTime(), greaterThanOrEqualTo(now + keepAlive.millis()));
340+
} finally {
341+
executionLatch.countDown();
342+
}
343+
}
344+
329345
private StoredAsyncResponse<TestResponse> getResponse(String id, TimeValue timeout) throws InterruptedException {
330-
AtomicReference<StoredAsyncResponse<TestResponse>> response = new AtomicReference<>();
331-
assertThat(
332-
getResponse(id, timeout, ActionTestUtils.assertNoFailureListener(response::set)).await(10, TimeUnit.SECONDS),
333-
equalTo(true)
334-
);
335-
return response.get();
346+
return safeGet(getResponse(id, timeout, TimeValue.MINUS_ONE));
336347
}
337348

338-
private CountDownLatch getResponse(String id, TimeValue timeout, ActionListener<StoredAsyncResponse<TestResponse>> listener) {
339-
CountDownLatch responseLatch = new CountDownLatch(1);
349+
private PlainActionFuture<StoredAsyncResponse<TestResponse>> getResponse(String id, TimeValue timeout, TimeValue keepAlive) {
350+
PlainActionFuture<StoredAsyncResponse<TestResponse>> future = new PlainActionFuture<>();
340351
GetAsyncResultRequest getResultsRequest = new GetAsyncResultRequest(id).setWaitForCompletionTimeout(timeout);
341-
results.retrieveResult(getResultsRequest, ActionListener.wrap(r -> {
342-
listener.onResponse(r);
343-
responseLatch.countDown();
344-
}, e -> {
345-
listener.onFailure(e);
346-
responseLatch.countDown();
347-
}));
348-
return responseLatch;
352+
getResultsRequest.setKeepAlive(keepAlive);
353+
results.retrieveResult(getResultsRequest, future);
354+
return future;
349355
}
350356

351357
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ public void testUpdateKeepAlive() throws Exception {
307307
assertThat(resp.isRunning(), is(false));
308308
}
309309
});
310+
assertThat(getExpirationFromDoc(asyncId), greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
310311
// update the keepAlive after the query has completed
311312
int iters = between(1, 5);
312313
for (int i = 0; i < iters; i++) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,7 @@ protected void doExecute(Task task, EsqlQueryRequest request, ActionListener<Esq
187187
private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
188188
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
189189
if (requestIsAsync(request)) {
190-
asyncTaskManagementService.asyncExecute(
191-
request,
192-
request.waitForCompletionTimeout(),
193-
request.keepAlive(),
194-
request.keepOnCompletion(),
195-
listener
196-
);
190+
asyncTaskManagementService.asyncExecute(request, request.waitForCompletionTimeout(), request.keepOnCompletion(), listener);
197191
} else {
198192
innerExecute(task, request, listener);
199193
}

0 commit comments

Comments
 (0)