From fe267012a3d9de5e5b8baa928da791aedd37efca Mon Sep 17 00:00:00 2001 From: Alec Grieser Date: Tue, 11 Mar 2025 17:31:46 +0000 Subject: [PATCH 1/2] update gradle.properties to 4.2 --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 375a36e369..a8e8c345ff 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ # rootProject.name=fdb-record-layer -version=4.1.9.0 +version=4.2.0.0 releaseBuild=false # this should be false for release branches (i.e. if there is no -SNAPSHOT on the above version) From af5e914dfc8bc2ba0bf8453a54ea3ccca502f3f9 Mon Sep 17 00:00:00 2001 From: Alec Grieser Date: Thu, 13 Mar 2025 10:34:41 +0000 Subject: [PATCH 2/2] Streaming aggregate cursor now requires 4.1.9.0 or later for continuation deserialization This cleans up the `StreamingAggregateCursor` and its helper classes to remove the `createDefaultIfEmpty` option. We'd been carrying that option around as we introduced that field in #3092 and wanted to preserve behavior when upgrading from older versions. The intention had been that all new plans would set the option starting in 4.1, but a bug (fixed in #3211; see #3096) means that we didn't actually enable it until 4.1.9.0. That means that after this change, we'd require 4.1.9.0 or newer in order to safely continue these queries. In theory, we could wait with this change, but 4.1.9.0 has enough fixes that I actually think our recommendation should be that anyone upgrading from 4.0 or below go straight to 4.1.9.0, and then they can proceed safely to a newer version with this change. I was able to validate that this change is compatible with 4.1.9.0 via the cross-version tests run during PRB. When I ran the full mixed mode tests with the `aggregate-index-tests.yamsql`, I found that only 4.1.9.0 worked, which is expected. This resolves #3092. --- .../cursors/aggregate/AggregateCursor.java | 14 ++------- .../cursors/aggregate/StreamGrouping.java | 4 --- .../RecordQueryStreamingAggregationPlan.java | 29 +++++++------------ 3 files changed, 14 insertions(+), 33 deletions(-) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java index ad000a659a..b9a885e310 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java @@ -25,7 +25,6 @@ import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordCursorContinuation; import com.apple.foundationdb.record.RecordCursorResult; -import com.apple.foundationdb.record.RecordCursorStartContinuation; import com.apple.foundationdb.record.RecordCursorVisitor; import com.apple.foundationdb.record.query.plan.plans.QueryResult; import com.google.common.base.Verify; @@ -50,7 +49,6 @@ public class AggregateCursor implements RecordCursor streamGrouping; - private final boolean isCreateDefaultOnEmpty; // Previous record processed by this cursor @Nullable private RecordCursorResult previousResult; @@ -59,11 +57,9 @@ public class AggregateCursor implements RecordCursor previousValidResult; public AggregateCursor(@Nonnull RecordCursor inner, - @Nonnull final StreamGrouping streamGrouping, - boolean isCreateDefaultOnEmpty) { + @Nonnull final StreamGrouping streamGrouping) { this.inner = inner; this.streamGrouping = streamGrouping; - this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty; } @Nonnull @@ -77,7 +73,7 @@ public CompletableFuture> onNext() { return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> { previousResult = innerResult; if (!innerResult.hasNext()) { - if (!isNoRecords() || (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty())) { + if (!isNoRecords()) { streamGrouping.finalizeGroup(); } return false; @@ -93,11 +89,7 @@ public CompletableFuture> onNext() { }), getExecutor()).thenApply(vignore -> { if (isNoRecords()) { // Edge case where there are no records at all - if (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty()) { - return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), RecordCursorStartContinuation.START); - } else { - return RecordCursorResult.exhausted(); - } + return RecordCursorResult.exhausted(); } // Use the last valid result for the continuation as we need non-terminal one here. RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation(); diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java index 721a920657..805841c437 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java @@ -193,8 +193,4 @@ private Object evalGroupingKey(@Nullable final Object currentObject) { final EvaluationContext nestedContext = context.withBinding(Bindings.Internal.CORRELATION, alias, currentObject); return Objects.requireNonNull(groupingKeyValue).eval(store, nestedContext); } - - public boolean isResultOnEmpty() { - return groupingKeyValue == null; - } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java index 17772363f0..3398ff2e9a 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java @@ -27,6 +27,7 @@ import com.apple.foundationdb.record.PlanDeserializer; import com.apple.foundationdb.record.PlanHashable; import com.apple.foundationdb.record.PlanSerializationContext; +import com.apple.foundationdb.record.RecordCoreArgumentException; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.cursors.aggregate.AggregateCursor; import com.apple.foundationdb.record.cursors.aggregate.StreamGrouping; @@ -96,13 +97,6 @@ public class RecordQueryStreamingAggregationPlan implements RecordQueryPlanWithC private final CorrelationIdentifier aggregateAlias; @Nonnull private final Value completeResultValue; - // - // This flag is needed to distinguish if we need to create a default value on-empty or not (i.e. - // RecordQueryDefaultOnEmptyPlan will do that going forward). We will always plan with that flag set to false going - // forward, but we accept and honor this field coming from proto if we are continuing OR if it not there we imply - // true. - // https://github.com/FoundationDB/fdb-record-layer/issues/3107 - private final boolean isCreateDefaultOnEmpty; /** * Construct a new plan. @@ -119,15 +113,13 @@ private RecordQueryStreamingAggregationPlan(@Nonnull final Quantifier.Physical i @Nonnull final AggregateValue aggregateValue, @Nonnull final CorrelationIdentifier groupingKeyAlias, @Nonnull final CorrelationIdentifier aggregateAlias, - @Nonnull final Value completeResultValue, - final boolean isCreateDefaultOnEmpty) { + @Nonnull final Value completeResultValue) { this.inner = inner; this.groupingKeyValue = groupingKeyValue; this.aggregateValue = aggregateValue; this.groupingKeyAlias = groupingKeyAlias; this.aggregateAlias = aggregateAlias; this.completeResultValue = completeResultValue; - this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty; } @Nonnull @@ -148,7 +140,7 @@ public RecordCursor executePlan(@Nonnull FDBRec (FDBRecordStoreBase)store, context, inner.getAlias()); - return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty) + return new AggregateCursor<>(innerCursor, streamGrouping) .skipThenLimit(executeProperties.getSkip(), executeProperties.getReturnedRowLimit()); } @@ -206,8 +198,7 @@ public RecordQueryStreamingAggregationPlan translateCorrelations(@Nonnull final translatedAggregateValue, groupingKeyAlias, aggregateAlias, - completeResultValue, - isCreateDefaultOnEmpty); + completeResultValue); } @Nonnull @@ -218,8 +209,7 @@ public RecordQueryStreamingAggregationPlan withChild(@Nonnull final Reference ch aggregateValue, groupingKeyAlias, aggregateAlias, - completeResultValue, - isCreateDefaultOnEmpty); + completeResultValue); } @Nonnull @@ -371,7 +361,7 @@ public PRecordQueryStreamingAggregationPlan toProto(@Nonnull final PlanSerializa builder.setGroupingKeyAlias(groupingKeyAlias.getId()) .setAggregateAlias(aggregateAlias.getId()) .setCompleteResultValue(completeResultValue.toValueProto(serializationContext)) - .setIsCreateDefaultOnEmpty(isCreateDefaultOnEmpty); + .setIsCreateDefaultOnEmpty(false); return builder.build(); } @@ -396,7 +386,10 @@ public static RecordQueryStreamingAggregationPlan fromProto(@Nonnull final PlanS final CorrelationIdentifier aggregateAlias = CorrelationIdentifier.of(Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getAggregateAlias())); final Value completeResultValue = Value.fromValueProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getCompleteResultValue())); final boolean isCreateDefaultOnEmpty = recordQueryStreamingAggregationPlanProto.hasIsCreateDefaultOnEmpty() ? recordQueryStreamingAggregationPlanProto.getIsCreateDefaultOnEmpty() : true; - return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, completeResultValue, isCreateDefaultOnEmpty); + if (isCreateDefaultOnEmpty) { + throw new RecordCoreArgumentException("cannot create streaming aggregate plan with default value on empty"); + } + return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, completeResultValue); } @Nonnull @@ -428,7 +421,7 @@ public static RecordQueryStreamingAggregationPlan of(@Nonnull final Quantifier.P final var referencedAggregateValue = ObjectValue.of(aggregateAlias, aggregateValue.getResultType()); return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, - resultValueFunction.apply(referencedGroupingKeyValue, referencedAggregateValue), false); + resultValueFunction.apply(referencedGroupingKeyValue, referencedAggregateValue)); } /**