Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the capability to turboload segments onto historicals #17775

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

adarshsanjeev
Copy link
Contributor

Add the capability to historicals to focus on loading segments at the cost of query performance

Context
One of the things that take the longest time when the historical starts up is the time required to segment loading. This includes the time for fetching the segments from deep storage, unzipping and loading it into memory. This can be improved by having a larger number of threads to speed this up.

Release Notes

  • Added a new dynamic coordinator configuration, turboLoadHistoricals. Historicals in this list will load using their bootstrap threadpool. This will load the segments faster at the cost of query performance.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@maytasm
Copy link
Contributor

maytasm commented Mar 4, 2025

One thing I noticed is that when you have the cluster starting up, the historical will not / can not serve any query until all the segments are loaded (I think there is a config for this which is enabled by default). In this case, we should always use turboLoadHistoricals (by default), since the historical cannot serve query anyway.

@kfaraz
Copy link
Contributor

kfaraz commented Mar 4, 2025

One thing I noticed is that when you have the cluster starting up, the historical will not / can not serve any query until all the segments are loaded (I think there is a config for this which is enabled by default). In this case, we should always use turboLoadHistoricals (by default), since the historical cannot serve query anyway.

@maytasm , IIUC, you are referring to the bootstrap segments, which includes broadcast segments and segments already present on the historical disk.
For these bootstrap segments, we already use the turbo loading thread pool.

The change here allows to optionally put a historical in turbo mode to load non-bootstrap segments, i.e. segments assigned later by the coordinator.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the changes, @adarshsanjeev ! This will be very helpful for cluster upgrades.

I have done a surface review and left some feedback.
Will take another deeper look today.

@FrankChen021
Copy link
Member

One thing I noticed is that when you have the cluster starting up, the historical will not / can not serve any query until all the segments are loaded (I think there is a config for this which is enabled by default). In this case, we should always use turboLoadHistoricals (by default), since the historical cannot serve query anyway.

@maytasm , IIUC, you are referring to the bootstrap segments, which includes broadcast segments and segments already present on the historical disk. For these bootstrap segments, we already use the turbo loading thread pool.

The change here allows to optionally put a historical in turbo mode to load non-bootstrap segments, i.e. segments assigned later by the coordinator.

But the description(the Context section) of this PR is different from what the changes really do. I think we need to update the description of the PR to eliminate misleading.

@@ -91,6 +94,10 @@ public SegmentLoadDropHandler(
Executors.newScheduledThreadPool(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
),
Executors.newScheduledThreadPool(
config.getNumBootstrapThreads(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this will create a threadpool with a corepool of getNumBootstrapThreads ; if those threads are not doing anything it will just hold on to the stack memory that needs

I wonder if this feature does need a separate executor - does that really have to be running with non-zero corepoolsize all the time?

Comment on lines 94 to 96
Executors.newScheduledThreadPool(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its unclear to me why the need for a second executor pool; it would be possibly better to use a more aggressively tuned threadpool first and then go back to using a more conservative one.

The usage of Executors.newScheduledThreadPool is kinda unfortunate as it will retain all these threads forever; a ThreadPoolExecutor with allowCoreThreadTimeOut could go back to 0 threads if not in use

Copy link
Member

@vtlim vtlim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments on the docs

@@ -885,7 +885,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.kill.maxInterval`|The largest interval, as an [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations), of segments to delete per kill task. Set to zero, e.g. `PT0S`, for unlimited. This only applies when `druid.coordinator.kill.on=true`.|`P30D`|
|`druid.coordinator.balancer.strategy`|The [balancing strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the Coordinator to distribute segments among the Historical servers in a tier. The `cost` strategy distributes segments by minimizing a cost function, `diskNormalized` weights these costs with the disk usage ratios of the servers and `random` distributes segments randomly.|`cost`|
|`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay (in milliseconds) for the load queue peon, which manages the load/drop queue of segments for any server.|1 minute|
|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than `druid.segmentCache.numLoadingThreads` config on Historical service.|1|
|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than `druid.segmentCache.numLoadingThreads` config on Historical service. If the value is not provided, automatically sets the value to the `numLoadingThreads` on the historical. | `druid.segmentCache.numLoadingThreads` |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If numLoadingThreads will be the default, does the previous sentence need to change to say "smaller than or equal to"?

Also need to capitalize Historical.

@@ -953,6 +953,7 @@ The following table shows the dynamic configuration properties for the Coordinat
|`decommissioningNodes`|List of Historical servers to decommission. Coordinator will not assign new segments to decommissioning servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `maxSegmentsToMove`.|none|
|`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of newly unused or overshadowed segments, matching and execution of load/drop rules for used segments, unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS name nodes with downtime and don't want the Coordinator to be directing Historical nodes to hit the name node with API requests until maintenance is done and the deep store is declared healthy for use again.|false|
|`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issue drop requests if the segment is over-replicated.|false|
|`turboLoadingNodes`|List of Historical servers to place in turbo loading mode. This causes the historical to load segments faster at the cost of query performance. For any performance increase, the runtime parameter `druid.coordinator.loadqueuepeon.http.batchSize` must not be configured. |none|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|`turboLoadingNodes`|List of Historical servers to place in turbo loading mode. This causes the historical to load segments faster at the cost of query performance. For any performance increase, the runtime parameter `druid.coordinator.loadqueuepeon.http.batchSize` must not be configured. |none|
|`turboLoadingNodes`|List of Historical servers to place in turbo loading mode. These Historicals will load segments more quickly but at the cost of query performance. For any performance increase, don't configure the runtime parameter `druid.coordinator.loadqueuepeon.http.batchSize`. |none|

@@ -953,6 +953,7 @@ The following table shows the dynamic configuration properties for the Coordinat
|`decommissioningNodes`|List of Historical servers to decommission. Coordinator will not assign new segments to decommissioning servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `maxSegmentsToMove`.|none|
|`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of newly unused or overshadowed segments, matching and execution of load/drop rules for used segments, unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS name nodes with downtime and don't want the Coordinator to be directing Historical nodes to hit the name node with API requests until maintenance is done and the deep store is declared healthy for use again.|false|
|`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issue drop requests if the segment is over-replicated.|false|
|`turboLoadingNodes`|List of Historical servers to place in turbo loading mode. This causes the historical to load segments faster at the cost of query performance. For any performance increase, the runtime parameter `druid.coordinator.loadqueuepeon.http.batchSize` must not be configured. |none|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not immediately clear how turboLoadingNodes relates to batchSize. Does it mean that configuring batchSize with turboLoadingNodes impacts query performance worse than just turbo mode? Or does it mean that not configuring the batch size will lead to query performance increase when in turbo mode?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants