-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce node level circuit breaker settings for k-NN #2509
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Mark Wu <[email protected]>
*/ | ||
private String getKnnCircuitBreakerLimitForNode(Settings settings) { | ||
// Get this node's circuit breaker tier attribute | ||
String tierAttribute = clusterService.localNode().getAttributes().get(KNN_CIRCUIT_BREAKER_TIER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running into an issue very similar to #2223 for the build failures
java.lang.AssertionError: should not be called by a cluster state applier. reason [the applied cluster state is not yet available]
» at org.opensearch.cluster.service.ClusterApplierService.assertNotCalledFromClusterStateApplier(ClusterApplierService.java:443)
» at org.opensearch.cluster.service.ClusterApplierService.state(ClusterApplierService.java:229)
» at org.opensearch.cluster.service.ClusterService.state(ClusterService.java:182)
» at org.opensearch.cluster.service.ClusterService.localNode(ClusterService.java:166)
» at org.opensearch.knn.index.KNNSettings.getKnnCircuitBreakerLimitForNode(KNNSettings.java:652)
Since node attributes are not dynamically configurable currently we can cache the value on node initialization when we refresh the cache. This would prevent the update thread from trying to access the attributes via the cluster state.
Signed-off-by: Mark Wu <[email protected]>
@@ -106,5 +109,26 @@ public void initialize(ThreadPool threadPool, ClusterService clusterService, Cli | |||
} | |||
}; | |||
this.threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueSeconds(CB_TIME_INTERVAL), ThreadPool.Names.GENERIC); | |||
|
|||
// Update when node is fully joined | |||
clusterService.addLifecycleListener(new LifecycleListener() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Node attributes aren't available during the time the cache manager computes its size on initialization. With the additional dependency on node attributes to the circuit breaker we may need to check if the circuit breaker needs to be updated after the node has bootstrapped.
I only see 1 example of a listener being attached in here but attached the listener on KnnCircuitBreaker initialization due needing to refresh the cache to recompute the size and caching the attribute value. If there's any feedback on this that'd be appreciated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach. I think it makes sense.
.forEach( | ||
(limit) -> parseknnMemoryCircuitBreakerValue( | ||
settings.get(limit), | ||
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran into an issue where not having the validation being performed in parseKnnMemoryCircuitBreakerValue
specifically here would result in dropping the cluster on a uncaught exception if the same method was hit in line 633 with a invalid knn cb value. There may be some exception handling happening earlier in the stack that allows an exception to be caught gracefully here doesn't exist later on but to maintain the same behavior we validate the given values before trying to update the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@@ -75,7 +77,8 @@ public class KNNSettings { | |||
public static final String KNN_ALGO_PARAM_EF_SEARCH = "index.knn.algo_param.ef_search"; | |||
public static final String KNN_ALGO_PARAM_INDEX_THREAD_QTY = "knn.algo_param.index_thread_qty"; | |||
public static final String KNN_MEMORY_CIRCUIT_BREAKER_ENABLED = "knn.memory.circuit_breaker.enabled"; | |||
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT = "knn.memory.circuit_breaker.limit"; | |||
public static final String KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT = "knn.memory.circuit_breaker.limit"; | |||
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX = "knn.memory.circuit_breaker.limit."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX = KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT + ".";
@@ -106,5 +109,26 @@ public void initialize(ThreadPool threadPool, ClusterService clusterService, Cli | |||
} | |||
}; | |||
this.threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueSeconds(CB_TIME_INTERVAL), ThreadPool.Names.GENERIC); | |||
|
|||
// Update when node is fully joined | |||
clusterService.addLifecycleListener(new LifecycleListener() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach. I think it makes sense.
@jmazanec15 WDYT? |
if (tierAttribute.isPresent()) { | ||
// Only rebuild the cache if the attribute was present | ||
logger.info( | ||
"[KNN] Node specific circuit breaker " + tierAttribute.get() + " classification found. Rebuilding the cache." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"[KNN] Node specific circuit breaker " + tierAttribute.get() + " classification found. Rebuilding the cache." | |
"[KNN] Node specific circuit breaker {} classification found. Rebuilding the cache.", tierAttribute.get()); |
I believe we have an item in the KNNStats API that indicates when the circuit breaker is triggered. Do we need to modify this at all to accommodate the new node-level circuit breaker and maybe give more details about which circuit breaker is triggered? |
The KNNStats API is reliant on a flag set here in the cache whenever an eviction happens based off of a capacity constraint. There can only be 1 circuit breaker value active at any given moment and the circuit breaker flag will continue to operate as expected here when the cache gets updated with a node level limit and so I don't think we're breaking any of our existing contracts here. The other pending item revolves around allowing more visibility into letting customers know about their current circuit breaker limit and usage. While we don't have an method to extract the currently used circuit breaker limit we can use the existing memory metrics to extrapolate if necessary such as graph memory usage percentage which is already available. However that is something we can consider maybe in the future to add a circuit breaker limit value API to give customers more flexibility |
// Update when node is fully joined | ||
clusterService.addLifecycleListener(new LifecycleListener() { | ||
@Override | ||
public void afterStart() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really node attribute (knn_cb_tier) on node start
Instead, this should fetch attributes dynamically whenever needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I had the attribute fetched dynamically to compute the circuit breaker value whenever we need it. The circuit breaker value is really only used by the CacheManager and I had mentioned earlier that on cache initialization we don't have access to node attributes yet and so there were 2 thoughts that I had with this approach -
-
Not having fetching this attribute on node start means that the cache will continue to maintain the cluster level size until the cache is rebuilt again with a updated value from the attribute. Not doing this on node start would mean this process would have to be happening during some sort of dynamic update of the cache (through a settings update or something else) in which we'll have to pass in the cache and have it rebuild. My idea here is that we set the correct cache size as soon as the node starts to ensure that there's no discrepancy between what users are expecting and what the cache size actually is. Since the circuit breaker here already interacts with the NativeCacheManager I thought it'd be the logical place to insert
-
Another issue is related to the comment I posted. I want to avoid having to reference cluster state configurations during different parts of the lifecycle and just have the value persist through the entirety.
Operating under the assumption that node attributes are considered to be statically defined as of now fetching the value on node start seems to be easier here but also open to suggestions on how we can go about doing this dynamically.
@@ -127,7 +136,7 @@ public synchronized void rebuildCache() { | |||
rebuildCache( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The entire cache is rebuilt every time a node joins, even if the circuit breaker limit remains unchanged. Can we lazily do this that updates only if the limit actually changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand here the cache gets rebuilt in 3 distinct places within the code:
- On initialization
- On a dynamic settings update
- On indexModule
For option 1, when a node joins I assumed this to be a part of the initialization. In that case my thought was that we need to recompute the value of the circuit breaker limit since each the attributes belonging to that node may correspond to a different circuit breaker tier.
As for option 2, existing behavior within k-nn dictates that whenever a dynamic setting gets updated the cache gets rebuilt. This applies to all settings that are dynamically configured. The configuration system within k-nn will throw away no-ops in the updated settings and so each call to this option will guarantee to have one of the dynamically configurations to have changed. A good callout here is that the cache is being rebuilt every time one of these settings have changed (regardless of if the limit has changed) but I see this as modifying existing k-nn behavior which I'd like to minimize outside the scope of my PR. The only change I am making here is modifying how we compute the weight of the cache in the event where it needs to be rebuilt.
The behavior with option 3 doesn't have a relation with our change I believe.
* @return ByteSizeValue representing the new circuit breaker limit to apply, | ||
* or null if no applicable updates found | ||
*/ | ||
private ByteSizeValue getUpdatedCircuitBreakerLimit(Settings updatedCbLimits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updates circuit breaker settings every time the cluster settings change, should we only update when the node-specific tier is modified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe I had addressed this in my comment here. The cache gets rebuilt every time the a dynamic cluster setting changes so I don't see an option to lazily update the based off what setting got updated.
If we look at the current code the weight is already getting recomputed every time any dynamic setting gets refreshed. What I'm doing here is simplifying that logic to recompute the max weight to take into account node level attributes.
Description
KNN plugin currently uses a cluster-wide circuit breaker. This doesn't work as well as it could when nodes have different memory capacities.
Solution
Added node-specific circuit breaker limits using node attributes. This opens up flexibility for heterogenous circuit breaker limits.
Usage
opensearch.yml
:Implementation
groupSetting
for dynamic limit configurationsTesting
Modified KNNCircuitBreakerIT integration tests to include node-level CB.
Used OSB benchmarking to run a modified low load test with/without the node level circuit breaker.
node.attr.knn_cb_tier = 'integ'
to build.gradleInitial state (no limits set):
After setting node limit to 500000kb:
Setting cluster limit to 5kb didn't affect node with specific limit which confirms proper override behavior.
Related Issues
Resolves #2263
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.