Skip to content
This repository has been archived by the owner on Oct 16, 2024. It is now read-only.

Commit

Permalink
Issue 225: Create log should create missing path components
Browse files Browse the repository at this point in the history
Descriptions of the changes in this PR:

reuse the methods used by `rename` to create missing path components.

(the test is covered by #227)

Author: Sijie Guo <[email protected]>

Reviewers: Jia Zhai <None>

This closes #228 from sijie/fix_create_log_pr
  • Loading branch information
sijie authored and jiazhai committed Oct 24, 2017
1 parent 715b5ec commit 81e0fad
Showing 1 changed file with 76 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
Expand Down Expand Up @@ -363,6 +363,7 @@ static void ensureMetadataExist(Versioned<byte[]> metadata) {
}

static void createMissingMetadata(final ZooKeeper zk,
final String basePath,
final String logRootPath,
final List<Versioned<byte[]>> metadatas,
final List<ACL> acl,
Expand All @@ -374,10 +375,10 @@ static void createMissingMetadata(final ZooKeeper zk,
CreateMode createMode = CreateMode.PERSISTENT;

// log root parent path
String logRootParentPath = Utils.getParent(logRootPath);
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
pathsToCreate.add(null);
} else {
String logRootParentPath = Utils.getParent(logRootPath);
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
}
Expand Down Expand Up @@ -425,7 +426,7 @@ static void createMissingMetadata(final ZooKeeper zk,
pathsToCreate.add(null);
} else {
byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
pathsToCreate.add(logSegmentsData);
zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
}
Expand All @@ -436,7 +437,7 @@ static void createMissingMetadata(final ZooKeeper zk,
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
EMPTY_BYTES, acl, createMode));
EMPTY_BYTES, acl, createMode));
}
}
if (zkOps.isEmpty()) {
Expand All @@ -449,6 +450,41 @@ static void createMissingMetadata(final ZooKeeper zk,
return;
}

getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath))
.whenComplete(new FutureEventListener<List<String>>() {
@Override
public void onSuccess(List<String> paths) {
for (String path : paths) {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(
0, Op.create(path, EMPTY_BYTES, acl, createMode));
}
executeCreateMissingPathTxn(
zk,
zkOps,
pathsToCreate,
metadatas,
logRootPath,
promise
);
}

@Override
public void onFailure(Throwable cause) {
promise.completeExceptionally(cause);
return;
}
});

}

private static void executeCreateMissingPathTxn(ZooKeeper zk,
List<Op> zkOps,
List<byte[]> pathsToCreate,
List<Versioned<byte[]>> metadatas,
String logRootPath,
CompletableFuture<List<Versioned<byte[]>>> promise) {

zk.multi(zkOps, new AsyncCallback.MultiCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
Expand Down Expand Up @@ -549,29 +585,30 @@ static CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
try {
final ZooKeeper zk = zooKeeperClient.get();
return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
.thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<List<Versioned<byte[]>>>>() {
@Override
public CompletableFuture<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
CompletableFuture<List<Versioned<byte[]>>> promise =
new CompletableFuture<List<Versioned<byte[]>>>();
createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
ownAllocator, createIfNotExists, promise);
return promise;
}
}).thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<LogMetadataForWriter>>() {
@Override
public CompletableFuture<LogMetadataForWriter> apply(List<Versioned<byte[]>> metadatas) {
try {
return FutureUtils.value(
processLogMetadatas(
uri,
logName,
logIdentifier,
metadatas,
ownAllocator));
} catch (UnexpectedException e) {
return FutureUtils.exception(e);
}
.thenCompose(metadatas -> {
CompletableFuture<List<Versioned<byte[]>>> promise =
new CompletableFuture<List<Versioned<byte[]>>>();
createMissingMetadata(
zk,
uri.getPath(),
logRootPath,
metadatas,
zooKeeperClient.getDefaultACL(),
ownAllocator,
createIfNotExists,
promise);
return promise;
}).thenCompose(metadatas -> {
try {
return FutureUtils.value(
processLogMetadatas(
uri,
logName,
logIdentifier,
metadatas,
ownAllocator));
} catch (UnexpectedException e) {
return FutureUtils.exception(e);
}
});
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
Expand Down Expand Up @@ -749,16 +786,22 @@ private CompletableFuture<Void> renameLogMetadata(URI uri,

@VisibleForTesting
static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (ZooKeeperConnectionException | InterruptedException e) {
return FutureUtils.exception(e);
}
String basePath = uri.getPath();
String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
LinkedList<String> missingPaths = Lists.newLinkedList();
return getMissingPaths(zk, basePath, logStreamPath);
}

@VisibleForTesting
static CompletableFuture<List<String>> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) {
LinkedList<String> missingPaths = Lists.newLinkedList();
CompletableFuture<List<String>> future = FutureUtils.createFuture();
try {
existPath(zkc.get(), logStreamPath, basePath, missingPaths, future);
} catch (ZooKeeperConnectionException | InterruptedException e) {
future.completeExceptionally(e);
}
existPath(zk, logStreamPath, basePath, missingPaths, future);
return future;
}

Expand Down

0 comments on commit 81e0fad

Please sign in to comment.