-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Migrate gRPC transport executor to ForkJoinPool #19685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,7 @@ | |
| import org.opensearch.script.ScriptService; | ||
| import org.opensearch.telemetry.tracing.Tracer; | ||
| import org.opensearch.threadpool.ExecutorBuilder; | ||
| import org.opensearch.threadpool.FixedExecutorBuilder; | ||
| import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder; | ||
| import org.opensearch.threadpool.ThreadPool; | ||
| import org.opensearch.transport.AuxTransport; | ||
| import org.opensearch.transport.client.Client; | ||
|
|
@@ -93,7 +93,8 @@ public GrpcPlugin() {} | |
|
|
||
| /** | ||
| * Loads extensions from other plugins. | ||
| * This method is called by the OpenSearch plugin system to load extensions from other plugins. | ||
| * This method is called by the OpenSearch plugin system to load extensions from | ||
| * other plugins. | ||
| * | ||
| * @param loader The extension loader to use for loading extensions | ||
| */ | ||
|
|
@@ -140,7 +141,8 @@ public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) { | |
| } | ||
| } | ||
|
|
||
| // Sort by order and create a chain - similar to OpenSearch's ActionFilter pattern | ||
| // Sort by order and create a chain - similar to OpenSearch's ActionFilter | ||
| // pattern | ||
| orderedList.sort(Comparator.comparingInt(OrderedGrpcInterceptor::order)); | ||
|
|
||
| if (!orderedList.isEmpty()) { | ||
|
|
@@ -179,12 +181,12 @@ public AbstractQueryBuilderProtoUtils getQueryUtils() { | |
| * Provides auxiliary transports for the plugin. | ||
| * Creates and returns a map of transport names to transport suppliers. | ||
| * | ||
| * @param settings The node settings | ||
| * @param threadPool The thread pool | ||
| * @param settings The node settings | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| * @param threadPool The thread pool | ||
| * @param circuitBreakerService The circuit breaker service | ||
| * @param networkService The network service | ||
| * @param clusterSettings The cluster settings | ||
| * @param tracer The tracer | ||
| * @param networkService The network service | ||
| * @param clusterSettings The cluster settings | ||
| * @param tracer The tracer | ||
| * @return A map of transport names to transport suppliers | ||
| * @throws IllegalStateException if queryRegistry is not initialized | ||
| */ | ||
|
|
@@ -218,14 +220,15 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports( | |
| /** | ||
| * Provides secure auxiliary transports for the plugin. | ||
| * Registered under a distinct key from gRPC transport. | ||
| * Consumes pluggable security settings as provided by a SecureAuxTransportSettingsProvider. | ||
| * Consumes pluggable security settings as provided by a | ||
| * SecureAuxTransportSettingsProvider. | ||
| * | ||
| * @param settings The node settings | ||
| * @param threadPool The thread pool | ||
| * @param circuitBreakerService The circuit breaker service | ||
| * @param networkService The network service | ||
| * @param clusterSettings The cluster settings | ||
| * @param tracer The tracer | ||
| * @param settings The node settings | ||
| * @param threadPool The thread pool | ||
| * @param circuitBreakerService The circuit breaker service | ||
| * @param networkService The network service | ||
| * @param clusterSettings The cluster settings | ||
| * @param tracer The tracer | ||
| * @param secureAuxTransportSettingsProvider provides ssl context params | ||
| * @return A map of transport names to transport suppliers | ||
| * @throws IllegalStateException if queryRegistry is not initialized | ||
|
|
@@ -303,31 +306,32 @@ public List<Setting<?>> getSettings() { | |
| * Returns the executor builders for this plugin's custom thread pools. | ||
| * Creates a dedicated thread pool for gRPC request processing that integrates | ||
| * with OpenSearch's thread pool monitoring and management system. | ||
| * Uses ForkJoinPool for improved performance through work-stealing and better | ||
| * load balancing. | ||
| * | ||
| * @param settings the current settings | ||
| * @return executor builders for this plugin's custom thread pools | ||
| */ | ||
| @Override | ||
| public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) { | ||
| final int executorCount = SETTING_GRPC_EXECUTOR_COUNT.get(settings); | ||
| return List.of( | ||
| new FixedExecutorBuilder(settings, GRPC_THREAD_POOL_NAME, executorCount, 1000, "thread_pool." + GRPC_THREAD_POOL_NAME) | ||
| ); | ||
| final int parallelism = SETTING_GRPC_EXECUTOR_COUNT.get(settings); | ||
| return List.of(new ForkJoinPoolExecutorBuilder(GRPC_THREAD_POOL_NAME, parallelism, "thread_pool." + GRPC_THREAD_POOL_NAME)); | ||
| } | ||
|
|
||
| /** | ||
| * Creates components used by the plugin. | ||
| * Stores the client for later use in creating gRPC services, and the query registry which registers the types of supported GRPC Search queries. | ||
| * Stores the client for later use in creating gRPC services, and the query | ||
| * registry which registers the types of supported GRPC Search queries. | ||
| * | ||
| * @param client The client | ||
| * @param clusterService The cluster service | ||
| * @param threadPool The thread pool | ||
| * @param resourceWatcherService The resource watcher service | ||
| * @param scriptService The script service | ||
| * @param xContentRegistry The named content registry | ||
| * @param environment The environment | ||
| * @param nodeEnvironment The node environment | ||
| * @param namedWriteableRegistry The named writeable registry | ||
| * @param client The client | ||
| * @param clusterService The cluster service | ||
| * @param threadPool The thread pool | ||
| * @param resourceWatcherService The resource watcher service | ||
| * @param scriptService The script service | ||
| * @param xContentRegistry The named content registry | ||
| * @param environment The environment | ||
| * @param nodeEnvironment The node environment | ||
| * @param namedWriteableRegistry The named writeable registry | ||
| * @param indexNameExpressionResolver The index name expression resolver | ||
| * @param repositoriesServiceSupplier The repositories service supplier | ||
| * @return A collection of components | ||
|
|
@@ -373,7 +377,8 @@ public Collection<Object> createComponents( | |
| } | ||
| logger.info("Successfully injected registry and registered all {} external converters", queryConverters.size()); | ||
|
|
||
| // Update the registry on all converters (including built-in ones) so they can access external converters | ||
| // Update the registry on all converters (including built-in ones) so they can | ||
| // access external converters | ||
| queryRegistry.updateRegistryOnAllConverters(); | ||
| logger.info("Updated registry on all converters to include external converters"); | ||
| } else { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.