From 15edb593778529f83d94eaea644d7683d960fe12 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 24 Oct 2017 02:34:47 -0700 Subject: [PATCH 1/6] Issue 226: ByteBuf.release() was not called before it's garbage-collected --- .../distributedlog/BKLogSegmentWriter.java | 59 +++++++++++++++++-- 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java index 1d65d077b..8584780c6 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java @@ -18,6 +18,7 @@ package org.apache.distributedlog; import static com.google.common.base.Charsets.UTF_8; +import static org.apache.distributedlog.DistributedLogConstants.INVALID_TXID; import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE; import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; @@ -47,6 +48,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.MathUtils; +import org.apache.distributedlog.Entry.Writer; import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.common.stats.OpStatsListener; @@ -71,11 +73,8 @@ import org.apache.distributedlog.logsegment.LogSegmentEntryWriter; import org.apache.distributedlog.logsegment.LogSegmentWriter; import org.apache.distributedlog.util.FailpointUtils; - - import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.SimplePermitLimiter; - import org.apache.distributedlog.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +103,54 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Sizable { static final Logger LOG = LoggerFactory.getLogger(BKLogSegmentWriter.class); + final Writer REJECT_WRITES_WRITER = new Writer() { + @Override + public void writeRecord(LogRecord record, CompletableFuture transmitPromise) + throws LogRecordTooLongException, WriteException { + throw new WriteException(getFullyQualifiedLogSegment(), "Write record is cancelled."); + } + + @Override + public boolean hasUserRecords() { + return false; + } + + @Override + public int getNumRecords() { + return 0; + } + + @Override + public int getNumBytes() { + return 0; + } + + @Override + public long getMaxTxId() { + return INVALID_TXID; + } + + @Override + public ByteBuf getBuffer() throws InvalidEnvelopedEntryException, IOException { + throw new IOException("GetBuffer is not supported."); + } + + @Override + public DLSN finalizeTransmit(long lssn, long entryId) { + return new DLSN(lssn, entryId, -1L); + } + + @Override + public void completeTransmit(long lssn, long entryId) { + // no-op + } + + @Override + public void abortTransmit(Throwable reason) { + // no-op + } + }; + private final String fullyQualifiedLogSegment; private final String streamName; private final int logSegmentMetadataVersion; @@ -120,8 +167,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private final boolean isDurableWriteEnabled; private DLSN lastDLSN = DLSN.InvalidDLSN; private final long startTxId; - private long lastTxId = DistributedLogConstants.INVALID_TXID; - private long lastTxIdAcknowledged = DistributedLogConstants.INVALID_TXID; + private long lastTxId = INVALID_TXID; + private long lastTxIdAcknowledged = INVALID_TXID; private long outstandingBytes = 0; private long numFlushesSinceRestart = 0; private long numBytes = 0; @@ -555,7 +602,7 @@ private void abortTransmitPacketOnClose(final boolean abort, synchronized (this) { packetPreviousSaved = packetPrevious; packetCurrentSaved = new BKTransmitPacket(recordSetWriter); - recordSetWriter = newRecordSetWriter(); + recordSetWriter = REJECT_WRITES_WRITER; } // Once the last packet been transmitted, apply any remaining promises asynchronously From 0899ecf82d6109a391124045c11b5dcf13b94fe6 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 24 Oct 2017 02:35:50 -0700 Subject: [PATCH 2/6] Issue 224: listing logs should exclude --- .../src/main/java/org/apache/distributedlog/util/DLUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/DLUtils.java index 833c9f31b..a0083b244 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/DLUtils.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/DLUtils.java @@ -279,7 +279,7 @@ public static String normalizeClientId(String clientId) { * @return true if it is reserved name, otherwise false. */ public static boolean isReservedStreamName(String name) { - return name.startsWith("."); + return name.startsWith(".") || name.startsWith("<"); } /** From 99b28cc5a71c4ec9b6de48a88049c070e1c3e5f9 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 24 Oct 2017 02:36:41 -0700 Subject: [PATCH 3/6] Issue 225: Create log should create missing path components --- .../metadata/ZKLogStreamMetadataStore.java | 109 ++++++++++++------ 1 file changed, 76 insertions(+), 33 deletions(-) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java index b3250fa2e..c046fc620 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java @@ -35,7 +35,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import org.apache.bookkeeper.common.concurrent.FutureEventListener; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.versioning.LongVersion; import org.apache.bookkeeper.versioning.Versioned; @@ -363,6 +363,7 @@ static void ensureMetadataExist(Versioned metadata) { } static void createMissingMetadata(final ZooKeeper zk, + final String basePath, final String logRootPath, final List> metadatas, final List acl, @@ -374,10 +375,10 @@ static void createMissingMetadata(final ZooKeeper zk, CreateMode createMode = CreateMode.PERSISTENT; // log root parent path + String logRootParentPath = Utils.getParent(logRootPath); if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) { pathsToCreate.add(null); } else { - String logRootParentPath = Utils.getParent(logRootPath); pathsToCreate.add(EMPTY_BYTES); zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode)); } @@ -425,7 +426,7 @@ static void createMissingMetadata(final ZooKeeper zk, pathsToCreate.add(null); } else { byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber( - DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO); + DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO); pathsToCreate.add(logSegmentsData); zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode)); } @@ -436,7 +437,7 @@ static void createMissingMetadata(final ZooKeeper zk, } else { pathsToCreate.add(EMPTY_BYTES); zkOps.add(Op.create(logRootPath + ALLOCATION_PATH, - EMPTY_BYTES, acl, createMode)); + EMPTY_BYTES, acl, createMode)); } } if (zkOps.isEmpty()) { @@ -449,6 +450,41 @@ static void createMissingMetadata(final ZooKeeper zk, return; } + getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath)) + .whenComplete(new FutureEventListener>() { + @Override + public void onSuccess(List paths) { + for (String path : paths) { + pathsToCreate.add(EMPTY_BYTES); + zkOps.add( + 0, Op.create(path, EMPTY_BYTES, acl, createMode)); + } + executeCreateMissingPathTxn( + zk, + zkOps, + pathsToCreate, + metadatas, + logRootPath, + promise + ); + } + + @Override + public void onFailure(Throwable cause) { + promise.completeExceptionally(cause); + return; + } + }); + + } + + private static void executeCreateMissingPathTxn(ZooKeeper zk, + List zkOps, + List pathsToCreate, + List> metadatas, + String logRootPath, + CompletableFuture>> promise) { + zk.multi(zkOps, new AsyncCallback.MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List resultList) { @@ -549,29 +585,30 @@ static CompletableFuture getLog(final URI uri, try { final ZooKeeper zk = zooKeeperClient.get(); return checkLogMetadataPaths(zk, logRootPath, ownAllocator) - .thenCompose(new Function>, CompletableFuture>>>() { - @Override - public CompletableFuture>> apply(List> metadatas) { - CompletableFuture>> promise = - new CompletableFuture>>(); - createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(), - ownAllocator, createIfNotExists, promise); - return promise; - } - }).thenCompose(new Function>, CompletableFuture>() { - @Override - public CompletableFuture apply(List> metadatas) { - try { - return FutureUtils.value( - processLogMetadatas( - uri, - logName, - logIdentifier, - metadatas, - ownAllocator)); - } catch (UnexpectedException e) { - return FutureUtils.exception(e); - } + .thenCompose(metadatas -> { + CompletableFuture>> promise = + new CompletableFuture>>(); + createMissingMetadata( + zk, + uri.getPath(), + logRootPath, + metadatas, + zooKeeperClient.getDefaultACL(), + ownAllocator, + createIfNotExists, + promise); + return promise; + }).thenCompose(metadatas -> { + try { + return FutureUtils.value( + processLogMetadatas( + uri, + logName, + logIdentifier, + metadatas, + ownAllocator)); + } catch (UnexpectedException e) { + return FutureUtils.exception(e); } }); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { @@ -749,16 +786,22 @@ private CompletableFuture renameLogMetadata(URI uri, @VisibleForTesting static CompletableFuture> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) { + ZooKeeper zk; + try { + zk = zkc.get(); + } catch (ZooKeeperConnectionException | InterruptedException e) { + return FutureUtils.exception(e); + } String basePath = uri.getPath(); String logStreamPath = LogMetadata.getLogStreamPath(uri, logName); - LinkedList missingPaths = Lists.newLinkedList(); + return getMissingPaths(zk, basePath, logStreamPath); + } + @VisibleForTesting + static CompletableFuture> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) { + LinkedList missingPaths = Lists.newLinkedList(); CompletableFuture> future = FutureUtils.createFuture(); - try { - existPath(zkc.get(), logStreamPath, basePath, missingPaths, future); - } catch (ZooKeeperConnectionException | InterruptedException e) { - future.completeExceptionally(e); - } + existPath(zk, logStreamPath, basePath, missingPaths, future); return future; } From 51d84aa60e15cad0cbf25876c244525428910617 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Sun, 22 Oct 2017 23:42:11 -0700 Subject: [PATCH 4/6] DLFS Implementation --- .../DistributedLogConstants.java | 2 +- distributedlog-io/dlfs/pom.xml | 117 +++++++ .../distributedlog/fs/DLFileSystem.java | 330 ++++++++++++++++++ .../distributedlog/fs/DLInputStream.java | 244 +++++++++++++ .../distributedlog/fs/DLOutputStream.java | 132 +++++++ .../distributedlog/fs/package-info.java | 22 ++ .../distributedlog/fs/TestDLFSBase.java | 59 ++++ .../distributedlog/fs/TestDLFileSystem.java | 229 ++++++++++++ .../dlfs/src/test/resources/dlfs.conf | 27 ++ distributedlog-io/pom.xml | 38 ++ pom.xml | 1 + 11 files changed, 1200 insertions(+), 1 deletion(-) create mode 100644 distributedlog-io/dlfs/pom.xml create mode 100644 distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java create mode 100644 distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java create mode 100644 distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java create mode 100644 distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java create mode 100644 distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java create mode 100644 distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java create mode 100644 distributedlog-io/dlfs/src/test/resources/dlfs.conf create mode 100644 distributedlog-io/pom.xml diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java index d8bdc3da8..e91b22fee 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java @@ -59,7 +59,7 @@ public class DistributedLogConstants { public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress"; public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs"; public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement"; - static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); + public static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8); // An ACL that gives all permissions to node creators and read permissions only to everyone else. diff --git a/distributedlog-io/dlfs/pom.xml b/distributedlog-io/dlfs/pom.xml new file mode 100644 index 000000000..775f07be1 --- /dev/null +++ b/distributedlog-io/dlfs/pom.xml @@ -0,0 +1,117 @@ + + + + 4.0.0 + + distributedlog + org.apache.distributedlog + 0.6.0-SNAPSHOT + ../.. + + org.apache.distributedlog + dlfs + Apache DistributedLog :: IO :: FileSystem + http://maven.apache.org + + UTF-8 + ${basedir}/lib + + + + org.projectlombok + lombok + ${lombok.version} + provided + + + org.apache.distributedlog + distributedlog-core + ${project.parent.version} + + + org.apache.hadoop + hadoop-common + 2.7.2 + + + com.google.protobuf + protobuf-java + + + com.google.guava + guava + + + + + junit + junit + ${junit.version} + test + + + org.apache.distributedlog + distributedlog-core + ${project.parent.version} + tests + test + + + + + + org.codehaus.mojo + findbugs-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + ${maven-checkstyle-plugin.version} + + + com.puppycrawl.tools + checkstyle + ${puppycrawl.checkstyle.version} + + + org.apache.distributedlog + distributedlog-build-tools + ${project.version} + + + + distributedlog/checkstyle.xml + distributedlog/suppressions.xml + true + true + false + true + + + + test-compile + + check + + + + + + + diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java new file mode 100644 index 000000000..cc18aa1ff --- /dev/null +++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.fs; + +import com.google.common.collect.Lists; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URI; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogConstants; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.exceptions.LogEmptyException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.util.Utils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BufferedFSInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +/** + * A FileSystem Implementation powered by replicated logs. + */ +@Slf4j +public class DLFileSystem extends FileSystem { + + // + // Settings + // + + public static final String DLFS_CONF_FILE = "dlog.configuration.file"; + + + private URI rootUri; + private Namespace namespace; + private final DistributedLogConfiguration dlConf; + private Path workingDir; + + public DLFileSystem() { + this.dlConf = new DistributedLogConfiguration(); + setWorkingDirectory(new Path(System.getProperty("user.dir", ""))); + } + + @Override + public URI getUri() { + return rootUri; + } + + // + // Initialization + // + + @Override + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + setConf(conf); + + // initialize + + this.rootUri = name; + // load the configuration + String dlConfLocation = conf.get(DLFS_CONF_FILE); + if (null != dlConfLocation) { + try { + this.dlConf.loadConf(new File(dlConfLocation).toURI().toURL()); + log.info("Loaded the distributedlog configuration from {}", dlConfLocation); + } catch (ConfigurationException e) { + log.error("Failed to load the distributedlog configuration from " + dlConfLocation, e); + throw new IOException("Failed to load distributedlog configuration from " + dlConfLocation); + } + } + log.info("Initializing the filesystem at {}", name); + // initialize the namespace + this.namespace = NamespaceBuilder.newBuilder() + .clientId("dlfs-client-" + InetAddress.getLocalHost().getHostName()) + .conf(dlConf) + .regionId(DistributedLogConstants.LOCAL_REGION_ID) + .uri(name) + .build(); + log.info("Initialized the filesystem at {}", name); + } + + @Override + public void close() throws IOException { + // clean up the resource + namespace.close(); + super.close(); + } + + // + // Util Functions + // + + private Path makeAbsolute(Path f) { + if (f.isAbsolute()) { + return f; + } else { + return new Path(workingDir, f); + } + } + + private String getStreamName(Path relativePath) { + return makeAbsolute(relativePath).toUri().getPath().substring(1); + } + + // + // Home & Working Directory + // + + @Override + public Path getHomeDirectory() { + return this.makeQualified(new Path(System.getProperty("user.home", ""))); + } + + protected Path getInitialWorkingDirectory() { + return this.makeQualified(new Path(System.getProperty("user.dir", ""))); + } + + @Override + public void setWorkingDirectory(Path path) { + workingDir = makeAbsolute(path); + checkPath(workingDir); + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + + @Override + public FSDataInputStream open(Path path, int bufferSize) + throws IOException { + try { + DistributedLogManager dlm = namespace.openLog(getStreamName(path)); + LogReader reader; + try { + reader = dlm.openLogReader(DLSN.InitialDLSN); + } catch (LogNotFoundException lnfe) { + throw new FileNotFoundException(path.toString()); + } catch (LogEmptyException lee) { + throw new FileNotFoundException(path.toString()); + } + return new FSDataInputStream( + new BufferedFSInputStream( + new DLInputStream(dlm, reader, 0L), + bufferSize)); + } catch (LogNotFoundException e) { + throw new FileNotFoundException(path.toString()); + } + } + + @Override + public FSDataOutputStream create(Path path, + FsPermission fsPermission, + boolean overwrite, + int bufferSize, + short replication, + long blockSize, + Progressable progressable) throws IOException { + // for overwrite, delete the existing file first. + if (overwrite) { + delete(path, false); + } + + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(dlConf); + confLocal.setEnsembleSize(replication); + confLocal.setWriteQuorumSize(replication); + confLocal.setAckQuorumSize(replication); + confLocal.setMaxLogSegmentBytes(blockSize); + return append(path, bufferSize, Optional.of(confLocal)); + } + + @Override + public FSDataOutputStream append(Path path, + int bufferSize, + Progressable progressable) throws IOException { + return append(path, bufferSize, Optional.empty()); + } + + private FSDataOutputStream append(Path path, + int bufferSize, + Optional confLocal) + throws IOException { + try { + DistributedLogManager dlm = namespace.openLog( + getStreamName(path), + confLocal, + Optional.empty(), + Optional.empty()); + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); + return new FSDataOutputStream( + new BufferedOutputStream( + new DLOutputStream(dlm, writer), bufferSize + ), + statistics, + writer.getLastTxId() < 0L ? 0L : writer.getLastTxId()); + } catch (LogNotFoundException le) { + throw new FileNotFoundException(path.toString()); + } + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + try { + String logName = getStreamName(path); + if (recursive) { + Iterator logs = namespace.getLogs(logName); + while (logs.hasNext()) { + String child = logs.next(); + Path childPath = new Path(path, child); + delete(childPath, recursive); + } + } + namespace.deleteLog(logName); + return true; + } catch (LogNotFoundException e) { + return true; + } + } + + // + // Not Supported + // + + @Override + public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { + String logName = getStreamName(path); + try { + Iterator logs = namespace.getLogs(logName); + List statusList = Lists.newArrayList(); + while (logs.hasNext()) { + String child = logs.next(); + Path childPath = new Path(path, child); + statusList.add(getFileStatus(childPath)); + } + Collections.sort(statusList, Comparator.comparing(FileStatus::getPath)); + return statusList.toArray(new FileStatus[statusList.size()]); + } catch (LogNotFoundException e) { + throw new FileNotFoundException(path.toString()); + } + } + + + @Override + public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { + String streamName = getStreamName(path); + + // Create a dummy stream to make the path exists. + namespace.createLog(streamName); + return true; + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + String logName = getStreamName(path); + boolean exists = namespace.logExists(logName); + if (!exists) { + throw new FileNotFoundException(path.toString()); + } + + long endPos; + try { + DistributedLogManager dlm = namespace.openLog(logName); + endPos = dlm.getLastTxId(); + } catch (LogNotFoundException e) { + throw new FileNotFoundException(path.toString()); + } catch (LogEmptyException e) { + endPos = 0L; + } + + // we need to store more metadata information on logs for supporting filesystem-like use cases + return new FileStatus( + endPos, + false, + 3, + dlConf.getMaxLogSegmentBytes(), + 0L, + makeAbsolute(path)); + } + + + @Override + public boolean rename(Path src, Path dst) throws IOException { + String srcLog = getStreamName(src); + String dstLog = getStreamName(dst); + namespace.renameLog(srcLog, dstLog); + return true; + } + + @Override + public boolean truncate(Path f, long newLength) throws IOException { + throw new UnsupportedOperationException("Truncate is not supported yet"); + } +} diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java new file mode 100644 index 000000000..c5a810fff --- /dev/null +++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.fs; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.io.InputStream; +import lombok.extern.slf4j.Slf4j; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; +import org.apache.hadoop.fs.FSInputStream; + +/** + * The input stream for a distributedlog stream. + */ +@Slf4j +class DLInputStream extends FSInputStream { + + private static final long REOPEN_READER_SKIP_BYTES = 4 * 1024 * 1024; // 4MB + + private static class RecordStream { + + private final InputStream payloadStream; + private final LogRecordWithDLSN record; + + RecordStream(LogRecordWithDLSN record) { + checkNotNull(record); + + this.record = record; + this.payloadStream = record.getPayLoadInputStream(); + } + + } + + private static RecordStream nextRecordStream(LogReader reader) throws IOException { + LogRecordWithDLSN record = reader.readNext(false); + if (null != record) { + return new RecordStream(record); + } + return null; + } + + private final DistributedLogManager dlm; + private LogReader reader; + private long pos; + private long lastPos; + private RecordStream currentRecord = null; + + DLInputStream(DistributedLogManager dlm, + LogReader reader, + long startPos) + throws IOException { + this.dlm = dlm; + this.reader = reader; + this.pos = startPos; + this.lastPos = readEndPos(); + seek(startPos); + } + + @Override + public void close() throws IOException { + reader.close(); + dlm.close(); + } + + private long readEndPos() throws IOException { + return dlm.getLastTxId(); + } + + // + // FSInputStream + // + + @Override + public void seek(long pos) throws IOException { + if (this.pos == pos) { + return; + } + + if (this.pos > pos || (pos - this.pos) >= REOPEN_READER_SKIP_BYTES) { + // close the previous reader + this.reader.close(); + this.reader = dlm.openLogReader(pos); + this.currentRecord = null; + } + + skipTo(pos); + } + + private boolean skipTo(final long position) throws IOException { + while (true) { + if (null == currentRecord) { + currentRecord = nextRecordStream(reader); + } + + if (null == currentRecord) { // the stream is empty now + return false; + } + + long endPos = currentRecord.record.getTransactionId(); + if (endPos < position) { + currentRecord = nextRecordStream(reader); + this.pos = endPos; + continue; + } else if (endPos == position){ + // find the record, but we defer read next record when actual read happens + this.pos = position; + this.currentRecord = null; + return true; + } else { + this.currentRecord.payloadStream.skip( + this.currentRecord.payloadStream.available() - (endPos - position)); + this.pos = position; + return true; + } + } + } + + @Override + public long getPos() throws IOException { + return this.pos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + // + // Input Stream + // + + @Override + public int read(byte[] b, final int off, final int len) throws IOException { + int remaining = len; + int numBytesRead = 0; + while (remaining > 0) { + if (null == currentRecord) { + currentRecord = nextRecordStream(reader); + } + + if (null == currentRecord) { + if (numBytesRead == 0) { + return -1; + } + break; + } + + int bytesLeft = currentRecord.payloadStream.available(); + if (bytesLeft <= 0) { + currentRecord.payloadStream.close(); + currentRecord = null; + continue; + } + + int numBytesToRead = Math.min(bytesLeft, remaining); + int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead); + if (numBytes < 0) { + continue; + } + numBytesRead += numBytes; + remaining -= numBytes; + } + return numBytesRead; + } + + @Override + public long skip(final long n) throws IOException { + if (n <= 0L) { + return 0L; + } + + long remaining = n; + while (true) { + if (null == currentRecord) { + currentRecord = nextRecordStream(reader); + } + + if (null == currentRecord) { // end of stream + return n - remaining; + } + + int bytesLeft = currentRecord.payloadStream.available(); + long endPos = currentRecord.record.getTransactionId(); + if (remaining > bytesLeft) { + // skip the whole record + remaining -= bytesLeft; + this.pos = endPos; + this.currentRecord = nextRecordStream(reader); + continue; + } else if (remaining == bytesLeft) { + this.pos = endPos; + this.currentRecord = null; + return n; + } else { + currentRecord.payloadStream.skip(remaining); + this.pos = endPos - currentRecord.payloadStream.available(); + return n; + } + } + } + + @Override + public int available() throws IOException { + if (lastPos - pos == 0L) { + lastPos = readEndPos(); + } + return (int) (lastPos - pos); + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public int read() throws IOException { + byte[] data = new byte[1]; + int numBytes = read(data); + if (numBytes <= 0) { + return -1; + } + return data[0]; + } + +} diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java new file mode 100644 index 000000000..50e063c96 --- /dev/null +++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.fs; + +import static org.apache.distributedlog.DistributedLogConstants.CONTROL_RECORD_CONTENT; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.util.Utils; + +/** + * DistributedLog Output Stream. + */ +@Slf4j +class DLOutputStream extends OutputStream { + + private final DistributedLogManager dlm; + private final AsyncLogWriter writer; + + // positions + private final long[] syncPos = new long[1]; + private long writePos = 0L; + + // state + private final AtomicReference exception = new AtomicReference<>(null); + + DLOutputStream(DistributedLogManager dlm, + AsyncLogWriter writer) { + this.dlm = dlm; + this.writer = writer; + this.writePos = writer.getLastTxId() < 0L ? 0L : writer.getLastTxId(); + this.syncPos[0] = writePos; + } + + public synchronized long position() { + return syncPos[0]; + } + + @Override + public void write(int b) throws IOException { + byte[] data = new byte[] { (byte) b }; + write(data); + } + + @Override + public void write(byte[] b) throws IOException { + write(Unpooled.wrappedBuffer(b)); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + write(Unpooled.wrappedBuffer(b, off, len)); + } + + private synchronized void write(ByteBuf buf) throws IOException { + Throwable cause = exception.get(); + if (null != cause) { + if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new UnexpectedException("Encountered unknown issue", cause); + } + } + + writePos += buf.readableBytes(); + LogRecord record = new LogRecord(writePos, buf); + writer.write(record).whenComplete(new FutureEventListener() { + @Override + public void onSuccess(DLSN value) { + synchronized (syncPos) { + syncPos[0] = record.getTransactionId(); + } + } + + @Override + public void onFailure(Throwable cause) { + exception.compareAndSet(null, cause); + } + }); + } + + @Override + public void flush() throws IOException { + try { + LogRecord record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT)); + record.setControl(); + FutureUtils.result(writer.write(record)); + } catch (IOException ioe) { + throw ioe; + } catch (Exception e) { + log.error("Unexpected exception in DLOutputStream", e); + throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e); + } + } + + @Override + public void close() throws IOException { + LogRecord record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT)); + record.setControl(); + Utils.ioResult( + writer.write(record) + .thenCompose(ignored -> writer.asyncClose()) + .thenCompose(ignored -> dlm.asyncClose())); + } +} diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java new file mode 100644 index 000000000..2af39b76c --- /dev/null +++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A filesystem API built over distributedlog. + */ +package org.apache.distributedlog.fs; \ No newline at end of file diff --git a/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java new file mode 100644 index 000000000..1c67d3663 --- /dev/null +++ b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.fs; + +import java.net.URI; +import org.apache.distributedlog.DLMTestUtil; +import org.apache.distributedlog.TestDistributedLogBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; + +/** + * Integration test for {@link DLFileSystem}. + */ +public abstract class TestDLFSBase extends TestDistributedLogBase { + + @Rule + public final TestName runtime = new TestName(); + + protected static URI dlfsUri; + protected static DLFileSystem fs; + + @BeforeClass + public static void setupDLFS() throws Exception { + setupCluster(); + dlfsUri = DLMTestUtil.createDLMURI(zkPort, ""); + fs = new DLFileSystem(); + Configuration conf = new Configuration(); + conf.set(DLFileSystem.DLFS_CONF_FILE, TestDLFSBase.class.getResource("/dlfs.conf").toURI().getPath()); + fs.initialize(dlfsUri, conf); + fs.setWorkingDirectory(new Path("/")); + } + + @AfterClass + public static void teardownDLFS() throws Exception { + fs.close(); + teardownCluster(); + } + +} diff --git a/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java new file mode 100644 index 000000000..70d8a216d --- /dev/null +++ b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.fs; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.InputStreamReader; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +/** + * Integration test for {@link DLFileSystem}. + */ +@Slf4j +public class TestDLFileSystem extends TestDLFSBase { + + @Test(expected = FileNotFoundException.class) + public void testOpenFileNotFound() throws Exception { + Path path = new Path("not-found-file"); + fs.open(path, 1024); + } + + @Test + public void testBasicIO() throws Exception { + Path path = new Path("/path/to/" + runtime.getMethodName()); + + assertFalse(fs.exists(path)); + + try (FSDataOutputStream out = fs.create(path)) { + for (int i = 0; i < 100; i++) { + out.writeBytes("line-" + i + "\n"); + } + out.flush(); + } + assertTrue(fs.exists(path)); + + File tempFile = new File("/tmp/" + runtime.getMethodName()); + tempFile.delete(); + Path localDst = new Path(tempFile.getPath()); + // copy the file + fs.copyToLocalFile(path, localDst); + // copy the file to dest + fs.copyFromLocalFile(localDst, new Path(runtime.getMethodName() + "-copied")); + + // rename + Path dstPath = new Path(runtime.getMethodName() + "-renamed"); + fs.rename(path, dstPath); + assertFalse(fs.exists(path)); + assertTrue(fs.exists(dstPath)); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(dstPath, 1134)))) { + int lineno = 0; + String line; + while ((line = reader.readLine()) != null) { + assertEquals("line-" + lineno, line); + ++lineno; + } + assertEquals(100, lineno); + } + + + // delete the file + fs.delete(dstPath, false); + assertFalse(fs.exists(dstPath)); + } + + @Test + public void testListStatuses() throws Exception { + Path parentPath = new Path("/path/to/" + runtime.getMethodName()); + assertFalse(fs.exists(parentPath)); + try (FSDataOutputStream parentOut = fs.create(parentPath)) { + parentOut.writeBytes("parent"); + parentOut.flush(); + } + assertTrue(fs.exists(parentPath)); + + int numLogs = 3; + for (int i = 0; i < numLogs; i++) { + Path path = new Path("/path/to/" + runtime.getMethodName() + + "/" + runtime.getMethodName() + "-" + i); + assertFalse(fs.exists(path)); + try (FSDataOutputStream out = fs.create(path)) { + out.writeBytes("line"); + out.flush(); + } + assertTrue(fs.exists(path)); + } + FileStatus[] files = fs.listStatus(new Path("/path/to/" + runtime.getMethodName())); + + assertEquals(3, files.length); + for (int i = 0; i < numLogs; i++) { + FileStatus file = files[i]; + assertEquals(4, file.getLen()); + assertFalse(file.isDirectory()); + assertEquals(3, file.getReplication()); + assertEquals(0L, file.getModificationTime()); + assertEquals( + new Path("/path/to/" + runtime.getMethodName() + "/" + runtime.getMethodName() + "-" + i), + file.getPath()); + } + } + + @Test + public void testMkDirs() throws Exception { + Path path = new Path("/path/to/" + runtime.getMethodName()); + assertFalse(fs.exists(path)); + assertTrue(fs.mkdirs(path)); + assertTrue(fs.exists(path)); + assertTrue(fs.mkdirs(path)); + } + + @Test(expected = UnsupportedOperationException.class) + public void testTruncation() throws Exception { + Path path = new Path("/path/to/" + runtime.getMethodName()); + fs.truncate(path, 10); + } + + @Test + public void testDeleteRecursive() throws Exception { + int numLogs = 3; + for (int i = 0; i < numLogs; i++) { + Path path = new Path("/path/to/" + runtime.getMethodName() + + "/" + runtime.getMethodName() + "-" + i); + assertFalse(fs.exists(path)); + try (FSDataOutputStream out = fs.create(path)) { + out.writeBytes("line"); + out.flush(); + } + assertTrue(fs.exists(path)); + } + + fs.delete(new Path("/path/to/" + runtime.getMethodName()), true); + FileStatus[] files = fs.listStatus(new Path("/path/to/" + runtime.getMethodName())); + assertEquals(0, files.length); + } + + @Test + public void testCreateOverwrite() throws Exception { + Path path = new Path("/path/to/" + runtime.getMethodName()); + assertFalse(fs.exists(path)); + byte[] originData = "original".getBytes(UTF_8); + try (FSDataOutputStream out = fs.create(path)) { + out.write(originData); + out.flush(); + } + + try (FSDataInputStream in = fs.open(path, 1024)) { + assertEquals(originData.length, in.available()); + byte[] readData = new byte[originData.length]; + assertEquals(originData.length, in.read(readData)); + assertArrayEquals(originData, readData); + } + + byte[] overwrittenData = "overwritten".getBytes(UTF_8); + try (FSDataOutputStream out = fs.create(path, true)) { + out.write(overwrittenData); + out.flush(); + } + + try (FSDataInputStream in = fs.open(path, 1024)) { + assertEquals(overwrittenData.length, in.available()); + byte[] readData = new byte[overwrittenData.length]; + assertEquals(overwrittenData.length, in.read(readData)); + assertArrayEquals(overwrittenData, readData); + } + } + + @Test + public void testAppend() throws Exception { + Path path = new Path("/path/to/" + runtime.getMethodName()); + assertFalse(fs.exists(path)); + byte[] originData = "original".getBytes(UTF_8); + try (FSDataOutputStream out = fs.create(path)) { + out.write(originData); + out.flush(); + } + + try (FSDataInputStream in = fs.open(path, 1024)) { + assertEquals(originData.length, in.available()); + byte[] readData = new byte[originData.length]; + assertEquals(originData.length, in.read(readData)); + assertArrayEquals(originData, readData); + } + + byte[] appendData = "append".getBytes(UTF_8); + try (FSDataOutputStream out = fs.append(path, 1024)) { + out.write(appendData); + out.flush(); + } + + try (FSDataInputStream in = fs.open(path, 1024)) { + assertEquals(originData.length + appendData.length, in.available()); + byte[] readData = new byte[originData.length]; + assertEquals(originData.length, in.read(readData)); + assertArrayEquals(originData, readData); + readData = new byte[appendData.length]; + assertEquals(appendData.length, in.read(readData)); + assertArrayEquals(appendData, readData); + } + } + +} diff --git a/distributedlog-io/dlfs/src/test/resources/dlfs.conf b/distributedlog-io/dlfs/src/test/resources/dlfs.conf new file mode 100644 index 000000000..26d2bd9fd --- /dev/null +++ b/distributedlog-io/dlfs/src/test/resources/dlfs.conf @@ -0,0 +1,27 @@ +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +## DLFS settings + +writeLockEnabled=false + +enableImmediateFlush=false + +writerOutputBufferSize=131072 + +numWorkerThreads=1 diff --git a/distributedlog-io/pom.xml b/distributedlog-io/pom.xml new file mode 100644 index 000000000..be82c405a --- /dev/null +++ b/distributedlog-io/pom.xml @@ -0,0 +1,38 @@ + + + + + org.apache.distributedlog + distributedlog + 0.6.0-SNAPSHOT + + 4.0.0 + distributedlog-io + pom + Apache DistributedLog :: IO + + dlfs + + + UTF-8 + UTF-8 + + diff --git a/pom.xml b/pom.xml index bd93cfc2c..8f6b91b42 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ distributedlog-common distributedlog-protocol distributedlog-core + distributedlog-io distributedlog-proxy-protocol distributedlog-proxy-client distributedlog-proxy-server From 187bf1b215af1f3e37a3b930e3b682df7905865e Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 24 Oct 2017 16:12:42 -0700 Subject: [PATCH 5/6] Address findbugs errors --- .../DistributedLogConstants.java | 2 +- .../distributedlog/fs/DLOutputStream.java | 23 +++++++++++++------ pom.xml | 2 +- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java index e91b22fee..d8bdc3da8 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java @@ -59,7 +59,7 @@ public class DistributedLogConstants { public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress"; public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs"; public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement"; - public static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); + static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8); // An ACL that gives all permissions to node creators and read permissions only to everyone else. diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java index 50e063c96..0132cfc27 100644 --- a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java +++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java @@ -18,12 +18,13 @@ package org.apache.distributedlog.fs; -import static org.apache.distributedlog.DistributedLogConstants.CONTROL_RECORD_CONTENT; +import static com.google.common.base.Charsets.UTF_8; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; @@ -41,6 +42,8 @@ @Slf4j class DLOutputStream extends OutputStream { + private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); + private final DistributedLogManager dlm; private final AsyncLogWriter writer; @@ -106,12 +109,20 @@ public void onFailure(Throwable cause) { }); } + private CompletableFuture writeControlRecord() { + LogRecord record; + synchronized (this) { + record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT)); + record.setControl(); + } + return writer.write(record); + } + @Override public void flush() throws IOException { try { - LogRecord record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT)); - record.setControl(); - FutureUtils.result(writer.write(record)); + + FutureUtils.result(writeControlRecord()); } catch (IOException ioe) { throw ioe; } catch (Exception e) { @@ -122,10 +133,8 @@ public void flush() throws IOException { @Override public void close() throws IOException { - LogRecord record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT)); - record.setControl(); Utils.ioResult( - writer.write(record) + writeControlRecord() .thenCompose(ignored -> writer.asyncClose()) .thenCompose(ignored -> dlm.asyncClose())); } diff --git a/pom.xml b/pom.xml index 8f6b91b42..616d755c9 100644 --- a/pom.xml +++ b/pom.xml @@ -243,7 +243,7 @@ **/dependency-reduced-pom.xml **/org/apache/distributedlog/thrift/* **/logs/*.log - **/target/* + **/target/**/* .git/**/* .github/**/* From 045c1f31d3e6943bdf885438afb729e2934d44cb Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 26 Oct 2017 21:46:47 -0700 Subject: [PATCH 6/6] Address comments --- .../java/org/apache/distributedlog/fs/DLFileSystem.java | 8 ++++---- .../java/org/apache/distributedlog/fs/DLOutputStream.java | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java index cc18aa1ff..0670a4a31 100644 --- a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java +++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java @@ -254,10 +254,6 @@ public boolean delete(Path path, boolean recursive) throws IOException { } } - // - // Not Supported - // - @Override public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { String logName = getStreamName(path); @@ -323,6 +319,10 @@ public boolean rename(Path src, Path dst) throws IOException { return true; } + // + // Not Supported + // + @Override public boolean truncate(Path f, long newLength) throws IOException { throw new UnsupportedOperationException("Truncate is not supported yet"); diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java index 0132cfc27..3670bc52b 100644 --- a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java +++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java @@ -121,7 +121,6 @@ record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT)) @Override public void flush() throws IOException { try { - FutureUtils.result(writeControlRecord()); } catch (IOException ioe) { throw ioe;