-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Huge Number of Watches in ZooKeeper #17482
base: master
Are you sure you want to change the base?
Conversation
* Tear down nodeAnnouncer * Remove useless Logger and ExecutorService * Init CuratorListener by lambda * Improve explicit type * Using CuratorMultiTransaction instead of CuratorTransaction * Add @GuardedBy("toAnnounce") for toUpdate field * Improve docs
Ready for review, PTAL @kaijianding @leventov @jihoonson @asdf2014 @gianm |
I did some benchmarking on a single cluster, deploying two druid instances into two namespaces. We can see a lowered memory usage in ZooKeeper after the change. I conducted the benchmark by submitting 5 instances of the IngestionQueryingI ran a python script to conduct 10 groups of queries on the ingested datasources. Each group of queries are repeated 500 times. To simulate a concurrent querying environment, I allocated 9 threads to send these queries. |
This looks quite interesting. How has your experience been using this? |
hi @cryptoe, I have been using this for a week, I have seen an overall of 6% decrease in memory usage for the entire cluster. In terms of CPU usage, there is an 7% increase. This may be because I am no longer using a Druid cluster with lots of ZooKeeper activity anymore. Both streaming and batch tasks are ingesting properly after the change. I can work towards creating a feature flag for this so people with the resources can also try it out. 😄 |
// I don't have a watcher on this path yet, create a Map and start watching. | ||
announcements.putIfAbsent(parentPath, new ConcurrentHashMap<>()); | ||
|
||
// Guaranteed to be non-null, but might be a map put in there by another thread. | ||
// Guaranteed to be non-null, but might be a map put in here by another thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: this seems like the usecase of computeIfAbsent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored.
@GuardedBy("toAnnounce") | ||
private NodeCache setupNodeCache(String path) | ||
{ | ||
final NodeCache cache = new NodeCache(curator, path, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NodeCache
seems to be deprecated ; could we start using a non-deprecated api - or is there a reason to use the deprecated one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with the more recent CuratorCache implementation.
I have tried to update PathChildrenCache to CuratorCache too, but I have problem passing the integration tests when doing so. Since this PR focuses on NodeAnnouncer, I feel we can leave this for later.
server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java
Show resolved
Hide resolved
import org.junit.After; | ||
import org.junit.Assert; | ||
import org.junit.Before; | ||
import org.junit.Test; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please don't use junit4 for newly added tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with JUnit5
public static String getParentPath(String path) | ||
{ | ||
return ZKPaths.getPathAndNode(path).getPath(); | ||
} | ||
|
||
public static String getParentNode(String path) | ||
{ | ||
return ZKPaths.getPathAndNode(path).getNode(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure about the value of these methods...they could be inlined - so that the ZK api is used directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed ZKPathsUtil
final String parentPath = ZKPathsUtils.getParentPath(path); | ||
byte[] announcedPayload = announcedPaths.get(path); | ||
|
||
// If announcedPayload is null, this means that we have yet to announce this path. | ||
// There is a possibility that the parent paths do not exist, so we check if we need to create the parent path first. | ||
if (announcedPayload == null) { | ||
boolean buildParentPath = false; | ||
try { | ||
buildParentPath = curator.checkExists().forPath(parentPath) == null; | ||
} | ||
catch (Exception e) { | ||
log.warn(e, "Failed to check existence of parent path. Proceeding without creating parent path."); | ||
} | ||
|
||
// Synchronize to make sure that I only create a listener once. | ||
synchronized (toAnnounce) { | ||
if (!listeners.containsKey(path)) { | ||
final NodeCache cache = setupNodeCache(path); | ||
|
||
if (started) { | ||
if (buildParentPath) { | ||
createPath(parentPath, removeParentIfCreated); | ||
} | ||
startCache(cache); | ||
listeners.put(path, cache); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you pack this into a method like ensureParentPathExists
or something?
I find the Exception
handling pretty strange here and in the createPath
method
don't really understand why continue if curator.checkExists().forPath(parentPath)
returns some exception ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I referred to #195 when writing this. Seems like the exception does not affect the functionality, so I retained the behaviour from Announcer
.
{ | ||
synchronized (toAnnounce) { | ||
if (!started) { | ||
return false; // Do nothing if not started |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could the following lead to lost events:
- L212
started
is true stop()
is called -> sets started to false- L293 return false
is this a valid usecase? can an announcer started/stopped multiple times?
(I think not) ; so it might have been better to just throw an error here; or remove it entirely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made necessary changes.
createPath(parentPath, removeParentIfCreated); | ||
} | ||
startCache(cache); | ||
listeners.put(path, cache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is an upper limit on the number of cache
-s ; could that cause any problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we will reach (2^32) - 1 elements. Besides, this PR aims to reduce memory consumption, so we should be safe.
final byte[] updatedAnnouncementData = announcedPaths.compute(path, (key, oldBytes) -> { | ||
if (oldBytes == null) { | ||
return bytes; // Insert the new value | ||
} else if (!Arrays.equals(oldBytes, bytes)) { | ||
throw new IAE("Cannot reannounce different values under the same path."); | ||
} | ||
return oldBytes; // No change if values are equal | ||
}); | ||
|
||
// Return true if we have updated the paths. | ||
return Arrays.equals(updatedAnnouncementData, bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: this seems like an exception or true
currentBytes =computeIfAbsent(path, () -> bytes)
if(Arrays.equals(oldBytes, bytes)) {
return;
}
throw ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
Hi @kgyrtkirk, I have made changes according to your suggestions. The PR description is edited accordingly. |
Fixes #6647
Description
This PR is built upon #6683 and #9172 and aims to reduce the number of ZooKeeper watch counts.
Fixed Huge Number of Watches in ZooKeeper
The current
Announcer.java
leverages on Apache Curator's PathChildrenCache. In its present form, the announcement mechanism watches the immediate parent of the specified path. This results in all child nodes under the parent path being monitored by the ZooKeeper ensemble, including sibling nodes and children of the specified path. This causes an unnecessarily large number of ZooKeeper watches to be produced.The new
NodeAnnouncer.java
class is simplyAnnouncer.java
but leverages on NodeCache instead to watch a single node during announcement. By eliminating the watches on child nodes, this approach significantly reduces the total number of watch counts in ZooKeeper. Users can opt-in to use the newNodeAnnouncer
by setting toggling the feature flagdruid.zk.service.pathChildrenCacheStrategy=false
.Tests conducted on the production server also indicate a decrease in watch counts resulting from this change.
Note:
The use of the two different announcer classes simultaneously may result in a
KeeperException.NotEmptyException
. This happens when two nodes are sharing the same parent, and since both announcers do not have a full picture of the nodes it is watching, the exception will be thrown when the following occurs:Announcer
removes all of its tracked children nodes.Announcer
tries to remove the parent node.NodeAnnouncer
is still watching one or more child node, the attempt byAnnouncer
in removing the parent node will result in the exception.Documentation
NodeAnnouncer
.Refactoring
Announceable
class out ofAnnouncer
.Announcer.java
toPathChildrenAnnouncer.java
Announcer
interface to facilitate dependency injection for different flavours of caching strategies.ZKPathsUtils.java
to abstract the retrieval of ZooKeeper path and ZooKeeper node.Release note
New: A new opt-in caching strategy is provided that uses a much smaller number of ZooKeeper watches for service announcement.
Key changed/added classes in this PR
Announcer.java
->PathChildrenAnnouncer.java
Announcer.java
interface (new)NodeAnnouncer.java
Announceable.java
AnnouncerModule.java
CuratorConfig.java
DirectExecutorAnnouncer
&SingleThreadedAnnouncer
annotations for Guice.docs/configuration/index.md
&.spelling
for docs.This PR has: