Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the parallel tx set building implementation. #4637

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Builds/VisualStudio/stellar-core.vcxproj
Original file line number Diff line number Diff line change
@@ -518,6 +518,7 @@ exit /b 0
<ClCompile Include="..\..\src\herder\HerderSCPDriver.cpp" />
<ClCompile Include="..\..\src\herder\HerderUtils.cpp" />
<ClCompile Include="..\..\src\herder\LedgerCloseData.cpp" />
<ClCompile Include="..\..\src\herder\ParallelTxSetBuilder.cpp" />
<ClCompile Include="..\..\src\herder\PendingEnvelopes.cpp" />
<ClCompile Include="..\..\src\herder\QuorumIntersectionCheckerImpl.cpp" />
<ClCompile Include="..\..\src\herder\QuorumTracker.cpp" />
@@ -972,6 +973,7 @@ exit /b 0
<ClInclude Include="..\..\src\herder\HerderSCPDriver.h" />
<ClInclude Include="..\..\src\herder\HerderUtils.h" />
<ClInclude Include="..\..\src\herder\LedgerCloseData.h" />
<ClInclude Include="..\..\src\herder\ParallelTxSetBuilder.h" />
<ClInclude Include="..\..\src\herder\PendingEnvelopes.h" />
<ClInclude Include="..\..\src\herder\QuorumIntersectionChecker.h" />
<ClInclude Include="..\..\src\herder\QuorumIntersectionCheckerImpl.h" />
18 changes: 18 additions & 0 deletions Builds/VisualStudio/stellar-core.vcxproj.filters
Original file line number Diff line number Diff line change
@@ -1365,6 +1365,15 @@
<ClCompile Include="..\..\src\catchup\LedgerApplyManagerImpl.cpp">
<Filter>catchup</Filter>
</ClCompile>
<ClCompile Include="..\..\src\herder\ParallelTxSetBuilder.cpp">
<Filter>herder</Filter>
</ClCompile>
<ClCompile Include="..\..\src\simulation\ApplyLoad.cpp">
<Filter>simulation</Filter>
</ClCompile>
<ClCompile Include="..\..\src\simulation\TxGenerator.cpp">
<Filter>simulation</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\lib\util\cpptoml.h">
@@ -2360,6 +2369,15 @@
<ClInclude Include="..\..\src\main\QueryServer.h">
<Filter>main</Filter>
</ClInclude>
<ClInclude Include="..\..\src\herder\ParallelTxSetBuilder.h">
<Filter>herder</Filter>
</ClInclude>
<ClInclude Include="..\..\src\simulation\ApplyLoad.h">
<Filter>simulation</Filter>
</ClInclude>
<ClInclude Include="..\..\src\simulation\TxGenerator.h">
<Filter>simulation</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ledger\LedgerStateSnapshot.h">
<Filter>ledger</Filter>
</ClInclude>
483 changes: 483 additions & 0 deletions src/herder/ParallelTxSetBuilder.cpp

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions src/herder/ParallelTxSetBuilder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

// Copyright 2025 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "herder/SurgePricingUtils.h"
#include "herder/TxSetFrame.h"
#include "ledger/NetworkConfig.h"
#include "main/Config.h"

namespace stellar
{
// Builds a sequence of parallel processing stages from the provided
// transactions while respecting the limits defined by the network
// configuration.
// The number of stages and the number of clusters in each stage is determined
// by the provided configurations (`cfg` and `sorobanCfg`).
// The resource limits in transactions are determined based on the input
// `laneConfig`.
// This doesn't support multi-lane surge pricing and thus it's expected
// `laneConfig` to only have a configuration for a single surge pricing lane.
TxStageFrameList buildSurgePricedParallelSorobanPhase(
TxFrameList const& txFrames, Config const& cfg,
SorobanNetworkConfig const& sorobanCfg,
std::shared_ptr<SurgePricingLaneConfig> laneConfig,
std::vector<bool>& hadTxNotFittingLane);

} // namespace stellar
7 changes: 7 additions & 0 deletions src/herder/SurgePricingUtils.cpp
Original file line number Diff line number Diff line change
@@ -326,6 +326,13 @@ SurgePricingPriorityQueue::popTopTxs(
laneLeftUntilLimit[lane] -= res;
}
}
else if (visitRes == VisitTxResult::REJECTED)
{
// If a transaction hasn't been processed, then it is considered to
// be not fitting the lane.
hadTxNotFittingLane[GENERIC_LANE] = true;
hadTxNotFittingLane[lane] = true;
}
erase(currIt);
}
}
25 changes: 14 additions & 11 deletions src/herder/SurgePricingUtils.h
Original file line number Diff line number Diff line change
@@ -133,6 +133,9 @@ class SurgePricingPriorityQueue
// Transaction should be skipped and not counted towards the lane
// limits.
SKIPPED,
// Like `SKIPPED`, but marks the fact that the transaction didn't fit
// into the lane due to reasons beyond the lane's resource limit.
REJECTED,
// Transaction has been processed and should be counted towards the
// lane limits.
PROCESSED
@@ -184,6 +187,17 @@ class SurgePricingPriorityQueue
std::vector<std::pair<TransactionFrameBasePtr, bool>>& txsToEvict)
const;

// Generalized method for visiting and popping the top transactions in the
// queue until the lane limits are reached.
// This is a destructive method that removes all or most of the queue
// elements and thus should be used with care.
void popTopTxs(
bool allowGaps,
std::function<VisitTxResult(TransactionFrameBasePtr const&)> const&
visitor,
std::vector<Resource>& laneResourcesLeftUntilLimit,
std::vector<bool>& hadTxNotFittingLane);

private:
class TxComparator
{
@@ -236,17 +250,6 @@ class SurgePricingPriorityQueue
std::vector<LaneIter> mutable mIters;
};

// Generalized method for visiting and popping the top transactions in the
// queue until the lane limits are reached.
// This is a destructive method that removes all or most of the queue
// elements and thus should be used with care.
void popTopTxs(
bool allowGaps,
std::function<VisitTxResult(TransactionFrameBasePtr const&)> const&
visitor,
std::vector<Resource>& laneResourcesLeftUntilLimit,
std::vector<bool>& hadTxNotFittingLane);

void erase(Iterator const& it);
void erase(size_t lane,
SurgePricingPriorityQueue::TxSortedSet::iterator iter);
155 changes: 120 additions & 35 deletions src/herder/TxSetFrame.cpp
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
#include "crypto/Random.h"
#include "crypto/SHA.h"
#include "database/Database.h"
#include "herder/ParallelTxSetBuilder.h"
#include "herder/SurgePricingUtils.h"
#include "ledger/LedgerManager.h"
#include "ledger/LedgerTxn.h"
@@ -487,8 +488,8 @@ computeLaneBaseFee(TxSetPhase phase, LedgerHeader const& ledgerHeader,
return laneBaseFee;
}

std::pair<TxFrameList, std::shared_ptr<InclusionFeeMap>>
applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app)
std::shared_ptr<SurgePricingLaneConfig>
createSurgePricingLangeConfig(TxSetPhase phase, Application& app)
{
ZoneScoped;
releaseAssert(threadIsMain());
@@ -524,6 +525,16 @@ applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app)

auto limits = app.getLedgerManager().maxLedgerResources(
/* isSoroban */ true);
// When building Soroban tx sets with parallel execution support,
// instructions are accounted for by the build logic, not by the surge
// pricing config, so we need to relax the instruction limit in surge
// pricing logic.
if (protocolVersionStartsFrom(lclHeader.ledgerVersion,
PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION))
{
limits.setVal(Resource::Type::INSTRUCTIONS,
std::numeric_limits<int64_t>::max());
}

auto byteLimit =
std::min(static_cast<int64_t>(MAX_SOROBAN_BYTE_ALLOWANCE),
@@ -533,27 +544,102 @@ applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app)
surgePricingLaneConfig =
std::make_shared<SorobanGenericLaneConfig>(limits);
}
auto includedTxs = SurgePricingPriorityQueue::getMostTopTxsWithinLimits(
return surgePricingLaneConfig;
}

TxFrameList
buildSurgePricedSequentialPhase(
TxFrameList const& txs,
std::shared_ptr<SurgePricingLaneConfig> surgePricingLaneConfig,
std::vector<bool>& hadTxNotFittingLane)
{
ZoneScoped;
return SurgePricingPriorityQueue::getMostTopTxsWithinLimits(
txs, surgePricingLaneConfig, hadTxNotFittingLane);
}

size_t laneCount = surgePricingLaneConfig->getLaneLimits().size();
std::vector<int64_t> lowestLaneFee(laneCount,
std::numeric_limits<int64_t>::max());
for (auto const& tx : includedTxs)
std::pair<std::variant<TxFrameList, TxStageFrameList>,
std::shared_ptr<InclusionFeeMap>>
applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app)
{
ZoneScoped;
auto surgePricingLaneConfig = createSurgePricingLangeConfig(phase, app);
std::vector<bool> hadTxNotFittingLane;
bool isParallelSoroban =
phase == TxSetPhase::SOROBAN &&
protocolVersionStartsFrom(app.getLedgerManager()
.getLastClosedLedgerHeader()
.header.ledgerVersion,
PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION);
std::variant<TxFrameList, TxStageFrameList> includedTxs;
if (isParallelSoroban)
{
includedTxs = buildSurgePricedParallelSorobanPhase(
txs, app.getConfig(),
app.getLedgerManager().getSorobanNetworkConfigReadOnly(),
surgePricingLaneConfig, hadTxNotFittingLane);
}
else
{
size_t lane = surgePricingLaneConfig->getLane(*tx);
auto perOpFee = computePerOpFee(*tx, lclHeader.ledgerVersion);
lowestLaneFee[lane] = std::min(lowestLaneFee[lane], perOpFee);
includedTxs = buildSurgePricedSequentialPhase(
txs, surgePricingLaneConfig, hadTxNotFittingLane);
}

auto visitIncludedTxs =
[&includedTxs](
std::function<void(TransactionFrameBaseConstPtr const&)> visitor) {
std::visit(
[&visitor](auto const& txs) {
using T = std::decay_t<decltype(txs)>;
if constexpr (std::is_same_v<T, TxFrameList>)
{
for (auto const& tx : txs)
{
visitor(tx);
}
}
else if constexpr (std::is_same_v<T, TxStageFrameList>)
{
for (auto const& stage : txs)
{
for (auto const& thread : stage)
{
for (auto const& tx : thread)
{
visitor(tx);
}
}
}
}
else
{
releaseAssert(false);
}
},
includedTxs);
};

std::vector<int64_t> lowestLaneFee;
auto const& lclHeader =
app.getLedgerManager().getLastClosedLedgerHeader().header;

size_t laneCount = surgePricingLaneConfig->getLaneLimits().size();
lowestLaneFee.resize(laneCount, std::numeric_limits<int64_t>::max());
visitIncludedTxs(
[&lowestLaneFee, &surgePricingLaneConfig, &lclHeader](auto const& tx) {
size_t lane = surgePricingLaneConfig->getLane(*tx);
auto perOpFee = computePerOpFee(*tx, lclHeader.ledgerVersion);
lowestLaneFee[lane] = std::min(lowestLaneFee[lane], perOpFee);
});
auto laneBaseFee =
computeLaneBaseFee(phase, lclHeader, *surgePricingLaneConfig,
lowestLaneFee, hadTxNotFittingLane);
auto inclusionFeeMapPtr = std::make_shared<InclusionFeeMap>();
auto& inclusionFeeMap = *inclusionFeeMapPtr;
for (auto const& tx : includedTxs)
{
visitIncludedTxs([&inclusionFeeMap, &laneBaseFee,
&surgePricingLaneConfig](auto const& tx) {
inclusionFeeMap[tx] = laneBaseFee[surgePricingLaneConfig->getLane(*tx)];
}
});

return std::make_pair(includedTxs, inclusionFeeMapPtr);
}
@@ -738,29 +824,28 @@ makeTxSetFromTransactions(PerPhaseTransactionList const& txPhases,
}
#endif
auto phaseType = static_cast<TxSetPhase>(i);
auto [includedTxs, inclusionFeeMap] =
auto [includedTxs, inclusionFeeMapBinding] =
applySurgePricing(phaseType, validatedTxs, app);
if (phaseType != TxSetPhase::SOROBAN ||
protocolVersionIsBefore(app.getLedgerManager()
.getLastClosedLedgerHeader()
.header.ledgerVersion,
PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION))
{
validatedPhases.emplace_back(TxSetPhaseFrame(
phaseType, std::move(includedTxs), inclusionFeeMap));
}
// This is a temporary stub for building a valid parallel tx set
// without any parallelization.
else
{
TxStageFrameList stages;
if (!includedTxs.empty())
{
stages.emplace_back().push_back(includedTxs);
}
validatedPhases.emplace_back(
TxSetPhaseFrame(phaseType, std::move(stages), inclusionFeeMap));
}
auto inclusionFeeMap = inclusionFeeMapBinding;
std::visit(
[&validatedPhases, phaseType, inclusionFeeMap](auto&& txs) {
using T = std::decay_t<decltype(txs)>;
if constexpr (std::is_same_v<T, TxFrameList>)
{
validatedPhases.emplace_back(
TxSetPhaseFrame(phaseType, txs, inclusionFeeMap));
}
else if constexpr (std::is_same_v<T, TxStageFrameList>)
{
validatedPhases.emplace_back(TxSetPhaseFrame(
phaseType, std::move(txs), inclusionFeeMap));
}
else
{
releaseAssert(false);
}
},
includedTxs);
}

auto const& lclHeader = app.getLedgerManager().getLastClosedLedgerHeader();
801 changes: 799 additions & 2 deletions src/herder/test/TxSetTests.cpp

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/main/Config.cpp
Original file line number Diff line number Diff line change
@@ -304,6 +304,10 @@ Config::Config() : NODE_SEED(SecretKey::random())
EMIT_LEDGER_CLOSE_META_EXT_V1 = false;

FORCE_OLD_STYLE_LEADER_ELECTION = false;
// This is not configurable for now. It doesn't need to be a network-wide
// setting, but on the other hand there aren't many good values for it and
// it's not clear what the right way to configure it would be, if at all.
SOROBAN_PHASE_STAGE_COUNT = 1;

#ifdef BUILD_TESTS
TEST_CASES_ENABLED = false;
2 changes: 2 additions & 0 deletions src/main/Config.h
Original file line number Diff line number Diff line change
@@ -752,6 +752,8 @@ class Config : public std::enable_shared_from_this<Config>
bool EMIT_SOROBAN_TRANSACTION_META_EXT_V1;
bool EMIT_LEDGER_CLOSE_META_EXT_V1;

uint32_t SOROBAN_PHASE_STAGE_COUNT;

#ifdef BUILD_TESTS
// If set to true, the application will be aware this run is for a test
// case. This is used right now in the signal handler to exit() instead of