diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 853ac9b877d3f..2a7a2ef8874c0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction; +import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; @@ -21,6 +22,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -39,21 +41,25 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matchers; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableSet; +import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.CoreMatchers.equalTo; @@ -202,7 +208,7 @@ public void testClusterInfoServiceInformationClearOnError() { internalCluster().startNodes( 2, // manually control publishing - Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build() + Settings.builder().put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build() ); prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)).get(); ensureGreen("test"); @@ -334,4 +340,65 @@ public void testClusterInfoServiceInformationClearOnError() { ); } } + + public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { + var settings = Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build(); + var masterName = internalCluster().startMasterOnlyNode(settings); + var dataNodeName = internalCluster().startDataOnlyNode(settings); + ensureStableCluster(2); + assertEquals(internalCluster().getMasterName(), masterName); + assertNotEquals(internalCluster().getMasterName(), dataNodeName); + logger.info("---> master node: " + masterName + ", data node: " + dataNodeName); + + // Track when the data node receives a poll from the master for the write thread pool's stats. + final MockTransportService dataNodeMockTransportService = MockTransportService.getInstance(dataNodeName); + final CountDownLatch nodeThreadPoolStatsPolledByMaster = new CountDownLatch(1); + dataNodeMockTransportService.addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> { + handler.messageReceived(request, channel, task); + + if (nodeThreadPoolStatsPolledByMaster.getCount() > 0) { + logger.info("---> Data node received a request for thread pool stats"); + } + nodeThreadPoolStatsPolledByMaster.countDown(); + } + ); + + // Do some writes to create some write thread pool activity. + final String indexName = randomIdentifier(); + for (int i = 0; i < randomIntBetween(1, 1000); i++) { + index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); + } + + // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. + final InternalClusterInfoService masterClusterInfoService = asInstanceOf( + InternalClusterInfoService.class, + internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class) + ); + final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + + // Verify that the data node received a request for thread pool stats. + safeAwait(nodeThreadPoolStatsPolledByMaster); + + final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(2)); // master and data node + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); + } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 25ae21964ba0e..9db3836fdb32f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; -import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -91,7 +90,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -133,11 +131,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList( - InternalSettingsPlugin.class, - BogusEstimatedHeapUsagePlugin.class, - BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class - ); + return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class); } public void testLockTryingToDelete() throws Exception { @@ -332,8 +326,7 @@ public void testNodeWriteLoadsArePresent() { ClusterInfoServiceUtils.refresh(clusterInfoService); nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools(); - /** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation - * generates random usage values */ + /** Verify that each node has usage stats reported. */ ClusterState state = getInstanceFromNode(ClusterService.class).state(); assertEquals(state.nodes().size(), nodeThreadPoolStats.size()); for (DiscoveryNode node : state.nodes()) { @@ -346,7 +339,7 @@ public void testNodeWriteLoadsArePresent() { assertNotNull(writeThreadPoolStats); assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0)); assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); - assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } } finally { updateClusterSettings( @@ -935,61 +928,4 @@ public ClusterService getClusterService() { return clusterService.get(); } } - - /** - * A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random - * {@link NodeUsageStatsForThreadPools} for each node in the cluster. - *

- * Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the - * plugin system can pick it up and use it for the test set-up. - */ - public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { - - private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin; - - public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) { - this.plugin = plugin; - } - - @Override - public void collectUsageStats(ActionListener> listener) { - ActionListener.completeWith( - listener, - () -> plugin.getClusterService() - .state() - .nodes() - .stream() - .collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId()))) - ); - } - - private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) { - NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - randomNonNegativeInt(), - randomFloat(), - randomNonNegativeLong() - ); - Map statsForThreadPools = new HashMap<>(); - statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats); - return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools); - } - } - - /** - * Make a plugin to gain access to the {@link ClusterService} instance. - */ - public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin { - - private final SetOnce clusterService = new SetOnce<>(); - - @Override - public Collection createComponents(PluginServices services) { - clusterService.set(services.clusterService()); - return List.of(); - } - - public ClusterService getClusterService() { - return clusterService.get(); - } - } } diff --git a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector deleted file mode 100644 index 787ce436c3ca6..0000000000000 --- a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector +++ /dev/null @@ -1,10 +0,0 @@ -# -# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -# or more contributor license agreements. Licensed under the "Elastic License -# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side -# Public License v 1"; you may not use this file except in compliance with, at -# your election, the "Elastic License 2.0", the "GNU Affero General Public -# License v3.0 only", or the "Server Side Public License, v 1". -# - -org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageStatsForThreadPoolsCollector diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index ae0ccecf15ed7..b8124c7b077c7 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -340,6 +340,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_FIXED_INDEX_LIKE = def(9_119_0_00); public static final TransportVersion LOOKUP_JOIN_CCS = def(9_120_0_00); public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00); + public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_122_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index f6f06f3301a6d..51091c5f0d886 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; +import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; import org.elasticsearch.action.admin.cluster.node.usage.TransportNodesUsageAction; import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction; import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction; @@ -629,6 +630,7 @@ public void reg ActionRegistry actions = new ActionRegistry(); actions.register(TransportNodesInfoAction.TYPE, TransportNodesInfoAction.class); + actions.register(TransportNodeUsageStatsForThreadPoolsAction.TYPE, TransportNodeUsageStatsForThreadPoolsAction.class); actions.register(TransportRemoteInfoAction.TYPE, TransportRemoteInfoAction.class); actions.register(TransportNodesCapabilitiesAction.TYPE, TransportNodesCapabilitiesAction.class); actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java new file mode 100644 index 0000000000000..81af31dbcafbb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.node.usage; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.AbstractTransportRequest; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Defines the request/response types for {@link TransportNodeUsageStatsForThreadPoolsAction}. + */ +public class NodeUsageStatsForThreadPoolsAction { + /** + * The sender request type that will be resolved to send individual {@link NodeRequest} requests to every node in the cluster. + */ + public static class Request extends BaseNodesRequest { + public Request() { + // Send all nodes a request by specifying null. + super((String[]) null); + } + } + + /** + * Request sent to and received by a cluster node. There are no parameters needed in the node-specific request. + */ + public static class NodeRequest extends AbstractTransportRequest { + public NodeRequest(StreamInput in) throws IOException { + super(in); + } + + public NodeRequest() {} + } + + /** + * A collection of {@link NodeUsageStatsForThreadPools} responses from all the cluster nodes. + */ + public static class Response extends BaseNodesResponse { + + protected Response(StreamInput in) throws IOException { + super(in); + } + + public Response( + ClusterName clusterName, + List nodeResponses, + List nodeFailures + ) { + super(clusterName, nodeResponses, nodeFailures); + } + + /** + * Combines the responses from each node that was called into a single map (by node ID) for the final {@link Response}. + */ + public Map getAllNodeUsageStatsForThreadPools() { + Map allNodeUsageStatsForThreadPools = new HashMap<>(); + for (NodeUsageStatsForThreadPoolsAction.NodeResponse nodeResponse : getNodes()) { + // NOMERGE: Is the nodeID in NodeUsageStatsForThreadPools redundant? What is it useful for? If not, remove? + allNodeUsageStatsForThreadPools.put( + nodeResponse.getNodeUsageStatsForThreadPools().nodeId(), + nodeResponse.getNodeUsageStatsForThreadPools() + ); + } + return allNodeUsageStatsForThreadPools; + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodeResponses) throws IOException { + out.writeCollection(nodeResponses); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(NodeUsageStatsForThreadPoolsAction.NodeResponse::new); + } + + @Override + public String toString() { + return "NodeUsageStatsForThreadPoolsAction.Response{" + getNodes() + "}"; + } + } + + /** + * A {@link NodeUsageStatsForThreadPools} response from a single cluster node. + */ + public static class NodeResponse extends BaseNodeResponse { + private final NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools; + + protected NodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + super(in, node); + this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in); + } + + public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools) { + super(node); + this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools; + } + + public NodeResponse(StreamInput in) throws IOException { + super(in); + this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in); + } + + public NodeUsageStatsForThreadPools getNodeUsageStatsForThreadPools() { + return nodeUsageStatsForThreadPools; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + nodeUsageStatsForThreadPools.writeTo(out); + } + + @Override + public String toString() { + return "NodeUsageStatsForThreadPoolsAction.NodeResponse{" + + "nodeId=" + + getNode().getId() + + ", nodeUsageStatsForThreadPools=" + + nodeUsageStatsForThreadPools + + "}"; + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java new file mode 100644 index 0000000000000..710028bf54627 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.node.usage; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Collects some thread pool stats from each data node for purposes of shard allocation balancing. The specific stats are defined in + * {@link NodeUsageStatsForThreadPools}. + */ +public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesAction< + NodeUsageStatsForThreadPoolsAction.Request, + NodeUsageStatsForThreadPoolsAction.Response, + NodeUsageStatsForThreadPoolsAction.NodeRequest, + NodeUsageStatsForThreadPoolsAction.NodeResponse, + Void> { + + private static final Logger logger = LogManager.getLogger(TransportNodeUsageStatsForThreadPoolsAction.class); + + public static final String NAME = "internal:monitor/thread_pool/stats"; + public static final ActionType TYPE = new ActionType<>(NAME); + + private final ThreadPool threadPool; + private final ClusterService clusterService; + + @Inject + public TransportNodeUsageStatsForThreadPoolsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters + ) { + super( + NAME, + clusterService, + transportService, + actionFilters, + NodeUsageStatsForThreadPoolsAction.NodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.threadPool = threadPool; + this.clusterService = clusterService; + } + + @Override + protected NodeUsageStatsForThreadPoolsAction.Response newResponse( + NodeUsageStatsForThreadPoolsAction.Request request, + List nodeResponses, + List nodeFailures + ) { + + return new NodeUsageStatsForThreadPoolsAction.Response(clusterService.getClusterName(), nodeResponses, nodeFailures); + } + + @Override + protected NodeUsageStatsForThreadPoolsAction.NodeRequest newNodeRequest(NodeUsageStatsForThreadPoolsAction.Request request) { + return new NodeUsageStatsForThreadPoolsAction.NodeRequest(); + } + + @Override + protected NodeUsageStatsForThreadPoolsAction.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new NodeUsageStatsForThreadPoolsAction.NodeResponse(in); + } + + @Override + protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( + NodeUsageStatsForThreadPoolsAction.NodeRequest request, + Task task + ) { + logger.info("~~~TransportNodeUsageStatsForThreadPoolsAction: START"); + DiscoveryNode localNode = clusterService.localNode(); + var writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); + assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor; + var trackingForWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor; + + ThreadPoolUsageStats threadPoolUsageStats = new ThreadPoolUsageStats( + trackingForWriteExecutor.getMaximumPoolSize(), + (float) trackingForWriteExecutor.pollUtilization( + TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION + ), + trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset() + ); + + logger.info("~~~TransportNodeUsageStatsForThreadPoolsAction: " + threadPoolUsageStats); + + Map perThreadPool = new HashMap<>(); + perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats); + return new NodeUsageStatsForThreadPoolsAction.NodeResponse( + localNode, + new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 89394c8fa8ba8..19ecbf0ff9f0f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -50,6 +50,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import java.util.function.Supplier; import static org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING; import static org.elasticsearch.core.Strings.format; @@ -107,6 +108,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private final ThreadPool threadPool; private final Client client; + private final Supplier clusterStateSupplier; private final List> listeners = new CopyOnWriteArrayList<>(); private final Object mutex = new Object(); @@ -139,6 +141,7 @@ public InternalClusterInfoService( this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); + this.clusterStateSupplier = clusterService::state; ClusterSettings clusterSettings = clusterService.getClusterSettings(); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency); @@ -269,18 +272,22 @@ private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writ } private void fetchNodesUsageStatsForThreadPools() { - nodeUsageStatsForThreadPoolsCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() { - @Override - public void onResponse(Map writeLoads) { - nodeThreadPoolUsageStatsPerNode = writeLoads; - } + nodeUsageStatsForThreadPoolsCollector.collectUsageStats( + client, + clusterStateSupplier.get(), + ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(Map threadPoolStats) { + nodeThreadPoolUsageStatsPerNode = threadPoolStats; + } - @Override - public void onFailure(Exception e) { - logger.warn("failed to fetch write load estimates for nodes", e); - nodeThreadPoolUsageStatsPerNode = Map.of(); - } - }, fetchRefs.acquire())); + @Override + public void onFailure(Exception e) { + logger.warn("failed to fetch thread pool usage estimates for nodes", e); + nodeThreadPoolUsageStatsPerNode = Map.of(); + } + }, fetchRefs.acquire()) + ); } private void fetchNodesEstimatedHeapUsage() { diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java index 5e84f29af8412..9b0297cd73abd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java @@ -33,7 +33,7 @@ public NodeUsageStatsForThreadPools(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(this.nodeId); - out.writeMap(threadPoolUsageStatsMap, StreamOutput::writeWriteable); + out.writeMap(this.threadPoolUsageStatsMap, StreamOutput::writeWriteable); } @Override @@ -47,6 +47,9 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; NodeUsageStatsForThreadPools other = (NodeUsageStatsForThreadPools) o; for (var entry : other.threadPoolUsageStatsMap.entrySet()) { + if (nodeId.equals(other.nodeId) == false) { + return false; + } var loadStats = threadPoolUsageStatsMap.get(entry.getKey()); if (loadStats == null || loadStats.equals(entry.getValue()) == false) { return false; @@ -70,14 +73,11 @@ public String toString() { * * @param totalThreadPoolThreads Total number of threads in the thread pool. * @param averageThreadPoolUtilization Percent of thread pool threads that are in use, averaged over some period of time. - * @param averageThreadPoolQueueLatencyMillis How much time tasks spend in the thread pool queue. Zero if there is nothing being queued - * in the write thread pool. + * @param maxThreadPoolQueueLatencyMillis The max time any task has spent in the thread pool queue. Zero if no task is queued. */ - public record ThreadPoolUsageStats( - int totalThreadPoolThreads, - float averageThreadPoolUtilization, - long averageThreadPoolQueueLatencyMillis - ) implements Writeable { + public record ThreadPoolUsageStats(int totalThreadPoolThreads, float averageThreadPoolUtilization, long maxThreadPoolQueueLatencyMillis) + implements + Writeable { public ThreadPoolUsageStats(StreamInput in) throws IOException { this(in.readVInt(), in.readFloat(), in.readVLong()); @@ -87,12 +87,12 @@ public ThreadPoolUsageStats(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeVInt(this.totalThreadPoolThreads); out.writeFloat(this.averageThreadPoolUtilization); - out.writeVLong(this.averageThreadPoolQueueLatencyMillis); + out.writeVLong(this.maxThreadPoolQueueLatencyMillis); } @Override public int hashCode() { - return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, averageThreadPoolQueueLatencyMillis); + return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, maxThreadPoolQueueLatencyMillis); } @Override @@ -101,8 +101,8 @@ public String toString() { + totalThreadPoolThreads + ", averageThreadPoolUtilization=" + averageThreadPoolUtilization - + ", averageThreadPoolQueueLatencyMillis=" - + averageThreadPoolQueueLatencyMillis + + ", maxThreadPoolQueueLatencyMillis=" + + maxThreadPoolQueueLatencyMillis + "]"; } @@ -113,7 +113,7 @@ public boolean equals(Object o) { ThreadPoolUsageStats other = (ThreadPoolUsageStats) o; return totalThreadPoolThreads == other.totalThreadPoolThreads && averageThreadPoolUtilization == other.averageThreadPoolUtilization - && averageThreadPoolQueueLatencyMillis == other.averageThreadPoolQueueLatencyMillis; + && maxThreadPoolQueueLatencyMillis == other.maxThreadPoolQueueLatencyMillis; } } // ThreadPoolUsageStats diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index e302a4abed559..e08563a648eb1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -9,25 +9,48 @@ package org.elasticsearch.cluster; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.client.internal.Client; import java.util.Map; /** - * Collects the usage stats (like write thread pool load) estimations for each node in the cluster. + * Collects the thread pool usage stats for each node in the cluster. *

* Results are returned as a map of node ID to node usage stats. */ -public interface NodeUsageStatsForThreadPoolsCollector { - /** - * This will be used when there is no NodeUsageLoadCollector available. - */ - NodeUsageStatsForThreadPoolsCollector EMPTY = listener -> listener.onResponse(Map.of()); +public class NodeUsageStatsForThreadPoolsCollector { + public static final NodeUsageStatsForThreadPoolsCollector EMPTY = new NodeUsageStatsForThreadPoolsCollector() { + public void collectUsageStats( + Client client, + ClusterState clusterState, + ActionListener> listener + ) { + listener.onResponse(Map.of()); + } + }; /** - * Collects the write load estimates from the cluster. + * Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster. * - * @param listener The listener to receive the write load results. + * @param listener The listener to receive the usage results. */ - void collectUsageStats(ActionListener> listener); + public void collectUsageStats( + Client client, + ClusterState clusterState, + ActionListener> listener + ) { + if (clusterState.getMinTransportVersion().onOrAfter(TransportVersions.TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) { + client.execute( + TransportNodeUsageStatsForThreadPoolsAction.TYPE, + new NodeUsageStatsForThreadPoolsAction.Request(), + listener.map(response -> response.getAllNodeUsageStatsForThreadPools()) + ); + } else { + listener.onResponse(Map.of()); + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java new file mode 100644 index 0000000000000..0d7c5031e5168 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.gateway.GatewayService; + +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +/** + * Monitors the node-level thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via + * {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds. Also maintains node-level + * write-load stats for each data node that can be supplied to callers: these stats are pushed from data nodes whenever an + * actionable change occurs, such as crossing the utilization or queue time thresholds. + * + * Runs on the master node. + * + * TODO (ES-11992): implement + */ +public class NodeUsageStatsForThreadPoolsMonitor { + private static final Logger logger = LogManager.getLogger(NodeUsageStatsForThreadPoolsMonitor.class); + private final WriteLoadConstraintSettings writeLoadConstraintSettings; + private final Supplier clusterStateSupplier; + private final LongSupplier currentTimeMillisSupplier; + private final RerouteService rerouteService; + + public NodeUsageStatsForThreadPoolsMonitor( + ClusterSettings clusterSettings, + LongSupplier currentTimeMillisSupplier, + Supplier clusterStateSupplier, + RerouteService rerouteService + ) { + this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings); + this.clusterStateSupplier = clusterStateSupplier; + this.currentTimeMillisSupplier = currentTimeMillisSupplier; + this.rerouteService = rerouteService; + } + + /** + * Receives a copy of the latest {@link ClusterInfo} whenever the {@link ClusterInfoService} collects it. Processes the new + * {@link org.elasticsearch.cluster.NodeUsageStatsForThreadPools} and initiates rebalancing, via reroute, if a node in the cluster + * exceeds thread pool usage thresholds. + */ + public void onNewInfo(ClusterInfo clusterInfo) { + final ClusterState state = clusterStateSupplier.get(); + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + logger.debug("skipping monitor as the cluster state is not recovered yet"); + return; + } + + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) { + logger.trace("skipping monitor because the write load decider is disabled"); + return; + } + + logger.trace("processing new cluster info"); + + boolean reroute = false; + String explanation = ""; + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + + // TODO (ES-11992): implement + + if (reroute) { + logger.debug("rerouting shards: [{}]", explanation); + rerouteService.reroute("disk threshold monitor", Priority.NORMAL, ActionListener.wrap(ignored -> { + final var reroutedClusterState = clusterStateSupplier.get(); + + // TODO (ES-11992): implement + + }, e -> logger.debug("reroute failed", e))); + } else { + logger.trace("no reroute required"); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java index cba02ed207b81..23e1cb563f9fd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.RatioValue; import org.elasticsearch.core.TimeValue; @@ -98,4 +99,28 @@ public enum WriteLoadDeciderStatus { Setting.Property.Dynamic, Setting.Property.NodeScope ); + + WriteLoadDeciderStatus writeLoadDeciderStatus; + TimeValue writeLoadDeciderRerouteIntervalSetting; + + WriteLoadConstraintSettings(ClusterSettings clusterSettings) { + clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled); + clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting); + }; + + private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) { + this.writeLoadDeciderStatus = status; + } + + public WriteLoadDeciderStatus getWriteLoadConstraintEnabled() { + return this.writeLoadDeciderStatus; + } + + public TimeValue getWriteLoadDeciderRerouteIntervalSetting() { + return this.writeLoadDeciderRerouteIntervalSetting; + } + + private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) { + this.writeLoadDeciderRerouteIntervalSetting = timeValue; + } } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index bb28ed4a8aff5..5d30a30e22c5d 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -60,6 +60,7 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; +import org.elasticsearch.cluster.routing.allocation.NodeUsageStatsForThreadPoolsMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.version.CompatibilityVersions; @@ -785,6 +786,15 @@ private void construct( )::onNewInfo ); + clusterInfoService.addListener( + new NodeUsageStatsForThreadPoolsMonitor( + clusterService.getClusterSettings(), + threadPool.relativeTimeInMillisSupplier(), + clusterService::state, + rerouteService + )::onNewInfo + ); + IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class).toList()); modules.add(indicesModule); diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 326002c7d346c..3b44d6b25b7af 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -80,17 +80,13 @@ ClusterInfoService newClusterInfoService( EstimatedHeapUsageCollector.class, () -> EstimatedHeapUsageCollector.EMPTY ); - final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector = pluginsService.loadSingletonServiceProvider( - NodeUsageStatsForThreadPoolsCollector.class, - () -> NodeUsageStatsForThreadPoolsCollector.EMPTY - ); final InternalClusterInfoService service = new InternalClusterInfoService( settings, clusterService, threadPool, client, estimatedHeapUsageCollector, - nodeUsageStatsForThreadPoolsCollector + new NodeUsageStatsForThreadPoolsCollector() ); if (DiscoveryNode.isMasterNode(settings)) { // listen for state changes (this node starts/stops being the elected master, or new nodes are added) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 814aa102ce284..26af726bccc02 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -72,7 +72,7 @@ private static Map randomNodeUsageStatsFor NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolUsageStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(/* totalThreadPoolThreads= */ randomIntBetween(1, 16), /* averageThreadPoolUtilization= */ randomFloat(), - /* averageThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) + /* maxThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) ); Map usageStatsForThreadPools = new HashMap<>(); usageStatsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 6e80e0d087993..22da1f662c6c0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -21,11 +21,14 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.routing.allocation.NodeUsageStatsForThreadPoolsMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; @@ -84,8 +87,8 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final EstimatedHeapUsageCollector mockEstimatedHeapUsageCollector = spy(new StubEstimatedEstimatedHeapUsageCollector()); - final NodeUsageStatsForThreadPoolsCollector mockNodeUsageStatsForThreadPoolsCollector = spy( - new StubNodeUsageStatsForThreadPoolsCollector() + final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector = spy( + new NodeUsageStatsForThreadPoolsCollector() ); final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService( settings, @@ -93,8 +96,20 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { threadPool, client, mockEstimatedHeapUsageCollector, - mockNodeUsageStatsForThreadPoolsCollector + nodeUsageStatsForThreadPoolsCollector ); + final NodeUsageStatsForThreadPoolsMonitor usageMonitor = spy( + new NodeUsageStatsForThreadPoolsMonitor( + clusterService.getClusterSettings(), + threadPool.relativeTimeInMillisSupplier(), + clusterService::state, + new RerouteService() { + @Override + public void reroute(String reason, Priority priority, ActionListener listener) {} + } + ) + ); + clusterInfoService.addListener(usageMonitor::onNewInfo); clusterService.addListener(clusterInfoService); clusterInfoService.addListener(ignored -> {}); @@ -131,14 +146,16 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { for (int i = 0; i < 3; i++) { Mockito.clearInvocations(mockEstimatedHeapUsageCollector); - Mockito.clearInvocations(mockNodeUsageStatsForThreadPoolsCollector); + Mockito.clearInvocations(nodeUsageStatsForThreadPoolsCollector); + Mockito.clearInvocations(usageMonitor); final int initialRequestCount = client.requestCount; final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); runFor(deterministicTaskQueue, duration); deterministicTaskQueue.runAllRunnableTasks(); assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval - verify(mockNodeUsageStatsForThreadPoolsCollector).collectUsageStats(any()); + verify(nodeUsageStatsForThreadPoolsCollector).collectUsageStats(any(), any(), any()); + verify(usageMonitor).onNewInfo(any()); } final AtomicBoolean failMaster2 = new AtomicBoolean(); @@ -163,17 +180,6 @@ public void collectClusterHeapUsage(ActionListener> listener) } } - /** - * Simple for test {@link NodeUsageStatsForThreadPoolsCollector} implementation that returns an empty map of nodeId string to - * {@link NodeUsageStatsForThreadPools}. - */ - private static class StubNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { - @Override - public void collectUsageStats(ActionListener> listener) { - listener.onResponse(Map.of()); - } - } - private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; while (deterministicTaskQueue.getCurrentTimeMillis() < endTime