Skip to content

Reapply "Update the IOContext rather than the ReadAdvice on IndexInput (#14702)" #14844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public abstract void search(
*
* <p>The default implementation returns {@code this}
*/
public KnnVectorsReader getMergeInstance() {
public KnnVectorsReader getMergeInstance() throws IOException {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public abstract RandomVectorScorer getRandomVectorScorer(String field, byte[] ta
* <p>The default implementation returns {@code this}
*/
@Override
public FlatVectorsReader getMergeInstance() {
public FlatVectorsReader getMergeInstance() throws IOException {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readVectorEncoding;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
Expand All @@ -45,7 +44,6 @@
import org.apache.lucene.store.FileTypeHint;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.ReadAdvice;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.hnsw.RandomVectorScorer;
Expand All @@ -63,23 +61,25 @@ public final class Lucene99FlatVectorsReader extends FlatVectorsReader {
private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
private final IndexInput vectorData;
private final FieldInfos fieldInfos;
private final IOContext dataContext;

public Lucene99FlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer)
throws IOException {
super(scorer);
int versionMeta = readMetadata(state);
this.fieldInfos = state.fieldInfos;
// Flat formats are used to randomly access vectors from their node ID that is stored
// in the HNSW graph.
dataContext =
state.context.withHints(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, DataAccessHint.RANDOM);
try {
vectorData =
openDataInput(
state,
versionMeta,
Lucene99FlatVectorsFormat.VECTOR_DATA_EXTENSION,
Lucene99FlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
// Flat formats are used to randomly access vectors from their node ID that is stored
// in the HNSW graph.
state.context.withHints(
FileTypeHint.DATA, FileDataHint.KNN_VECTORS, DataAccessHint.RANDOM));
dataContext);
} catch (Throwable t) {
IOUtils.closeWhileSuppressingExceptions(t, this);
throw t;
Expand Down Expand Up @@ -177,14 +177,10 @@ public void checkIntegrity() throws IOException {
}

@Override
public FlatVectorsReader getMergeInstance() {
try {
// Update the read advice since vectors are guaranteed to be accessed sequentially for merge
this.vectorData.updateReadAdvice(ReadAdvice.SEQUENTIAL);
return this;
} catch (IOException exception) {
throw new UncheckedIOException(exception);
}
public FlatVectorsReader getMergeInstance() throws IOException {
// Update the read advice since vectors are guaranteed to be accessed sequentially for merge
vectorData.updateIOContext(dataContext.withHints(DataAccessHint.SEQUENTIAL));
return this;
}

private FieldEntry getFieldEntryOrThrow(String field) {
Expand Down Expand Up @@ -276,7 +272,7 @@ public RandomVectorScorer getRandomVectorScorer(String field, byte[] target) thr
public void finishMerge() throws IOException {
// This makes sure that the access pattern hint is reverted back since HNSW implementation
// needs it
this.vectorData.updateReadAdvice(ReadAdvice.RANDOM);
vectorData.updateIOContext(dataContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private Lucene99HnswVectorsReader(
}

@Override
public KnnVectorsReader getMergeInstance() {
public KnnVectorsReader getMergeInstance() throws IOException {
return new Lucene99HnswVectorsReader(this, this.flatVectorsReader.getMergeInstance());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public FieldsReader(final SegmentReadState readState) throws IOException {
}
}

private FieldsReader(final FieldsReader fieldsReader) {
private FieldsReader(final FieldsReader fieldsReader) throws IOException {
this.fieldInfos = fieldsReader.fieldInfos;
for (FieldInfo fi : this.fieldInfos) {
if (fi.hasVectorValues() && fieldsReader.fields.containsKey(fi.number)) {
Expand All @@ -248,7 +248,7 @@ private FieldsReader(final FieldsReader fieldsReader) {
}

@Override
public KnnVectorsReader getMergeInstance() {
public KnnVectorsReader getMergeInstance() throws IOException {
return new FieldsReader(this);
}

Expand Down
4 changes: 2 additions & 2 deletions lucene/core/src/java/org/apache/lucene/store/IndexInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ public String toString() {
public void prefetch(long offset, long length) throws IOException {}

/**
* Optional method: Give a hint to this input about the change in read access pattern. IndexInput
* Optional method: Updates the {@code IOContext} to specify a new read access pattern. IndexInput
* implementations may take advantage of this hint to optimize reads from storage.
*
* <p>The default implementation is a no-op.
*/
public void updateReadAdvice(ReadAdvice readAdvice) throws IOException {}
public void updateIOContext(IOContext context) throws IOException {}

/**
* Returns a hint whether all the contents of this input are resident in physical memory. It's a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ public void prefetch(long offset, long length) throws IOException {
}

@Override
public void updateReadAdvice(ReadAdvice readAdvice) throws IOException {
public void updateIOContext(IOContext context) throws IOException {
updateReadAdvice(toReadAdvice.apply(context));
}

private void updateReadAdvice(ReadAdvice readAdvice) throws IOException {
if (NATIVE_ACCESS.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,17 @@ public long ramBytesUsed() {
public static class AssertingKnnVectorsReader extends KnnVectorsReader
implements HnswGraphProvider {
public final KnnVectorsReader delegate;
final FieldInfos fis;
final boolean mergeInstance;
AtomicInteger mergeInstanceCount = new AtomicInteger();
AtomicInteger finishMergeCount = new AtomicInteger();
private final FieldInfos fis;
private final boolean mergeInstance;
private final AtomicInteger mergeInstanceCount = new AtomicInteger();
private final AtomicInteger finishMergeCount = new AtomicInteger();

AssertingKnnVectorsReader(KnnVectorsReader delegate, FieldInfos fis) {
private AssertingKnnVectorsReader(KnnVectorsReader delegate, FieldInfos fis) {
this(delegate, fis, false);
}

AssertingKnnVectorsReader(KnnVectorsReader delegate, FieldInfos fis, boolean mergeInstance) {
private AssertingKnnVectorsReader(
KnnVectorsReader delegate, FieldInfos fis, boolean mergeInstance) {
assert delegate != null;
this.delegate = delegate;
this.fis = fis;
Expand All @@ -136,6 +137,8 @@ public void checkIntegrity() throws IOException {

@Override
public FloatVectorValues getFloatVectorValues(String field) throws IOException {
assert mergeInstanceCount.get() == finishMergeCount.get() || mergeInstance
: "Called on the wrong instance";
FieldInfo fi = fis.fieldInfo(field);
assert fi != null
&& fi.getVectorDimension() > 0
Expand All @@ -150,6 +153,8 @@ public FloatVectorValues getFloatVectorValues(String field) throws IOException {

@Override
public ByteVectorValues getByteVectorValues(String field) throws IOException {
assert mergeInstanceCount.get() == finishMergeCount.get() || mergeInstance
: "Called on the wrong instance";
FieldInfo fi = fis.fieldInfo(field);
assert fi != null
&& fi.getVectorDimension() > 0
Expand All @@ -165,7 +170,7 @@ public ByteVectorValues getByteVectorValues(String field) throws IOException {
@Override
public void search(String field, float[] target, KnnCollector knnCollector, Bits acceptDocs)
throws IOException {
assert !mergeInstance;
assert mergeInstanceCount.get() == finishMergeCount.get() : "There is an open merge instance";
FieldInfo fi = fis.fieldInfo(field);
assert fi != null
&& fi.getVectorDimension() > 0
Expand All @@ -176,7 +181,7 @@ public void search(String field, float[] target, KnnCollector knnCollector, Bits
@Override
public void search(String field, byte[] target, KnnCollector knnCollector, Bits acceptDocs)
throws IOException {
assert !mergeInstance;
assert mergeInstanceCount.get() == finishMergeCount.get() : "There is an open merge instance";
FieldInfo fi = fis.fieldInfo(field);
assert fi != null
&& fi.getVectorDimension() > 0
Expand All @@ -185,15 +190,28 @@ public void search(String field, byte[] target, KnnCollector knnCollector, Bits
}

@Override
public KnnVectorsReader getMergeInstance() {
assert !mergeInstance;
public KnnVectorsReader getMergeInstance() throws IOException {
var mergeVectorsReader = delegate.getMergeInstance();
assert mergeVectorsReader != null;
mergeInstanceCount.incrementAndGet();
AtomicInteger parentMergeFinishCount = this.finishMergeCount;

final var parent = this;
return new AssertingKnnVectorsReader(
mergeVectorsReader, AssertingKnnVectorsReader.this.fis, true) {
private boolean finished;

@Override
public void search(
String field, float[] target, KnnCollector knnCollector, Bits acceptDocs) {
assert false : "This instance should only be used for merging";
}

@Override
public void search(
String field, byte[] target, KnnCollector knnCollector, Bits acceptDocs) {
assert false : "This instance should only be used for merging";
}

@Override
public KnnVectorsReader getMergeInstance() {
assert false; // merging from a merge instance it not allowed
Expand All @@ -202,9 +220,10 @@ public KnnVectorsReader getMergeInstance() {

@Override
public void finishMerge() throws IOException {
assert mergeInstance;
assert !finished : "Merging already finished";
finished = true;
delegate.finishMerge();
parent.finishMergeCount.incrementAndGet();
parentMergeFinishCount.incrementAndGet();
}

@Override
Expand All @@ -216,9 +235,7 @@ public void close() {

@Override
public void finishMerge() throws IOException {
assert mergeInstance;
delegate.finishMerge();
finishMergeCount.incrementAndGet();
assert false; // can only finish merge on the merge instance
}

@Override
Expand All @@ -228,8 +245,6 @@ public Map<String, Long> getOffHeapByteSize(FieldInfo fieldInfo) {

@Override
public void close() throws IOException {
assert !mergeInstance;
delegate.close();
delegate.close();
assert finishMergeCount.get() <= 0 || mergeInstanceCount.get() == finishMergeCount.get();
Copy link
Contributor Author

@thecoop thecoop Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the line causing the problems (which was modified in the original PR) - its ok for close() to be called when there's an open merge instance, if the merge is aborted - which is what testNoWaitClose is doing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sitting that deep in codecs. @jpountz is much better fitted for the review of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect Lucene to never close merge instances, since merge instances are conceptually like clones, and Lucene doesn't close clones.

That said, it looks like we should make sure that finishMerge is always called - even if the merge fails or is aborted, otherwise aborted merges would leave readers open with the wrong read advice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an issue with the code calling finishMerge, not in this PR. Shall I merge this change, then look at pinning down the missing finishMerge separately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've found the cause here

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.RandomAccessInput;
import org.apache.lucene.store.ReadAdvice;
import org.apache.lucene.tests.mockfile.ExtrasFS;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
Expand Down Expand Up @@ -1571,38 +1570,6 @@ public void testPrefetchOnSlice() throws IOException {
doTestPrefetch(TestUtil.nextInt(random(), 1, 1024));
}

public void testUpdateReadAdvice() throws IOException {
try (Directory dir = getDirectory(createTempDir("testUpdateReadAdvice"))) {
final int totalLength = TestUtil.nextInt(random(), 16384, 65536);
byte[] arr = new byte[totalLength];
random().nextBytes(arr);
try (IndexOutput out = dir.createOutput("temp.bin", IOContext.DEFAULT)) {
out.writeBytes(arr, arr.length);
}

try (IndexInput orig = dir.openInput("temp.bin", IOContext.DEFAULT)) {
IndexInput in = random().nextBoolean() ? orig.clone() : orig;
// Read advice updated at start
in.updateReadAdvice(randomFrom(random(), ReadAdvice.values()));
for (int i = 0; i < totalLength; i++) {
int offset = TestUtil.nextInt(random(), 0, (int) in.length() - 1);
in.seek(offset);
assertEquals(arr[offset], in.readByte());
}

// Updating readAdvice in the middle
for (int i = 0; i < 10_000; ++i) {
int offset = TestUtil.nextInt(random(), 0, (int) in.length() - 1);
in.seek(offset);
assertEquals(arr[offset], in.readByte());
if (random().nextBoolean()) {
in.updateReadAdvice(randomFrom(random(), ReadAdvice.values()));
}
}
}
}
}

private void doTestPrefetch(int startOffset) throws IOException {
try (Directory dir = getDirectory(createTempDir())) {
final int totalLength = startOffset + TestUtil.nextInt(random(), 16384, 65536);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.store.FilterIndexInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.ReadAdvice;

/**
* Used by MockDirectoryWrapper to create an input stream that keeps track of when it's been closed.
Expand Down Expand Up @@ -186,10 +185,10 @@ public Optional<Boolean> isLoaded() {
}

@Override
public void updateReadAdvice(ReadAdvice readAdvice) throws IOException {
public void updateIOContext(IOContext context) throws IOException {
ensureOpen();
ensureAccessible();
in.updateReadAdvice(readAdvice);
in.updateIOContext(context);
}

@Override
Expand Down
Loading