Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
387a648
save
pengpeng-lu May 21, 2025
0fd847e
save
pengpeng-lu May 22, 2025
3ce0259
save
pengpeng-lu May 28, 2025
78bea95
save
pengpeng-lu May 29, 2025
aefa82c
save
pengpeng-lu May 31, 2025
3fe7ba0
save
pengpeng-lu Jun 4, 2025
1336975
save
pengpeng-lu Jun 11, 2025
44a5cb9
clean
pengpeng-lu Jun 12, 2025
cb40b88
save
pengpeng-lu Jun 12, 2025
d82d31c
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Jun 12, 2025
8d523c1
add planner configuration
pengpeng-lu Jun 13, 2025
d015132
save
pengpeng-lu Jun 16, 2025
22a6e85
save
pengpeng-lu Jun 16, 2025
f30c3e3
revert PlannerConfiguration change:
pengpeng-lu Jun 16, 2025
cc9567b
add test
pengpeng-lu Jun 17, 2025
09907fe
style
pengpeng-lu Jun 17, 2025
8343661
checkstyle
pengpeng-lu Jun 17, 2025
30cf435
small things
pengpeng-lu Jul 10, 2025
1f27b7e
implementation comments
pengpeng-lu Jul 21, 2025
61ef12b
save
pengpeng-lu Jul 28, 2025
51e3746
save
pengpeng-lu Jul 29, 2025
c0c8e72
save
pengpeng-lu Aug 1, 2025
b151ab4
style
pengpeng-lu Aug 1, 2025
e170e11
set to_old
pengpeng-lu Aug 1, 2025
5cb1ca0
merge conflict
pengpeng-lu Aug 1, 2025
dcce7df
save
pengpeng-lu Aug 1, 2025
022dc23
1 test fail
pengpeng-lu Aug 6, 2025
69fcaf1
save in the middle of debugging
pengpeng-lu Aug 7, 2025
7fda4f2
fix test
pengpeng-lu Aug 7, 2025
def4939
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Aug 7, 2025
39075d7
merge main
pengpeng-lu Aug 7, 2025
3185c91
add back yaml tests
pengpeng-lu Aug 25, 2025
15fa168
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Aug 25, 2025
4d065e4
remove prefixLength from proto
pengpeng-lu Aug 25, 2025
5866b52
throw ex when error parsing
pengpeng-lu Aug 26, 2025
bc2f064
serialize mode in plans
pengpeng-lu Aug 28, 2025
68fe061
remove serialization in plans
pengpeng-lu Sep 12, 2025
0f91a76
merge main
pengpeng-lu Sep 12, 2025
4c19031
style
pengpeng-lu Sep 12, 2025
05890a0
magic number
pengpeng-lu Sep 17, 2025
2e7c809
comments
pengpeng-lu Sep 18, 2025
4280bdd
fix test and style
pengpeng-lu Sep 18, 2025
04b34a9
more tests
pengpeng-lu Sep 18, 2025
3f89e3d
comments
pengpeng-lu Sep 18, 2025
2909eb9
nit
pengpeng-lu Sep 19, 2025
425073d
Merge branch 'main' into keyvalue_cursor
pengpeng-lu Sep 19, 2025
db08407
add test back
pengpeng-lu Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ public class ExecuteProperties {
// how record scan limit reached is handled -- false: return early with continuation, true: throw exception
private final boolean failOnScanLimitReached;
private final boolean isDryRun;
private final boolean kvCursorContSerializeToNew;

private final CursorStreamingMode defaultCursorStreamingMode;

@SuppressWarnings("java:S107")
private ExecuteProperties(int skip, int rowLimit, @Nonnull IsolationLevel isolationLevel, long timeLimit,
@Nonnull ExecuteState state, boolean failOnScanLimitReached, @Nonnull CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun) {
@Nonnull ExecuteState state, boolean failOnScanLimitReached, @Nonnull CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun, boolean kvCursorContSerializeToNew) {
this.skip = skip;
this.rowLimit = rowLimit;
this.isolationLevel = isolationLevel;
Expand All @@ -86,6 +87,7 @@ private ExecuteProperties(int skip, int rowLimit, @Nonnull IsolationLevel isolat
this.failOnScanLimitReached = failOnScanLimitReached;
this.defaultCursorStreamingMode = defaultCursorStreamingMode;
this.isDryRun = isDryRun;
this.kvCursorContSerializeToNew = kvCursorContSerializeToNew;
}

@Nonnull
Expand All @@ -102,7 +104,7 @@ public ExecuteProperties setSkip(final int skip) {
if (skip == this.skip) {
return this;
}
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

public boolean isDryRun() {
Expand All @@ -114,9 +116,11 @@ public ExecuteProperties setDryRun(final boolean isDryRun) {
if (isDryRun == this.isDryRun) {
return this;
}
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

public boolean isKvCursorContSerializeToNew() {return kvCursorContSerializeToNew;}


/**
* Get the limit on the number of rows that will be returned as it would be passed to FDB.
Expand All @@ -137,7 +141,7 @@ public ExecuteProperties setReturnedRowLimit(final int rowLimit) {
if (newLimit == this.rowLimit) {
return this;
}
return copy(skip, newLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, newLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand Down Expand Up @@ -184,7 +188,7 @@ public ExecuteState getState() {
*/
@Nonnull
public ExecuteProperties setState(@Nonnull ExecuteState newState) {
return copy(skip, rowLimit, timeLimit, isolationLevel, newState, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, newState, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -193,7 +197,7 @@ public ExecuteProperties setState(@Nonnull ExecuteState newState) {
*/
@Nonnull
public ExecuteProperties clearState() {
return copy(skip, rowLimit, timeLimit, isolationLevel, new ExecuteState(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, new ExecuteState(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -209,15 +213,15 @@ public ExecuteProperties setFailOnScanLimitReached(boolean failOnScanLimitReache
if (failOnScanLimitReached == this.failOnScanLimitReached) {
return this;
}
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

@Nonnull
public ExecuteProperties clearReturnedRowLimit() {
if (getReturnedRowLimit() == ReadTransaction.ROW_LIMIT_UNLIMITED) {
return this;
}
return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -229,7 +233,7 @@ public ExecuteProperties clearRowAndTimeLimits() {
if (getTimeLimit() == UNLIMITED_TIME && getReturnedRowLimit() == ReadTransaction.ROW_LIMIT_UNLIMITED) {
return this;
}
return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, UNLIMITED_TIME, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, UNLIMITED_TIME, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -241,7 +245,7 @@ public ExecuteProperties clearSkipAndLimit() {
if (skip == 0 && rowLimit == ReadTransaction.ROW_LIMIT_UNLIMITED) {
return this;
}
return copy(0, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(0, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -254,7 +258,7 @@ public ExecuteProperties clearSkipAndAdjustLimit() {
return this;
}
return copy(0, rowLimit == ReadTransaction.ROW_LIMIT_UNLIMITED ? ReadTransaction.ROW_LIMIT_UNLIMITED : rowLimit + skip,
timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand Down Expand Up @@ -305,7 +309,7 @@ public ExecuteProperties setDefaultCursorStreamingMode(CursorStreamingMode defau
if (defaultCursorStreamingMode == this.defaultCursorStreamingMode) {
return this;
}
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -315,7 +319,7 @@ public ExecuteProperties setDefaultCursorStreamingMode(CursorStreamingMode defau
*/
@Nonnull
public ExecuteProperties resetState() {
return copy(skip, rowLimit, timeLimit, isolationLevel, state.reset(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state.reset(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -333,8 +337,8 @@ public ExecuteProperties resetState() {
@SuppressWarnings("java:S107")
@Nonnull
protected ExecuteProperties copy(int skip, int rowLimit, long timeLimit, @Nonnull IsolationLevel isolationLevel,
@Nonnull ExecuteState state, boolean failOnScanLimitReached, CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun) {
return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
@Nonnull ExecuteState state, boolean failOnScanLimitReached, CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun, boolean kvCursorContSerializeToNew) {
return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

@Nonnull
Expand Down Expand Up @@ -408,6 +412,7 @@ public static class Builder {
private ExecuteState executeState = null;
private boolean failOnScanLimitReached = false;
private boolean isDryRun = false;
private boolean kvCursorContSerializeToNew = false;
private CursorStreamingMode defaultCursorStreamingMode = CursorStreamingMode.ITERATOR;

private Builder() {
Expand All @@ -422,6 +427,7 @@ private Builder(ExecuteProperties executeProperties) {
this.failOnScanLimitReached = executeProperties.failOnScanLimitReached;
this.defaultCursorStreamingMode = executeProperties.defaultCursorStreamingMode;
this.isDryRun = executeProperties.isDryRun;
this.kvCursorContSerializeToNew = executeProperties.kvCursorContSerializeToNew;
}

@Nonnull
Expand Down Expand Up @@ -455,6 +461,12 @@ public Builder setDryRun(boolean isDryRun) {
return this;
}

@Nonnull
public Builder setKvCursorContSerializeToNew(boolean kvCursorContSerializeToNew) {
this.kvCursorContSerializeToNew = kvCursorContSerializeToNew;
return this;
}

@Nonnull
public Builder setReturnedRowLimit(int rowLimit) {
this.rowLimit = validateAndNormalizeRowLimit(rowLimit);
Expand Down Expand Up @@ -607,7 +619,7 @@ public ExecuteProperties build() {
} else {
state = new ExecuteState(RecordScanLimiterFactory.enforce(scannedRecordsLimit), ByteScanLimiterFactory.enforce(scannedBytesLimit));
}
return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ private IndexPrefetchRangeKeyValueCursor(@Nonnull final FDBRecordContext context
@Nonnull final AsyncIterator<MappedKeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
int valuesLimit,
SerializationMode serializationMode) {

super(context, iterator, prefixLength, limitManager, valuesLimit);
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand All @@ -69,7 +70,7 @@ public IndexPrefetchRangeKeyValueCursor build() {
AsyncIterator<MappedKeyValue> iterator = getTransaction()
.getMappedRange(getBegin(), getEnd(), mapper, getLimit(), isReverse(), getStreamingMode())
.iterator();
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ private KeyValueCursor(@Nonnull final FDBRecordContext context,
@Nonnull final AsyncIterator<KeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
super(context, iterator, prefixLength, limitManager, valuesLimit);
int valuesLimit,
SerializationMode serializationMode) {
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand Down Expand Up @@ -77,7 +78,7 @@ public KeyValueCursor build() {
final AsyncIterator<KeyValue> iterator = getTransaction()
.getRange(getBegin(), getEnd(), getLimit(), isReverse(), getStreamingMode())
.iterator();
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Loading
Loading