Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ public SnapWorldStateDownloadProcess build() {
"batchDownloadBlockAccessListsData",
tasks -> requestDataStep.requestBlockAccessLists(tasks),
maxOutstandingRequests)
.inSingleBatch()
.thenProcess(
"batchPersistBlockAccessListsData", tasks -> persistDataStep.persist(tasks))
.andFinishWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ protected int doPersist(
0, Wei.ZERO, Hash.EMPTY_TRIE_HASH, Hash.EMPTY));

final var updatedCode = accountChanges.code();
final Hash updatedCodeHash =
updatedCode.map(Hash::hash).orElse(trieAccountValue.getCodeHash());
final Hash updatedCodeHash = resolveUpdatedCodeHash(accountChanges, trieAccountValue);
updatedCode.ifPresent(
code -> bonsaiUpdater.putCode(accountHash, updatedCodeHash, code));

Expand All @@ -105,8 +104,8 @@ protected int doPersist(

final PmtStateTrieAccountValue updatedValue =
new PmtStateTrieAccountValue(
accountChanges.nonce().orElse(trieAccountValue.getNonce()),
accountChanges.balance().orElse(trieAccountValue.getBalance()),
resolveUpdatedNonce(accountChanges, trieAccountValue),
resolveUpdatedBalance(accountChanges, trieAccountValue),
updatedStorageRoot,
updatedCodeHash);
bonsaiUpdater.putAccountInfoState(accountHash, RLP.encode(updatedValue::writeTo));
Expand All @@ -131,6 +130,24 @@ private void applyStorageChanges(
}
}

private long resolveUpdatedNonce(
final BlockAccessListChanges.AccountFinalChanges accountChanges,
final PmtStateTrieAccountValue trieAccountValue) {
return accountChanges.nonce().orElse(trieAccountValue.getNonce());
}

private Wei resolveUpdatedBalance(
final BlockAccessListChanges.AccountFinalChanges accountChanges,
final PmtStateTrieAccountValue trieAccountValue) {
return accountChanges.balance().orElse(trieAccountValue.getBalance());
}

private Hash resolveUpdatedCodeHash(
final BlockAccessListChanges.AccountFinalChanges accountChanges,
final PmtStateTrieAccountValue trieAccountValue) {
return accountChanges.code().map(Hash::hash).orElse(trieAccountValue.getCodeHash());
}

@Override
public boolean isResponseReceived() {
return blockAccessList.isPresent();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.pipeline;

import org.hyperledger.besu.plugin.services.metrics.Counter;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;

/**
* A read pipe that aggregates all incoming batches into a single batch.
*
* @param <T> the type of item included in batches
*/
public class AggregatingReadPipe<T> implements ReadPipe<List<T>> {

private final ReadPipe<List<T>> input;
private final Counter aggregateCounter;
private final Queue<T> pendingItems = new ArrayDeque<>();

/**
* Instantiates a new Aggregating read pipe.
*
* @param input the input batches
* @param aggregateCounter the aggregate counter
*/
public AggregatingReadPipe(final ReadPipe<List<T>> input, final Counter aggregateCounter) {
this.input = input;
this.aggregateCounter = aggregateCounter;
}

@Override
public boolean hasMore() {
return input.hasMore() || !pendingItems.isEmpty();
}

@Override
public boolean isAborted() {
return input.isAborted();
}

@Override
public List<T> get() {
drainInputToPendingByGet();
return emitPendingUpTo(Integer.MAX_VALUE);
}

@Override
public List<T> poll() {
drainInputToPendingByPoll();

if (input.hasMore()) {
return null;
}

return emitPendingUpTo(Integer.MAX_VALUE);
}

@Override
public int drainTo(final Collection<List<T>> output, final int maxElements) {
if (maxElements <= 0) {
return 0;
}

drainInputToPendingByPoll();

final List<T> aggregate = emitPendingUpTo(maxElements);
if (aggregate != null) {
output.add(aggregate);
return aggregate.size();
}
return 0;
}

private List<T> emitPendingUpTo(final int maximumSize) {
if (pendingItems.isEmpty()) {
return null;
}
final int outputSize = Math.min(maximumSize, pendingItems.size());
final List<T> output = new ArrayList<>(outputSize);
for (int i = 0; i < outputSize; i++) {
output.add(pendingItems.remove());
}
aggregateCounter.inc();
return output;
}

private void drainInputToPendingByPoll() {
List<T> batch;
while ((batch = input.poll()) != null) {
pendingItems.addAll(batch);
}
}

private void drainInputToPendingByGet() {
while (input.hasMore()) {
final List<T> batch = input.get();
if (batch != null) {
pendingItems.addAll(batch);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,31 @@ public PipelineBuilder<I, List<T>> inBatches(
pipelineName);
}

/**
* Aggregates all incoming batches into a single batch.
*
* <p>This stage is intended to be used after {@link #inBatches(int)} (or any stage producing
* {@link List} items), when downstream processing must operate on one final aggregate batch.
*
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
@SuppressWarnings("unchecked")
public PipelineBuilder<I, T> inSingleBatch() {
return new PipelineBuilder<>(
inputPipe,
stages,
pipes,
lastStageName,
(ReadPipe<T>)
new AggregatingReadPipe<>(
(ReadPipe<List<T>>) pipeEnd,
outputCounter.labels(lastStageName + "_outputPipe", "aggregated_batches")),
1,
outputCounter,
tracingEnabled,
pipelineName);
}

/**
* Adds a 1-to-many processing stage to the pipeline. For each item in the stream, <i>mapper</i>
* is called and each item of the {@link Stream} it returns is output as an individual item. The
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.pipeline;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

import org.hyperledger.besu.plugin.services.metrics.Counter;

import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.Test;

public class AggregatingReadPipeTest {

private final Pipe<List<String>> source =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "source_pipe");
private final Counter aggregateCounter = mock(Counter.class);
private final AggregatingReadPipe<String> aggregatingPipe =
new AggregatingReadPipe<>(source, aggregateCounter);

@Test
public void shouldGetSingleBatchContainingAllInputBatches() {
source.put(asList("a", "b"));
source.put(asList("c"));
source.put(asList("d", "e"));
source.close();

assertThat(aggregatingPipe.get()).containsExactly("a", "b", "c", "d", "e");
assertThat(aggregatingPipe.get()).isNull();

verify(aggregateCounter, times(1)).inc();
}

@Test
public void shouldPollSingleBatchOnlyWhenInputIsComplete() {
source.put(asList("a", "b"));

assertThat(aggregatingPipe.poll()).isNull();
verifyNoInteractions(aggregateCounter);

source.put(asList("c"));
source.close();

assertThat(aggregatingPipe.poll()).containsExactly("a", "b", "c");
assertThat(aggregatingPipe.poll()).isNull();

verify(aggregateCounter, times(1)).inc();
}

@Test
public void shouldDrainToRespectMaxElements() {
source.put(asList("a"));
source.put(asList("b", "c"));
source.close();

final List<List<String>> output = new ArrayList<>();

assertThat(aggregatingPipe.drainTo(output, 0)).isZero();
assertThat(output).isEmpty();

assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1);
assertThat(output).containsExactly(asList("a"));

assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1);
assertThat(output).containsExactly(asList("a"), asList("b"));

assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1);
assertThat(output).containsExactly(asList("a"), asList("b"), asList("c"));

assertThat(aggregatingPipe.drainTo(output, 1)).isZero();

verify(aggregateCounter, times(3)).inc();
}

@Test
public void shouldDrainAvailableDataAndLeaveRemainingForPoll() {
source.put(asList("a", "b"));

final List<List<String>> output = new ArrayList<>();

assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1);
assertThat(output).containsExactly(asList("a"));

source.close();

assertThat(aggregatingPipe.poll()).containsExactly("b");
}

@Test
public void shouldTrackHasMoreAcrossAggregationLifecycle() {
assertThat(aggregatingPipe.hasMore()).isTrue();

source.put(asList("a"));
assertThat(aggregatingPipe.poll()).isNull();
assertThat(aggregatingPipe.hasMore()).isTrue();

source.close();
assertThat(aggregatingPipe.poll()).containsExactly("a");
assertThat(aggregatingPipe.hasMore()).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,27 @@ public void shouldCombineIntoBatches() throws Exception {
assertThat(output).isEmpty();
}

@Test
public void shouldAggregateAllBatchesIntoSingleBatch() throws Exception {
final BlockingQueue<List<Integer>> output = new ArrayBlockingQueue<>(1);
final Pipeline<Integer> pipeline =
PipelineBuilder.<Integer>createPipeline(
"source", 20, NO_OP_LABELLED_2_COUNTER, false, "test")
.inBatches(4)
.inSingleBatch()
.andFinishWith("end", output::offer);

final Pipe<Integer> input = pipeline.getInputPipe();
tasks.forEachRemaining(input::put);
input.close();

pipeline.start(executorService).get(10, SECONDS);

assertThat(output.poll(10, SECONDS))
.containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
assertThat(output).isEmpty();
}

@Test
public void shouldProcessAsync() throws Exception {
final List<String> output = new ArrayList<>();
Expand Down
Loading