diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java index bd3e30b7825..c61808744ba 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java @@ -433,6 +433,7 @@ public SnapWorldStateDownloadProcess build() { "batchDownloadBlockAccessListsData", tasks -> requestDataStep.requestBlockAccessLists(tasks), maxOutstandingRequests) + .inSingleBatch() .thenProcess( "batchPersistBlockAccessListsData", tasks -> persistDataStep.persist(tasks)) .andFinishWith( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/BlockAccessListDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/BlockAccessListDataRequest.java index 74a95268a96..88be38da838 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/BlockAccessListDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/BlockAccessListDataRequest.java @@ -93,8 +93,7 @@ protected int doPersist( 0, Wei.ZERO, Hash.EMPTY_TRIE_HASH, Hash.EMPTY)); final var updatedCode = accountChanges.code(); - final Hash updatedCodeHash = - updatedCode.map(Hash::hash).orElse(trieAccountValue.getCodeHash()); + final Hash updatedCodeHash = resolveUpdatedCodeHash(accountChanges, trieAccountValue); updatedCode.ifPresent( code -> bonsaiUpdater.putCode(accountHash, updatedCodeHash, code)); @@ -105,8 +104,8 @@ protected int doPersist( final PmtStateTrieAccountValue updatedValue = new PmtStateTrieAccountValue( - accountChanges.nonce().orElse(trieAccountValue.getNonce()), - accountChanges.balance().orElse(trieAccountValue.getBalance()), + resolveUpdatedNonce(accountChanges, trieAccountValue), + resolveUpdatedBalance(accountChanges, trieAccountValue), updatedStorageRoot, updatedCodeHash); bonsaiUpdater.putAccountInfoState(accountHash, RLP.encode(updatedValue::writeTo)); @@ -131,6 +130,24 @@ private void applyStorageChanges( } } + private long resolveUpdatedNonce( + final BlockAccessListChanges.AccountFinalChanges accountChanges, + final PmtStateTrieAccountValue trieAccountValue) { + return accountChanges.nonce().orElse(trieAccountValue.getNonce()); + } + + private Wei resolveUpdatedBalance( + final BlockAccessListChanges.AccountFinalChanges accountChanges, + final PmtStateTrieAccountValue trieAccountValue) { + return accountChanges.balance().orElse(trieAccountValue.getBalance()); + } + + private Hash resolveUpdatedCodeHash( + final BlockAccessListChanges.AccountFinalChanges accountChanges, + final PmtStateTrieAccountValue trieAccountValue) { + return accountChanges.code().map(Hash::hash).orElse(trieAccountValue.getCodeHash()); + } + @Override public boolean isResponseReceived() { return blockAccessList.isPresent(); diff --git a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/AggregatingReadPipe.java b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/AggregatingReadPipe.java new file mode 100644 index 00000000000..903fd17204c --- /dev/null +++ b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/AggregatingReadPipe.java @@ -0,0 +1,118 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.services.pipeline; + +import org.hyperledger.besu.plugin.services.metrics.Counter; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Queue; + +/** + * A read pipe that aggregates all incoming batches into a single batch. + * + * @param the type of item included in batches + */ +public class AggregatingReadPipe implements ReadPipe> { + + private final ReadPipe> input; + private final Counter aggregateCounter; + private final Queue pendingItems = new ArrayDeque<>(); + + /** + * Instantiates a new Aggregating read pipe. + * + * @param input the input batches + * @param aggregateCounter the aggregate counter + */ + public AggregatingReadPipe(final ReadPipe> input, final Counter aggregateCounter) { + this.input = input; + this.aggregateCounter = aggregateCounter; + } + + @Override + public boolean hasMore() { + return input.hasMore() || !pendingItems.isEmpty(); + } + + @Override + public boolean isAborted() { + return input.isAborted(); + } + + @Override + public List get() { + drainInputToPendingByGet(); + return emitPendingUpTo(Integer.MAX_VALUE); + } + + @Override + public List poll() { + drainInputToPendingByPoll(); + + if (input.hasMore()) { + return null; + } + + return emitPendingUpTo(Integer.MAX_VALUE); + } + + @Override + public int drainTo(final Collection> output, final int maxElements) { + if (maxElements <= 0) { + return 0; + } + + drainInputToPendingByPoll(); + + final List aggregate = emitPendingUpTo(maxElements); + if (aggregate != null) { + output.add(aggregate); + return aggregate.size(); + } + return 0; + } + + private List emitPendingUpTo(final int maximumSize) { + if (pendingItems.isEmpty()) { + return null; + } + final int outputSize = Math.min(maximumSize, pendingItems.size()); + final List output = new ArrayList<>(outputSize); + for (int i = 0; i < outputSize; i++) { + output.add(pendingItems.remove()); + } + aggregateCounter.inc(); + return output; + } + + private void drainInputToPendingByPoll() { + List batch; + while ((batch = input.poll()) != null) { + pendingItems.addAll(batch); + } + } + + private void drainInputToPendingByGet() { + while (input.hasMore()) { + final List batch = input.get(); + if (batch != null) { + pendingItems.addAll(batch); + } + } + } +} diff --git a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java index aaf68f5e3f1..6e5999f09e8 100644 --- a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java +++ b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java @@ -296,6 +296,31 @@ public PipelineBuilder> inBatches( pipelineName); } + /** + * Aggregates all incoming batches into a single batch. + * + *

This stage is intended to be used after {@link #inBatches(int)} (or any stage producing + * {@link List} items), when downstream processing must operate on one final aggregate batch. + * + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + @SuppressWarnings("unchecked") + public PipelineBuilder inSingleBatch() { + return new PipelineBuilder<>( + inputPipe, + stages, + pipes, + lastStageName, + (ReadPipe) + new AggregatingReadPipe<>( + (ReadPipe>) pipeEnd, + outputCounter.labels(lastStageName + "_outputPipe", "aggregated_batches")), + 1, + outputCounter, + tracingEnabled, + pipelineName); + } + /** * Adds a 1-to-many processing stage to the pipeline. For each item in the stream, mapper * is called and each item of the {@link Stream} it returns is output as an individual item. The diff --git a/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/AggregatingReadPipeTest.java b/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/AggregatingReadPipeTest.java new file mode 100644 index 00000000000..b8b827aa5cb --- /dev/null +++ b/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/AggregatingReadPipeTest.java @@ -0,0 +1,120 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.services.pipeline; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +import org.hyperledger.besu.plugin.services.metrics.Counter; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +public class AggregatingReadPipeTest { + + private final Pipe> source = + new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "source_pipe"); + private final Counter aggregateCounter = mock(Counter.class); + private final AggregatingReadPipe aggregatingPipe = + new AggregatingReadPipe<>(source, aggregateCounter); + + @Test + public void shouldGetSingleBatchContainingAllInputBatches() { + source.put(asList("a", "b")); + source.put(asList("c")); + source.put(asList("d", "e")); + source.close(); + + assertThat(aggregatingPipe.get()).containsExactly("a", "b", "c", "d", "e"); + assertThat(aggregatingPipe.get()).isNull(); + + verify(aggregateCounter, times(1)).inc(); + } + + @Test + public void shouldPollSingleBatchOnlyWhenInputIsComplete() { + source.put(asList("a", "b")); + + assertThat(aggregatingPipe.poll()).isNull(); + verifyNoInteractions(aggregateCounter); + + source.put(asList("c")); + source.close(); + + assertThat(aggregatingPipe.poll()).containsExactly("a", "b", "c"); + assertThat(aggregatingPipe.poll()).isNull(); + + verify(aggregateCounter, times(1)).inc(); + } + + @Test + public void shouldDrainToRespectMaxElements() { + source.put(asList("a")); + source.put(asList("b", "c")); + source.close(); + + final List> output = new ArrayList<>(); + + assertThat(aggregatingPipe.drainTo(output, 0)).isZero(); + assertThat(output).isEmpty(); + + assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1); + assertThat(output).containsExactly(asList("a")); + + assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1); + assertThat(output).containsExactly(asList("a"), asList("b")); + + assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1); + assertThat(output).containsExactly(asList("a"), asList("b"), asList("c")); + + assertThat(aggregatingPipe.drainTo(output, 1)).isZero(); + + verify(aggregateCounter, times(3)).inc(); + } + + @Test + public void shouldDrainAvailableDataAndLeaveRemainingForPoll() { + source.put(asList("a", "b")); + + final List> output = new ArrayList<>(); + + assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1); + assertThat(output).containsExactly(asList("a")); + + source.close(); + + assertThat(aggregatingPipe.poll()).containsExactly("b"); + } + + @Test + public void shouldTrackHasMoreAcrossAggregationLifecycle() { + assertThat(aggregatingPipe.hasMore()).isTrue(); + + source.put(asList("a")); + assertThat(aggregatingPipe.poll()).isNull(); + assertThat(aggregatingPipe.hasMore()).isTrue(); + + source.close(); + assertThat(aggregatingPipe.poll()).containsExactly("a"); + assertThat(aggregatingPipe.hasMore()).isFalse(); + } +} diff --git a/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java b/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java index 1ec64ab65fc..18da004010c 100644 --- a/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java +++ b/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java @@ -134,6 +134,27 @@ public void shouldCombineIntoBatches() throws Exception { assertThat(output).isEmpty(); } + @Test + public void shouldAggregateAllBatchesIntoSingleBatch() throws Exception { + final BlockingQueue> output = new ArrayBlockingQueue<>(1); + final Pipeline pipeline = + PipelineBuilder.createPipeline( + "source", 20, NO_OP_LABELLED_2_COUNTER, false, "test") + .inBatches(4) + .inSingleBatch() + .andFinishWith("end", output::offer); + + final Pipe input = pipeline.getInputPipe(); + tasks.forEachRemaining(input::put); + input.close(); + + pipeline.start(executorService).get(10, SECONDS); + + assertThat(output.poll(10, SECONDS)) + .containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15); + assertThat(output).isEmpty(); + } + @Test public void shouldProcessAsync() throws Exception { final List output = new ArrayList<>();