Skip to content
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
387a648
save
pengpeng-lu May 21, 2025
0fd847e
save
pengpeng-lu May 22, 2025
3ce0259
save
pengpeng-lu May 28, 2025
78bea95
save
pengpeng-lu May 29, 2025
aefa82c
save
pengpeng-lu May 31, 2025
3fe7ba0
save
pengpeng-lu Jun 4, 2025
1336975
save
pengpeng-lu Jun 11, 2025
44a5cb9
clean
pengpeng-lu Jun 12, 2025
cb40b88
save
pengpeng-lu Jun 12, 2025
d82d31c
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Jun 12, 2025
8d523c1
add planner configuration
pengpeng-lu Jun 13, 2025
d015132
save
pengpeng-lu Jun 16, 2025
22a6e85
save
pengpeng-lu Jun 16, 2025
f30c3e3
revert PlannerConfiguration change:
pengpeng-lu Jun 16, 2025
cc9567b
add test
pengpeng-lu Jun 17, 2025
09907fe
style
pengpeng-lu Jun 17, 2025
8343661
checkstyle
pengpeng-lu Jun 17, 2025
30cf435
small things
pengpeng-lu Jul 10, 2025
1f27b7e
implementation comments
pengpeng-lu Jul 21, 2025
61ef12b
save
pengpeng-lu Jul 28, 2025
51e3746
save
pengpeng-lu Jul 29, 2025
c0c8e72
save
pengpeng-lu Aug 1, 2025
b151ab4
style
pengpeng-lu Aug 1, 2025
e170e11
set to_old
pengpeng-lu Aug 1, 2025
5cb1ca0
merge conflict
pengpeng-lu Aug 1, 2025
dcce7df
save
pengpeng-lu Aug 1, 2025
022dc23
1 test fail
pengpeng-lu Aug 6, 2025
69fcaf1
save in the middle of debugging
pengpeng-lu Aug 7, 2025
7fda4f2
fix test
pengpeng-lu Aug 7, 2025
def4939
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Aug 7, 2025
39075d7
merge main
pengpeng-lu Aug 7, 2025
3185c91
add back yaml tests
pengpeng-lu Aug 25, 2025
15fa168
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Aug 25, 2025
4d065e4
remove prefixLength from proto
pengpeng-lu Aug 25, 2025
5866b52
throw ex when error parsing
pengpeng-lu Aug 26, 2025
bc2f064
serialize mode in plans
pengpeng-lu Aug 28, 2025
68fe061
remove serialization in plans
pengpeng-lu Sep 12, 2025
0f91a76
merge main
pengpeng-lu Sep 12, 2025
4c19031
style
pengpeng-lu Sep 12, 2025
05890a0
magic number
pengpeng-lu Sep 17, 2025
2e7c809
comments
pengpeng-lu Sep 18, 2025
4280bdd
fix test and style
pengpeng-lu Sep 18, 2025
04b34a9
more tests
pengpeng-lu Sep 18, 2025
3f89e3d
comments
pengpeng-lu Sep 18, 2025
2909eb9
nit
pengpeng-lu Sep 19, 2025
425073d
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Sep 19, 2025
db08407
add test back
pengpeng-lu Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ private IndexPrefetchRangeKeyValueCursor(@Nonnull final FDBRecordContext context
@Nonnull final AsyncIterator<MappedKeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
int valuesLimit,
@Nonnull SerializationMode serializationMode) {

super(context, iterator, prefixLength, limitManager, valuesLimit);
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand All @@ -69,7 +70,7 @@ public IndexPrefetchRangeKeyValueCursor build() {
AsyncIterator<MappedKeyValue> iterator = getTransaction()
.getMappedRange(getBegin(), getEnd(), mapper, getLimit(), isReverse(), getStreamingMode())
.iterator();
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ private KeyValueCursor(@Nonnull final FDBRecordContext context,
@Nonnull final AsyncIterator<KeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
super(context, iterator, prefixLength, limitManager, valuesLimit);
int valuesLimit,
@Nonnull SerializationMode serializationMode) {
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand Down Expand Up @@ -77,7 +78,7 @@ public KeyValueCursor build() {
final AsyncIterator<KeyValue> iterator = getTransaction()
.getRange(getBegin(), getEnd(), getLimit(), isReverse(), getStreamingMode())
.iterator();
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.apple.foundationdb.record.KeyRange;
import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordCursorContinuation;
import com.apple.foundationdb.record.RecordCursorProto;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.ScanProperties;
import com.apple.foundationdb.record.TupleRange;
Expand All @@ -42,11 +43,12 @@
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ZeroCopyByteString;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand All @@ -61,22 +63,26 @@
private final int prefixLength;
@Nonnull
private final CursorLimitManager limitManager;
private int valuesLimit;
private final int valuesLimit;
// the pointer may be mutated, but the actual array must never be mutated or continuations will break
@Nullable
private byte[] lastKey;
@Nonnull
private final SerializationMode serializationMode;

protected KeyValueCursorBase(@Nonnull final FDBRecordContext context,
@Nonnull final AsyncIterator<K> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
int valuesLimit,
@Nonnull final SerializationMode serializationMode) {
super(context.getExecutor(), iterator);

this.context = context;
this.prefixLength = prefixLength;
this.limitManager = limitManager;
this.valuesLimit = valuesLimit;
this.serializationMode = serializationMode;

context.instrument(FDBStoreTimer.DetailEvents.GET_SCAN_RANGE_RAW_FIRST_CHUNK, iterator.onHasNext());
}
Expand Down Expand Up @@ -131,21 +137,36 @@

@Nonnull
private RecordCursorContinuation continuationHelper() {
return new Continuation(lastKey, prefixLength);
return new Continuation(lastKey, prefixLength, serializationMode);
}

private static class Continuation implements RecordCursorContinuation {
public static class Continuation implements RecordCursorContinuation {
@Nullable
private final byte[] lastKey;
private final int prefixLength;
private final SerializationMode serializationMode;
/*
how we chose this "magic number":
The goal is to make sure an old continuation won't be accidentally parsed as: {magic_number = 6773487359078157740L, inner_continuation = some byte array}
In little endian, MAGIC_NUMBER: 6773487359078157740L is:
new byte[]{ (byte) 0xAC, (byte) 0xCD, 0x73, (byte) 0x98, (byte) 0xDD, 0x42, 0x00, 0x5E };
Note that none of those bytes are valid Tuple codes (except for the 0x00--which is also deliberate).
That includes the byte at position 4, that is \x98, which is the relevant byte if we had a Tuple that began with \x11.
The choice of 0x00 as the penultimate byte is there to protect against the only case where we don't get a valid Tuple back from a scan,
namely PREFIX_STRING scan. In that case, the byte string will begin with some String suffix, which theoretically could be a valid Protobuf value.
But that String suffix will have any \x00 bytes escaped by following it with an \xff byte.
So the sequence \x00\x5e would mean "end-of-string" followed by the beginning of a new Tuple value with code \x5e, which again, is invalid.
*/
private static final long MAGIC_NUMBER = 677_348_735_907_815_774_0L;

public Continuation(@Nullable final byte[] lastKey, final int prefixLength) {
public Continuation(@Nullable final byte[] lastKey, final int prefixLength, final SerializationMode serializationMode) {
// Note that doing this without a full copy is dangerous if the array is ever mutated.
// Currently, this never happens and the only thing that changes is which array lastKey points to.
// However, if logic in KeyValueCursor or KeyValue changes, this could break continuations.
// To resolve it, we could resort to doing a full copy here, although that's somewhat expensive.
this.lastKey = lastKey;
this.prefixLength = prefixLength;
this.serializationMode = serializationMode;
}

@Override
Expand All @@ -156,11 +177,17 @@
@Nonnull
@Override
public ByteString toByteString() {
if (lastKey == null) {
return ByteString.EMPTY;
if (serializationMode == SerializationMode.TO_OLD) {
// lastKey = null when source iterator hit limit that we passed down.
if (lastKey == null) {
return ByteString.EMPTY;
}
ByteString base = ZeroCopyByteString.wrap(lastKey);
// when prefixLength == lastKey.length, toByteString() also returns ByteString.EMPTY
return base.substring(prefixLength, lastKey.length);
} else {
return toProto().toByteString();
}
ByteString base = ZeroCopyByteString.wrap(lastKey);
return base.substring(prefixLength, lastKey.length);
}

@Nullable
Expand All @@ -169,10 +196,49 @@
if (lastKey == null) {
return null;
}
return Arrays.copyOfRange(lastKey, prefixLength, lastKey.length);
ByteString byteString = toByteString();
return byteString.isEmpty() ? new byte[0] : byteString.toByteArray();
}

public static byte[] getInnerContinuation(@Nullable byte[] rawBytes) {
if (rawBytes == null) {
return null;
}
try {
RecordCursorProto.KeyValueCursorContinuation continuationProto = RecordCursorProto.KeyValueCursorContinuation.parseFrom(rawBytes);
if (continuationProto.getMagicNumber() != MAGIC_NUMBER) {
// an old continuation was accidentally deserialized as proto
// after all versions we care is in TO_NEW, should throw an exception here.
return rawBytes;
}
return continuationProto.getInnerContinuation().toByteArray();
} catch (InvalidProtocolBufferException ipbe) {
// in intermediate step when serializationMode is TO_OLD in old version, and TO_NEW in new version
// in version TO_NEW, we could try to deserialization a continuation generated by TO_OLD, in this case we'd like to return rawBytes, so that it behaves like TO_OLD
// after all versions we care is in TO_NEW, InvalidProtocolBufferException should throw an exception.
return rawBytes;
}
}

@Nonnull
private RecordCursorProto.KeyValueCursorContinuation toProto() {
RecordCursorProto.KeyValueCursorContinuation.Builder builder = RecordCursorProto.KeyValueCursorContinuation.newBuilder();
if (lastKey == null) {
// when lastKey is null, proto.hasInnerContinuation() = false
return builder.setMagicNumber(MAGIC_NUMBER).build();
} else {
ByteString base = ZeroCopyByteString.wrap(Objects.requireNonNull(lastKey));
// even when prefixLength = lastKey.length, proto.getInnerContinuation() = ByteString.EMPTY, proto.hasContinuation() = true

Check warning on line 231 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java#L231

Commented Out Code https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3397%2Fpengpeng-lu%2Fkeyvalue_cursor%3AHEAD&id=6C8EE75CB9DF76FAD72F7AFC0EFD7B81
return builder.setInnerContinuation(base.substring(prefixLength, lastKey.length)).setMagicNumber(MAGIC_NUMBER).build();
}
}
}

public enum SerializationMode {
TO_OLD,
TO_NEW
}

/**
* A builder for {@link KeyValueCursorBase}.
* @param <T> the type of the concrete subclass of the builder
Expand Down Expand Up @@ -208,9 +274,11 @@
private StreamingMode streamingMode;
private KeySelector begin;
private KeySelector end;
protected SerializationMode serializationMode;

protected Builder(@Nonnull Subspace subspace) {
this.subspace = subspace;
this.serializationMode = SerializationMode.TO_OLD;
}

/**
Expand Down Expand Up @@ -252,10 +320,12 @@
}

reverse = scanProperties.isReverse();

if (continuation != null) {
final byte[] continuationBytes = new byte[prefixLength + continuation.length];
byte[] realContinuation = KeyValueCursorBase.Continuation.getInnerContinuation(continuation);
final byte[] continuationBytes = new byte[prefixLength + realContinuation.length];
System.arraycopy(lowBytes, 0, continuationBytes, 0, prefixLength);
System.arraycopy(continuation, 0, continuationBytes, prefixLength, continuation.length);
System.arraycopy(realContinuation, 0, continuationBytes, prefixLength, realContinuation.length);
if (reverse) {
highBytes = continuationBytes;
highEndpoint = EndpointType.CONTINUATION;
Expand Down Expand Up @@ -294,7 +364,7 @@

@SpotBugsSuppressWarnings(value = "EI2", justification = "copies are expensive")
public T setContinuation(@Nullable byte[] continuation) {
this.continuation = continuation;
this.continuation = KeyValueCursorBase.Continuation.getInnerContinuation(continuation);
return self();
}

Expand Down Expand Up @@ -339,6 +409,11 @@
return self();
}

public T setSerializationMode(@Nonnull final SerializationMode serializationMode) {
this.serializationMode = serializationMode;
return self();
}

/**
* Calculate the key prefix length for the returned values. This will be used to derive the primary key used in
* the calculated continuation.
Expand Down
Loading
Loading