Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
387a648
save
pengpeng-lu May 21, 2025
0fd847e
save
pengpeng-lu May 22, 2025
3ce0259
save
pengpeng-lu May 28, 2025
78bea95
save
pengpeng-lu May 29, 2025
aefa82c
save
pengpeng-lu May 31, 2025
3fe7ba0
save
pengpeng-lu Jun 4, 2025
1336975
save
pengpeng-lu Jun 11, 2025
44a5cb9
clean
pengpeng-lu Jun 12, 2025
cb40b88
save
pengpeng-lu Jun 12, 2025
d82d31c
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Jun 12, 2025
8d523c1
add planner configuration
pengpeng-lu Jun 13, 2025
d015132
save
pengpeng-lu Jun 16, 2025
22a6e85
save
pengpeng-lu Jun 16, 2025
f30c3e3
revert PlannerConfiguration change:
pengpeng-lu Jun 16, 2025
cc9567b
add test
pengpeng-lu Jun 17, 2025
09907fe
style
pengpeng-lu Jun 17, 2025
8343661
checkstyle
pengpeng-lu Jun 17, 2025
30cf435
small things
pengpeng-lu Jul 10, 2025
1f27b7e
implementation comments
pengpeng-lu Jul 21, 2025
61ef12b
save
pengpeng-lu Jul 28, 2025
51e3746
save
pengpeng-lu Jul 29, 2025
c0c8e72
save
pengpeng-lu Aug 1, 2025
b151ab4
style
pengpeng-lu Aug 1, 2025
e170e11
set to_old
pengpeng-lu Aug 1, 2025
5cb1ca0
merge conflict
pengpeng-lu Aug 1, 2025
dcce7df
save
pengpeng-lu Aug 1, 2025
022dc23
1 test fail
pengpeng-lu Aug 6, 2025
69fcaf1
save in the middle of debugging
pengpeng-lu Aug 7, 2025
7fda4f2
fix test
pengpeng-lu Aug 7, 2025
def4939
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Aug 7, 2025
39075d7
merge main
pengpeng-lu Aug 7, 2025
3185c91
add back yaml tests
pengpeng-lu Aug 25, 2025
15fa168
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Aug 25, 2025
4d065e4
remove prefixLength from proto
pengpeng-lu Aug 25, 2025
5866b52
throw ex when error parsing
pengpeng-lu Aug 26, 2025
bc2f064
serialize mode in plans
pengpeng-lu Aug 28, 2025
68fe061
remove serialization in plans
pengpeng-lu Sep 12, 2025
0f91a76
merge main
pengpeng-lu Sep 12, 2025
4c19031
style
pengpeng-lu Sep 12, 2025
05890a0
magic number
pengpeng-lu Sep 17, 2025
2e7c809
comments
pengpeng-lu Sep 18, 2025
4280bdd
fix test and style
pengpeng-lu Sep 18, 2025
04b34a9
more tests
pengpeng-lu Sep 18, 2025
3f89e3d
comments
pengpeng-lu Sep 18, 2025
2909eb9
nit
pengpeng-lu Sep 19, 2025
425073d
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Sep 19, 2025
db08407
add test back
pengpeng-lu Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,20 @@
private final byte[] lastKey;
private final int prefixLength;
private final SerializationMode serializationMode;
private static final long MAGIC_NUMBER = 1234L;
/*
how we chose this "magic number":
The goal is to make sure an old continuation won't be accidentally parsed as: {magic_number = 1234567890L, inner_continuation = some byte array}
An example that can be parsed as above is:
byte[] data = new byte[] {0x11, (byte) 0xD2, 0x02, (byte) 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, 0x0A, 0x01, 0x14};
where 0x11 is wire tag for sfixed64,
[(byte) 0xD2, 0x02, (byte) 0x96, 0x49, 0x00, 0x00, 0x00, 0x00] is 1234567890L
0x0A is wire tag for bytes
0x01 represents that the byte array contains 1 byte
0x14 is the value of the byte array

Because 0xD2 is a negative number in java, it is not possible for a continuation to reach this number.
*/
private static final long MAGIC_NUMBER = 1234567890L;

public Continuation(@Nullable final byte[] lastKey, final int prefixLength, final SerializationMode serializationMode) {
// Note that doing this without a full copy is dangerous if the array is ever mutated.
Expand Down Expand Up @@ -188,13 +201,10 @@
return byteString.isEmpty() ? new byte[0] : byteString.toByteArray();
}

public static byte[] fromRawBytes(@Nullable byte[] rawBytes, SerializationMode serializationMode) {
public static byte[] fromRawBytes(@Nullable byte[] rawBytes) {
if (rawBytes == null) {
return null;
}
if (serializationMode == SerializationMode.TO_OLD) {
return rawBytes;
}
try {
RecordCursorProto.KeyValueCursorContinuation continuationProto = RecordCursorProto.KeyValueCursorContinuation.parseFrom(rawBytes);
if (continuationProto.getMagicNumber() != MAGIC_NUMBER) {
Expand All @@ -219,7 +229,7 @@
return builder.setMagicNumber(MAGIC_NUMBER).build();
} else {
ByteString base = ZeroCopyByteString.wrap(Objects.requireNonNull(lastKey));
// even when prefixLength = lastKey.length, proto.getInnerContinuation() = ByteString.EMPTY, proto.hasContinuation() = true

Check warning on line 232 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java#L232

Commented Out Code https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3397%2Fpengpeng-lu%2Fkeyvalue_cursor%3AHEAD&id=6C8EE75CB9DF76FAD72F7AFC0EFD7B81
return builder.setInnerContinuation(base.substring(prefixLength, lastKey.length)).setMagicNumber(MAGIC_NUMBER).build();
}
}
Expand Down Expand Up @@ -313,7 +323,7 @@
reverse = scanProperties.isReverse();

if (continuation != null) {
byte[] realContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation, serializationMode);
byte[] realContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation);
final byte[] continuationBytes = new byte[prefixLength + realContinuation.length];
System.arraycopy(lowBytes, 0, continuationBytes, 0, prefixLength);
System.arraycopy(realContinuation, 0, continuationBytes, prefixLength, realContinuation.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
*/
@API(API.Status.INTERNAL)
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class RecordQueryIndexPlan implements RecordQueryPlanWithNoChildren,

Check warning on line 136 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java#L136

Too many constructors (6/5) https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3397%2Fpengpeng-lu%2Fkeyvalue_cursor%3AHEAD&id=B0915E539FFB18ED90523CCF94A11F26
RecordQueryPlanWithComparisons,
RecordQueryPlanWithIndex,
PlannerGraphRewritable,
Expand Down Expand Up @@ -180,7 +180,7 @@
this(indexName, commonPrimaryKey, scanParameters, useIndexPrefetch, fetchIndexRecords, reverse, strictlySorted, Optional.empty(), new Type.Any(), QueryPlanConstraint.noConstraint());
}

public RecordQueryIndexPlan(@Nonnull final String indexName,

Check warning on line 183 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java#L183

Method `RecordQueryIndexPlan` has 10 parameters but no more than 7 parameters are allowed https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3397%2Fpengpeng-lu%2Fkeyvalue_cursor%3AHEAD&id=11C6F46A0D2E82D36A4A3424DEB55B5A
@Nullable final KeyExpression commonPrimaryKey,
@Nonnull final IndexScanParameters scanParameters,
@Nonnull final IndexFetchMethod indexFetchMethod,
Expand Down Expand Up @@ -222,7 +222,7 @@
}

@VisibleForTesting
public RecordQueryIndexPlan(@Nonnull final String indexName,

Check warning on line 225 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java#L225

Method `RecordQueryIndexPlan` has 11 parameters but no more than 7 parameters are allowed https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3397%2Fpengpeng-lu%2Fkeyvalue_cursor%3AHEAD&id=6F597311CDC3FECD4D2DBFCCDD569EAB
@Nullable final KeyExpression commonPrimaryKey,
@Nonnull final IndexScanParameters scanParameters,
@Nonnull final IndexFetchMethod indexFetchMethod,
Expand Down Expand Up @@ -310,7 +310,7 @@
final RecordMetaData metaData = store.getRecordMetaData();
final Index index = metaData.getIndex(indexName);
final IndexScanBounds scanBounds = scanParameters.bind(store, index, context);
byte[] innerContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation, serializationMode);
byte[] innerContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation);

return store.scanIndexRemoteFetch(index, scanBounds, innerContinuation, executeProperties.asScanProperties(isReverse()), IndexOrphanBehavior.ERROR)
.map(store::queriedRecord)
Expand All @@ -324,7 +324,7 @@
final RecordMetaData metaData = store.getRecordMetaData();
final Index index = metaData.getIndex(indexName);
final IndexScanBounds scanBounds = scanParameters.bind(store, index, context);
byte[] innerContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation, serializationMode);
byte[] innerContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation);

if (!IndexScanType.BY_VALUE_OVER_SCAN.equals(getScanType())) {
return store.scanIndex(index, scanBounds, innerContinuation, executeProperties.asScanProperties(reverse));
Expand Down Expand Up @@ -793,7 +793,7 @@
return null;
}
// Add the prefix back to the inner continuation
byte[] innerContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation, serializationMode);
byte[] innerContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation);
return ByteArrayUtil.join(prefixBytes, innerContinuation);
}

Expand All @@ -802,7 +802,7 @@
if (continuation.isEnd()) {
return continuation;
}
byte[] continuationBytes = KeyValueCursorBase.Continuation.fromRawBytes(continuation.toBytes(), serializationMode);
byte[] continuationBytes = KeyValueCursorBase.Continuation.fromRawBytes(continuation.toBytes());
if (continuationBytes != null && ByteArrayUtil.startsWith(continuationBytes, prefixBytes)) {
// Strip away the prefix. Note that ByteStrings re-use the underlying ByteArray, so this can
// save a copy.
Expand Down Expand Up @@ -835,7 +835,7 @@
if (bytes == null) {
synchronized (this) {
if (bytes == null) {
byte[] baseContinuationBytes = KeyValueCursorBase.Continuation.fromRawBytes(baseContinuation.toBytes(), serializationMode);
byte[] baseContinuationBytes = KeyValueCursorBase.Continuation.fromRawBytes(baseContinuation.toBytes());
if (baseContinuationBytes == null) {
return null;
}
Expand All @@ -849,7 +849,7 @@
@Nonnull
@Override
public ByteString toByteString() {
byte[] result = KeyValueCursorBase.Continuation.fromRawBytes(baseContinuation.toBytes(), serializationMode);
byte[] result = KeyValueCursorBase.Continuation.fromRawBytes(baseContinuation.toBytes());
return result == null ? ByteString.EMPTY : ByteString.copyFrom(result).substring(prefixLength);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ public class RecordQueryScanPlan implements RecordQueryPlanWithNoChildren, Recor
private final Optional<? extends WithPrimaryKeyMatchCandidate> matchCandidateOptional;
@Nonnull
private final Supplier<ComparisonRanges> comparisonRangesSupplier;
@Nonnull
private final KeyValueCursorBase.SerializationMode serializationMode;

/**
* Overloaded constructor.
Expand Down Expand Up @@ -146,17 +144,6 @@ public RecordQueryScanPlan(@Nullable Set<String> recordTypes,
this(recordTypes, flowedType, commonPrimaryKey, comparisons, reverse, strictlySorted, Optional.of(matchCandidate));
}

public RecordQueryScanPlan(@Nullable Set<String> recordTypes,
@Nonnull Type flowedType,
@Nullable KeyExpression commonPrimaryKey,
@Nonnull ScanComparisons comparisons,
boolean reverse,
boolean strictlySorted,
@Nonnull final Optional<? extends WithPrimaryKeyMatchCandidate> matchCandidateOptional) {
this(recordTypes, flowedType, commonPrimaryKey, comparisons, reverse, strictlySorted, matchCandidateOptional, KeyValueCursorBase.SerializationMode.TO_OLD);
}


/**
* Overloaded constructor.
* @param recordTypes a super set of record types of the records that this scan operator can produce
Expand All @@ -175,8 +162,7 @@ public RecordQueryScanPlan(@Nullable Set<String> recordTypes,
@Nonnull ScanComparisons comparisons,
boolean reverse,
boolean strictlySorted,
@Nonnull final Optional<? extends WithPrimaryKeyMatchCandidate> matchCandidateOptional,
@Nonnull final KeyValueCursorBase.SerializationMode serializationMode) {
@Nonnull final Optional<? extends WithPrimaryKeyMatchCandidate> matchCandidateOptional) {
this.recordTypes = recordTypes == null ? null : ImmutableSet.copyOf(recordTypes);
this.flowedType = flowedType;
this.commonPrimaryKey = commonPrimaryKey;
Expand All @@ -185,7 +171,6 @@ public RecordQueryScanPlan(@Nullable Set<String> recordTypes,
this.strictlySorted = strictlySorted;
this.matchCandidateOptional = matchCandidateOptional;
this.comparisonRangesSupplier = Suppliers.memoize(this::computeComparisonRanges);
this.serializationMode = serializationMode;
}

@Nonnull
Expand All @@ -195,7 +180,7 @@ public <M extends Message> RecordCursor<QueryResult> executePlan(@Nonnull final
@Nullable final byte[] continuation,
@Nonnull final ExecuteProperties executeProperties) {
final TupleRange range = comparisons.toTupleRange(store, context);
byte[] innerContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation, serializationMode);
byte[] innerContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation);

return store.scanRecords(
range.getLow(), range.getHigh(), range.getLowEndpoint(), range.getHighEndpoint(), innerContinuation,
Expand Down
2 changes: 1 addition & 1 deletion fdb-record-layer-core/src/main/proto/record_cursor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,5 @@ message RangeCursorContinuation {

message KeyValueCursorContinuation {
optional bytes inner_continuation = 1;
optional int64 magic_number = 2;
optional sfixed64 magic_number = 2;
}
Loading