diff --git a/docs/api-reference/tasks-api.md b/docs/api-reference/tasks-api.md index d94be5b0c5fd..69a0a0153619 100644 --- a/docs/api-reference/tasks-api.md +++ b/docs/api-reference/tasks-api.md @@ -1059,9 +1059,9 @@ Host: http://ROUTER_IP:ROUTER_PORT 2023-07-03T22:11:17,933 INFO [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter 2023-07-03T22:11:17,933 INFO [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Metrics reporters closed 2023-07-03T22:11:17,935 INFO [task-runner-0-priority-0] org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-kafka-supervisor-dcanhmig-1 unregistered - 2023-07-03T22:11:17,936 INFO [task-runner-0-priority-0] org.apache.druid.curator.announcement.Announcer - Unannouncing [/druid/internal-discovery/PEON/localhost:8100] + 2023-07-03T22:11:17,936 INFO [task-runner-0-priority-0] org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing [/druid/internal-discovery/PEON/localhost:8100] 2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0] org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Unannounced self [{"druidNode":{"service":"druid/middleManager","host":"localhost","bindOnHost":false,"plaintextPort":8100,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"peon","services":{"dataNodeService":{"type":"dataNodeService","tier":"_default_tier","maxSize":0,"type":"indexer-executor","serverType":"indexer-executor","priority":0},"lookupNodeService":{"type":"lookupNodeService","lookupTier":"__default"}}}]. - 2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0] org.apache.druid.curator.announcement.Announcer - Unannouncing [/druid/announcements/localhost:8100] + 2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0] org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing [/druid/announcements/localhost:8100] 2023-07-03T22:11:17,996 INFO [task-runner-0-priority-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: { "id" : "index_kafka_social_media_0e905aa31037879_nommnaeg", "status" : "SUCCESS", diff --git a/docs/configuration/index.md b/docs/configuration/index.md index cd1b8dfc1f9e..ed8612d9d629 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -148,6 +148,7 @@ We recommend just setting the base ZK path and the ZK service host, but all ZK p |`druid.zk.service.connectionTimeoutMs`|ZooKeeper connection timeout, in milliseconds.|`15000`| |`druid.zk.service.compress`|Boolean flag for whether or not created Znodes should be compressed.|`true`| |`druid.zk.service.acl`|Boolean flag for whether or not to enable ACL security for ZooKeeper. If ACL is enabled, zNode creators will have all permissions.|`false`| +|`druid.zk.service.pathChildrenCacheStrategy`|Dictates the underlying caching strategy for service announcements. Set true to let announcers to use Apache Curator's PathChildrenCache strategy, otherwise NodeCache strategy. Consider using NodeCache strategy when you are dealing with huge number of ZooKeeper watches in your cluster.|`true`| #### Path configuration diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java index 7e7c09893b9d..669a5acf4cbc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -26,10 +26,10 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.druid.curator.CuratorUtils; import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.guice.annotations.DirectExecutorAnnouncer; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; @@ -69,6 +69,7 @@ public WorkerCuratorCoordinator( IndexerZkConfig indexerZkConfig, RemoteTaskRunnerConfig config, CuratorFramework curatorFramework, + @DirectExecutorAnnouncer Announcer announcer, Worker worker ) { @@ -76,8 +77,7 @@ public WorkerCuratorCoordinator( this.config = config; this.curatorFramework = curatorFramework; this.worker = worker; - - this.announcer = new Announcer(curatorFramework, Execs.directExecutor()); + this.announcer = announcer; this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost())); this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost())); @@ -87,7 +87,7 @@ public WorkerCuratorCoordinator( @LifecycleStart public void start() throws Exception { - log.info("WorkerCuratorCoordinator good to go sir. Server[%s]", worker.getHost()); + log.info("WorkerCuratorCoordinator good to go. Server[%s]", worker.getHost()); synchronized (lock) { if (started) { return; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index c8a754625358..3a3dc2d5333f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -28,6 +28,7 @@ import org.apache.curator.test.TestingCluster; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.IndexingServiceCondition; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; @@ -47,6 +48,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.rpc.indexing.NoopOverlordClient; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.IndexIO; @@ -141,6 +143,7 @@ public String getBase() ), new TestRemoteTaskRunnerConfig(new Period("PT1S")), cf, + new NodeAnnouncer(cf, Execs.directExecutor()), worker ); workerCuratorCoordinator.start(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java index 0ad900dd8c44..df8e34ab1111 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java @@ -26,6 +26,7 @@ import org.apache.curator.test.TestingCluster; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; import org.apache.druid.curator.ZkEnablementConfig; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; @@ -33,6 +34,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.initialization.IndexerZkConfig; import org.apache.druid.server.initialization.ZkPathsConfig; import org.easymock.EasyMock; @@ -95,6 +97,7 @@ public String getBase() }, null, null, null, null), new RemoteTaskRunnerConfig(), cf, + new NodeAnnouncer(cf, Execs.directExecutor()), worker ); curatorCoordinator.start(); diff --git a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java index 7a53ee941d7d..e98a457b0f37 100644 --- a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java +++ b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java @@ -63,6 +63,9 @@ public class CuratorConfig @JsonProperty("maxZkRetries") private int maxZkRetries = 29; + @JsonProperty("pathChildrenCacheStrategy") + private boolean pathChildrenCacheStrategy = true; + public static CuratorConfig create(String hosts) { CuratorConfig config = new CuratorConfig(); @@ -141,4 +144,9 @@ public int getMaxZkRetries() { return maxZkRetries; } + + public boolean getPathChildrenCacheStrategy() + { + return pathChildrenCacheStrategy; + } } diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java new file mode 100644 index 000000000000..cecdb8dbcd53 --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator.announcement; + +/** + * The {@link Announceable} is a representation of an announcement to be made in ZooKeeper. + */ +class Announceable +{ + /** + * Represents the path in ZooKeeper where the announcement will be made. + */ + final String path; + + /** + * Holds the actual data to be announced. + */ + final byte[] bytes; + + /** + * Indicates whether parent nodes should be removed if the announcement is created successfully. + * This can be useful for cleaning up unused paths in ZooKeeper. + */ + final boolean removeParentsIfCreated; + + public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated) + { + this.path = path; + this.bytes = bytes; + this.removeParentsIfCreated = removeParentsIfCreated; + } + + // This should be used for updates only, where removeParentsIfCreated is not relevant. + public Announceable(String path, byte[] bytes) + { + // removeParentsIfCreated is irrelevant, so we can use dummy value "false". + this(path, bytes, false); + } +} diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index 4abd24045245..8852920071b4 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -19,429 +19,17 @@ package org.apache.druid.curator.announcement; -import com.google.common.annotations.VisibleForTesting; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.transaction.CuratorTransaction; -import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.cache.PathChildrenCacheFactory; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.utils.CloseableUtils; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Announces things on Zookeeper. - */ -public class Announcer +public interface Announcer { - private static final Logger log = new Logger(Announcer.class); - - private final CuratorFramework curator; - private final PathChildrenCacheFactory factory; - private final ExecutorService pathChildrenCacheExecutor; - - private final List toAnnounce = new ArrayList<>(); - private final List toUpdate = new ArrayList<>(); - private final ConcurrentMap listeners = new ConcurrentHashMap<>(); - private final ConcurrentMap> announcements = new ConcurrentHashMap<>(); - private final List parentsIBuilt = new CopyOnWriteArrayList<>(); - - // Used for testing - private Set addedChildren; - - private boolean started = false; - - public Announcer( - CuratorFramework curator, - ExecutorService exec - ) - { - this.curator = curator; - this.pathChildrenCacheExecutor = exec; - this.factory = new PathChildrenCacheFactory.Builder() - .withCacheData(false) - .withCompressed(true) - .withExecutorService(exec) - .withShutdownExecutorOnClose(false) - .build(); - } - - @VisibleForTesting - void initializeAddedChildren() - { - addedChildren = new HashSet<>(); - } - - @VisibleForTesting - Set getAddedChildren() - { - return addedChildren; - } - - @LifecycleStart - public void start() - { - log.debug("Starting Announcer."); - synchronized (toAnnounce) { - if (started) { - return; - } - - started = true; - - for (Announceable announceable : toAnnounce) { - announce(announceable.path, announceable.bytes, announceable.removeParentsIfCreated); - } - toAnnounce.clear(); - - for (Announceable announceable : toUpdate) { - update(announceable.path, announceable.bytes); - } - toUpdate.clear(); - } - } - - @LifecycleStop - public void stop() - { - log.debug("Stopping Announcer."); - synchronized (toAnnounce) { - if (!started) { - return; - } - - started = false; - - try { - CloseableUtils.closeAll(listeners.values()); - } - catch (IOException e) { - throw new RuntimeException(e); - } - finally { - pathChildrenCacheExecutor.shutdown(); - } - - for (Map.Entry> entry : announcements.entrySet()) { - String basePath = entry.getKey(); - - for (String announcementPath : entry.getValue().keySet()) { - unannounce(ZKPaths.makePath(basePath, announcementPath)); - } - } - - if (!parentsIBuilt.isEmpty()) { - CuratorTransaction transaction = curator.inTransaction(); - for (String parent : parentsIBuilt) { - try { - transaction = transaction.delete().forPath(parent).and(); - } - catch (Exception e) { - log.info(e, "Unable to delete parent[%s], boooo.", parent); - } - } - try { - ((CuratorTransactionFinal) transaction).commit(); - } - catch (Exception e) { - log.info(e, "Unable to commit transaction. Please feed the hamsters"); - } - } - } - } - - /** - * Like announce(path, bytes, true). - */ - public void announce(String path, byte[] bytes) - { - announce(path, bytes, true); - } - - /** - * Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node - * and monitor it to make sure that it always exists until it is unannounced or this object is closed. - * - * @param path The path to announce at - * @param bytes The payload to announce - * @param removeParentIfCreated remove parent of "path" if we had created that parent - */ - public void announce(String path, byte[] bytes, boolean removeParentIfCreated) - { - synchronized (toAnnounce) { - if (!started) { - toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated)); - return; - } - } - - final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); - - final String parentPath = pathAndNode.getPath(); - boolean buildParentPath = false; - - ConcurrentMap subPaths = announcements.get(parentPath); - - if (subPaths == null) { - try { - if (curator.checkExists().forPath(parentPath) == null) { - buildParentPath = true; - } - } - catch (Exception e) { - log.debug(e, "Problem checking if the parent existed, ignoring."); - } - - // I don't have a watcher on this path yet, create a Map and start watching. - announcements.putIfAbsent(parentPath, new ConcurrentHashMap<>()); - - // Guaranteed to be non-null, but might be a map put in there by another thread. - final ConcurrentMap finalSubPaths = announcements.get(parentPath); - - // Synchronize to make sure that I only create a listener once. - synchronized (finalSubPaths) { - if (!listeners.containsKey(parentPath)) { - final PathChildrenCache cache = factory.make(curator, parentPath); - cache.getListenable().addListener( - new PathChildrenCacheListener() - { - private final AtomicReference> pathsLost = new AtomicReference<>(null); - - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - // NOTE: ZooKeeper does not guarantee that we will get every event, and thus PathChildrenCache doesn't - // as well. If one of the below events are missed, Announcer might not work properly. - log.debug("Path[%s] got event[%s]", parentPath, event); - switch (event.getType()) { - case CHILD_REMOVED: - final ChildData child = event.getData(); - final ZKPaths.PathAndNode childPath = ZKPaths.getPathAndNode(child.getPath()); - final byte[] value = finalSubPaths.get(childPath.getNode()); - if (value != null) { - log.info("Node[%s] dropped, reinstating.", child.getPath()); - createAnnouncement(child.getPath(), value); - } - break; - case CONNECTION_LOST: - // Lost connection, which means session is broken, take inventory of what has been seen. - // This is to protect from a race condition in which the ephemeral node could have been - // created but not actually seen by the PathChildrenCache, which means that it won't know - // that it disappeared and thus will not generate a CHILD_REMOVED event for us. Under normal - // circumstances, this can only happen upon connection loss; but technically if you have - // an adversary in the system, they could also delete the ephemeral node before the cache sees - // it. This does not protect from that case, so don't have adversaries. - - Set pathsToReinstate = new HashSet<>(); - for (String node : finalSubPaths.keySet()) { - String path = ZKPaths.makePath(parentPath, node); - log.info("Node[%s] is added to reinstate.", path); - pathsToReinstate.add(path); - } - - if (!pathsToReinstate.isEmpty() && !pathsLost.compareAndSet(null, pathsToReinstate)) { - log.info("Already had a pathsLost set!?[%s]", parentPath); - } - break; - case CONNECTION_RECONNECTED: - final Set thePathsLost = pathsLost.getAndSet(null); - - if (thePathsLost != null) { - for (String path : thePathsLost) { - log.info("Reinstating [%s]", path); - final ZKPaths.PathAndNode split = ZKPaths.getPathAndNode(path); - createAnnouncement(path, announcements.get(split.getPath()).get(split.getNode())); - } - } - break; - case CHILD_ADDED: - if (addedChildren != null) { - addedChildren.add(event.getData().getPath()); - } - // fall through - case INITIALIZED: - case CHILD_UPDATED: - case CONNECTION_SUSPENDED: - // do nothing - } - } - } - ); - - synchronized (toAnnounce) { - if (started) { - if (buildParentPath) { - createPath(parentPath, removeParentIfCreated); - } - startCache(cache); - listeners.put(parentPath, cache); - } - } - } - } - - subPaths = finalSubPaths; - } - - boolean created = false; - synchronized (toAnnounce) { - if (started) { - byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes); - - if (oldBytes == null) { - created = true; - } else if (!Arrays.equals(oldBytes, bytes)) { - throw new IAE("Cannot reannounce different values under the same path"); - } - } - } - - if (created) { - try { - createAnnouncement(path, bytes); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - public void update(final String path, final byte[] bytes) - { - synchronized (toAnnounce) { - if (!started) { - // removeParentsIfCreated is not relevant for updates; use dummy value "false". - toUpdate.add(new Announceable(path, bytes, false)); - return; - } - } - - final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); - - final String parentPath = pathAndNode.getPath(); - final String nodePath = pathAndNode.getNode(); - - ConcurrentMap subPaths = announcements.get(parentPath); - - if (subPaths == null || subPaths.get(nodePath) == null) { - throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); - } - - synchronized (toAnnounce) { - try { - byte[] oldBytes = subPaths.get(nodePath); - - if (!Arrays.equals(oldBytes, bytes)) { - subPaths.put(nodePath, bytes); - updateAnnouncement(path, bytes); - } - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - private String createAnnouncement(final String path, byte[] value) throws Exception - { - return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value); - } - - private Stat updateAnnouncement(final String path, final byte[] value) throws Exception - { - return curator.setData().compressed().inBackground().forPath(path, value); - } - - /** - * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer - * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. - *

- * If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. - * - * @param path the path to unannounce - */ - public void unannounce(String path) - { - final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); - final String parentPath = pathAndNode.getPath(); - - final ConcurrentMap subPaths = announcements.get(parentPath); - - if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) { - log.debug("Path[%s] not announced, cannot unannounce.", path); - return; - } - log.info("Unannouncing [%s]", path); + void start(); - try { - curator.inTransaction().delete().forPath(path).and().commit(); - } - catch (KeeperException.NoNodeException e) { - log.info("Node[%s] didn't exist anyway...", path); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } + void stop(); - private void startCache(PathChildrenCache cache) - { - try { - cache.start(); - } - catch (Throwable e) { - throw CloseableUtils.closeAndWrapInCatch(e, cache); - } - } + void announce(String path, byte[] bytes); - private void createPath(String parentPath, boolean removeParentsIfCreated) - { - try { - curator.create().creatingParentsIfNeeded().forPath(parentPath); - if (removeParentsIfCreated) { - parentsIBuilt.add(parentPath); - } - log.debug("Created parentPath[%s], %s remove on stop.", parentPath, removeParentsIfCreated ? "will" : "will not"); - } - catch (Exception e) { - log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); - } - } + void announce(String path, byte[] bytes, boolean removeParentIfCreated); - private static class Announceable - { - final String path; - final byte[] bytes; - final boolean removeParentsIfCreated; + void update(String path, byte[] bytes); - public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated) - { - this.path = path; - this.bytes = bytes; - this.removeParentsIfCreated = removeParentsIfCreated; - } - } + void unannounce(String path); } diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java new file mode 100644 index 000000000000..bff7c36c85aa --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator.announcement; + +import com.google.common.annotations.VisibleForTesting; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.transaction.CuratorMultiTransaction; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; + +/** + * The {@link NodeAnnouncer} class is responsible for announcing a single node + * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path + * and monitors its existence to ensure that it remains active until it is + * explicitly unannounced or the object is closed. + * + *

+ * This class uses Apache Curator's NodeCache recipe under the hood to track a single + * node, along with all of its parent's status. See {@link PathChildrenAnnouncer} for an announcer that + * uses the PathChildrenCache recipe instead. + *

+ */ +public class NodeAnnouncer implements Announcer +{ + private static final Logger log = new Logger(NodeAnnouncer.class); + + private final CuratorFramework curator; + private final ExecutorService nodeCacheExecutor; + + private final ConcurrentHashMap listeners = new ConcurrentHashMap<>(); + private final ConcurrentHashMap announcedPaths = new ConcurrentHashMap<>(); + + @GuardedBy("toAnnounce") + private boolean started = false; + + /** + * This list holds paths that need to be announced. If a path is added to this list + * in the {@link #announce(String, byte[], boolean)} method before the connection to ZooKeeper is established, + * it will be stored here and announced later during the {@link #start} method. + */ + @GuardedBy("toAnnounce") + private final List toAnnounce = new ArrayList<>(); + + /** + * This list holds paths that need to be updated. If a path is added to this list + * in the {@link #update} method before the connection to ZooKeeper is established, + * it will be stored here and updated later during the {@link #start} method. + */ + @GuardedBy("toAnnounce") + private final List toUpdate = new ArrayList<>(); + + /** + * This list keeps track of all the paths created by this node announcer. + * When the {@link #stop} method is called, + * the node announcer is responsible for deleting all paths stored in this list. + */ + @GuardedBy("toAnnounce") + private final List parentsIBuilt = new CopyOnWriteArrayList<>(); + + public NodeAnnouncer(CuratorFramework curator, ExecutorService exec) + { + this.curator = curator; + this.nodeCacheExecutor = exec; + } + + @VisibleForTesting + Set getAddedPaths() + { + return announcedPaths.keySet(); + } + + @LifecycleStart + @Override + public void start() + { + log.debug("Starting Announcer"); + synchronized (toAnnounce) { + if (started) { + log.debug("Announcer has already been started by another thread, ignoring start request."); + return; + } + + started = true; + + for (Announceable announceable : toAnnounce) { + announce(announceable.path, announceable.bytes, announceable.removeParentsIfCreated); + } + toAnnounce.clear(); + + for (Announceable announceable : toUpdate) { + update(announceable.path, announceable.bytes); + } + toUpdate.clear(); + } + } + + @LifecycleStop + @Override + public void stop() + { + log.debug("Stopping Announcer"); + synchronized (toAnnounce) { + if (!started) { + log.debug("Announcer has already been stopped by another thread, ignoring stop request."); + return; + } + + started = false; + closeResources(); + } + } + + @GuardedBy("toAnnounce") + private void closeResources() + { + try { + // Close all caches... + CloseableUtils.closeAll(listeners.values()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + finally { + nodeCacheExecutor.shutdown(); + } + + for (String announcementPath : announcedPaths.keySet()) { + unannounce(announcementPath); + } + + if (!parentsIBuilt.isEmpty()) { + CuratorMultiTransaction transaction = curator.transaction(); + + ArrayList operations = new ArrayList<>(); + for (String parent : parentsIBuilt) { + try { + operations.add(curator.transactionOp().delete().forPath(parent)); + } + catch (Exception e) { + log.info(e, "Unable to delete parent[%s] when closing Announcer.", parent); + } + } + + try { + transaction.forOperations(operations); + } + catch (Exception e) { + log.info(e, "Unable to commit transaction when closing Announcer."); + } + } + } + + /** + * Overload of {@link #announce(String, byte[], boolean)}, but removes parent node of path after announcement. + */ + @Override + public void announce(String path, byte[] bytes) + { + announce(path, bytes, true); + } + + /** + * Announces the provided bytes at the given path. + * + *

+ * Announcement using {@link NodeAnnouncer} will create an ephemeral znode at the specified path, and listens for + * changes on your znode. Your znode will exist until it is unannounced, or until {@link #stop()} is called. + *

+ * + * @param path The path to announce at + * @param bytes The payload to announce + * @param removeParentIfCreated remove parent of "path" if we had created that parent during announcement + */ + @Override + public void announce(String path, byte[] bytes, boolean removeParentIfCreated) + { + synchronized (toAnnounce) { + if (!started) { + log.debug("Announcer has not started yet, queuing announcement for later processing..."); + toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated)); + return; + } + } + + final String parentPath = ZKPaths.getPathAndNode(path).getPath(); + byte[] announcedPayload = announcedPaths.get(path); + + // If announcedPayload is null, this means that we have yet to announce this path. + // There is a possibility that the parent paths do not exist, so we check if we need to create the parent path first. + if (announcedPayload == null) { + boolean buildParentPath = false; + try { + buildParentPath = curator.checkExists().forPath(parentPath) == null; + } + catch (Exception e) { + log.debug(e, "Problem checking if the parent existed, ignoring."); + } + + // Synchronize to make sure that I only create a listener once. + synchronized (toAnnounce) { + if (!listeners.containsKey(path)) { + final CuratorCache cache = createCacheForPath(path); + + if (started) { + if (buildParentPath) { + createPath(parentPath, removeParentIfCreated); + } + startCache(cache); + listeners.put(path, cache); + } + } + } + } + + boolean created = false; + synchronized (toAnnounce) { + if (started) { + byte[] oldBytes = announcedPaths.putIfAbsent(path, bytes); + + if (oldBytes == null) { + created = true; + } else if (!Arrays.equals(oldBytes, bytes)) { + throw new IAE("Cannot reannounce different values under the same path."); + } + } + } + + if (created) { + try { + createAnnouncement(path, bytes); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @GuardedBy("toAnnounce") + private CuratorCache createCacheForPath(String path) + { + final CuratorCache cache = CuratorCache.build(curator, path, CuratorCache.Options.SINGLE_NODE_CACHE); + + cache.listenable().addListener( + (type, oldData, data) -> { + if (type == CuratorCacheListener.Type.NODE_DELETED) { + final byte[] previouslyAnnouncedData = announcedPaths.get(path); + if (previouslyAnnouncedData != null) { + try { + log.info("ZooKeeper Node[%s] dropped, reinstating...", path); + createAnnouncement(path, previouslyAnnouncedData); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + }, nodeCacheExecutor + ); + + return cache; + } + + @Override + public void update(final String path, final byte[] bytes) + { + synchronized (toAnnounce) { + if (!started) { + log.debug("Announcer has not started yet, queuing updates for later processing..."); + toUpdate.add(new Announceable(path, bytes)); + return; + } + + byte[] oldBytes = announcedPaths.get(path); + + if (oldBytes == null) { + throw new ISE("Cannot update path[%s] that hasn't been announced!", path); + } + + try { + if (!Arrays.equals(oldBytes, bytes)) { + announcedPaths.put(path, bytes); + updateAnnouncement(path, bytes); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private void createAnnouncement(final String path, byte[] value) throws Exception + { + curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value); + } + + private void updateAnnouncement(final String path, final byte[] value) throws Exception + { + curator.setData().compressed().inBackground().forPath(path, value); + } + + /** + * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer + * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. + *

+ * If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. + * + * @param path the path to unannounce + */ + @Override + public void unannounce(String path) + { + synchronized (toAnnounce) { + final byte[] value = announcedPaths.remove(path); + + if (value == null) { + log.debug("Path[%s] not announced, cannot unannounce.", path); + return; + } + } + + log.info("unannouncing [%s]", path); + + try { + CuratorOp deleteOp = curator.transactionOp().delete().forPath(path); + curator.transaction().forOperations(deleteOp); + } + catch (KeeperException.NoNodeException e) { + log.info("Unannounced node[%s] that does not exist.", path); + } + catch (KeeperException.NotEmptyException e) { + log.warn("Unannouncing non-empty path[%s]", path); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void startCache(CuratorCache cache) + { + try { + cache.start(); + } + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, cache); + } + } + + @GuardedBy("toAnnounce") + private void createPath(String parentPath, boolean removeParentsIfCreated) + { + try { + curator.create().creatingParentsIfNeeded().forPath(parentPath); + if (removeParentsIfCreated) { + // We keep track of all parents we have built, so we can delete them later on when needed. + parentsIBuilt.add(parentPath); + } + + log.debug("Created parentPath[%s], %s remove on stop.", parentPath, removeParentsIfCreated ? "will" : "will not"); + } + catch (KeeperException.NodeExistsException e) { + log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); + } + catch (Exception e) { + log.error(e, "Unhandled exception when creating parentPath[%s].", parentPath); + } + } +} diff --git a/server/src/main/java/org/apache/druid/curator/announcement/PathChildrenAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/PathChildrenAnnouncer.java new file mode 100644 index 000000000000..9ce965551114 --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/announcement/PathChildrenAnnouncer.java @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator.announcement; + +import com.google.common.annotations.VisibleForTesting; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.transaction.CuratorMultiTransaction; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.druid.curator.cache.PathChildrenCacheFactory; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The {@link PathChildrenAnnouncer} class manages the announcement of a node, and watches all child + * and sibling nodes under the specified path in a ZooKeeper ensemble. It monitors these nodes + * to ensure their existence and manage their lifecycle collectively. + * + *

+ * This class uses Apache Curator's PathChildrenCache recipe under the hood to track all znodes + * under the specified node's parent. See {@link NodeAnnouncer} for an announcer that + * uses the NodeCache recipe instead. + *

+ */ +public class PathChildrenAnnouncer implements Announcer +{ + private static final Logger log = new Logger(PathChildrenAnnouncer.class); + + private final CuratorFramework curator; + private final PathChildrenCacheFactory factory; + private final ExecutorService pathChildrenCacheExecutor; + + @GuardedBy("toAnnounce") + private final List toAnnounce = new ArrayList<>(); + @GuardedBy("toAnnounce") + private final List toUpdate = new ArrayList<>(); + private final ConcurrentHashMap listeners = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> announcements = new ConcurrentHashMap<>(); + private final List parentsIBuilt = new CopyOnWriteArrayList<>(); + + // Used for testing + private Set addedChildren; + + private boolean started = false; + + public PathChildrenAnnouncer( + CuratorFramework curator, + ExecutorService exec + ) + { + this.curator = curator; + this.pathChildrenCacheExecutor = exec; + this.factory = new PathChildrenCacheFactory.Builder() + .withCacheData(false) + .withCompressed(true) + .withExecutorService(exec) + .withShutdownExecutorOnClose(false) + .build(); + } + + @VisibleForTesting + void initializeAddedChildren() + { + addedChildren = new HashSet<>(); + } + + @VisibleForTesting + Set getAddedChildren() + { + return addedChildren; + } + + @LifecycleStart + @Override + public void start() + { + log.debug("Starting Announcer."); + synchronized (toAnnounce) { + if (started) { + log.debug("Announcer has already been started by another thread, ignoring start request."); + return; + } + + started = true; + + for (Announceable announceable : toAnnounce) { + announce(announceable.path, announceable.bytes, announceable.removeParentsIfCreated); + } + toAnnounce.clear(); + + for (Announceable announceable : toUpdate) { + update(announceable.path, announceable.bytes); + } + toUpdate.clear(); + } + } + + @LifecycleStop + @Override + public void stop() + { + log.debug("Stopping Announcer."); + synchronized (toAnnounce) { + if (!started) { + log.debug("Announcer has already been stopped by another thread, ignoring stop request."); + return; + } + + started = false; + + try { + CloseableUtils.closeAll(listeners.values()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + finally { + pathChildrenCacheExecutor.shutdown(); + } + + for (Map.Entry> entry : announcements.entrySet()) { + String basePath = entry.getKey(); + + for (String announcementPath : entry.getValue().keySet()) { + unannounce(ZKPaths.makePath(basePath, announcementPath)); + } + } + + if (!parentsIBuilt.isEmpty()) { + CuratorMultiTransaction transaction = curator.transaction(); + + ArrayList operations = new ArrayList<>(); + for (String parent : parentsIBuilt) { + try { + operations.add(curator.transactionOp().delete().forPath(parent)); + } + catch (Exception e) { + log.info(e, "Unable to delete parent[%s] when closing Announcer.", parent); + } + } + + try { + transaction.forOperations(operations); + } + catch (Exception e) { + log.info(e, "Unable to commit transaction when closing Announcer."); + } + } + } + } + + /** + * Overload of {@link #announce(String, byte[], boolean)}, but removes parent node of path after announcement. + */ + @Override + public void announce(String path, byte[] bytes) + { + announce(path, bytes, true); + } + + /** + * Announces the provided bytes at the given path. + * + *

+ * Announcement using {@link PathChildrenAnnouncer} will create an ephemeral znode at the specified path, and uses its parent + * path to watch all the siblings and children znodes of your specified path. The watched nodes will always exist + * until it is unannounced, or until {@link #stop()} is called. + *

+ * + * @param path The path to announce at + * @param bytes The payload to announce + * @param removeParentIfCreated remove parent of "path" if we had created that parent during announcement + */ + @Override + public void announce(String path, byte[] bytes, boolean removeParentIfCreated) + { + synchronized (toAnnounce) { + if (!started) { + log.debug("Announcer has not started yet, queuing announcement for later processing..."); + toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated)); + return; + } + } + + final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + + final String parentPath = pathAndNode.getPath(); + boolean buildParentPath = false; + + ConcurrentMap subPaths = announcements.get(parentPath); + + if (subPaths == null) { + try { + if (curator.checkExists().forPath(parentPath) == null) { + buildParentPath = true; + } + } + catch (Exception e) { + log.debug(e, "Problem checking if the parent existed, ignoring."); + } + + final ConcurrentHashMap finalSubPaths = announcements.computeIfAbsent(parentPath, key -> new ConcurrentHashMap<>()); + + // Synchronize to make sure that I only create a listener once. + synchronized (finalSubPaths) { + if (!listeners.containsKey(parentPath)) { + final PathChildrenCache cache = factory.make(curator, parentPath); + cache.getListenable().addListener( + new PathChildrenCacheListener() + { + private final AtomicReference> pathsLost = new AtomicReference<>(null); + + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + // NOTE: ZooKeeper does not guarantee that we will get every event, and thus PathChildrenCache doesn't + // as well. If one of the below events are missed, Announcer might not work properly. + log.debug("Path[%s] got event[%s]", parentPath, event); + switch (event.getType()) { + case CHILD_REMOVED: + final ChildData child = event.getData(); + final ZKPaths.PathAndNode childPath = ZKPaths.getPathAndNode(child.getPath()); + final byte[] value = finalSubPaths.get(childPath.getNode()); + if (value != null) { + log.info("Node[%s] dropped, reinstating.", child.getPath()); + createAnnouncement(child.getPath(), value); + } + break; + case CONNECTION_LOST: + // Lost connection, which means session is broken, take inventory of what has been seen. + // This is to protect from a race condition in which the ephemeral node could have been + // created but not actually seen by the PathChildrenCache, which means that it won't know + // that it disappeared and thus will not generate a CHILD_REMOVED event for us. Under normal + // circumstances, this can only happen upon connection loss; but technically if you have + // an adversary in the system, they could also delete the ephemeral node before the cache sees + // it. This does not protect from that case, so don't have adversaries. + + Set pathsToReinstate = new HashSet<>(); + for (String node : finalSubPaths.keySet()) { + String path = ZKPaths.makePath(parentPath, node); + log.info("Node[%s] is added to reinstate.", path); + pathsToReinstate.add(path); + } + + if (!pathsToReinstate.isEmpty() && !pathsLost.compareAndSet(null, pathsToReinstate)) { + log.info("Already had a pathsLost set!?[%s]", parentPath); + } + break; + case CONNECTION_RECONNECTED: + final Set thePathsLost = pathsLost.getAndSet(null); + + if (thePathsLost != null) { + for (String path : thePathsLost) { + log.info("Reinstating [%s]", path); + final ZKPaths.PathAndNode split = ZKPaths.getPathAndNode(path); + createAnnouncement(path, announcements.get(split.getPath()).get(split.getNode())); + } + } + break; + case CHILD_ADDED: + if (addedChildren != null) { + addedChildren.add(event.getData().getPath()); + } + // fall through + case INITIALIZED: + case CHILD_UPDATED: + case CONNECTION_SUSPENDED: + // do nothing + } + } + } + ); + + synchronized (toAnnounce) { + if (started) { + if (buildParentPath) { + createPath(parentPath, removeParentIfCreated); + } + startCache(cache); + listeners.put(parentPath, cache); + } + } + } + } + + subPaths = finalSubPaths; + } + + boolean created = false; + synchronized (toAnnounce) { + if (started) { + byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes); + + if (oldBytes == null) { + created = true; + } else if (!Arrays.equals(oldBytes, bytes)) { + throw new IAE("Cannot reannounce different values under the same path"); + } + } + } + + if (created) { + try { + createAnnouncement(path, bytes); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void update(final String path, final byte[] bytes) + { + synchronized (toAnnounce) { + if (!started) { + toUpdate.add(new Announceable(path, bytes)); + return; + } + } + + final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + + final String parentPath = pathAndNode.getPath(); + final String nodePath = pathAndNode.getNode(); + + ConcurrentMap subPaths = announcements.get(parentPath); + + if (subPaths == null || subPaths.get(nodePath) == null) { + throw new ISE("Cannot update path[%s] that hasn't been announced!", path); + } + + synchronized (toAnnounce) { + try { + byte[] oldBytes = subPaths.get(nodePath); + + if (!Arrays.equals(oldBytes, bytes)) { + subPaths.put(nodePath, bytes); + updateAnnouncement(path, bytes); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private void createAnnouncement(final String path, byte[] value) throws Exception + { + curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value); + } + + private void updateAnnouncement(final String path, final byte[] value) throws Exception + { + curator.setData().compressed().inBackground().forPath(path, value); + } + + /** + * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer + * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. + *

+ * If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. + * + * @param path the path to unannounce + */ + @Override + public void unannounce(String path) + { + final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + final String parentPath = pathAndNode.getPath(); + + final ConcurrentMap subPaths = announcements.get(parentPath); + + if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) { + log.debug("Path[%s] not announced, cannot unannounce.", path); + return; + } + log.info("Unannouncing [%s]", path); + + try { + CuratorOp deleteOp = curator.transactionOp().delete().forPath(path); + curator.transaction().forOperations(deleteOp); + } + catch (KeeperException.NoNodeException e) { + log.info("Unannounced node[%s] that does not exist.", path); + } + catch (KeeperException.NotEmptyException e) { + log.warn("Unannouncing non-empty path[%s]", path); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void startCache(PathChildrenCache cache) + { + try { + cache.start(); + } + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, cache); + } + } + + private void createPath(String parentPath, boolean removeParentsIfCreated) + { + try { + curator.create().creatingParentsIfNeeded().forPath(parentPath); + if (removeParentsIfCreated) { + parentsIBuilt.add(parentPath); + } + log.debug("Created parentPath[%s], %s remove on stop.", parentPath, removeParentsIfCreated ? "will" : "will not"); + } + catch (KeeperException.NodeExistsException e) { + log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); + } + catch (Exception e) { + log.error(e, "Unhandled exception when creating parentPath[%s].", parentPath); + } + } +} diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java index 5b536c65d39d..34d54dbb56ca 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java @@ -28,6 +28,7 @@ import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.SingleThreadedAnnouncer; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.DruidNode; @@ -47,7 +48,11 @@ static String makeNodeAnnouncementPath(ZkPathsConfig config, NodeRole nodeRole, private final ObjectMapper jsonMapper; @Inject - public CuratorDruidNodeAnnouncer(Announcer announcer, ZkPathsConfig config, @Json ObjectMapper jsonMapper) + public CuratorDruidNodeAnnouncer( + @SingleThreadedAnnouncer Announcer announcer, + ZkPathsConfig config, + @Json ObjectMapper jsonMapper + ) { this.announcer = announcer; this.config = config; diff --git a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java index 6b0c96641a65..da2d38f64c9f 100644 --- a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java +++ b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java @@ -24,8 +24,13 @@ import com.google.inject.Module; import com.google.inject.Provides; import org.apache.curator.framework.CuratorFramework; +import org.apache.druid.curator.CuratorConfig; import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; +import org.apache.druid.curator.announcement.PathChildrenAnnouncer; +import org.apache.druid.guice.annotations.DirectExecutorAnnouncer; +import org.apache.druid.guice.annotations.SingleThreadedAnnouncer; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer; import org.apache.druid.server.coordination.CuratorDataSegmentServerAnnouncer; @@ -64,9 +69,28 @@ public void configure(Binder binder) } @Provides + @SingleThreadedAnnouncer @ManageLifecycleAnnouncements - public Announcer getAnnouncer(CuratorFramework curator) + public Announcer getAnnouncerWithSingleThreadedExecutorService(CuratorFramework curator, CuratorConfig config) { - return new Announcer(curator, Execs.singleThreaded("Announcer-%s")); + boolean usingPathChildrenCacheAnnouncer = config.getPathChildrenCacheStrategy(); + if (usingPathChildrenCacheAnnouncer) { + return new PathChildrenAnnouncer(curator, Execs.singleThreaded("Announcer-%s")); + } else { + return new NodeAnnouncer(curator, Execs.singleThreaded("Announcer-%s")); + } + } + + @Provides + @DirectExecutorAnnouncer + @ManageLifecycleAnnouncements + public Announcer getAnnouncerWithDirectExecutorService(CuratorFramework curator, CuratorConfig config) + { + boolean usingPathChildrenCacheAnnouncer = config.getPathChildrenCacheStrategy(); + if (usingPathChildrenCacheAnnouncer) { + return new PathChildrenAnnouncer(curator, Execs.directExecutor()); + } else { + return new NodeAnnouncer(curator, Execs.directExecutor()); + } } } diff --git a/server/src/main/java/org/apache/druid/guice/annotations/DirectExecutorAnnouncer.java b/server/src/main/java/org/apache/druid/guice/annotations/DirectExecutorAnnouncer.java new file mode 100644 index 000000000000..0d675469222b --- /dev/null +++ b/server/src/main/java/org/apache/druid/guice/annotations/DirectExecutorAnnouncer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface DirectExecutorAnnouncer +{ +} diff --git a/server/src/main/java/org/apache/druid/guice/annotations/SingleThreadedAnnouncer.java b/server/src/main/java/org/apache/druid/guice/annotations/SingleThreadedAnnouncer.java new file mode 100644 index 000000000000..8f815301ecfa --- /dev/null +++ b/server/src/main/java/org/apache/druid/guice/annotations/SingleThreadedAnnouncer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface SingleThreadedAnnouncer +{ +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java index d95dee729af0..d75b788205eb 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -33,6 +33,7 @@ import org.apache.druid.common.utils.UUIDUtils; import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.guice.annotations.SingleThreadedAnnouncer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -91,7 +92,7 @@ public BatchDataSegmentAnnouncer( DruidServerMetadata server, final BatchDataSegmentAnnouncerConfig config, ZkPathsConfig zkPaths, - Provider announcerProvider, + @SingleThreadedAnnouncer Provider announcerProvider, ObjectMapper jsonMapper, ZkEnablementConfig zkEnablementConfig ) diff --git a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java index 1f456bd3445d..dea15d840aad 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import org.apache.curator.utils.ZKPaths; import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.guice.annotations.SingleThreadedAnnouncer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -48,7 +49,7 @@ public class CuratorDataSegmentServerAnnouncer implements DataSegmentServerAnnou public CuratorDataSegmentServerAnnouncer( DruidServerMetadata server, ZkPathsConfig config, - Announcer announcer, + @SingleThreadedAnnouncer Announcer announcer, ObjectMapper jsonMapper ) { diff --git a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java index 86ee872dafed..f5ed9ba924e2 100644 --- a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java @@ -38,7 +38,7 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.client.ServerView; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -88,7 +88,7 @@ public class BatchServerInventoryViewTest private TestingCluster testingCluster; private CuratorFramework cf; private ObjectMapper jsonMapper; - private Announcer announcer; + private NodeAnnouncer nodeAnnouncer; private BatchDataSegmentAnnouncer segmentAnnouncer; private DataSegmentServerAnnouncer serverAnnouncer; private Set testSegments; @@ -116,11 +116,8 @@ public void setUp() throws Exception jsonMapper = TestHelper.makeJsonMapper(); - announcer = new Announcer( - cf, - Execs.directExecutor() - ); - announcer.start(); + nodeAnnouncer = new NodeAnnouncer(cf, Execs.directExecutor()); + nodeAnnouncer.start(); DruidServerMetadata serverMetadata = new DruidServerMetadata( "id", @@ -144,7 +141,7 @@ public String getBase() serverAnnouncer = new CuratorDataSegmentServerAnnouncer( serverMetadata, zkPathsConfig, - announcer, + nodeAnnouncer, jsonMapper ); serverAnnouncer.announce(); @@ -160,7 +157,7 @@ public int getSegmentsPerNode() } }, zkPathsConfig, - announcer, + nodeAnnouncer, jsonMapper ); @@ -225,7 +222,7 @@ public void tearDown() throws Exception batchServerInventoryView.stop(); filteredBatchServerInventoryView.stop(); serverAnnouncer.unannounce(); - announcer.stop(); + nodeAnnouncer.stop(); cf.close(); testingCluster.stop(); } @@ -425,7 +422,10 @@ private void waitForUpdateEvents(int count) public void testSameTimeZnode() throws Exception { final int numThreads = INITIAL_SEGMENTS / 10; - final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.multiThreaded(numThreads, "BatchServerInventoryViewTest-%d")); + final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.multiThreaded( + numThreads, + "BatchServerInventoryViewTest-%d" + )); segmentAnnouncer.announceSegments(testSegments); @@ -474,7 +474,7 @@ public String getBase() return TEST_BASE_PATH; } }, - announcer, + nodeAnnouncer, jsonMapper ); List segments = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java new file mode 100644 index 000000000000..6841f87b3142 --- /dev/null +++ b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator.announcement; + +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.test.KillSession; +import org.apache.curator.utils.ZKPaths; +import org.apache.druid.curator.CuratorTestBase; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.data.Stat; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +public class NodeAnnouncerTest extends CuratorTestBase +{ + private ExecutorService exec; + + @BeforeEach + public void setUp() throws Exception + { + setupServerAndCurator(); + exec = Execs.singleThreaded("test-node-announcer-sanity-%s"); + curator.start(); + curator.blockUntilConnected(); + } + + @AfterEach + public void tearDown() + { + tearDownServerAndCurator(); + } + + @Test + @Timeout(60_000) + public void testCreateParentPath() throws Exception + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/newParent/testPath"; + final String parentPath = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + Assertions.assertNull(curator.checkExists().forPath(parentPath), "Parent path should not exist before announcement"); + announcer.announce(testPath, billy); + + // Wait for the announcement to be processed + while (curator.checkExists().forPath(testPath) == null) { + Thread.sleep(100); + } + + Assertions.assertNotNull(curator.checkExists().forPath(parentPath), "Parent path should be created"); + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + announcer.stop(); + } + + @Test + @Timeout(60_000) + public void testAnnounceSamePathWithDifferentPayloadThrowsIAE() throws Exception + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + final byte[] billy = StringUtils.toUtf8("billy"); + final byte[] tilly = StringUtils.toUtf8("tilly"); + final String testPath = "/testPath"; + + announcer.start(); + announcer.announce(testPath, billy); + while (curator.checkExists().forPath(testPath) == null) { + Thread.sleep(100); + } + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + + // Nothing wrong when we announce same payload on the same path. + announcer.announce(testPath, billy); + + // Expect an exception when announcing a different payload + IAE exception = Assertions.assertThrows(IAE.class, () -> announcer.announce(testPath, tilly)); + Assertions.assertEquals("Cannot reannounce different values under the same path.", exception.getMessage()); + + // Confirm that the announcement remains unchanged. + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + announcer.stop(); + } + + @Test + public void testUpdateBeforeStartingNodeAnnouncer() throws Exception + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + final byte[] billy = StringUtils.toUtf8("billy"); + final byte[] tilly = StringUtils.toUtf8("tilly"); + final String testPath = "/testAnnounce"; + + // Queue update before the announcer is started + announcer.update(testPath, tilly); + announcer.announce(testPath, billy); + announcer.start(); + + // Verify that the update took precedence + Assertions.assertArrayEquals(tilly, curator.getData().decompressed().forPath(testPath)); + announcer.stop(); + } + + @Test + public void testUpdateSuccessfully() throws Exception + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + final byte[] billy = StringUtils.toUtf8("billy"); + final byte[] tilly = StringUtils.toUtf8("tilly"); + final String testPath = "/testUpdate"; + + announcer.start(); + announcer.announce(testPath, billy); + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + + // Update with the same payload: nothing should change. + announcer.update(testPath, billy); + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + + // Update with a new payload. + announcer.update(testPath, tilly); + Assertions.assertArrayEquals(tilly, curator.getData().decompressed().forPath(testPath)); + announcer.stop(); + } + + @Test + public void testUpdateNonExistentPath() + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/testUpdate"; + + announcer.start(); + + ISE exception = Assertions.assertThrows(ISE.class, () -> announcer.update(testPath, billy)); + Assertions.assertEquals("Cannot update path[/testUpdate] that hasn't been announced!", exception.getMessage()); + announcer.stop(); + } + + @Test + @Timeout(60_000) + public void testSanity() throws Exception + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath1 = "/test1"; + final String testPath2 = "/somewhere/test2"; + announcer.announce(testPath1, billy); + + Assertions.assertNull(curator.checkExists().forPath(testPath1), "/test1 does not exist before announcer start"); + Assertions.assertNull(curator.checkExists().forPath(testPath2), "/somewhere/test2 does not exist before announcer start"); + + announcer.start(); + while (!announcer.getAddedPaths().contains("/test1")) { + Thread.sleep(100); + } + + try { + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1), "/test1 has data"); + Assertions.assertNull(curator.checkExists().forPath(testPath2), "/somewhere/test2 still does not exist"); + + announcer.announce(testPath2, billy); + + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1), "/test1 still has data"); + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2), "/somewhere/test2 has data"); + + final CountDownLatch latch = new CountDownLatch(1); + curator.getCuratorListenable().addListener((client, event) -> { + if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(testPath1)) { + latch.countDown(); + } + }); + final CuratorOp deleteOp = curator.transactionOp().delete().forPath(testPath1); + final Collection results = curator.transaction().forOperations(deleteOp); + Assertions.assertEquals(1, results.size(), "Expected one result from the delete op"); + final CuratorTransactionResult result = results.iterator().next(); + Assertions.assertEquals(Code.OK.intValue(), result.getError(), "Expected OK code on delete"); + + Assertions.assertTrue(timing.forWaiting().awaitLatch(latch), "Wait for /test1 to be recreated"); + + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1), "Expected /test1 data to be restored"); + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2), "Expected /somewhere/test2 data to remain"); + + announcer.unannounce(testPath1); + Assertions.assertNull(curator.checkExists().forPath(testPath1), "Expected /test1 to be unannounced"); + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2), "Expected /somewhere/test2 to remain"); + } + finally { + announcer.stop(); + } + + Assertions.assertNull(curator.checkExists().forPath(testPath1), "Expected /test1 to remain unannounced"); + Assertions.assertNull(curator.checkExists().forPath(testPath2), "Expected /somewhere/test2 to be unannounced"); + } + + @Test + @Timeout(60_000) + public void testSessionKilled() throws Exception + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + try { + CuratorOp createOp = curator.transactionOp().create().forPath("/somewhere"); + curator.transaction().forOperations(createOp); + announcer.start(); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath1 = "/test1"; + final String testPath2 = "/somewhere/test2"; + final String[] paths = new String[]{testPath1, testPath2}; + announcer.announce(testPath1, billy); + announcer.announce(testPath2, billy); + + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1)); + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2)); + + final CountDownLatch latch = createCountdownLatchForPaths(paths); + KillSession.kill(curator.getZookeeperClient().getZooKeeper(), server.getConnectString()); + + Assertions.assertTrue(timing.forWaiting().awaitLatch(latch), "Await latch after killing session"); + + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1)); + Assertions.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2)); + + announcer.stop(); + + while ((curator.checkExists().forPath(testPath1) != null) || + (curator.checkExists().forPath(testPath2) != null)) { + Thread.sleep(100); + } + + Assertions.assertNull(curator.checkExists().forPath(testPath1)); + Assertions.assertNull(curator.checkExists().forPath(testPath2)); + } + finally { + announcer.stop(); + } + } + + @Test + public void testRemovesParentIfCreated() throws Exception + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/somewhere/test"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + try { + Assertions.assertNull(curator.checkExists().forPath(parent)); + + awaitAnnounce(announcer, testPath, billy, true); + + Assertions.assertNotNull(curator.checkExists().forPath(parent)); + } + finally { + announcer.stop(); + } + + Assertions.assertNull(curator.checkExists().forPath(parent)); + } + + @Test + public void testLeavesBehindParentPathIfAlreadyExists() throws Exception + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + curator.create().forPath(parent); + final Stat initialStat = curator.checkExists().forPath(parent); + + announcer.start(); + try { + Assertions.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + + awaitAnnounce(announcer, testPath, billy, true); + + Assertions.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + } + finally { + announcer.stop(); + } + + Assertions.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + } + + @Test + public void testLeavesParentPathsUntouchedWhenInstructed() throws Exception + { + NodeAnnouncer announcer = new NodeAnnouncer(curator, exec); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + try { + Assertions.assertNull(curator.checkExists().forPath(parent)); + + awaitAnnounce(announcer, testPath, billy, false); + + Assertions.assertNotNull(curator.checkExists().forPath(parent)); + } + finally { + announcer.stop(); + } + + Assertions.assertNotNull(curator.checkExists().forPath(parent)); + } + + private void awaitAnnounce( + final NodeAnnouncer announcer, + final String path, + final byte[] bytes, + boolean removeParentsIfCreated + ) throws InterruptedException + { + final CountDownLatch latch = createCountdownLatchForPaths(path); + announcer.announce(path, bytes, removeParentsIfCreated); + latch.await(); + } + + private CountDownLatch createCountdownLatchForPaths(String... paths) + { + final CountDownLatch latch = new CountDownLatch(paths.length); + curator.getCuratorListenable().addListener((client, event) -> { + if (event.getType() == CuratorEventType.CREATE && Arrays.asList(paths).contains(event.getPath())) { + latch.countDown(); + } + }); + + return latch; + } +} diff --git a/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java b/server/src/test/java/org/apache/druid/curator/announcement/PathChildrenAnnouncerTest.java similarity index 78% rename from server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java rename to server/src/test/java/org/apache/druid/curator/announcement/PathChildrenAnnouncerTest.java index e12d8bc47bb7..095a38f69ce0 100644 --- a/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/curator/announcement/PathChildrenAnnouncerTest.java @@ -19,7 +19,6 @@ package org.apache.druid.curator.announcement; -import com.google.common.collect.Sets; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; @@ -39,16 +38,17 @@ import org.junit.Before; import org.junit.Test; +import java.util.Arrays; import java.util.Collection; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; /** + * */ -public class AnnouncerTest extends CuratorTestBase +public class PathChildrenAnnouncerTest extends CuratorTestBase { - private static final Logger log = new Logger(AnnouncerTest.class); + private static final Logger log = new Logger(PathChildrenAnnouncerTest.class); private ExecutorService exec; @Before @@ -56,6 +56,8 @@ public void setUp() throws Exception { setupServerAndCurator(); exec = Execs.singleThreaded("test-announcer-sanity-%s"); + curator.start(); + curator.blockUntilConnected(); } @After @@ -67,9 +69,7 @@ public void tearDown() @Test(timeout = 60_000L) public void testSanity() throws Exception { - curator.start(); - curator.blockUntilConnected(); - Announcer announcer = new Announcer(curator, exec); + PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec); announcer.initializeAddedChildren(); final byte[] billy = StringUtils.toUtf8("billy"); @@ -98,19 +98,8 @@ public void testSanity() throws Exception curator.getData().decompressed().forPath(testPath2) ); - final CountDownLatch latch = new CountDownLatch(1); - curator.getCuratorListenable().addListener( - new CuratorListener() - { - @Override - public void eventReceived(CuratorFramework client, CuratorEvent event) - { - if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(testPath1)) { - latch.countDown(); - } - } - } - ); + final CountDownLatch latch = createCountdownLatchForPaths(testPath1); + final CuratorOp deleteOp = curator.transactionOp().delete().forPath(testPath1); final Collection results = curator.transaction().forOperations(deleteOp); Assert.assertEquals(1, results.size()); @@ -149,39 +138,24 @@ public void eventReceived(CuratorFramework client, CuratorEvent event) @Test(timeout = 60_000L) public void testSessionKilled() throws Exception { - curator.start(); - curator.blockUntilConnected(); - Announcer announcer = new Announcer(curator, exec); + PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec); try { - curator.inTransaction().create().forPath("/somewhere").and().commit(); + CuratorOp createOp = curator.transactionOp().create().forPath("/somewhere"); + curator.transaction().forOperations(createOp); announcer.start(); final byte[] billy = StringUtils.toUtf8("billy"); final String testPath1 = "/test1"; final String testPath2 = "/somewhere/test2"; - final Set paths = Sets.newHashSet(testPath1, testPath2); + final String[] paths = new String[]{testPath1, testPath2}; announcer.announce(testPath1, billy); announcer.announce(testPath2, billy); Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1)); Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2)); - final CountDownLatch latch = new CountDownLatch(1); - curator.getCuratorListenable().addListener( - new CuratorListener() - { - @Override - public void eventReceived(CuratorFramework client, CuratorEvent event) - { - if (event.getType() == CuratorEventType.CREATE) { - paths.remove(event.getPath()); - if (paths.isEmpty()) { - latch.countDown(); - } - } - } - } - ); + final CountDownLatch latch = createCountdownLatchForPaths(paths); + KillSession.kill(curator.getZookeeperClient().getZooKeeper(), server.getConnectString()); Assert.assertTrue(timing.forWaiting().awaitLatch(latch)); @@ -204,11 +178,9 @@ public void eventReceived(CuratorFramework client, CuratorEvent event) } @Test - public void testCleansUpItsLittleTurdlings() throws Exception + public void testRemovesParentIfCreated() throws Exception { - curator.start(); - curator.blockUntilConnected(); - Announcer announcer = new Announcer(curator, exec); + PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec); final byte[] billy = StringUtils.toUtf8("billy"); final String testPath = "/somewhere/test2"; @@ -230,14 +202,12 @@ public void testCleansUpItsLittleTurdlings() throws Exception } @Test - public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception + public void testLeavesBehindParentPathIfAlreadyExists() throws Exception { - curator.start(); - curator.blockUntilConnected(); - Announcer announcer = new Announcer(curator, exec); + PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec); final byte[] billy = StringUtils.toUtf8("billy"); - final String testPath = "/somewhere/test2"; + final String testPath = "/somewhere/test"; final String parent = ZKPaths.getPathAndNode(testPath).getPath(); curator.create().forPath(parent); @@ -259,14 +229,12 @@ public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception } @Test - public void testLeavesBehindTurdlingsWhenToldTo() throws Exception + public void testLeavesParentPathsUntouchedWhenInstructed() throws Exception { - curator.start(); - curator.blockUntilConnected(); - Announcer announcer = new Announcer(curator, exec); + PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec); final byte[] billy = StringUtils.toUtf8("billy"); - final String testPath = "/somewhere/test2"; + final String testPath = "/somewhere/test"; final String parent = ZKPaths.getPathAndNode(testPath).getPath(); announcer.start(); @@ -285,26 +253,33 @@ public void testLeavesBehindTurdlingsWhenToldTo() throws Exception } private void awaitAnnounce( - final Announcer announcer, + final PathChildrenAnnouncer announcer, final String path, final byte[] bytes, boolean removeParentsIfCreated ) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch = createCountdownLatchForPaths(path); + announcer.announce(path, bytes, removeParentsIfCreated); + latch.await(); + } + + private CountDownLatch createCountdownLatchForPaths(String... path) + { + final CountDownLatch latch = new CountDownLatch(path.length); curator.getCuratorListenable().addListener( new CuratorListener() { @Override public void eventReceived(CuratorFramework client, CuratorEvent event) { - if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(path)) { + if (event.getType() == CuratorEventType.CREATE && Arrays.asList(path).contains(event.getPath())) { latch.countDown(); } } } ); - announcer.announce(path, bytes, removeParentsIfCreated); - latch.await(); + + return latch; } } diff --git a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java index 93466876e6ed..7d9368a8e1e5 100644 --- a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java +++ b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.curator.CuratorTestBase; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.NodeRole; @@ -70,10 +70,7 @@ public void testAnnouncementAndDiscovery() throws Exception curator.start(); curator.blockUntilConnected(); - Announcer announcer = new Announcer( - curator, - Execs.directExecutor() - ); + NodeAnnouncer announcer = new NodeAnnouncer(curator, Execs.directExecutor()); announcer.start(); CuratorDruidNodeAnnouncer druidNodeAnnouncer = new CuratorDruidNodeAnnouncer( diff --git a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index 98644cdc5655..c3e04f04d6d2 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -31,7 +31,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.segment.TestHelper; @@ -284,7 +284,10 @@ public void testSkipDimensions() throws Exception List zNodes = cf.getChildren().forPath(TEST_SEGMENTS_PATH); for (String zNode : zNodes) { - DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, zNode))); + DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(JOINER.join( + TEST_SEGMENTS_PATH, + zNode + ))); Assert.assertEquals(announcedSegment, firstSegment); Assert.assertTrue(announcedSegment.getDimensions().isEmpty()); Assert.assertTrue(announcedSegment.getMetrics().isEmpty()); @@ -307,7 +310,10 @@ public void testSkipLoadSpec() throws Exception List zNodes = cf.getChildren().forPath(TEST_SEGMENTS_PATH); for (String zNode : zNodes) { - DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, zNode))); + DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(JOINER.join( + TEST_SEGMENTS_PATH, + zNode + ))); Assert.assertEquals(announcedSegment, firstSegment); Assert.assertNull(announcedSegment.getLoadSpec()); } @@ -402,7 +408,8 @@ public void testSchemaAnnounce() throws Exception segmentAnnouncer.announceSegmentSchemas( taskId, new SegmentSchemas(Collections.singletonList(absoluteSchema1)), - new SegmentSchemas(Collections.singletonList(absoluteSchema1))); + new SegmentSchemas(Collections.singletonList(absoluteSchema1)) + ); ChangeRequestsSnapshot snapshot; @@ -618,7 +625,7 @@ public Set read(String path) } } - private static class TestAnnouncer extends Announcer + private static class TestAnnouncer extends NodeAnnouncer { private final ConcurrentHashMap> numPathAnnounced = new ConcurrentHashMap<>(); @@ -630,7 +637,9 @@ private TestAnnouncer(CuratorFramework curator, ExecutorService exec) @Override public void announce(String path, byte[] bytes, boolean removeParentIfCreated) { - numPathAnnounced.computeIfAbsent(path, k -> new ConcurrentHashMap<>()).computeIfAbsent(bytes, k -> new AtomicInteger(0)).incrementAndGet(); + numPathAnnounced.computeIfAbsent(path, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(bytes, k -> new AtomicInteger(0)) + .incrementAndGet(); super.announce(path, bytes, removeParentIfCreated); } } diff --git a/website/.spelling b/website/.spelling index 8d9be4dfd7ee..c484813d8da7 100644 --- a/website/.spelling +++ b/website/.spelling @@ -196,6 +196,7 @@ Murmur3 MVCC MV_TO_ARRAY NFS +NodeCache OCF OIDC OLAP @@ -207,6 +208,7 @@ OutputStream ParAccel ParseSpec ParseSpecs +PathChildrenCache Protobuf protobuf pull-deps