From 81e0fadc82c7fea05f048973111a7abb8cc98906 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 24 Oct 2017 19:35:36 +0800 Subject: [PATCH] Issue 225: Create log should create missing path components Descriptions of the changes in this PR: reuse the methods used by `rename` to create missing path components. (the test is covered by #227) Author: Sijie Guo Reviewers: Jia Zhai This closes #228 from sijie/fix_create_log_pr --- .../metadata/ZKLogStreamMetadataStore.java | 109 ++++++++++++------ 1 file changed, 76 insertions(+), 33 deletions(-) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java index b3250fa2e..c046fc620 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java @@ -35,7 +35,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import org.apache.bookkeeper.common.concurrent.FutureEventListener; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.versioning.LongVersion; import org.apache.bookkeeper.versioning.Versioned; @@ -363,6 +363,7 @@ static void ensureMetadataExist(Versioned metadata) { } static void createMissingMetadata(final ZooKeeper zk, + final String basePath, final String logRootPath, final List> metadatas, final List acl, @@ -374,10 +375,10 @@ static void createMissingMetadata(final ZooKeeper zk, CreateMode createMode = CreateMode.PERSISTENT; // log root parent path + String logRootParentPath = Utils.getParent(logRootPath); if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) { pathsToCreate.add(null); } else { - String logRootParentPath = Utils.getParent(logRootPath); pathsToCreate.add(EMPTY_BYTES); zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode)); } @@ -425,7 +426,7 @@ static void createMissingMetadata(final ZooKeeper zk, pathsToCreate.add(null); } else { byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber( - DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO); + DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO); pathsToCreate.add(logSegmentsData); zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode)); } @@ -436,7 +437,7 @@ static void createMissingMetadata(final ZooKeeper zk, } else { pathsToCreate.add(EMPTY_BYTES); zkOps.add(Op.create(logRootPath + ALLOCATION_PATH, - EMPTY_BYTES, acl, createMode)); + EMPTY_BYTES, acl, createMode)); } } if (zkOps.isEmpty()) { @@ -449,6 +450,41 @@ static void createMissingMetadata(final ZooKeeper zk, return; } + getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath)) + .whenComplete(new FutureEventListener>() { + @Override + public void onSuccess(List paths) { + for (String path : paths) { + pathsToCreate.add(EMPTY_BYTES); + zkOps.add( + 0, Op.create(path, EMPTY_BYTES, acl, createMode)); + } + executeCreateMissingPathTxn( + zk, + zkOps, + pathsToCreate, + metadatas, + logRootPath, + promise + ); + } + + @Override + public void onFailure(Throwable cause) { + promise.completeExceptionally(cause); + return; + } + }); + + } + + private static void executeCreateMissingPathTxn(ZooKeeper zk, + List zkOps, + List pathsToCreate, + List> metadatas, + String logRootPath, + CompletableFuture>> promise) { + zk.multi(zkOps, new AsyncCallback.MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List resultList) { @@ -549,29 +585,30 @@ static CompletableFuture getLog(final URI uri, try { final ZooKeeper zk = zooKeeperClient.get(); return checkLogMetadataPaths(zk, logRootPath, ownAllocator) - .thenCompose(new Function>, CompletableFuture>>>() { - @Override - public CompletableFuture>> apply(List> metadatas) { - CompletableFuture>> promise = - new CompletableFuture>>(); - createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(), - ownAllocator, createIfNotExists, promise); - return promise; - } - }).thenCompose(new Function>, CompletableFuture>() { - @Override - public CompletableFuture apply(List> metadatas) { - try { - return FutureUtils.value( - processLogMetadatas( - uri, - logName, - logIdentifier, - metadatas, - ownAllocator)); - } catch (UnexpectedException e) { - return FutureUtils.exception(e); - } + .thenCompose(metadatas -> { + CompletableFuture>> promise = + new CompletableFuture>>(); + createMissingMetadata( + zk, + uri.getPath(), + logRootPath, + metadatas, + zooKeeperClient.getDefaultACL(), + ownAllocator, + createIfNotExists, + promise); + return promise; + }).thenCompose(metadatas -> { + try { + return FutureUtils.value( + processLogMetadatas( + uri, + logName, + logIdentifier, + metadatas, + ownAllocator)); + } catch (UnexpectedException e) { + return FutureUtils.exception(e); } }); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { @@ -749,16 +786,22 @@ private CompletableFuture renameLogMetadata(URI uri, @VisibleForTesting static CompletableFuture> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) { + ZooKeeper zk; + try { + zk = zkc.get(); + } catch (ZooKeeperConnectionException | InterruptedException e) { + return FutureUtils.exception(e); + } String basePath = uri.getPath(); String logStreamPath = LogMetadata.getLogStreamPath(uri, logName); - LinkedList missingPaths = Lists.newLinkedList(); + return getMissingPaths(zk, basePath, logStreamPath); + } + @VisibleForTesting + static CompletableFuture> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) { + LinkedList missingPaths = Lists.newLinkedList(); CompletableFuture> future = FutureUtils.createFuture(); - try { - existPath(zkc.get(), logStreamPath, basePath, missingPaths, future); - } catch (ZooKeeperConnectionException | InterruptedException e) { - future.completeExceptionally(e); - } + existPath(zk, logStreamPath, basePath, missingPaths, future); return future; }