diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 5946419d28ae..24d25c5c29df 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -49,6 +49,8 @@ New Features * GITHUB#14792: Introduced OffHeapQuantizedFloatVectorValues class to access float vectors when only quantized byte vectors are available in the index. (Pulkit Gupta) +* GITHUB#15224: Add asynchronous prefetching to DirectIO directories. (Ben Trent) + Improvements --------------------- diff --git a/lucene/misc/src/java/org/apache/lucene/misc/store/DirectIODirectory.java b/lucene/misc/src/java/org/apache/lucene/misc/store/DirectIODirectory.java index 94723546b6c6..6ba2dc3394e6 100644 --- a/lucene/misc/src/java/org/apache/lucene/misc/store/DirectIODirectory.java +++ b/lucene/misc/src/java/org/apache/lucene/misc/store/DirectIODirectory.java @@ -18,6 +18,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.UncheckedIOException; @@ -27,9 +28,16 @@ import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; import java.util.Arrays; +import java.util.Deque; import java.util.Objects; import java.util.OptionalLong; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.zip.CRC32; import java.util.zip.Checksum; import org.apache.lucene.store.AlreadyClosedException; @@ -71,8 +79,11 @@ public class DirectIODirectory extends FilterDirectory { /** Default min expected merge size before direct IO is used (10 MB): */ public static final long DEFAULT_MIN_BYTES_DIRECT = 10 * 1024 * 1024; + public static final int DEFAULT_MAX_PREFETCH_BUFFERS = 16; + private final int blockSize, mergeBufferSize; private final long minBytesDirect; + private final int maxPrefetches; volatile boolean isOpen = true; @@ -118,14 +129,31 @@ public class DirectIODirectory extends FilterDirectory { * @param mergeBufferSize Size of buffer to use for merging. * @param minBytesDirect Merges, or files to be opened for reading, smaller than this will not use * direct IO. See {@link #DEFAULT_MIN_BYTES_DIRECT} and {@link #useDirectIO}. + * @param maxPrefetches The maximum number of prefetch buffers to use when reading with direct IO. * @throws IOException If there is a low-level I/O error */ - public DirectIODirectory(FSDirectory delegate, int mergeBufferSize, long minBytesDirect) + public DirectIODirectory( + FSDirectory delegate, int mergeBufferSize, long minBytesDirect, int maxPrefetches) throws IOException { super(delegate); this.blockSize = Math.toIntExact(Files.getFileStore(delegate.getDirectory()).getBlockSize()); this.mergeBufferSize = mergeBufferSize; this.minBytesDirect = minBytesDirect; + this.maxPrefetches = maxPrefetches; + } + + /** + * Create a new DirectIODirectory for the named location. + * + * @param delegate Directory for non-merges, also used as reference to file system path. + * @param mergeBufferSize Size of buffer to use for merging. + * @param minBytesDirect Merges, or files to be opened for reading, smaller than this will not use + * direct IO. See {@link #DEFAULT_MIN_BYTES_DIRECT} and {@link #useDirectIO}. + * @throws IOException If there is a low-level I/O error + */ + public DirectIODirectory(FSDirectory delegate, int mergeBufferSize, long minBytesDirect) + throws IOException { + this(delegate, mergeBufferSize, minBytesDirect, DEFAULT_MAX_PREFETCH_BUFFERS); } /** @@ -135,7 +163,11 @@ public DirectIODirectory(FSDirectory delegate, int mergeBufferSize, long minByte * @throws IOException If there is a low-level I/O error */ public DirectIODirectory(FSDirectory delegate) throws IOException { - this(delegate, DEFAULT_MERGE_BUFFER_SIZE, DEFAULT_MIN_BYTES_DIRECT); + this( + delegate, + DEFAULT_MERGE_BUFFER_SIZE, + DEFAULT_MIN_BYTES_DIRECT, + DEFAULT_MAX_PREFETCH_BUFFERS); } /** @@ -175,7 +207,8 @@ protected boolean useDirectIO(String name, IOContext context, OptionalLong fileL public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); if (useDirectIO(name, context, OptionalLong.of(fileLength(name)))) { - return new DirectIOIndexInput(getDirectory().resolve(name), blockSize, mergeBufferSize); + return new DirectIOIndexInput( + getDirectory().resolve(name), blockSize, mergeBufferSize, maxPrefetches); } else { return in.openInput(name, context); } @@ -308,6 +341,7 @@ private static final class DirectIOIndexInput extends IndexInput { private final long length; private final boolean isClosable; // clones and slices are not closable private boolean isOpen; + private final DirectIOPrefetcher prefetcher; private long filePos; /** @@ -318,11 +352,14 @@ private static final class DirectIOIndexInput extends IndexInput { * @throws IOException if the operating system or filesystem does not support support Direct I/O * or a sufficient equivalent. */ - public DirectIOIndexInput(Path path, int blockSize, int bufferSize) throws IOException { + public DirectIOIndexInput(Path path, int blockSize, int bufferSize, int maxPrefetches) + throws IOException { super("DirectIOIndexInput(path=\"" + path + "\")"); this.channel = FileChannel.open(path, StandardOpenOption.READ, getDirectOpenOption()); this.blockSize = blockSize; this.buffer = allocateBuffer(bufferSize, blockSize); + this.prefetcher = + new DirectIOPrefetcher(bufferSize, blockSize, channel, maxPrefetches, maxPrefetches * 32); this.isOpen = true; this.isClosable = true; this.length = channel.size(); @@ -340,6 +377,13 @@ private DirectIOIndexInput( this.buffer = allocateBuffer(bufferSize, other.blockSize); this.blockSize = other.blockSize; this.channel = other.channel; + this.prefetcher = + new DirectIOPrefetcher( + bufferSize, + blockSize, + channel, + other.prefetcher.maxConcurrentPrefetches, + other.prefetcher.maxTotalPrefetches); this.isOpen = true; this.isClosable = false; this.length = length; @@ -348,14 +392,45 @@ private DirectIOIndexInput( buffer.limit(0); } - private static ByteBuffer allocateBuffer(int bufferSize, int blockSize) { + static ByteBuffer allocateBuffer(int bufferSize, int blockSize) { return ByteBuffer.allocateDirect(bufferSize + blockSize - 1) .alignedSlice(blockSize) .order(LITTLE_ENDIAN); } + /** + * Prefetches the given range of bytes. The range will be aligned to blockSize and will be + * chopped up into chunks of buffer size. + * + * @param pos the position to prefetch from, must be non-negative and within file length + * @param length the length to prefetch, must be non-negative. This length may cause multiple + * prefetches to be issued, depending on the buffer size. + */ + @Override + public void prefetch(long pos, long length) throws IOException { + if (prefetcher.maxConcurrentPrefetches == 0) { + return; + } + if (pos < 0 || length < 0 || pos + length > this.length) { + throw new IllegalArgumentException( + "Invalid prefetch range: pos=" + + pos + + ", length=" + + length + + ", fileLength=" + + this.length); + } + // check if our current buffer already contains the requested range + final long absPos = pos + offset; + final long alignedPos = absPos - (absPos % blockSize); + // we only prefetch into a single buffer, even if length exceeds buffer size + // maybe we should improve this... + prefetcher.prefetch(alignedPos, length); + } + @Override public void close() throws IOException { + prefetcher.close(); if (isOpen && isClosable) { channel.close(); isOpen = false; @@ -395,8 +470,12 @@ private void seekInternal(long pos) throws IOException { filePos = alignedPos - buffer.capacity(); final int delta = (int) (absPos - alignedPos); - refill(delta); - buffer.position(delta); + refill(delta, delta); + } + + private void refill(int bytesToRead) throws IOException { + assert filePos % blockSize == 0; + refill(bytesToRead, 0); } @Override @@ -440,26 +519,37 @@ public long readLong() throws IOException { } } - private void refill(int bytesToRead) throws IOException { - filePos += buffer.capacity(); + private void refill(int bytesToRead, int delta) throws IOException { + long nextFilePos = filePos + buffer.capacity(); // BaseDirectoryTestCase#testSeekPastEOF test for consecutive read past EOF, // hence throwing EOFException early to maintain buffer state (position in particular) - if (filePos > offset + length || ((offset + length) - filePos < bytesToRead)) { + if (nextFilePos > offset + length || ((offset + length) - nextFilePos < bytesToRead)) { + filePos = nextFilePos; throw new EOFException("read past EOF: " + this); } buffer.clear(); try { + if (prefetcher.readBytes(nextFilePos, buffer, delta)) { + // handle potentially differently aligned prefetch buffer + // this gets tricky as the prefetch buffer is always blockSize aligned + // but the prefetches might be aligned on an earlier block boundary + // so we need to adjust the filePos accordingly + long currentLogicalPos = nextFilePos + delta; + filePos = currentLogicalPos - buffer.position(); + return; + } + filePos = nextFilePos; // read may return -1 here iff filePos == channel.size(), but that's ok as it just reaches // EOF // when filePos > channel.size(), an EOFException will be thrown from above channel.read(buffer, filePos); + buffer.flip(); + buffer.position(delta); } catch (IOException ioe) { throw new IOException(ioe.getMessage() + ": " + this, ioe); } - - buffer.flip(); } @Override @@ -540,6 +630,7 @@ public void readLongs(long[] dst, int offset, int len) throws IOException { public DirectIOIndexInput clone() { try { var clone = new DirectIOIndexInput("clone:" + this, this, offset, length); + // TODO figure out how to make this async clone.seekInternal(getFilePointer()); return clone; } catch (IOException ioe) { @@ -554,8 +645,205 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw "slice() " + sliceDescription + " out of bounds: " + this); } var slice = new DirectIOIndexInput(sliceDescription, this, this.offset + offset, length); + // TODO figure out how to make this async slice.seekInternal(0L); return slice; } } + + /** A prefetcher that can prefetch multiple chunks of data from a FileChannel using direct IO. */ + private static class DirectIOPrefetcher implements Closeable { + private final int maxConcurrentPrefetches; + private final int maxTotalPrefetches; + private final int blockSize; + private final long[] prefetchPos; + private final Future[] prefetchThreads; + private final TreeMap posToSlot; + private final Deque slots; + private final ByteBuffer[] prefetchBuffers; + private final IOException[] prefetchExceptions; + private final int prefetchBytesSize; + private final Deque pendingPrefetches = new ArrayDeque<>(); + private final FileChannel channel; + private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + + DirectIOPrefetcher( + int prefetchBytesSize, + int blockSize, + FileChannel channel, + int maxConcurrentPrefetches, + int maxTotalPrefetches) { + this.blockSize = blockSize; + this.maxConcurrentPrefetches = maxConcurrentPrefetches; + this.prefetchPos = new long[maxConcurrentPrefetches]; + this.prefetchThreads = new Future[maxConcurrentPrefetches]; + this.posToSlot = new TreeMap<>(); + this.slots = new ArrayDeque<>(maxConcurrentPrefetches); + for (int i = 0; i < maxConcurrentPrefetches; i++) { + slots.addLast(i); + } + this.prefetchExceptions = new IOException[maxConcurrentPrefetches]; + this.prefetchBuffers = new ByteBuffer[maxConcurrentPrefetches]; + this.prefetchBytesSize = prefetchBytesSize; + this.maxTotalPrefetches = maxTotalPrefetches; + this.channel = channel; + } + + /** + * Initiate prefetch of the given range. The range will be aligned to blockSize and chopped up + * into chunks of prefetchBytesSize. + * + * @param pos the position to prefetch from, must be non-negative and within file length + * @param length the length to prefetch, must be non-negative. + */ + void prefetch(long pos, long length) { + // first determine how many slots we need given the length + int numSlots = + (int) + Math.min((length + prefetchBytesSize - 1) / prefetchBytesSize, Integer.MAX_VALUE - 1); + while (numSlots > 0 + && (this.posToSlot.size() + this.pendingPrefetches.size()) < maxTotalPrefetches) { + final int slot; + Integer existingSlot = this.posToSlot.get(pos); + if (existingSlot != null && prefetchThreads[existingSlot] != null) { + // already being prefetched and hasn't been consumed. + // return early + return; + } + if (this.posToSlot.size() < maxConcurrentPrefetches && slots.isEmpty() == false) { + slot = slots.removeFirst(); + posToSlot.put(pos, slot); + prefetchPos[slot] = pos; + } else { + slot = -1; + pendingPrefetches.addLast(pos); + } + if (slot != -1) { + startPrefetch(pos, slot); + } + pos += prefetchBytesSize; + numSlots--; + } + } + + /** + * Try to read the requested bytes from an already prefetched buffer. If the requested bytes are + * not in a prefetched buffer, return false. + * + * @param pos the absolute position to read from + * @param slice the buffer to read into, must be pre-sized to the required length + * @param delta an offset into the slice buffer to start writing at + * @return true if the requested bytes were read from a prefetched buffer, false otherwise + * @throws IOException if an I/O error occurs + */ + boolean readBytes(long pos, ByteBuffer slice, int delta) throws IOException { + final var entry = this.posToSlot.floorEntry(pos + delta); + if (entry == null) { + return false; + } + final int slot = entry.getValue(); + final long prefetchedPos = entry.getKey(); + // determine if the requested pos is within the prefetched range + if (pos + delta >= prefetchedPos + prefetchBytesSize) { + return false; + } + final Future thread = prefetchThreads[slot]; + if (thread == null) { + // free slot and decrement active prefetches + clearSlotAndMaybeStartPending(slot); + return false; + } + try { + thread.get(); + } catch (ExecutionException | InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("interrupted while waiting for prefetch", e); + } + if (prefetchExceptions[slot] != null) { + IOException e = prefetchExceptions[slot]; + clearSlotAndMaybeStartPending(slot); + throw e; + } + if (prefetchBuffers[slot] == null) { + clearSlotAndMaybeStartPending(slot); + return false; + } + // our buffer sizes are uniform, and match the required buffer size, however, the position + // here + // might be before the requested pos, so offset it + slice.put(prefetchBuffers[slot]); + slice.flip(); + slice.position(Math.toIntExact(pos - prefetchedPos) + delta); + clearSlotAndMaybeStartPending(slot); + return true; + } + + private void clearSlotAndMaybeStartPending(int slot) { + assert prefetchThreads[slot] != null && prefetchThreads[slot].isDone(); + prefetchExceptions[slot] = null; + prefetchThreads[slot] = null; + posToSlot.remove(prefetchPos[slot]); + if (pendingPrefetches.isEmpty()) { + slots.addLast(slot); + return; + } + final long req = pendingPrefetches.removeFirst(); + posToSlot.put(req, slot); + prefetchPos[slot] = req; + startPrefetch(req, slot); + } + + private boolean assertSlotsConsistent() { + posToSlot.forEach( + (k, v) -> { + if (prefetchThreads[v] == null) { + throw new AssertionError( + "posToSlot inconsistent: slot " + + v + + " for pos " + + k + + " has no prefetch thread"); + } + if (prefetchPos[v] != k) { + throw new AssertionError( + "posToSlot inconsistent: slot " + + v + + " for pos " + + k + + " has prefetchPos " + + prefetchPos[v]); + } + }); + return true; + } + + private void startPrefetch(long pos, int slot) { + prefetchExceptions[slot] = null; + Future future = + executor.submit( + () -> { + try { + ByteBuffer prefetchBuffer = this.prefetchBuffers[slot]; + if (prefetchBuffer == null) { + prefetchBuffer = + DirectIOIndexInput.allocateBuffer(prefetchBytesSize, blockSize); + this.prefetchBuffers[slot] = prefetchBuffer; + } else { + prefetchBuffer.clear(); + } + channel.read(prefetchBuffer, pos); + prefetchBuffer.flip(); + } catch (IOException e) { + prefetchExceptions[slot] = e; + } + }); + prefetchThreads[slot] = future; + assert assertSlotsConsistent(); + } + + @Override + public void close() throws IOException { + executor.shutdownNow(); + } + } } diff --git a/lucene/misc/src/test/org/apache/lucene/misc/store/TestDirectIODirectory.java b/lucene/misc/src/test/org/apache/lucene/misc/store/TestDirectIODirectory.java index 32f375cd28f9..8aa8972f4ba9 100644 --- a/lucene/misc/src/test/org/apache/lucene/misc/store/TestDirectIODirectory.java +++ b/lucene/misc/src/test/org/apache/lucene/misc/store/TestDirectIODirectory.java @@ -16,7 +16,10 @@ */ package org.apache.lucene.misc.store; -import com.carrotsearch.randomizedtesting.RandomizedTest; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween; +import static org.apache.lucene.misc.store.DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE; +import static org.apache.lucene.misc.store.DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT; + import java.io.EOFException; import java.io.IOException; import java.nio.file.Files; @@ -58,7 +61,11 @@ public static void checkSupported() throws IOException { } private static DirectIODirectory open(Path path) throws IOException { - return new DirectIODirectory(FSDirectory.open(path)) { + return new DirectIODirectory( + FSDirectory.open(path), + DEFAULT_MERGE_BUFFER_SIZE, + DEFAULT_MIN_BYTES_DIRECT, + randomIntBetween(0, 32)) { @Override protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) { return true; @@ -102,8 +109,7 @@ public void testIllegalEOFWithFileSizeMultipleOfBlockSize() throws Exception { i.seek(fileSize); // Seeking past EOF should always throw EOFException - expectThrows( - EOFException.class, () -> i.seek(fileSize + RandomizedTest.randomIntBetween(1, 2048))); + expectThrows(EOFException.class, () -> i.seek(fileSize + randomIntBetween(1, 2048))); // Reading immediately after seeking past EOF should throw EOFException expectThrows(EOFException.class, () -> i.readByte()); @@ -126,8 +132,7 @@ public void testReadPastEOFShouldThrowEOFExceptionWithEmptyFile() throws Excepti } try (IndexInput i = dir.openInput("out", newIOContext(random()))) { - expectThrows( - EOFException.class, () -> i.seek(fileSize + RandomizedTest.randomIntBetween(1, 2048))); + expectThrows(EOFException.class, () -> i.seek(fileSize + randomIntBetween(1, 2048))); expectThrows(EOFException.class, () -> i.readByte()); expectThrows(EOFException.class, () -> i.readBytes(new byte[1], 0, 1)); } @@ -153,8 +158,7 @@ public void testSeekPastEOFAndRead() throws Exception { try (IndexInput i = dir.openInput("out", newIOContext(random()))) { // Seeking past EOF should always throw EOFException - expectThrows( - EOFException.class, () -> i.seek(len + RandomizedTest.randomIntBetween(1, 2048))); + expectThrows(EOFException.class, () -> i.seek(len + randomIntBetween(1, 2048))); // Reading immediately after seeking past EOF should throw EOFException expectThrows(EOFException.class, () -> i.readByte()); @@ -165,9 +169,8 @@ public void testSeekPastEOFAndRead() throws Exception { public void testUseDirectIODefaults() throws Exception { Path path = createTempDir("testUseDirectIODefaults"); try (DirectIODirectory dir = new DirectIODirectory(FSDirectory.open(path))) { - long largeSize = DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT + random().nextInt(10_000); - long smallSize = - random().nextInt(Math.toIntExact(DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT)); + long largeSize = DEFAULT_MIN_BYTES_DIRECT + random().nextInt(10_000); + long smallSize = random().nextInt(Math.toIntExact(DEFAULT_MIN_BYTES_DIRECT)); int numDocs = random().nextInt(1000); assertFalse(dir.useDirectIO("dummy", IOContext.DEFAULT, OptionalLong.empty())); @@ -243,4 +246,67 @@ public void testSeekSmall() throws IOException { } } } + + public void testPrefetchEdgeCase() throws IOException { + byte[] bytes = new byte[8192 * 32 + randomIntBetween(1, 8192)]; + int offset = 84; + float[] vectorActual = new float[768]; + int[] toSeek = new int[] {1, 2, 3, 5, 6, 9, 11, 14, 15, 16, 18, 23, 24, 25, 26, 29, 30, 31}; + int byteSize = 768 * 4; + Path path = createTempDir("testDirectIODirectory"); + random().nextBytes(bytes); + try (Directory dir = + new DirectIODirectory(FSDirectory.open(path), 8192, 8192) { + @Override + protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) { + return true; + } + }) { + try (var output = dir.createOutput("test", org.apache.lucene.store.IOContext.DEFAULT)) { + output.writeBytes(bytes, bytes.length); + } + try (var input = dir.openInput("test", org.apache.lucene.store.IOContext.DEFAULT)) { + IndexInput actualSlice = input.slice("vectors", offset, bytes.length - offset); + for (int seek : toSeek) { + actualSlice.prefetch(seek * byteSize, byteSize); + } + for (int seek : toSeek) { + actualSlice.seek(seek * byteSize); + actualSlice.readFloats(vectorActual, 0, vectorActual.length); + assertEquals( + "mismatch at seek: " + seek, (seek + 1) * byteSize, actualSlice.getFilePointer()); + } + } + } + } + + public void testLargePrefetch() throws IOException { + byte[] bytes = new byte[8192 * 10 + randomIntBetween(1, 8192)]; + int offset = randomIntBetween(1, 8192); + int numBytes = randomIntBetween(8192 + 1, 8192 * 8); + random().nextBytes(bytes); + byte[] trueBytes = new byte[numBytes]; + System.arraycopy(bytes, offset, trueBytes, 0, numBytes); + + Path path = createTempDir("testDirectIODirectory"); + try (Directory dir = + new DirectIODirectory(FSDirectory.open(path), 8192, 8192) { + @Override + protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) { + return true; + } + }) { + try (var output = dir.createOutput("test", org.apache.lucene.store.IOContext.DEFAULT)) { + output.writeBytes(bytes, bytes.length); + } + try (var input = dir.openInput("test", org.apache.lucene.store.IOContext.DEFAULT)) { + byte[] actualBytes = new byte[numBytes]; + // prefetch everything at once + input.prefetch(offset, numBytes); + input.seek(offset); + input.readBytes(actualBytes, 0, actualBytes.length); + assertArrayEquals(trueBytes, actualBytes); + } + } + } }