Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
387a648
save
pengpeng-lu May 21, 2025
0fd847e
save
pengpeng-lu May 22, 2025
3ce0259
save
pengpeng-lu May 28, 2025
78bea95
save
pengpeng-lu May 29, 2025
aefa82c
save
pengpeng-lu May 31, 2025
3fe7ba0
save
pengpeng-lu Jun 4, 2025
1336975
save
pengpeng-lu Jun 11, 2025
44a5cb9
clean
pengpeng-lu Jun 12, 2025
cb40b88
save
pengpeng-lu Jun 12, 2025
d82d31c
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Jun 12, 2025
8d523c1
add planner configuration
pengpeng-lu Jun 13, 2025
d015132
save
pengpeng-lu Jun 16, 2025
22a6e85
save
pengpeng-lu Jun 16, 2025
f30c3e3
revert PlannerConfiguration change:
pengpeng-lu Jun 16, 2025
cc9567b
add test
pengpeng-lu Jun 17, 2025
09907fe
style
pengpeng-lu Jun 17, 2025
8343661
checkstyle
pengpeng-lu Jun 17, 2025
30cf435
small things
pengpeng-lu Jul 10, 2025
1f27b7e
implementation comments
pengpeng-lu Jul 21, 2025
61ef12b
save
pengpeng-lu Jul 28, 2025
51e3746
save
pengpeng-lu Jul 29, 2025
c0c8e72
save
pengpeng-lu Aug 1, 2025
b151ab4
style
pengpeng-lu Aug 1, 2025
e170e11
set to_old
pengpeng-lu Aug 1, 2025
5cb1ca0
merge conflict
pengpeng-lu Aug 1, 2025
dcce7df
save
pengpeng-lu Aug 1, 2025
022dc23
1 test fail
pengpeng-lu Aug 6, 2025
69fcaf1
save in the middle of debugging
pengpeng-lu Aug 7, 2025
7fda4f2
fix test
pengpeng-lu Aug 7, 2025
def4939
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Aug 7, 2025
39075d7
merge main
pengpeng-lu Aug 7, 2025
3185c91
add back yaml tests
pengpeng-lu Aug 25, 2025
15fa168
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Aug 25, 2025
4d065e4
remove prefixLength from proto
pengpeng-lu Aug 25, 2025
5866b52
throw ex when error parsing
pengpeng-lu Aug 26, 2025
bc2f064
serialize mode in plans
pengpeng-lu Aug 28, 2025
68fe061
remove serialization in plans
pengpeng-lu Sep 12, 2025
0f91a76
merge main
pengpeng-lu Sep 12, 2025
4c19031
style
pengpeng-lu Sep 12, 2025
05890a0
magic number
pengpeng-lu Sep 17, 2025
2e7c809
comments
pengpeng-lu Sep 18, 2025
4280bdd
fix test and style
pengpeng-lu Sep 18, 2025
04b34a9
more tests
pengpeng-lu Sep 18, 2025
3f89e3d
comments
pengpeng-lu Sep 18, 2025
2909eb9
nit
pengpeng-lu Sep 19, 2025
425073d
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Sep 19, 2025
db08407
add test back
pengpeng-lu Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ private IndexPrefetchRangeKeyValueCursor(@Nonnull final FDBRecordContext context
@Nonnull final AsyncIterator<MappedKeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
int valuesLimit,
@Nonnull SerializationMode serializationMode) {

super(context, iterator, prefixLength, limitManager, valuesLimit);
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand All @@ -69,7 +70,7 @@ public IndexPrefetchRangeKeyValueCursor build() {
AsyncIterator<MappedKeyValue> iterator = getTransaction()
.getMappedRange(getBegin(), getEnd(), mapper, getLimit(), isReverse(), getStreamingMode())
.iterator();
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ private KeyValueCursor(@Nonnull final FDBRecordContext context,
@Nonnull final AsyncIterator<KeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
super(context, iterator, prefixLength, limitManager, valuesLimit);
int valuesLimit,
@Nonnull SerializationMode serializationMode) {
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand Down Expand Up @@ -77,7 +78,7 @@ public KeyValueCursor build() {
final AsyncIterator<KeyValue> iterator = getTransaction()
.getRange(getBegin(), getEnd(), getLimit(), isReverse(), getStreamingMode())
.iterator();
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.apple.foundationdb.record.KeyRange;
import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordCursorContinuation;
import com.apple.foundationdb.record.RecordCursorProto;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.ScanProperties;
import com.apple.foundationdb.record.TupleRange;
Expand All @@ -42,11 +43,13 @@
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ZeroCopyByteString;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand All @@ -61,22 +64,26 @@ public abstract class KeyValueCursorBase<K extends KeyValue> extends AsyncIterat
private final int prefixLength;
@Nonnull
private final CursorLimitManager limitManager;
private int valuesLimit;
private final int valuesLimit;
// the pointer may be mutated, but the actual array must never be mutated or continuations will break
@Nullable
private byte[] lastKey;
@Nonnull
private final SerializationMode serializationMode;

protected KeyValueCursorBase(@Nonnull final FDBRecordContext context,
@Nonnull final AsyncIterator<K> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
int valuesLimit,
@Nonnull final SerializationMode serializationMode) {
super(context.getExecutor(), iterator);

this.context = context;
this.prefixLength = prefixLength;
this.limitManager = limitManager;
this.valuesLimit = valuesLimit;
this.serializationMode = serializationMode;

context.instrument(FDBStoreTimer.DetailEvents.GET_SCAN_RANGE_RAW_FIRST_CHUNK, iterator.onHasNext());
}
Expand Down Expand Up @@ -131,21 +138,23 @@ public RecordCursorResult<K> getNext() {

@Nonnull
private RecordCursorContinuation continuationHelper() {
return new Continuation(lastKey, prefixLength);
return new Continuation(lastKey, prefixLength, serializationMode);
}

private static class Continuation implements RecordCursorContinuation {
public static class Continuation implements RecordCursorContinuation {
@Nullable
private final byte[] lastKey;
private final int prefixLength;
private final SerializationMode serializationMode;

public Continuation(@Nullable final byte[] lastKey, final int prefixLength) {
public Continuation(@Nullable final byte[] lastKey, final int prefixLength, final SerializationMode serializationMode) {
// Note that doing this without a full copy is dangerous if the array is ever mutated.
// Currently, this never happens and the only thing that changes is which array lastKey points to.
// However, if logic in KeyValueCursor or KeyValue changes, this could break continuations.
// To resolve it, we could resort to doing a full copy here, although that's somewhat expensive.
this.lastKey = lastKey;
this.prefixLength = prefixLength;
this.serializationMode = serializationMode;
}

@Override
Expand All @@ -156,21 +165,83 @@ public boolean isEnd() {
@Nonnull
@Override
public ByteString toByteString() {
if (lastKey == null) {
return ByteString.EMPTY;
if (serializationMode == SerializationMode.TO_OLD) {
// lastKey = null when source iterator hit limit that we passed down.
if (lastKey == null) {
return ByteString.EMPTY;
}
ByteString base = ZeroCopyByteString.wrap(lastKey);
// when prefixLength == lastKey.length, toByteString() also returns ByteString.EMPTY
return base.substring(prefixLength, lastKey.length);
} else {
return toProto().toByteString();
}
ByteString base = ZeroCopyByteString.wrap(lastKey);
return base.substring(prefixLength, lastKey.length);
}

@Nullable
@Override
public byte[] toBytes() {
if (lastKey == null) {
return null;
}
ByteString byteString = toByteString();
return byteString.isEmpty() ? new byte[0] : byteString.toByteArray();
}

@Nullable
public byte[] getInnerContinuationInBytes() {
if (lastKey == null) {
return null;
}
return Arrays.copyOfRange(lastKey, prefixLength, lastKey.length);
}

@Nonnull
public ByteString getInnerContinuationInByteString() {
if (lastKey == null) {
return ByteString.EMPTY;
}
ByteString base = ZeroCopyByteString.wrap(lastKey);
return base.substring(prefixLength, lastKey.length);
}

public static byte[] fromRawBytes(@Nullable byte[] rawBytes, SerializationMode serializationMode) {
if (rawBytes == null) {
return null;
}
if (serializationMode == SerializationMode.TO_OLD) {
return rawBytes;
}
try {
RecordCursorProto.KeyValueCursorContinuation continuationProto = RecordCursorProto.KeyValueCursorContinuation.parseFrom(rawBytes);
if (continuationProto.hasPrefixLength()) {
return continuationProto.getContinuation().toByteArray();
} else {
// parseFrom can parse an old serialization result as the new proto, wrong deserialization
return rawBytes;
}
} catch (InvalidProtocolBufferException ipbe) {
return rawBytes;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, it seems weird here to return rawBytes in these situations. It seems like we should only ever hit them if the user misconfigures things so that an old continuation is deserialized by an old serialization cursor, and we may want to just throw in that case rather than try and be smart about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in step2 when serializationMode is TO_OLD in old version, and TO_NEW in new version. If in version TO_NEW, we could try to deserialization a continuation generated by TO_OLD, in this case we'd like to return rawBytes, so that it behaves like TO_OLD.
From step 3 onwards (after all versions we care is in TO_NEW), InvalidProtocolBufferException should throw an exception. I modified line#221 to throw exception, I think that is a real exception even in step2.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I think the fundamental disagreement is then on the middle phase. I still don't think that we want to have a "duck-typing" middle phase where we provide continuations and the KeyValueCursor takes a guess at what it could mean. I think we want this to mirror what we did with the aggregate cursor where we communicate the serialization mode in the plan(s), and then we know how the cursor is supposed to be parsed.

To make that work, we'd either need to expose the serialization mode through scanIndex and scanRecords, or we'd need to move the continuation wrapping/unwrapping logic into the calling plans

}
}

@Nonnull
private RecordCursorProto.KeyValueCursorContinuation toProto() {
RecordCursorProto.KeyValueCursorContinuation.Builder builder = RecordCursorProto.KeyValueCursorContinuation.newBuilder();
if (lastKey == null) {
// proto.hasContinuation() = false when lastKey = null
return builder.setPrefixLength(prefixLength).build();
} else {
ByteString base = ZeroCopyByteString.wrap(Objects.requireNonNull(lastKey));
// proto.hasContinuation() = ByteString.EMPTY when prefixLength = lastKey.length
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem if proto.getContinuation() returns the empty string if the prefix length and is equal to the last key length? I'd think that to fix #3206, the main thing would be to make sure that the final wrapped protobuf value isn't empty in that case, and that parsing it leads to proto.hasContinuation() returning true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a unit test in KeyValueCursorTest to verify:

proto = builder.build(); // proto.hasContinuation() = false in this case
proto = builder.setContinuation(ByteString.EMPTY).build();   // proto.hasContinuation() = true in this case.

return builder.setContinuation(base.substring(prefixLength, lastKey.length)).setPrefixLength(prefixLength).build();
}
}
}

public enum SerializationMode {
TO_OLD,
TO_NEW
}

/**
Expand Down Expand Up @@ -208,9 +279,11 @@ public abstract static class Builder<T extends Builder<T>> {
private StreamingMode streamingMode;
private KeySelector begin;
private KeySelector end;
protected SerializationMode serializationMode;

protected Builder(@Nonnull Subspace subspace) {
this.subspace = subspace;
this.serializationMode = SerializationMode.TO_OLD;
}

/**
Expand Down Expand Up @@ -247,10 +320,12 @@ protected void prepare() {
prefixLength = calculatePrefixLength();

reverse = scanProperties.isReverse();

if (continuation != null) {
final byte[] continuationBytes = new byte[prefixLength + continuation.length];
byte[] realContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation, serializationMode);
final byte[] continuationBytes = new byte[prefixLength + realContinuation.length];
System.arraycopy(lowBytes, 0, continuationBytes, 0, prefixLength);
System.arraycopy(continuation, 0, continuationBytes, prefixLength, continuation.length);
System.arraycopy(realContinuation, 0, continuationBytes, prefixLength, realContinuation.length);
if (reverse) {
highBytes = continuationBytes;
highEndpoint = EndpointType.CONTINUATION;
Expand Down Expand Up @@ -334,6 +409,11 @@ public T setHigh(@Nonnull byte[] highBytes, @Nonnull EndpointType highEndpoint)
return self();
}

public T setSerializationMode(@Nonnull final SerializationMode serializationMode) {
this.serializationMode = serializationMode;
return self();
}

/**
* Calculate the key prefix length for the returned values. This will be used to derive the primary key used in
* the calculated continuation.
Expand Down
Loading