Skip to content

Commit 2ac5031

Browse files
committed
Use a timeout listener
1 parent ca896c5 commit 2ac5031

File tree

2 files changed

+28
-88
lines changed

2 files changed

+28
-88
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/deployment/DeploymentManager.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.ActionListener;
1717
import org.elasticsearch.action.search.SearchRequest;
1818
import org.elasticsearch.action.search.TransportSearchAction;
19+
import org.elasticsearch.action.support.ListenerTimeouts;
1920
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2021
import org.elasticsearch.client.internal.Client;
2122
import org.elasticsearch.common.Strings;
@@ -79,6 +80,7 @@ public class DeploymentManager {
7980
private static final Logger logger = LogManager.getLogger(DeploymentManager.class);
8081
private static final AtomicLong requestIdCounter = new AtomicLong(1);
8182
public static final int NUM_RESTART_ATTEMPTS = 3;
83+
private static final TimeValue WORKER_QUEUE_COMPLETION_TIMEOUT = TimeValue.timeValueMinutes(5);
8284

8385
private final Client client;
8486
private final NamedXContentRegistry xContentRegistry;
@@ -674,25 +676,38 @@ private synchronized void stopProcessAfterCompletingPendingWork(ActionListener<A
674676
prepareInternalStateForShutdown();
675677

676678
// Waiting for the process worker to finish the pending work could
677-
// take a long time. Best not to block the thread so register
678-
// a function with the process worker that is called when the
679-
// work is finished. Then proceed to closing the native process
679+
// take a long time. To avoid blocking the calling thread register
680+
// a function with the process worker queue that is called when the
681+
// worker queue is finished. Then proceed to closing the native process
680682
// and wait for all results to be processed, the second part can be
681683
// done synchronously as it is not expected to take long.
682-
// The ShutdownTracker will handle this.
683-
684-
// Shutdown tracker will stop the process work and start a race with
685-
// a timeout condition.
686-
new ShutdownTracker(() -> {
687-
// Stopping the process worker timed out, kill the process
688-
logger.warn(format("[%s] Timed out waiting for process worker to complete, forcing a shutdown", task.getDeploymentId()));
689-
forcefullyStopProcess();
690-
}, () -> {
684+
685+
// This listener closes the native process and waits for the results
686+
// after the worker queue has finished
687+
var closeProcessListener = listener.delegateResponse((l, r) -> {
691688
// process worker stopped within allotted time, close process
692689
closeProcessAndWaitForResultProcessor();
693690
closeNlpTaskProcessor();
694-
}, threadPool, priorityProcessWorker, listener);
691+
l.onResponse(AcknowledgedResponse.TRUE);
692+
});
693+
694+
// Timeout listener waits
695+
var listenWithTimeout = ListenerTimeouts.wrapWithTimeout(
696+
threadPool,
697+
WORKER_QUEUE_COMPLETION_TIMEOUT,
698+
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME),
699+
closeProcessListener,
700+
(l) -> {
701+
// Stopping the process worker timed out, kill the process
702+
logger.warn(
703+
format("[%s] Timed out waiting for process worker to complete, forcing a shutdown", task.getDeploymentId())
704+
);
705+
forcefullyStopProcess();
706+
l.onResponse(AcknowledgedResponse.FALSE);
707+
}
708+
);
695709

710+
priorityProcessWorker.shutdownWithCallback(() -> listenWithTimeout.onResponse(AcknowledgedResponse.TRUE));
696711
}
697712

698713
private void closeProcessAndWaitForResultProcessor() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/deployment/ShutdownTracker.java

Lines changed: 0 additions & 75 deletions
This file was deleted.

0 commit comments

Comments
 (0)