Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,6 @@ private void setCheckpointPostMergeSyncEnabled(final Boolean enabled) {
CHECKPOINT_POST_MERGE_FLAG);
}

@CommandLine.Option(
names = {"--Xpeertask-system-enabled"},
hidden = true,
description =
"Temporary feature toggle to enable using the new peertask system (default: ${DEFAULT-VALUE})")
private final Boolean isPeerTaskSystemEnabled = false;

@CommandLine.Option(
names = SNAP_TRANSACTION_INDEXING_ENABLED_FLAG,
paramLabel = "<Boolean>",
Expand Down Expand Up @@ -436,15 +429,6 @@ public boolean isSnapsyncServerEnabled() {
return snapsyncServerEnabled;
}

/**
* Flag to indicate whether the peer task system should be used where available
*
* @return true if the peer task system should be used where available
*/
public boolean isPeerTaskSystemEnabled() {
return isPeerTaskSystemEnabled;
}

/**
* Create synchronizer options.
*
Expand Down Expand Up @@ -543,7 +527,6 @@ public SynchronizerConfiguration.Builder toDomainObject() {
.isSnapServerEnabled(snapsyncServerEnabled)
.isSnapSyncTransactionIndexingEnabled(snapTransactionIndexingEnabled)
.build());
builder.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled);
builder.snapSyncSavePreCheckpointHeadersOnlyEnabled(
snapSyncSavePreCheckpointHeadersOnlyEnabled);
builder.receiptsDownloadStepTimeoutMillis(receiptsDownloadStepTimeoutMillis);
Expand Down
12 changes: 4 additions & 8 deletions app/src/test/java/org/hyperledger/besu/RunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,30 +154,26 @@ public void fullSyncFromGenesis() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FULL, getFastSyncGenesis(), false);
syncFromGenesis(SyncMode.FULL, getFastSyncGenesis());
}

@Test
public void fullSyncFromGenesisUsingPeerTaskSystem() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FULL, getFastSyncGenesis(), true);
syncFromGenesis(SyncMode.FULL, getFastSyncGenesis());
}

private void syncFromGenesis(
final SyncMode mode, final GenesisConfig genesisConfig, final boolean isPeerTaskSystemEnabled)
private void syncFromGenesis(final SyncMode mode, final GenesisConfig genesisConfig)
throws Exception {
final Path dataDirAhead = Files.createTempDirectory(temp, "db-ahead");
final Path dbAhead = dataDirAhead.resolve("database");
final int blockCount = 500;
final NodeKey aheadDbNodeKey = NodeKeyUtils.createFrom(KeyPairUtil.loadKeyPair(dataDirAhead));
final NodeKey behindDbNodeKey = NodeKeyUtils.generate();
final SynchronizerConfiguration syncConfigAhead =
SynchronizerConfiguration.builder()
.syncMode(SyncMode.FULL)
.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled)
.build();
SynchronizerConfiguration.builder().syncMode(SyncMode.FULL).build();
final ObservableMetricsSystem noOpMetricsSystem = new NoOpMetricsSystem();
final var miningParameters = MiningConfiguration.newDefault();
final var dataStorageConfiguration = DataStorageConfiguration.DEFAULT_FOREST_CONFIG;
Expand Down
5 changes: 1 addition & 4 deletions app/src/test/java/org/hyperledger/besu/cli/EphemeryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ public class EphemeryTest extends CommandTestAbstract {
private final Vertx vertx = Vertx.vertx();
private final ObservableMetricsSystem noOpMetricsSystem = new NoOpMetricsSystem();
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
.syncMode(SyncMode.SNAP)
.isPeerTaskSystemEnabled(false)
.build();
SynchronizerConfiguration.builder().syncMode(SyncMode.SNAP).build();

Field cycleIdField;
BigInteger initialCycleId;
Expand Down
1 change: 0 additions & 1 deletion app/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ Xsecp256k1-native-enabled=false
Xaltbn128-native-enabled=false
snapsync-server-enabled=true
Xbonsai-full-flat-db-enabled=true
Xpeertask-system-enabled=false
era1-import-prepipeline-enabled=true
era1-data-uri="mainnet.era1.nimbus.team/"
era1-import-prepipeline-concurrency=4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public class SynchronizerConfiguration {
private final long receiptsDownloadStepTimeoutMillis;
private final long backwardHeadersDownloadStepTimeoutMillis;
private final long bodiesDownloadStepTimeoutMillis;
private final boolean isPeerTaskSystemEnabled;
private final boolean snapSyncSavePreCheckpointHeadersOnlyEnabled;

// ERA1 import prepipeline config
Expand Down Expand Up @@ -128,7 +127,6 @@ private SynchronizerConfiguration(
final long receiptsDownloadStepTimeoutMillis,
final long backwardHeadersDownloadStepTimeoutMillis,
final long bodiesDownloadStepTimeoutMillis,
final boolean isPeerTaskSystemEnabled,
final boolean snapSyncSavePreCheckpointHeadersOnlyEnabled,
final boolean era1ImportPrepipelineEnabled,
final URI era1DataUri,
Expand Down Expand Up @@ -158,7 +156,6 @@ private SynchronizerConfiguration(
this.receiptsDownloadStepTimeoutMillis = receiptsDownloadStepTimeoutMillis;
this.backwardHeadersDownloadStepTimeoutMillis = backwardHeadersDownloadStepTimeoutMillis;
this.bodiesDownloadStepTimeoutMillis = bodiesDownloadStepTimeoutMillis;
this.isPeerTaskSystemEnabled = isPeerTaskSystemEnabled;
this.snapSyncSavePreCheckpointHeadersOnlyEnabled = snapSyncSavePreCheckpointHeadersOnlyEnabled;
this.era1ImportPrepipelineEnabled = era1ImportPrepipelineEnabled;
this.era1DataUri = era1DataUri;
Expand Down Expand Up @@ -300,10 +297,6 @@ public long getBodiesDownloadStepTimeoutMillis() {
return bodiesDownloadStepTimeoutMillis;
}

public boolean isPeerTaskSystemEnabled() {
return isPeerTaskSystemEnabled;
}

public boolean isSnapSyncSavePreCheckpointHeadersOnlyEnabled() {
return snapSyncSavePreCheckpointHeadersOnlyEnabled;
}
Expand Down Expand Up @@ -347,7 +340,6 @@ public static class Builder {
DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS;
private long worldStateMinMillisBeforeStalling = DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING;
private int worldStateTaskCacheSize = DEFAULT_WORLD_STATE_TASK_CACHE_SIZE;
private boolean isPeerTaskSystemEnabled = false;
private boolean snapSyncSavePreCheckpointHeadersOnlyEnabled = true;
private boolean era1ImportPrepipelineEnabled = DEFAULT_ERA1_IMPORT_PREPIPELINE_ENABLED;
private URI era1DataUri = DEFAULT_ERA1_DATA_URI;
Expand Down Expand Up @@ -493,11 +485,6 @@ public Builder bodiesDownloadStepTimeoutMillis(final long bodiesDownloadStepTime
return this;
}

public Builder isPeerTaskSystemEnabled(final boolean isPeerTaskSystemEnabled) {
this.isPeerTaskSystemEnabled = isPeerTaskSystemEnabled;
return this;
}

public Builder snapSyncSavePreCheckpointHeadersOnlyEnabled(
final boolean snapSyncSavePreCheckpointHeadersOnlyEnabled) {
this.snapSyncSavePreCheckpointHeadersOnlyEnabled =
Expand Down Expand Up @@ -547,7 +534,6 @@ public SynchronizerConfiguration build() {
receiptsDownloadStepTimeoutMillis,
backwardHeadersDownloadStepTimeoutMillis,
bodiesDownloadStepTimeoutMillis,
isPeerTaskSystemEnabled,
snapSyncSavePreCheckpointHeadersOnlyEnabled,
era1ImportPrepipelineEnabled,
era1DataUri,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlocksFromPeersTask;

import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -79,47 +77,32 @@ public CompletableFuture<Void> possibleRequestBodies(final List<BlockHeader> blo

@VisibleForTesting
protected CompletableFuture<List<Block>> requestBodies(final List<BlockHeader> blockHeaders) {
CompletableFuture<List<Block>> blocksFuture;
if (context.getSynchronizerConfiguration().isPeerTaskSystemEnabled()) {
blocksFuture =
context
.getEthContext()
.getScheduler()
.scheduleServiceTask(
() -> {
GetBodiesFromPeerTask task =
new GetBodiesFromPeerTask(
blockHeaders,
context.getProtocolSchedule(),
context.getEthContext().getEthPeers().peerCount());
PeerTaskExecutorResult<List<Block>> taskResult =
context.getEthContext().getPeerTaskExecutor().execute(task);
if (taskResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& taskResult.result().isPresent()) {
return CompletableFuture.completedFuture(taskResult.result().get());
} else {
return CompletableFuture.failedFuture(
new RuntimeException(taskResult.responseCode().toString()));
}
});
} else {
final RetryingGetBlocksFromPeersTask getBodiesFromPeerTask =
RetryingGetBlocksFromPeersTask.forHeaders(
context.getProtocolSchedule(),
context.getEthContext(),
context.getMetricsSystem(),
context.getEthContext().getEthPeers().peerCount(),
blockHeaders);

blocksFuture =
getBodiesFromPeerTask.run().thenApply(AbstractPeerTask.PeerTaskResult::getResult);
}
return blocksFuture.thenApply(
blocks -> {
LOG.debug("Got {} blocks from peers", blocks.size());
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber()));
return blocks;
});
return context
.getEthContext()
.getScheduler()
.scheduleServiceTask(
() -> {
GetBodiesFromPeerTask task =
new GetBodiesFromPeerTask(
blockHeaders,
context.getProtocolSchedule(),
context.getEthContext().getEthPeers().peerCount());
PeerTaskExecutorResult<List<Block>> taskResult =
context.getEthContext().getPeerTaskExecutor().execute(task);
if (taskResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& taskResult.result().isPresent()) {
return CompletableFuture.completedFuture(taskResult.result().get());
} else {
return CompletableFuture.failedFuture(
new RuntimeException(taskResult.responseCode().toString()));
}
})
.thenApply(
blocks -> {
LOG.debug("Got {} blocks from peers", blocks.size());
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber()));
return blocks;
});
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void setUpTest() {

@Test
public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(true);
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher();

Mockito.when(
peerTaskExecutor.executeAgainstPeer(
Expand All @@ -138,7 +138,7 @@ public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {

@Test
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsMultipleOfSegmentSize() {
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(11), true);
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(11));

Mockito.when(
peerTaskExecutor.executeAgainstPeer(
Expand All @@ -155,7 +155,7 @@ public void shouldNotRequestHeadersBeyondTargetWhenTargetIsMultipleOfSegmentSize

@Test
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsNotAMultipleOfSegmentSize() {
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15), true);
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15));

Mockito.when(
peerTaskExecutor.executeAgainstPeer(
Expand All @@ -172,7 +172,7 @@ public void shouldNotRequestHeadersBeyondTargetWhenTargetIsNotAMultipleOfSegment

@Test
public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheRangeBeforeTarget() {
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15), false);
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15));

final CompletableFuture<List<BlockHeader>> result =
rangeHeaderFetcher.getNextRangeHeaders(respondingPeer.getEthPeer(), header(11));
Expand All @@ -182,7 +182,7 @@ public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheRangeBeforeTarget() {

@Test
public void shouldReturnEmptyListWhenLastHeaderIsTarget() {
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15), false);
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15));

final CompletableFuture<List<BlockHeader>> result =
rangeHeaderFetcher.getNextRangeHeaders(respondingPeer.getEthPeer(), header(15));
Expand All @@ -191,7 +191,7 @@ public void shouldReturnEmptyListWhenLastHeaderIsTarget() {

@Test
public void shouldReturnEmptyListWhenLastHeaderIsAfterTarget() {
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15), false);
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15));

final CompletableFuture<List<BlockHeader>> result =
rangeHeaderFetcher.getNextRangeHeaders(respondingPeer.getEthPeer(), header(16));
Expand All @@ -201,7 +201,7 @@ public void shouldReturnEmptyListWhenLastHeaderIsAfterTarget() {
@Test
public void nextRangeShouldEndAtChainHeadWhenNextRangeHeaderIsAfterHead() {
final long remoteChainHeight = blockchain.getChainHeadBlockNumber();
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(false);
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher();

assertThat(
rangeHeaderFetcher.nextRangeEndsAtChainHead(
Expand All @@ -213,7 +213,7 @@ public void nextRangeShouldEndAtChainHeadWhenNextRangeHeaderIsAfterHead() {
public void nextRangeShouldNotEndAtChainHeadWhenAFinalRangeHeaderIsSpecified() {
final long remoteChainHeight = blockchain.getChainHeadBlockNumber();
final RangeHeadersFetcher rangeHeaderFetcher =
createRangeHeaderFetcher(header(remoteChainHeight), false);
createRangeHeaderFetcher(header(remoteChainHeight));

assertThat(
rangeHeaderFetcher.nextRangeEndsAtChainHead(
Expand All @@ -224,7 +224,7 @@ public void nextRangeShouldNotEndAtChainHeadWhenAFinalRangeHeaderIsSpecified() {
@Test
public void shouldReturnRemoteChainHeadWhenNextRangeHeaderIsTheRemoteHead() {
final long remoteChainHeight = blockchain.getChainHeadBlockNumber();
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(true);
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher();

Mockito.when(
peerTaskExecutor.executeAgainstPeer(
Expand All @@ -245,26 +245,23 @@ public void shouldReturnRemoteChainHeadWhenNextRangeHeaderIsTheRemoteHead() {
assertThat(result).isCompletedWithValue(singletonList(header(remoteChainHeight)));
}

private RangeHeadersFetcher createRangeHeaderFetcher(final boolean isPeerTaskSystemEnabled) {
private RangeHeadersFetcher createRangeHeaderFetcher() {
final EthContext ethContext = ethProtocolManager.ethContext();
return new RangeHeadersFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(SEGMENT_SIZE)
.downloaderHeadersRequestSize(3)
.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled)
.build(),
protocolSchedule,
ethContext);
}

private RangeHeadersFetcher createRangeHeaderFetcher(
final BlockHeader targetHeader, final boolean isPeerTaskSystemEnabled) {
private RangeHeadersFetcher createRangeHeaderFetcher(final BlockHeader targetHeader) {
final EthContext ethContext = ethProtocolManager.ethContext();
return new RangeHeadersFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(SEGMENT_SIZE)
.downloaderHeadersRequestSize(3)
.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled)
.build(),
protocolSchedule,
ethContext,
Expand Down
Loading
Loading