diff --git a/CHANGELOG.md b/CHANGELOG.md index 40c7564860066..e0e0e80f4a340 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Onboarding new maven snapshots publishing to s3 ([#19619](https://github.com/opensearch-project/OpenSearch/pull/19619)) - Remove MultiCollectorWrapper and use MultiCollector in Lucene instead ([#19595](https://github.com/opensearch-project/OpenSearch/pull/19595)) - Change implementation for `percentiles` aggregation for latency improvement ([#19648](https://github.com/opensearch-project/OpenSearch/pull/19648)) +- Migrate gRPC transport executor from FixedExecutorBuilder to ForkJoinPoolExecutorBuilder for improved performance ([#19370](https://github.com/opensearch-project/OpenSearch/issues/19370)) ### Fixed - Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012)) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java index 47db838ef4fb4..511ba833ac1a7 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java @@ -28,7 +28,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ExecutorBuilder; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.AuxTransport; import org.opensearch.transport.client.Client; @@ -93,7 +93,8 @@ public GrpcPlugin() {} /** * Loads extensions from other plugins. - * This method is called by the OpenSearch plugin system to load extensions from other plugins. + * This method is called by the OpenSearch plugin system to load extensions from + * other plugins. * * @param loader The extension loader to use for loading extensions */ @@ -140,7 +141,8 @@ public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) { } } - // Sort by order and create a chain - similar to OpenSearch's ActionFilter pattern + // Sort by order and create a chain - similar to OpenSearch's ActionFilter + // pattern orderedList.sort(Comparator.comparingInt(OrderedGrpcInterceptor::order)); if (!orderedList.isEmpty()) { @@ -179,12 +181,12 @@ public AbstractQueryBuilderProtoUtils getQueryUtils() { * Provides auxiliary transports for the plugin. * Creates and returns a map of transport names to transport suppliers. * - * @param settings The node settings - * @param threadPool The thread pool + * @param settings The node settings + * @param threadPool The thread pool * @param circuitBreakerService The circuit breaker service - * @param networkService The network service - * @param clusterSettings The cluster settings - * @param tracer The tracer + * @param networkService The network service + * @param clusterSettings The cluster settings + * @param tracer The tracer * @return A map of transport names to transport suppliers * @throws IllegalStateException if queryRegistry is not initialized */ @@ -218,14 +220,15 @@ public Map> getAuxTransports( /** * Provides secure auxiliary transports for the plugin. * Registered under a distinct key from gRPC transport. - * Consumes pluggable security settings as provided by a SecureAuxTransportSettingsProvider. + * Consumes pluggable security settings as provided by a + * SecureAuxTransportSettingsProvider. * - * @param settings The node settings - * @param threadPool The thread pool - * @param circuitBreakerService The circuit breaker service - * @param networkService The network service - * @param clusterSettings The cluster settings - * @param tracer The tracer + * @param settings The node settings + * @param threadPool The thread pool + * @param circuitBreakerService The circuit breaker service + * @param networkService The network service + * @param clusterSettings The cluster settings + * @param tracer The tracer * @param secureAuxTransportSettingsProvider provides ssl context params * @return A map of transport names to transport suppliers * @throws IllegalStateException if queryRegistry is not initialized @@ -303,31 +306,32 @@ public List> getSettings() { * Returns the executor builders for this plugin's custom thread pools. * Creates a dedicated thread pool for gRPC request processing that integrates * with OpenSearch's thread pool monitoring and management system. + * Uses ForkJoinPool for improved performance through work-stealing and better + * load balancing. * * @param settings the current settings * @return executor builders for this plugin's custom thread pools */ @Override public List> getExecutorBuilders(Settings settings) { - final int executorCount = SETTING_GRPC_EXECUTOR_COUNT.get(settings); - return List.of( - new FixedExecutorBuilder(settings, GRPC_THREAD_POOL_NAME, executorCount, 1000, "thread_pool." + GRPC_THREAD_POOL_NAME) - ); + final int parallelism = SETTING_GRPC_EXECUTOR_COUNT.get(settings); + return List.of(new ForkJoinPoolExecutorBuilder(GRPC_THREAD_POOL_NAME, parallelism, "thread_pool." + GRPC_THREAD_POOL_NAME)); } /** * Creates components used by the plugin. - * Stores the client for later use in creating gRPC services, and the query registry which registers the types of supported GRPC Search queries. + * Stores the client for later use in creating gRPC services, and the query + * registry which registers the types of supported GRPC Search queries. * - * @param client The client - * @param clusterService The cluster service - * @param threadPool The thread pool - * @param resourceWatcherService The resource watcher service - * @param scriptService The script service - * @param xContentRegistry The named content registry - * @param environment The environment - * @param nodeEnvironment The node environment - * @param namedWriteableRegistry The named writeable registry + * @param client The client + * @param clusterService The cluster service + * @param threadPool The thread pool + * @param resourceWatcherService The resource watcher service + * @param scriptService The script service + * @param xContentRegistry The named content registry + * @param environment The environment + * @param nodeEnvironment The node environment + * @param namedWriteableRegistry The named writeable registry * @param indexNameExpressionResolver The index name expression resolver * @param repositoriesServiceSupplier The repositories service supplier * @return A collection of components @@ -373,7 +377,8 @@ public Collection createComponents( } logger.info("Successfully injected registry and registered all {} external converters", queryConverters.size()); - // Update the registry on all converters (including built-in ones) so they can access external converters + // Update the registry on all converters (including built-in ones) so they can + // access external converters queryRegistry.updateRegistryOnAllConverters(); logger.info("Updated registry on all converters to include external converters"); } else { diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java index fd3feb8a38fa3..bd7b073131d78 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java @@ -16,7 +16,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ExecutorBuilder; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.grpc.ssl.NettyGrpcClient; import org.hamcrest.MatcherAssert; @@ -45,10 +45,10 @@ public void setup() { // Create a ThreadPool with the gRPC executor Settings settings = Settings.builder() - .put("node.name", "test-node") - .put(Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT.getKey(), 4) - .build(); - ExecutorBuilder grpcExecutorBuilder = new FixedExecutorBuilder(settings, "grpc", 4, 1000, "thread_pool.grpc"); + .put("node.name", "test-node") + .put(Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT.getKey(), 4) + .build(); + ExecutorBuilder grpcExecutorBuilder = new ForkJoinPoolExecutorBuilder("grpc", 4, "thread_pool.grpc"); threadPool = new ThreadPool(settings, grpcExecutorBuilder); services = List.of(); @@ -62,7 +62,8 @@ public void cleanup() { } public void testBasicStartAndStop() { - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, + networkService, threadPool)) { transport.start(); MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray())); @@ -73,7 +74,8 @@ public void testBasicStartAndStop() { } public void testGrpcTransportHealthcheck() { - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, + networkService, threadPool)) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.getBoundAddress().boundAddresses()); try (NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(remoteAddress).build()) { @@ -86,7 +88,8 @@ public void testGrpcTransportHealthcheck() { } public void testGrpcTransportListServices() { - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, + networkService, threadPool)) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.getBoundAddress().boundAddresses()); try (NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(remoteAddress).build()) { @@ -100,15 +103,18 @@ public void testGrpcTransportListServices() { public void testWithCustomPort() { // Create settings with a specific port - Settings settings = Settings.builder().put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), "9000-9010").build(); + Settings settings = Settings.builder().put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), "9000-9010") + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray())); TransportAddress publishAddress = transport.getBoundAddress().publishAddress(); assertNotNull(publishAddress.address()); - assertTrue("Port should be in the specified range", publishAddress.getPort() >= 9000 && publishAddress.getPort() <= 9010); + assertTrue("Port should be in the specified range", + publishAddress.getPort() >= 9000 && publishAddress.getPort() <= 9010); transport.stop(); } @@ -117,11 +123,12 @@ public void testWithCustomPort() { public void testWithCustomPublishPort() { // Create settings with a specific publish port Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_PORT.getKey(), 9000) - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_PORT.getKey(), 9000) + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray())); @@ -136,21 +143,21 @@ public void testWithCustomPublishPort() { public void testWithCustomHost() { // Create settings with a specific host Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_HOST.getKey(), "127.0.0.1") - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_HOST.getKey(), "127.0.0.1") + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray())); TransportAddress publishAddress = transport.getBoundAddress().publishAddress(); assertNotNull(publishAddress.address()); assertEquals( - "Host should match the specified value", - "127.0.0.1", - InetAddresses.toAddrString(publishAddress.address().getAddress()) - ); + "Host should match the specified value", + "127.0.0.1", + InetAddresses.toAddrString(publishAddress.address().getAddress())); transport.stop(); } @@ -159,21 +166,21 @@ public void testWithCustomHost() { public void testWithCustomBindHost() { // Create settings with a specific bind host Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST.getKey(), "127.0.0.1") - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST.getKey(), "127.0.0.1") + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray())); TransportAddress boundAddress = transport.getBoundAddress().boundAddresses()[0]; assertNotNull(boundAddress.address()); assertEquals( - "Bind host should match the specified value", - "127.0.0.1", - InetAddresses.toAddrString(boundAddress.address().getAddress()) - ); + "Bind host should match the specified value", + "127.0.0.1", + InetAddresses.toAddrString(boundAddress.address().getAddress())); transport.stop(); } @@ -182,21 +189,21 @@ public void testWithCustomBindHost() { public void testWithCustomPublishHost() { // Create settings with a specific publish host Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_HOST.getKey(), "127.0.0.1") - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_HOST.getKey(), "127.0.0.1") + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray())); TransportAddress publishAddress = transport.getBoundAddress().publishAddress(); assertNotNull(publishAddress.address()); assertEquals( - "Publish host should match the specified value", - "127.0.0.1", - InetAddresses.toAddrString(publishAddress.address().getAddress()) - ); + "Publish host should match the specified value", + "127.0.0.1", + InetAddresses.toAddrString(publishAddress.address().getAddress())); transport.stop(); } @@ -205,11 +212,12 @@ public void testWithCustomPublishHost() { public void testWithCustomWorkerCount() { // Create settings with a specific worker count Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT.getKey(), 4) - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT.getKey(), 4) + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray())); @@ -222,11 +230,12 @@ public void testWithCustomWorkerCount() { public void testWithCustomExecutorCount() { // Create settings with a specific executor count Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT.getKey(), 8) - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT.getKey(), 8) + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray())); @@ -236,7 +245,8 @@ public void testWithCustomExecutorCount() { ExecutorService executor = transport.getGrpcExecutorForTesting(); assertNotNull("gRPC executor should be created", executor); // Note: The executor is now managed by OpenSearch's ThreadPool system - // We can't easily verify the thread count as it's encapsulated within OpenSearch's executor implementation + // We can't easily verify the thread count as it's encapsulated within + // OpenSearch's executor implementation transport.stop(); } @@ -247,13 +257,15 @@ public void testDefaultExecutorCount() { Settings settings = createSettings(); int expectedExecutorCount = OpenSearchExecutors.allocatedProcessors(settings) * 2; - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); ExecutorService executor = transport.getGrpcExecutorForTesting(); assertNotNull("gRPC executor should be created", executor); // Note: The executor is now managed by OpenSearch's ThreadPool system - // The actual thread count is configured via the FixedExecutorBuilder in the test setup + // The actual parallelism is configured via the ForkJoinPoolExecutorBuilder in + // the test setup transport.stop(); } @@ -262,11 +274,12 @@ public void testDefaultExecutorCount() { public void testSeparateEventLoopGroups() { // Test that boss and worker event loop groups are separate Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT.getKey(), 4) - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT.getKey(), 4) + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); EventLoopGroup bossGroup = transport.getBossEventLoopGroupForTesting(); @@ -284,7 +297,8 @@ public void testExecutorShutdownOnStop() { // Test that executor is properly shutdown when transport stops Settings settings = createSettings(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool); transport.start(); ExecutorService executor = transport.getGrpcExecutorForTesting(); @@ -292,7 +306,8 @@ public void testExecutorShutdownOnStop() { assertFalse("Executor should not be shutdown initially", executor.isShutdown()); transport.stop(); - // Note: The executor is managed by OpenSearch's ThreadPool and is not shutdown when transport stops + // Note: The executor is managed by OpenSearch's ThreadPool and is not shutdown + // when transport stops assertNotNull("Executor should still exist after transport stop", executor); transport.close(); @@ -302,7 +317,8 @@ public void testEventLoopGroupsShutdownOnStop() { // Test that event loop groups are properly shutdown when transport stops Settings settings = createSettings(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool); transport.start(); EventLoopGroup bossGroup = transport.getBossEventLoopGroupForTesting(); @@ -325,41 +341,45 @@ public void testEventLoopGroupsShutdownOnStop() { public void testSettingsValidation() { // Test that invalid settings are handled properly Settings invalidSettings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT.getKey(), 0) // Invalid: should be >= 1 - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT.getKey(), 0) // Invalid: should be >= 1 + .build(); expectThrows( - IllegalArgumentException.class, - () -> { new Netty4GrpcServerTransport(invalidSettings, services, networkService, threadPool); } - ); + IllegalArgumentException.class, + () -> { + new Netty4GrpcServerTransport(invalidSettings, services, networkService, threadPool); + }); } public void testExecutorCountSettingsValidation() { // Test that invalid executor count settings are handled properly Settings invalidSettings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT.getKey(), 0) // Invalid: should be >= 1 - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT.getKey(), 0) // Invalid: should be >= 1 + .build(); expectThrows( - IllegalArgumentException.class, - () -> { new Netty4GrpcServerTransport(invalidSettings, services, networkService, threadPool); } - ); + IllegalArgumentException.class, + () -> { + new Netty4GrpcServerTransport(invalidSettings, services, networkService, threadPool); + }); } public void testStartFailureTriggersCleanup() { // Create a transport that will fail to start Settings settingsWithInvalidPort = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), "999999") // Invalid port - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), "999999") // Invalid port + .build(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settingsWithInvalidPort, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settingsWithInvalidPort, services, + networkService, threadPool); // Start should fail expectThrows(Exception.class, transport::start); - // Resources should be cleaned up after failure - the implementation calls doStop() in the finally block + // Resources should be cleaned up after failure - the implementation calls + // doStop() in the finally block ExecutorService executor = transport.getGrpcExecutorForTesting(); EventLoopGroup bossGroup = transport.getBossEventLoopGroupForTesting(); EventLoopGroup workerGroup = transport.getWorkerEventLoopGroupForTesting(); @@ -381,7 +401,8 @@ public void testStartFailureTriggersCleanup() { public void testInterruptedShutdownHandling() throws InterruptedException { Settings settings = createSettings(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool); transport.start(); @@ -400,11 +421,12 @@ public void testInterruptedShutdownHandling() throws InterruptedException { public void testInvalidHostBinding() { // Test with invalid bind host to trigger host resolution error Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .put(Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST.getKey(), "invalid.host.that.does.not.exist") - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .put(Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST.getKey(), "invalid.host.that.does.not.exist") + .build(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool); // Start should fail due to host resolution failure expectThrows(Exception.class, transport::start); @@ -415,11 +437,12 @@ public void testInvalidHostBinding() { public void testPublishPortResolutionFailure() { // Create settings that will cause publish port resolution to fail Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), "0") // Dynamic port - .put(Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_PORT.getKey(), "65536") // Invalid publish port - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), "0") // Dynamic port + .put(Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_PORT.getKey(), "65536") // Invalid publish port + .build(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool); // Start should fail due to publish port resolution expectThrows(Exception.class, transport::start); @@ -430,11 +453,12 @@ public void testPublishPortResolutionFailure() { public void testMultipleBindAddresses() { // Test binding to multiple localhost addresses Settings settings = Settings.builder() - .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) - .putList(Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST.getKey(), "127.0.0.1", "localhost") - .build(); + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()) + .putList(Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST.getKey(), "127.0.0.1", "localhost") + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); BoundTransportAddress boundAddress = transport.getBoundAddress(); @@ -447,7 +471,8 @@ public void testMultipleBindAddresses() { public void testShutdownTimeoutHandling() throws InterruptedException { Settings settings = createSettings(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool); transport.start(); @@ -463,7 +488,8 @@ public void testShutdownTimeoutHandling() throws InterruptedException { // Normal shutdown should work transport.stop(); - // Verify everything is shutdown (except executor which is managed by OpenSearch's ThreadPool) + // Verify everything is shutdown (except executor which is managed by + // OpenSearch's ThreadPool) assertNotNull("Executor should still exist", executor); assertTrue("Boss group should be shutdown", bossGroup.isShutdown()); assertTrue("Worker group should be shutdown", workerGroup.isShutdown()); @@ -473,12 +499,14 @@ public void testShutdownTimeoutHandling() throws InterruptedException { public void testResourceCleanupOnClose() { Settings settings = createSettings(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool); transport.start(); transport.stop(); - // doClose should handle cleanup gracefully even if resources are already shutdown + // doClose should handle cleanup gracefully even if resources are already + // shutdown transport.close(); // Multiple closes should be safe @@ -487,9 +515,11 @@ public void testResourceCleanupOnClose() { public void testPortRangeHandling() { // Test with a port range - Settings settings = Settings.builder().put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), "9300-9400").build(); + Settings settings = Settings.builder().put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), "9300-9400") + .build(); - try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool)) { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool)) { transport.start(); BoundTransportAddress boundAddress = transport.getBoundAddress(); @@ -505,7 +535,8 @@ public void testPortRangeHandling() { public void testGracefulShutdownWithException() { // Test that exceptions during shutdown don't prevent cleanup Settings settings = createSettings(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool); transport.start(); @@ -536,7 +567,8 @@ public void testGracefulShutdownWithException() { public void testCloseWithNullResources() { // Test that close() handles null resources gracefully Settings settings = createSettings(); - Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, threadPool); + Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, + threadPool); // Don't start the transport, so resources should be null assertNull("Boss group should be null before start", transport.getBossEventLoopGroupForTesting()); @@ -552,6 +584,7 @@ public void testCloseWithNullResources() { } private static Settings createSettings() { - return Settings.builder().put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()).build(); + return Settings.builder() + .put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), OpenSearchTestCase.getPortRange()).build(); } } diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java index cf35ad449c234..31ceb9e70f8d8 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java @@ -13,7 +13,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ExecutorBuilder; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.grpc.interceptor.GrpcInterceptorChain; import org.junit.After; @@ -50,7 +50,7 @@ public void setup() { // Create a ThreadPool with the gRPC executor Settings settings = Settings.builder().put("node.name", "test-node").put("grpc.netty.executor_count", 4).build(); - ExecutorBuilder grpcExecutorBuilder = new FixedExecutorBuilder(settings, "grpc", 4, 1000, "thread_pool.grpc"); + ExecutorBuilder grpcExecutorBuilder = new ForkJoinPoolExecutorBuilder("grpc", 4, "thread_pool.grpc"); threadPool = new ThreadPool(settings, grpcExecutorBuilder); serverInterceptor = new GrpcInterceptorChain(Collections.emptyList()); }