diff --git a/Builds/VisualStudio/stellar-core.vcxproj b/Builds/VisualStudio/stellar-core.vcxproj index 129a51ce82..eb2fe86d31 100644 --- a/Builds/VisualStudio/stellar-core.vcxproj +++ b/Builds/VisualStudio/stellar-core.vcxproj @@ -518,6 +518,7 @@ exit /b 0 + @@ -972,6 +973,7 @@ exit /b 0 + diff --git a/Builds/VisualStudio/stellar-core.vcxproj.filters b/Builds/VisualStudio/stellar-core.vcxproj.filters index 5f2d5519d3..9724c12b26 100644 --- a/Builds/VisualStudio/stellar-core.vcxproj.filters +++ b/Builds/VisualStudio/stellar-core.vcxproj.filters @@ -1365,6 +1365,15 @@ catchup + + herder + + + simulation + + + simulation + @@ -2360,6 +2369,15 @@ main + + herder + + + simulation + + + simulation + ledger diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp new file mode 100644 index 0000000000..c119a1e8ea --- /dev/null +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -0,0 +1,483 @@ +// Copyright 2025 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "herder/ParallelTxSetBuilder.h" +#include "herder/SurgePricingUtils.h" +#include "herder/TxSetFrame.h" +#include "transactions/TransactionFrameBase.h" +#include "util/BitSet.h" + +#include + +namespace stellar +{ +namespace +{ +// Configuration for parallel partitioning of transactions. +struct ParallelPartitionConfig +{ + ParallelPartitionConfig(Config const& cfg, + SorobanNetworkConfig const& sorobanCfg) + : mStageCount( + std::max(cfg.SOROBAN_PHASE_STAGE_COUNT, static_cast(1))) + , mClustersPerStage(sorobanCfg.ledgerMaxDependentTxClusters()) + , mInstructionsPerCluster(sorobanCfg.ledgerMaxInstructions() / + mStageCount) + { + } + + uint64_t + instructionsPerStage() const + { + return mInstructionsPerCluster * mClustersPerStage; + } + + uint32_t mStageCount = 0; + uint32_t mClustersPerStage = 0; + uint64_t mInstructionsPerCluster = 0; +}; + +// Internal data structure that contains only relevant transaction information +// necessary for building parallel processing stages. +struct BuilderTx +{ + size_t mId = 0; + uint32_t mInstructions = 0; + // Set of ids of transactions that conflict with this transaction. + BitSet mConflictTxs; + + BuilderTx(size_t txId, TransactionFrameBase const& tx) + : mId(txId), mInstructions(tx.sorobanResources().instructions) + { + } +}; + +// Cluster of (potentially transitively) dependent transactions. +// Transactions are considered to be dependent if the have the same key in +// their footprints and for at least one of them this key belongs to read-write +// footprint. +struct Cluster +{ + // Total number of instructions in the cluster. Since transactions are + // dependent, these are always 'sequential' instructions. + uint64_t mInstructions = 0; + // Set of ids of transactions that conflict with this cluster. + BitSet mConflictTxs; + // Set of transaction ids in the cluster. + BitSet mTxIds; + // Id of the bin within a stage in which the cluster is packed. + size_t mutable mBinId = 0; + + explicit Cluster(BuilderTx const& tx) : mInstructions(tx.mInstructions) + { + mConflictTxs.inplaceUnion(tx.mConflictTxs); + mTxIds.set(tx.mId); + } + + void + merge(Cluster const& other) + { + mInstructions += other.mInstructions; + mConflictTxs.inplaceUnion(other.mConflictTxs); + mTxIds.inplaceUnion(other.mTxIds); + } +}; + +// The stage of parallel processing that consists of clusters of dependent +// transactions that can be processed in parallel relative to each other +// The stage contains an arbitrary number of clusters of actually dependent +// transactions and the bin-packing of these clusters into at most +// `mConfig.mClustersPerStage` bins, i.e. into as many clusters as the network +// configuration allows. +class Stage +{ + public: + Stage(ParallelPartitionConfig cfg) : mConfig(cfg) + { + mBinPacking.resize(mConfig.mClustersPerStage); + mBinInstructions.resize(mConfig.mClustersPerStage); + } + + // Tries to add a transaction to the stage and returns true if the + // transaction has been added. + bool + tryAdd(BuilderTx const& tx) + { + ZoneScoped; + // A fast-fail condition to ensure that adding the transaction won't + // exceed the theorethical limit of instructions per stage. + if (mInstructions + tx.mInstructions > mConfig.instructionsPerStage()) + { + return false; + } + // First, find all clusters that conflict with the new transaction. + auto conflictingClusters = getConflictingClusters(tx); + + bool packed = false; + // Then, try creating new clusters by merging the conflicting clusters + // together and adding the new transaction to the resulting cluster. + auto newClusters = createNewClusters(tx, conflictingClusters, packed); + // Fail fast if a new cluster will end up too large to fit into the + // stage. + if (newClusters.empty()) + { + return false; + } + // If the merge didn't cause a perturbation in bin-packing, we can just + // replace the old clusters with the new ones within one of the + // existing bins. + if (packed) + { + mClusters = newClusters; + mInstructions += tx.mInstructions; + return true; + } + // Otherwise, we need try to recompute the bin-packing from scratch. + std::vector newBinInstructions; + auto newPacking = binPacking(newClusters, newBinInstructions); + // Even if the new cluster is below the limit, it may invalidate the + // stage as a whole in case if we can no longer pack the clusters into + // the required number of bins. + if (newPacking.empty()) + { + return false; + } + mClusters = newClusters; + mBinPacking = newPacking; + mInstructions += tx.mInstructions; + mBinInstructions = newBinInstructions; + return true; + } + + // Visit every transaction in the stage. + // The visitor arguments are the index of the bin the transaction is packed + // into and the index of the transaction itself. + void + visitAllTransactions(std::function visitor) const + { + for (auto const& cluster : mClusters) + { + size_t txId = 0; + while (cluster->mTxIds.nextSet(txId)) + { + visitor(cluster->mBinId, txId); + ++txId; + } + } + } + + private: + std::unordered_set + getConflictingClusters(BuilderTx const& tx) const + { + std::unordered_set conflictingClusters; + for (auto const& cluster : mClusters) + { + if (cluster->mConflictTxs.get(tx.mId)) + { + conflictingClusters.insert(cluster.get()); + } + } + return conflictingClusters; + } + + std::vector> + createNewClusters(BuilderTx const& tx, + std::unordered_set const& txConflicts, + bool& packed) + { + int64_t newInstructions = tx.mInstructions; + for (auto const* cluster : txConflicts) + { + newInstructions += cluster->mInstructions; + } + + // Fast-fail condition to ensure that the new cluster doesn't exceed + // the instructions limit. + if (newInstructions > mConfig.mInstructionsPerCluster) + { + return {}; + } + auto newCluster = std::make_shared(tx); + for (auto const* cluster : txConflicts) + { + newCluster->merge(*cluster); + } + // Remove the clusters that were merged from their respective bins. + for (auto const& cluster : txConflicts) + { + mBinInstructions[cluster->mBinId] -= cluster->mInstructions; + mBinPacking[cluster->mBinId].inplaceDifference(cluster->mTxIds); + } + + packed = false; + // Try to simply put the new cluster into any one of the existing bins. + // If we can do that, then we save quite a bit of time on not redoing + // the bin-packing from scratch. + for (size_t binId = 0; binId < mConfig.mClustersPerStage; ++binId) + { + if (mBinInstructions[binId] + newCluster->mInstructions <= + mConfig.mInstructionsPerCluster) + { + mBinInstructions[binId] += newCluster->mInstructions; + mBinPacking[binId].inplaceUnion(newCluster->mTxIds); + newCluster->mBinId = binId; + packed = true; + break; + } + } + + std::vector> newClusters; + newClusters.reserve(mClusters.size() + 1 - txConflicts.size()); + for (auto const& cluster : mClusters) + { + if (txConflicts.find(cluster.get()) == txConflicts.end()) + { + newClusters.push_back(cluster); + } + } + newClusters.push_back(newCluster); + // If we couldn't pack the new cluster without full bin-packing, we + // recover the state of the bins (so that the transaction is not + // considered to have been added yet). + if (!packed) + { + for (auto const& cluster : txConflicts) + { + mBinInstructions[cluster->mBinId] += cluster->mInstructions; + mBinPacking[cluster->mBinId].inplaceUnion(cluster->mTxIds); + } + } + return newClusters; + } + + // Simple bin-packing first-fit-decreasing heuristic + // (https://en.wikipedia.org/wiki/First-fit-decreasing_bin_packing). + // This has around 11/9 maximum approximation ratio, which probably has + // the best complexity/performance tradeoff out of all the heuristics. + std::vector + binPacking(std::vector>& clusters, + std::vector& binInsns) const + { + // We could consider dropping the sort here in order to save some time + // and using just the first-fit heuristic, but that also raises the + // approximation ratio to 1.7. + std::sort(clusters.begin(), clusters.end(), + [](auto const& a, auto const& b) { + return a->mInstructions > b->mInstructions; + }); + size_t const binCount = mConfig.mClustersPerStage; + std::vector bins(binCount); + binInsns.resize(binCount); + std::vector newBinId(clusters.size()); + // Just add every cluster into the first bin it fits into. + for (size_t clusterId = 0; clusterId < clusters.size(); ++clusterId) + { + auto const& cluster = clusters[clusterId]; + bool packed = false; + for (size_t i = 0; i < binCount; ++i) + { + if (binInsns[i] + cluster->mInstructions <= + mConfig.mInstructionsPerCluster) + { + binInsns[i] += cluster->mInstructions; + bins[i].inplaceUnion(cluster->mTxIds); + newBinId[clusterId] = i; + packed = true; + break; + } + } + if (!packed) + { + return std::vector(); + } + } + for (size_t clusterId = 0; clusterId < clusters.size(); ++clusterId) + { + clusters[clusterId]->mBinId = newBinId[clusterId]; + } + return bins; + } + + std::vector> mClusters; + std::vector mBinPacking; + std::vector mBinInstructions; + int64_t mInstructions = 0; + ParallelPartitionConfig mConfig; +}; + +} // namespace + +TxStageFrameList +buildSurgePricedParallelSorobanPhase( + TxFrameList const& txFrames, Config const& cfg, + SorobanNetworkConfig const& sorobanCfg, + std::shared_ptr laneConfig, + std::vector& hadTxNotFittingLane) +{ + ZoneScoped; + // Simplify the transactions to the minimum necessary amount of data. + std::unordered_map + builderTxForTx; + std::vector> builderTxs; + builderTxs.reserve(txFrames.size()); + for (size_t i = 0; i < txFrames.size(); ++i) + { + auto const& txFrame = txFrames[i]; + builderTxs.emplace_back(std::make_unique(i, *txFrame)); + builderTxForTx.emplace(txFrame, builderTxs.back().get()); + } + + // Before trying to include any transactions, find all the pairs of the + // conflicting transactions and mark the conflicts in the builderTxs. + // + // In order to find the conflicts, we build the maps from the footprint + // keys to transactions, then mark the conflicts between the transactions + // that share RW key, or between the transactions that share RO and RW key. + // + // The approach here is optimized towards the low number of conflicts, + // specifically when there are no conflicts at all, the complexity is just + // O(total_footprint_entry_count). The worst case is roughly + // O(max_tx_footprint_size * transaction_count ^ 2), which is equivalent + // to the complexity of the straightforward approach of iterating over all + // the transaction pairs. + // + // This also has the further optimization potential: we could populate the + // key maps and even the conflicting transactions eagerly in tx queue, thus + // amortizing the costs across the whole ledger duration. + UnorderedMap> txsWithRoKey; + UnorderedMap> txsWithRwKey; + for (size_t i = 0; i < txFrames.size(); ++i) + { + auto const& txFrame = txFrames[i]; + auto const& footprint = txFrame->sorobanResources().footprint; + for (auto const& key : footprint.readOnly) + { + txsWithRoKey[key].push_back(i); + } + for (auto const& key : footprint.readWrite) + { + txsWithRwKey[key].push_back(i); + } + } + + for (auto const& [key, rwTxIds] : txsWithRwKey) + { + // RW-RW conflicts + for (size_t i = 0; i < rwTxIds.size(); ++i) + { + for (size_t j = i + 1; j < rwTxIds.size(); ++j) + { + builderTxs[rwTxIds[i]]->mConflictTxs.set(rwTxIds[j]); + builderTxs[rwTxIds[j]]->mConflictTxs.set(rwTxIds[i]); + } + } + // RO-RW conflicts + auto roIt = txsWithRoKey.find(key); + if (roIt != txsWithRoKey.end()) + { + auto const& roTxIds = roIt->second; + for (size_t i = 0; i < roTxIds.size(); ++i) + { + for (size_t j = 0; j < rwTxIds.size(); ++j) + { + builderTxs[roTxIds[i]]->mConflictTxs.set(rwTxIds[j]); + builderTxs[rwTxIds[j]]->mConflictTxs.set(roTxIds[i]); + } + } + } + } + + // Process the transactions in the surge pricing (decreasing fee) order. + // This also automatically ensures that the resource limits are respected + // for all the dimensions besides instructions. + SurgePricingPriorityQueue queue( + /* isHighestPriority */ true, laneConfig, + stellar::rand_uniform(0, std::numeric_limits::max())); + for (auto const& tx : txFrames) + { + queue.add(tx); + } + + ParallelPartitionConfig partitionCfg(cfg, sorobanCfg); + std::vector stages(partitionCfg.mStageCount, partitionCfg); + + // Visit the transactions in the surge pricing queue and try to add them to + // at least one of the stages. + auto visitor = [&stages, + &builderTxForTx](TransactionFrameBaseConstPtr const& tx) { + bool added = false; + auto builderTxIt = builderTxForTx.find(tx); + releaseAssert(builderTxIt != builderTxForTx.end()); + for (auto& stage : stages) + { + if (stage.tryAdd(*builderTxIt->second)) + { + added = true; + break; + } + } + if (added) + { + return SurgePricingPriorityQueue::VisitTxResult::PROCESSED; + } + // If a transaction didn't fit into any of the stages, we consider it + // to have been excluded due to resource limits and thus notify the + // surge pricing queue that surge pricing should be triggered ( + // REJECTED imitates the behavior for exceeding the resource limit + // within the queue itself). + return SurgePricingPriorityQueue::VisitTxResult::REJECTED; + }; + + std::vector laneLeftUntilLimitUnused; + queue.popTopTxs(/* allowGaps */ true, visitor, laneLeftUntilLimitUnused, + hadTxNotFittingLane); + releaseAssert(hadTxNotFittingLane.size() == 1); + + // At this point the stages have been filled with transactions and we just + // need to place the full transactions into the respective stages/clusters. + TxStageFrameList resStages; + resStages.reserve(stages.size()); + for (auto const& stage : stages) + { + auto& resStage = resStages.emplace_back(); + resStage.reserve(partitionCfg.mClustersPerStage); + + std::unordered_map clusterIdToStageCluster; + + stage.visitAllTransactions([&resStage, &txFrames, + &clusterIdToStageCluster](size_t clusterId, + size_t txId) { + auto it = clusterIdToStageCluster.find(clusterId); + if (it == clusterIdToStageCluster.end()) + { + it = clusterIdToStageCluster.emplace(clusterId, resStage.size()) + .first; + resStage.emplace_back(); + } + resStage[it->second].push_back(txFrames[txId]); + }); + // Algorithm ensures that clusters are populated from first to last and + // no empty clusters are generated. + for (auto const& cluster : resStage) + { + releaseAssert(!cluster.empty()); + } + } + // Ensure we don't return any empty stages, which is prohibited by the + // protocol. The algorithm builds the stages such that the stages are + // populated from first to last. + while (!resStages.empty() && resStages.back().empty()) + { + resStages.pop_back(); + } + for (auto const& stage : resStages) + { + releaseAssert(!stage.empty()); + } + + return resStages; +} + +} // namespace stellar diff --git a/src/herder/ParallelTxSetBuilder.h b/src/herder/ParallelTxSetBuilder.h new file mode 100644 index 0000000000..3b5ac69887 --- /dev/null +++ b/src/herder/ParallelTxSetBuilder.h @@ -0,0 +1,29 @@ +#pragma once + +// Copyright 2025 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "herder/SurgePricingUtils.h" +#include "herder/TxSetFrame.h" +#include "ledger/NetworkConfig.h" +#include "main/Config.h" + +namespace stellar +{ +// Builds a sequence of parallel processing stages from the provided +// transactions while respecting the limits defined by the network +// configuration. +// The number of stages and the number of clusters in each stage is determined +// by the provided configurations (`cfg` and `sorobanCfg`). +// The resource limits in transactions are determined based on the input +// `laneConfig`. +// This doesn't support multi-lane surge pricing and thus it's expected +// `laneConfig` to only have a configuration for a single surge pricing lane. +TxStageFrameList buildSurgePricedParallelSorobanPhase( + TxFrameList const& txFrames, Config const& cfg, + SorobanNetworkConfig const& sorobanCfg, + std::shared_ptr laneConfig, + std::vector& hadTxNotFittingLane); + +} // namespace stellar diff --git a/src/herder/SurgePricingUtils.cpp b/src/herder/SurgePricingUtils.cpp index db9d63e6a0..bc473c032a 100644 --- a/src/herder/SurgePricingUtils.cpp +++ b/src/herder/SurgePricingUtils.cpp @@ -326,6 +326,13 @@ SurgePricingPriorityQueue::popTopTxs( laneLeftUntilLimit[lane] -= res; } } + else if (visitRes == VisitTxResult::REJECTED) + { + // If a transaction hasn't been processed, then it is considered to + // be not fitting the lane. + hadTxNotFittingLane[GENERIC_LANE] = true; + hadTxNotFittingLane[lane] = true; + } erase(currIt); } } diff --git a/src/herder/SurgePricingUtils.h b/src/herder/SurgePricingUtils.h index 08473e43a8..c8c96d659c 100644 --- a/src/herder/SurgePricingUtils.h +++ b/src/herder/SurgePricingUtils.h @@ -133,6 +133,9 @@ class SurgePricingPriorityQueue // Transaction should be skipped and not counted towards the lane // limits. SKIPPED, + // Like `SKIPPED`, but marks the fact that the transaction didn't fit + // into the lane due to reasons beyond the lane's resource limit. + REJECTED, // Transaction has been processed and should be counted towards the // lane limits. PROCESSED @@ -184,6 +187,17 @@ class SurgePricingPriorityQueue std::vector>& txsToEvict) const; + // Generalized method for visiting and popping the top transactions in the + // queue until the lane limits are reached. + // This is a destructive method that removes all or most of the queue + // elements and thus should be used with care. + void popTopTxs( + bool allowGaps, + std::function const& + visitor, + std::vector& laneResourcesLeftUntilLimit, + std::vector& hadTxNotFittingLane); + private: class TxComparator { @@ -236,17 +250,6 @@ class SurgePricingPriorityQueue std::vector mutable mIters; }; - // Generalized method for visiting and popping the top transactions in the - // queue until the lane limits are reached. - // This is a destructive method that removes all or most of the queue - // elements and thus should be used with care. - void popTopTxs( - bool allowGaps, - std::function const& - visitor, - std::vector& laneResourcesLeftUntilLimit, - std::vector& hadTxNotFittingLane); - void erase(Iterator const& it); void erase(size_t lane, SurgePricingPriorityQueue::TxSortedSet::iterator iter); diff --git a/src/herder/TxSetFrame.cpp b/src/herder/TxSetFrame.cpp index 989e34406f..50e7cb28cc 100644 --- a/src/herder/TxSetFrame.cpp +++ b/src/herder/TxSetFrame.cpp @@ -9,6 +9,7 @@ #include "crypto/Random.h" #include "crypto/SHA.h" #include "database/Database.h" +#include "herder/ParallelTxSetBuilder.h" #include "herder/SurgePricingUtils.h" #include "ledger/LedgerManager.h" #include "ledger/LedgerTxn.h" @@ -487,8 +488,8 @@ computeLaneBaseFee(TxSetPhase phase, LedgerHeader const& ledgerHeader, return laneBaseFee; } -std::pair> -applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app) +std::shared_ptr +createSurgePricingLangeConfig(TxSetPhase phase, Application& app) { ZoneScoped; releaseAssert(threadIsMain()); @@ -524,6 +525,16 @@ applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app) auto limits = app.getLedgerManager().maxLedgerResources( /* isSoroban */ true); + // When building Soroban tx sets with parallel execution support, + // instructions are accounted for by the build logic, not by the surge + // pricing config, so we need to relax the instruction limit in surge + // pricing logic. + if (protocolVersionStartsFrom(lclHeader.ledgerVersion, + PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION)) + { + limits.setVal(Resource::Type::INSTRUCTIONS, + std::numeric_limits::max()); + } auto byteLimit = std::min(static_cast(MAX_SOROBAN_BYTE_ALLOWANCE), @@ -533,27 +544,102 @@ applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app) surgePricingLaneConfig = std::make_shared(limits); } - auto includedTxs = SurgePricingPriorityQueue::getMostTopTxsWithinLimits( + return surgePricingLaneConfig; +} + +TxFrameList +buildSurgePricedSequentialPhase( + TxFrameList const& txs, + std::shared_ptr surgePricingLaneConfig, + std::vector& hadTxNotFittingLane) +{ + ZoneScoped; + return SurgePricingPriorityQueue::getMostTopTxsWithinLimits( txs, surgePricingLaneConfig, hadTxNotFittingLane); +} - size_t laneCount = surgePricingLaneConfig->getLaneLimits().size(); - std::vector lowestLaneFee(laneCount, - std::numeric_limits::max()); - for (auto const& tx : includedTxs) +std::pair, + std::shared_ptr> +applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app) +{ + ZoneScoped; + auto surgePricingLaneConfig = createSurgePricingLangeConfig(phase, app); + std::vector hadTxNotFittingLane; + bool isParallelSoroban = + phase == TxSetPhase::SOROBAN && + protocolVersionStartsFrom(app.getLedgerManager() + .getLastClosedLedgerHeader() + .header.ledgerVersion, + PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION); + std::variant includedTxs; + if (isParallelSoroban) + { + includedTxs = buildSurgePricedParallelSorobanPhase( + txs, app.getConfig(), + app.getLedgerManager().getSorobanNetworkConfigReadOnly(), + surgePricingLaneConfig, hadTxNotFittingLane); + } + else { - size_t lane = surgePricingLaneConfig->getLane(*tx); - auto perOpFee = computePerOpFee(*tx, lclHeader.ledgerVersion); - lowestLaneFee[lane] = std::min(lowestLaneFee[lane], perOpFee); + includedTxs = buildSurgePricedSequentialPhase( + txs, surgePricingLaneConfig, hadTxNotFittingLane); } + + auto visitIncludedTxs = + [&includedTxs]( + std::function visitor) { + std::visit( + [&visitor](auto const& txs) { + using T = std::decay_t; + if constexpr (std::is_same_v) + { + for (auto const& tx : txs) + { + visitor(tx); + } + } + else if constexpr (std::is_same_v) + { + for (auto const& stage : txs) + { + for (auto const& thread : stage) + { + for (auto const& tx : thread) + { + visitor(tx); + } + } + } + } + else + { + releaseAssert(false); + } + }, + includedTxs); + }; + + std::vector lowestLaneFee; + auto const& lclHeader = + app.getLedgerManager().getLastClosedLedgerHeader().header; + + size_t laneCount = surgePricingLaneConfig->getLaneLimits().size(); + lowestLaneFee.resize(laneCount, std::numeric_limits::max()); + visitIncludedTxs( + [&lowestLaneFee, &surgePricingLaneConfig, &lclHeader](auto const& tx) { + size_t lane = surgePricingLaneConfig->getLane(*tx); + auto perOpFee = computePerOpFee(*tx, lclHeader.ledgerVersion); + lowestLaneFee[lane] = std::min(lowestLaneFee[lane], perOpFee); + }); auto laneBaseFee = computeLaneBaseFee(phase, lclHeader, *surgePricingLaneConfig, lowestLaneFee, hadTxNotFittingLane); auto inclusionFeeMapPtr = std::make_shared(); auto& inclusionFeeMap = *inclusionFeeMapPtr; - for (auto const& tx : includedTxs) - { + visitIncludedTxs([&inclusionFeeMap, &laneBaseFee, + &surgePricingLaneConfig](auto const& tx) { inclusionFeeMap[tx] = laneBaseFee[surgePricingLaneConfig->getLane(*tx)]; - } + }); return std::make_pair(includedTxs, inclusionFeeMapPtr); } @@ -738,29 +824,28 @@ makeTxSetFromTransactions(PerPhaseTransactionList const& txPhases, } #endif auto phaseType = static_cast(i); - auto [includedTxs, inclusionFeeMap] = + auto [includedTxs, inclusionFeeMapBinding] = applySurgePricing(phaseType, validatedTxs, app); - if (phaseType != TxSetPhase::SOROBAN || - protocolVersionIsBefore(app.getLedgerManager() - .getLastClosedLedgerHeader() - .header.ledgerVersion, - PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION)) - { - validatedPhases.emplace_back(TxSetPhaseFrame( - phaseType, std::move(includedTxs), inclusionFeeMap)); - } - // This is a temporary stub for building a valid parallel tx set - // without any parallelization. - else - { - TxStageFrameList stages; - if (!includedTxs.empty()) - { - stages.emplace_back().push_back(includedTxs); - } - validatedPhases.emplace_back( - TxSetPhaseFrame(phaseType, std::move(stages), inclusionFeeMap)); - } + auto inclusionFeeMap = inclusionFeeMapBinding; + std::visit( + [&validatedPhases, phaseType, inclusionFeeMap](auto&& txs) { + using T = std::decay_t; + if constexpr (std::is_same_v) + { + validatedPhases.emplace_back( + TxSetPhaseFrame(phaseType, txs, inclusionFeeMap)); + } + else if constexpr (std::is_same_v) + { + validatedPhases.emplace_back(TxSetPhaseFrame( + phaseType, std::move(txs), inclusionFeeMap)); + } + else + { + releaseAssert(false); + } + }, + includedTxs); } auto const& lclHeader = app.getLedgerManager().getLastClosedLedgerHeader(); diff --git a/src/herder/test/TxSetTests.cpp b/src/herder/test/TxSetTests.cpp index 376cb3a0ea..82b80d7c75 100644 --- a/src/herder/test/TxSetTests.cpp +++ b/src/herder/test/TxSetTests.cpp @@ -3,6 +3,7 @@ // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 #include "crypto/SHA.h" +#include "herder/ParallelTxSetBuilder.h" #include "herder/TxSetFrame.h" #include "herder/test/TestTxSetUtils.h" #include "ledger/LedgerManager.h" @@ -1299,8 +1300,11 @@ TEST_CASE("applicable txset validation - Soroban resources", "[txset][soroban]") std::numeric_limits::max(); sorobanCfg.mLedgerMaxTransactionsSizeBytes = std::numeric_limits::max(); - sorobanCfg.mLedgerMaxDependentTxClusters = - std::numeric_limits::max(); + // sorobanCfg.mLedgerMaxDependentTxClusters = + // std::numeric_limits::max(); + // TODO: need a reasonable lower bound for validating + // this? + sorobanCfg.mLedgerMaxDependentTxClusters = 100; }); TxStageFrameList nonConflictingTxsPerStage = { { @@ -2037,5 +2041,798 @@ TEST_CASE("txset nomination", "[txset]") #endif } +#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION +TEST_CASE("parallel tx set building", "[txset][soroban]") +{ + int const STAGE_COUNT = 4; + int const CLUSTER_COUNT = 8; + + VirtualClock clock; + auto cfg = getTestConfig(); + cfg.LEDGER_PROTOCOL_VERSION = + static_cast(PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION); + cfg.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION = + static_cast(PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION); + cfg.SOROBAN_PHASE_STAGE_COUNT = STAGE_COUNT; + Application::pointer app = createTestApplication(clock, cfg); + overrideSorobanNetworkConfigForTest(*app); + modifySorobanNetworkConfig(*app, [&](SorobanNetworkConfig& sorobanCfg) { + sorobanCfg.mLedgerMaxInstructions = 400'000'000; + sorobanCfg.mLedgerMaxReadLedgerEntries = 3000; + sorobanCfg.mLedgerMaxWriteLedgerEntries = 2000; + sorobanCfg.mLedgerMaxReadBytes = 1'000'000; + sorobanCfg.mLedgerMaxWriteBytes = 100'000; + sorobanCfg.mLedgerMaxTxCount = 1000; + sorobanCfg.mLedgerMaxDependentTxClusters = CLUSTER_COUNT; + }); + auto root = TestAccount::createRoot(*app); + std::map accounts; + int accountId = 1; + SCAddress contract(SC_ADDRESS_TYPE_CONTRACT); + + auto generateKey = [&contract](int i) { + return stellar::contractDataKey( + contract, txtest::makeU32(i), + i % 2 == 0 ? ContractDataDurability::PERSISTENT + : ContractDataDurability::TEMPORARY); + }; + + auto createTx = [&](int instructions, std::vector const& roKeys, + std::vector rwKeys, int64_t inclusionFee = 1000, + int readBytes = 1000, int writeBytes = 100) { + auto it = accounts.find(accountId); + if (it == accounts.end()) + { + it = accounts + .emplace(accountId, root.create(std::to_string(accountId), + 1'000'000'000)) + .first; + } + ++accountId; + auto source = it->second; + SorobanResources resources; + resources.instructions = instructions; + resources.readBytes = readBytes; + resources.writeBytes = writeBytes; + for (auto roKeyId : roKeys) + { + resources.footprint.readOnly.push_back(generateKey(roKeyId)); + } + for (auto rwKeyId : rwKeys) + { + resources.footprint.readWrite.push_back(generateKey(rwKeyId)); + } + auto resourceFee = sorobanResourceFee(*app, resources, 10'000, 40); + // It doesn't really matter what tx does as we're only interested in + // its resources. + auto tx = createUploadWasmTx(*app, source, inclusionFee, resourceFee, + resources); + LedgerSnapshot ls(*app); + REQUIRE( + tx->checkValid(app->getAppConnector(), ls, 0, 0, 0)->isSuccess()); + + return tx; + }; + + auto validateShape = [&](ApplicableTxSetFrame const& txSet, + size_t stageCount, size_t clustersPerStage, + size_t txsPerCluster) { + auto const& phase = + txSet.getPhase(TxSetPhase::SOROBAN).getParallelStages(); + + REQUIRE(phase.size() == stageCount); + for (auto const& stage : phase) + { + REQUIRE(stage.size() == clustersPerStage); + for (auto const& cluster : stage) + { + REQUIRE(cluster.size() == txsPerCluster); + } + } + }; + + auto validateBaseFee = [&](ApplicableTxSetFrame const& txSet, + int64_t baseFee) { + for (auto const& tx : txSet.getPhase(TxSetPhase::SOROBAN)) + { + REQUIRE(*txSet.getTxBaseFee(tx) == baseFee); + } + }; + + SECTION("no conflicts") + { + SECTION("single stage") + { + std::vector sorobanTxs; + for (int i = 0; i < CLUSTER_COUNT; ++i) + { + sorobanTxs.push_back(createTx(100'000'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3})); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + validateShape(*txSet, 1, CLUSTER_COUNT, 1); + validateBaseFee(*txSet, 100); + } + SECTION("all stages") + { + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT; ++i) + { + sorobanTxs.push_back(createTx(100'000'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3})); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, STAGE_COUNT, CLUSTER_COUNT, 1); + validateBaseFee(*txSet, 100); + } + SECTION("all stages, smaller txs") + { + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT * 5; ++i) + { + sorobanTxs.push_back(createTx(20'000'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3})); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, STAGE_COUNT, CLUSTER_COUNT, 5); + validateBaseFee(*txSet, 100); + } + + SECTION("all stages, smaller txs with prioritization") + { + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT * 10; ++i) + { + sorobanTxs.push_back(createTx( + 20'000'000, {4 * i, 4 * i + 1}, {4 * i + 2, 4 * i + 3}, + /* inclusionFee*/ (i + 1) * 1000LL)); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, STAGE_COUNT, CLUSTER_COUNT, 5); + validateBaseFee( + *txSet, 10LL * STAGE_COUNT * CLUSTER_COUNT * 1000 / 2 + 1000); + } + + SECTION("instruction limit reached") + { + modifySorobanNetworkConfig( + *app, [&](SorobanNetworkConfig& sorobanCfg) { + sorobanCfg.mLedgerMaxInstructions = 1'000'000; + }); + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT * 4; ++i) + { + sorobanTxs.push_back(createTx(250'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3}, + /* inclusionFee */ 100 + i)); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, STAGE_COUNT, CLUSTER_COUNT, 1); + validateBaseFee(*txSet, 100 + STAGE_COUNT * CLUSTER_COUNT * 4 - + STAGE_COUNT * CLUSTER_COUNT); + } + SECTION("read bytes limit reached") + { + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT; ++i) + { + sorobanTxs.push_back(createTx(1'000'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3}, + /* inclusionFee */ 100 + i, + /* readBytes */ 100'000)); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, 1, 1, 10); + validateBaseFee(*txSet, 100 + STAGE_COUNT * CLUSTER_COUNT - 10); + } + SECTION("read entries limit reached") + { + modifySorobanNetworkConfig( + *app, [&](SorobanNetworkConfig& sorobanCfg) { + sorobanCfg.mLedgerMaxReadLedgerEntries = 4 * 10 + 3; + }); + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT; ++i) + { + sorobanTxs.push_back(createTx(1'000'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3}, + /* inclusionFee */ 100 + i, + /* readBytes */ 100'000)); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, 1, 1, 10); + validateBaseFee(*txSet, 100 + STAGE_COUNT * CLUSTER_COUNT - 10); + } + SECTION("write bytes limit reached") + { + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT; ++i) + { + sorobanTxs.push_back(createTx(1'000'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3}, + /* inclusionFee */ 100 + i, + /* readBytes */ 100, + /* writeBytes */ 10'000)); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, 1, 1, 10); + validateBaseFee(*txSet, 100 + STAGE_COUNT * CLUSTER_COUNT - 10); + } + SECTION("write entries limit reached") + { + modifySorobanNetworkConfig( + *app, [&](SorobanNetworkConfig& sorobanCfg) { + sorobanCfg.mLedgerMaxWriteLedgerEntries = 2 * 10 + 1; + }); + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT; ++i) + { + sorobanTxs.push_back(createTx(1'000'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3}, + /* inclusionFee */ 100 + i)); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, 1, 1, 10); + validateBaseFee(*txSet, 100 + STAGE_COUNT * CLUSTER_COUNT - 10); + } + SECTION("tx size limit reached") + { + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT; ++i) + { + sorobanTxs.push_back(createTx(1'000'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3}, + /* inclusionFee */ 100 + i)); + } + modifySorobanNetworkConfig( + *app, [&](SorobanNetworkConfig& sorobanCfg) { + sorobanCfg.mLedgerMaxTransactionsSizeBytes = + xdr::xdr_size(sorobanTxs[0]->getEnvelope()) * 11 - 1; + }); + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, 1, 1, 10); + validateBaseFee(*txSet, 100 + STAGE_COUNT * CLUSTER_COUNT - 10); + } + SECTION("tx count limit reached") + { + modifySorobanNetworkConfig(*app, + [&](SorobanNetworkConfig& sorobanCfg) { + sorobanCfg.mLedgerMaxTxCount = 5; + }); + std::vector sorobanTxs; + for (int i = 0; i < STAGE_COUNT * CLUSTER_COUNT; ++i) + { + sorobanTxs.push_back(createTx(1'000'000, {4 * i, 4 * i + 1}, + {4 * i + 2, 4 * i + 3}, + /* inclusionFee */ 100 + i)); + } + + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, 1, 1, 5); + validateBaseFee(*txSet, 100 + STAGE_COUNT * CLUSTER_COUNT - 5); + } + } + + SECTION("with conflicts") + { + SECTION("all RW conflicting") + { + std::vector sorobanTxs; + for (int i = 0; i < CLUSTER_COUNT * STAGE_COUNT; ++i) + { + sorobanTxs.push_back(createTx(100'000'000, + {4 * i + 1, 4 * i + 2}, + {4 * i + 3, 0, 4 * i + 4}, + /* inclusionFee */ 100 + i)); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + validateShape(*txSet, STAGE_COUNT, 1, 1); + validateBaseFee(*txSet, + 100 + CLUSTER_COUNT * STAGE_COUNT - STAGE_COUNT); + } + SECTION("chain of conflicts") + { + std::vector sorobanTxs; + for (int i = 0; i < CLUSTER_COUNT * STAGE_COUNT; ++i) + { + sorobanTxs.push_back(createTx(100'000'000, {i}, {i + 1}, + /* inclusionFee */ 100 + i)); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + // It's easy to 'break' the chain by allocating transactions to + // different stages (technically, 2 stages would be sufficient). + validateShape(*txSet, STAGE_COUNT, CLUSTER_COUNT, 1); + validateBaseFee(*txSet, 100); + } + SECTION("small conflict clusters") + { + std::vector sorobanTxs; + for (int i = 0; i < CLUSTER_COUNT; ++i) + { + for (int j = 0; j < STAGE_COUNT; ++j) + { + sorobanTxs.push_back( + createTx(100'000'000, {i * STAGE_COUNT + j + 1000}, + {i, i * STAGE_COUNT + j + 10000}, + /* inclusionFee */ 100 + i)); + } + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + // Conflicting transactions can be distributed into separate + // stages. + validateShape(*txSet, STAGE_COUNT, CLUSTER_COUNT, 1); + validateBaseFee(*txSet, 100); + } + SECTION("small conflict clusters with excluded txs") + { + std::vector sorobanTxs; + for (int i = 0; i < CLUSTER_COUNT; ++i) + { + for (int j = 0; j < STAGE_COUNT + 1; ++j) + { + sorobanTxs.push_back(createTx( + 100'000'000, {}, {i}, + /* inclusionFee */ 100 + i * (STAGE_COUNT + 1) + j)); + } + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + // Conflicting transactions can be distributed into separate stages + // and lower fee txs in every cluster will be excluded. + validateShape(*txSet, STAGE_COUNT, CLUSTER_COUNT, 1); + // 1 cluster worth of txs will be excluded, however, the lowest fee + // transaction in the set has a fee of 101 (generated in cluster 0, + // stage 1). + validateBaseFee(*txSet, 101); + } + SECTION("one sparse conflict cluster") + { + std::vector sorobanTxs; + // A small dense cluster of RW conflicts on entry 1000 with high + // fee to ensure these are included. + for (int i = 0; i < STAGE_COUNT; ++i) + { + sorobanTxs.push_back( + createTx(100'000'000, {}, {i, 1000}, + /* inclusionFee */ 1'000'000 - i)); + } + // Create a (CLUSTER_COUNT - 1) txs with RO-RW conflict with one + // of the transactions in the small dense cluster, so that it's + // possible to fit them all into a stage with one of the dense + // cluster transactions. + for (int i = 0; i < STAGE_COUNT; ++i) + { + for (int j = 0; j < CLUSTER_COUNT - 1; ++j) + { + sorobanTxs.push_back(createTx( + 100'000'000, {i}, {i * CLUSTER_COUNT + j + 10'000}, + /* inclusionFee */ 1000 + i * CLUSTER_COUNT + j)); + } + } + // This is the assumption under which this test operates. + releaseAssert(CLUSTER_COUNT > STAGE_COUNT); + // Add some cheap transactions that conflict with the dense cluster + // that shouldn't be included. + for (int i = 0; i < CLUSTER_COUNT - STAGE_COUNT; ++i) + { + sorobanTxs.push_back(createTx(100'000'000, {i % STAGE_COUNT}, + {i + 100'000}, + /* inclusionFee */ 100 + i)); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + // All transactions can be distributed across stages, but 4 + // transactions simply don't fit into instruction limits (hence 103 + // base fee). + validateShape(*txSet, STAGE_COUNT, CLUSTER_COUNT, 1); + validateBaseFee(*txSet, 1000); + } + SECTION("many clusters with small transactions") + { + std::vector sorobanTxs; + for (int i = 0; i < CLUSTER_COUNT; ++i) + { + for (int j = 0; j < 10 * STAGE_COUNT; ++j) + { + sorobanTxs.push_back(createTx( + 10'000'000, {1000 + i * 10 + j}, + {i, 10'000 + i * 10 + j}, + /* inclusionFee */ 100 + i * (STAGE_COUNT + 1) + j)); + } + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + + validateShape(*txSet, STAGE_COUNT, CLUSTER_COUNT, 10); + validateBaseFee(*txSet, 100); + } + SECTION("all RO conflict with one RW") + { + std::vector sorobanTxs; + sorobanTxs.push_back(createTx(100'000'000, {1, 2}, {0, 3, 4}, + /* inclusionFee */ 1'000'000)); + for (int i = 1; i < CLUSTER_COUNT * STAGE_COUNT * 5; ++i) + { + sorobanTxs.push_back(createTx(20'000'000, + {0, 4 * i + 1, 4 * i + 2}, + {4 * i + 3, 4 * i + 4}, + /* inclusionFee */ 100 + i)); + } + + PerPhaseTransactionList phases = {{}, sorobanTxs}; + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + auto const& phase = + txSet->getPhase(TxSetPhase::SOROBAN).getParallelStages(); + + bool wasSingleThreadStage = false; + + for (auto const& stage : phase) + { + if (stage.size() == 1) + { + REQUIRE(!wasSingleThreadStage); + wasSingleThreadStage = true; + REQUIRE(stage[0].size() == 1); + REQUIRE(stage[0][0]->getEnvelope() == + sorobanTxs[0]->getEnvelope()); + continue; + } + REQUIRE(stage.size() == CLUSTER_COUNT); + for (auto const& thread : stage) + { + REQUIRE(thread.size() == 5); + } + } + // We can't include any of the small txs into stage 0, as it's + // occupied by high fee tx that writes entry 0. + validateBaseFee(*txSet, 100 + CLUSTER_COUNT * 5); + } + } + SECTION("smoke test") + { + auto runTest = [&]() { + stellar::uniform_int_distribution<> maxInsnsDistr(20'000'000, + 100'000'000); + stellar::uniform_int_distribution<> keyRangeDistr(50, 1000); + stellar::uniform_int_distribution<> insnsDistr( + 1'000'000, maxInsnsDistr(Catch::rng())); + stellar::uniform_int_distribution<> keyCountDistr(1, 10); + stellar::uniform_int_distribution<> keyDistr( + 1, keyRangeDistr(Catch::rng())); + stellar::uniform_int_distribution<> feeDistr(100, 100'000); + stellar::uniform_int_distribution<> readBytesDistr(100, 10'000); + stellar::uniform_int_distribution<> writeBytesDistr(10, 1000); + std::vector sorobanTxs; + accountId = 1; + for (int iter = 0; iter < 500; ++iter) + { + int roKeyCount = keyCountDistr(Catch::rng()); + int rwKeyCount = keyCountDistr(Catch::rng()); + std::unordered_set usedKeys; + std::vector roKeys; + std::vector rwKeys; + for (int i = 0; i < roKeyCount + rwKeyCount; ++i) + { + int key = keyDistr(Catch::rng()); + while (usedKeys.find(key) != usedKeys.end()) + { + key = keyDistr(Catch::rng()); + } + if (i < roKeyCount) + { + roKeys.push_back(key); + } + else + { + rwKeys.push_back(key); + } + usedKeys.insert(key); + } + sorobanTxs.push_back(createTx(insnsDistr(Catch::rng()), roKeys, + rwKeys, feeDistr(Catch::rng()), + readBytesDistr(Catch::rng()), + writeBytesDistr(Catch::rng()))); + } + PerPhaseTransactionList phases = {{}, sorobanTxs}; + // NB: `makeTxSetFromTransactions` does an XDR roundtrip and + // validation, so just calling it does a good amount of smoke + // testing. + auto [_, txSet] = makeTxSetFromTransactions(phases, *app, 0, 0); + auto const& phase = + txSet->getPhase(TxSetPhase::SOROBAN).getParallelStages(); + // The only thing we can really be sure about is that all the + // stages are utilized, as we have enough transactions. + REQUIRE(phase.size() == STAGE_COUNT); + }; + for (int iter = 0; iter < 10; ++iter) + { + runTest(); + } + } +} + +TEST_CASE("parallel tx set building benchmark", + "[txset][soroban][bench][!hide]") +{ + int const STAGE_COUNT = 4; + int const CLUSTER_COUNT = 16; + int const MEAN_INCLUDED_TX_COUNT = 1000; + int const TX_COUNT_MEMPOOL_MULTIPLIER = 2; + + int const MEAN_INSTRUCTIONS_PER_TX = 10'000'000; + int const MAX_INSTRUCTIONS_PER_TX = 100'000'000; + int const MEAN_READS_PER_TX = 30; + int const MAX_READS_PER_TX = 60; + int const MEAN_WRITES_PER_TX = 10; + int const MAX_WRITES_PER_TX = 30; + int const MEAN_TX_SIZE = 800; + int const MAX_TX_SIZE = 5000; + // The exact values for r/w bytes aren't meaningful for the performance, + // just give them high enough value to create some spread. + int const MEAN_READ_BYTES_PER_TX = 1000; + int const MAX_READ_BYTES_PER_TX = 5000; + int const MEAN_WRITE_BYTES_PER_TX = 500; + int const MAX_WRITE_BYTES_PER_TX = 2000; + + auto cfg = getTestConfig(); + cfg.SOROBAN_PHASE_STAGE_COUNT = STAGE_COUNT; + + // Only per-ledger limits matter for tx set building, as we don't perform + // any validation. + SorobanNetworkConfig sorobanCfg; + sorobanCfg.mLedgerMaxTransactionsSizeBytes = + MEAN_INCLUDED_TX_COUNT * MEAN_TX_SIZE * 2; + sorobanCfg.mLedgerMaxInstructions = + static_cast(MEAN_INSTRUCTIONS_PER_TX) * + MEAN_INCLUDED_TX_COUNT / CLUSTER_COUNT; + sorobanCfg.mLedgerMaxReadLedgerEntries = + MEAN_INCLUDED_TX_COUNT * (MEAN_READS_PER_TX + MEAN_WRITES_PER_TX) * 2; + sorobanCfg.mLedgerMaxReadBytes = + MEAN_INCLUDED_TX_COUNT * MEAN_READ_BYTES_PER_TX * 2; + sorobanCfg.mLedgerMaxWriteLedgerEntries = + MEAN_INCLUDED_TX_COUNT * MEAN_WRITES_PER_TX * 2; + sorobanCfg.mLedgerMaxWriteBytes = + MEAN_INCLUDED_TX_COUNT * MEAN_WRITE_BYTES_PER_TX * 2; + // This doesn't need to be a real limit for this test. + sorobanCfg.mLedgerMaxTxCount = MEAN_INCLUDED_TX_COUNT * 10; + sorobanCfg.mLedgerMaxDependentTxClusters = CLUSTER_COUNT; + + auto limits = sorobanCfg.maxLedgerResources(); + limits.setVal(Resource::Type::INSTRUCTIONS, + std::numeric_limits::max()); + + auto surgePricingLaneConfig = + std::make_shared(limits); + + SCAddress contract(SC_ADDRESS_TYPE_CONTRACT); + auto generateKey = [&contract](int i) { + return stellar::contractDataKey( + contract, txtest::makeU32(i), + i % 2 == 0 ? ContractDataDurability::PERSISTENT + : ContractDataDurability::TEMPORARY); + }; + + auto createTx = [&](int instructions, std::vector const& roKeys, + std::vector const& rwKeys, int64_t inclusionFee, + int readBytes, int writeBytes, int txSize) { + TransactionEnvelope txEnvelope(EnvelopeType::ENVELOPE_TYPE_TX); + txEnvelope.v1().tx.ext.v(1); + txEnvelope.v1().tx.fee = inclusionFee; + auto& resources = txEnvelope.v1().tx.ext.sorobanData().resources; + + resources.instructions = instructions; + resources.readBytes = readBytes; + resources.writeBytes = writeBytes; + for (auto roKeyId : roKeys) + { + resources.footprint.readOnly.push_back(generateKey(roKeyId)); + } + for (auto rwKeyId : rwKeys) + { + resources.footprint.readWrite.push_back(generateKey(rwKeyId)); + } + auto& op = txEnvelope.v1().tx.operations.emplace_back(); + op.body.type(OperationType::INVOKE_HOST_FUNCTION); + + op.body.invokeHostFunctionOp().hostFunction.type( + HostFunctionType::HOST_FUNCTION_TYPE_UPLOAD_CONTRACT_WASM); + auto currSize = xdr::xdr_size(txEnvelope); + if (currSize < txSize) + { + op.body.invokeHostFunctionOp().hostFunction.wasm().resize(txSize - + currSize); + } + + return TransactionFrameBase::makeTransactionFromWire(Hash{}, + txEnvelope); + }; + std::normal_distribution<> insnsDistr(MEAN_INSTRUCTIONS_PER_TX, + 0.2 * MEAN_INSTRUCTIONS_PER_TX); + std::normal_distribution<> txSizeDistr(MEAN_TX_SIZE, 0.1 * MEAN_TX_SIZE); + std::normal_distribution<> readBytesDistr(MEAN_READ_BYTES_PER_TX, + 0.3 * MEAN_READ_BYTES_PER_TX); + std::normal_distribution<> writeBytesDistr(MEAN_WRITE_BYTES_PER_TX, + 0.05 * MEAN_WRITE_BYTES_PER_TX); + std::normal_distribution<> readCountDistr(MEAN_READS_PER_TX, + 0.2 * MEAN_READS_PER_TX); + std::normal_distribution<> writeCountDistr(MEAN_WRITES_PER_TX, + 0.1 * MEAN_WRITES_PER_TX); + stellar::uniform_int_distribution<> feeDistr(100, 100'000); + + auto genValue = [](auto& distribution, int maxValue) { + return std::min( + maxValue, std::max(1, static_cast( + std::round(distribution(Catch::rng()))))); + }; + + auto generateConflictingTxs = [&](double mean_conflicts_per_tx, + double mean_ro_txs_per_conflict, + double mean_rw_txs_per_conflict) { + int maxKey = 0; + int txCount = MEAN_INCLUDED_TX_COUNT * TX_COUNT_MEMPOOL_MULTIPLIER; + std::vector, std::vector>> txKeys( + txCount); + // First, generate non-conflicting keys that follow the distributions + // for the respective resources. + for (int i = 0; i < txCount; ++i) + { + int readCount = genValue(readCountDistr, MAX_READS_PER_TX); + for (int j = 0; j < readCount; ++j) + { + txKeys[i].first.push_back(maxKey++); + } + int writeCount = genValue(writeCountDistr, MAX_WRITES_PER_TX); + for (int j = 0; j < writeCount; ++j) + { + txKeys[i].second.push_back(maxKey++); + } + } + + // Then, generate the conflict 'clusters', such that the same key is + // present in the RO footprint of `mean_ro_txs_per_conflict` on average + // and in the RW footprint of `mean_rw_txs_per_conflict` on average. + // The total number of the conflicts generated is defined such that + // on average a transaction participates in `mean_conflicts_per_tx` + // clusters. + int currConflictKey = 0; + std::poisson_distribution<> roTxCountDistr(mean_ro_txs_per_conflict); + std::poisson_distribution<> rwTxCountDistr(mean_rw_txs_per_conflict); + int conflictsLeft = txCount * mean_conflicts_per_tx; + // Try to exhaust all the conflicts, but make sure we don't get stuck + // in an infinite loop. + for (int iter = 0; iter < 1000 && conflictsLeft > 0; ++iter) + { + int roTxCount = roTxCountDistr(Catch::rng()); + int rwTxCount = std::max(1, rwTxCountDistr(Catch::rng())); + if (roTxCount + rwTxCount < 2) + { + continue; + } + stellar::shuffle(txKeys.begin(), txKeys.end(), Catch::rng()); + int conflictsLeftToAdd = roTxCount + rwTxCount; + for (int i = 0; i < txCount && conflictsLeftToAdd > 0; ++i) + { + auto& keys = conflictsLeftToAdd > roTxCount ? txKeys[i].second + : txKeys[i].first; + + for (int j = 0; j < keys.size(); ++j) + { + if (keys[j] < maxKey) + { + keys[j] = maxKey + currConflictKey; + --conflictsLeftToAdd; + --conflictsLeft; + break; + } + } + } + ++currConflictKey; + } + + TxFrameList txs; + for (int i = 0; i < txCount; ++i) + { + int insns = genValue(insnsDistr, MAX_INSTRUCTIONS_PER_TX); + int txSize = genValue(txSizeDistr, MAX_TX_SIZE); + int readBytes = genValue(readBytesDistr, MAX_READ_BYTES_PER_TX); + int writeBytes = genValue(writeBytesDistr, MAX_WRITE_BYTES_PER_TX); + int readCount = genValue(readCountDistr, MAX_READS_PER_TX); + int writeCount = genValue(writeCountDistr, MAX_WRITES_PER_TX); + txs.push_back(createTx(insns, txKeys[i].first, txKeys[i].second, + feeDistr(Catch::rng()), readBytes, + writeBytes, txSize)); + } + return txs; + }; + + auto runBenchmark = [&](double mean_conflicts_per_tx, + double mean_ro_txs_per_conflict, + double mean_rw_txs_per_conflict) { + const int iterCount = 5; + std::vector allTxs; + for (int i = 0; i < iterCount; ++i) + { + allTxs.push_back(generateConflictingTxs(mean_conflicts_per_tx, + mean_ro_txs_per_conflict, + mean_rw_txs_per_conflict)); + } + + for (int stageCount = 1; stageCount <= 4; ++stageCount) + { + int64_t totalDuration = 0; + int txsIncluded = 0; + int64_t insnsIncluded = 0; + for (int iter = 0; iter < 5; ++iter) + { + cfg.SOROBAN_PHASE_STAGE_COUNT = stageCount; + std::vector hadTxNotFittingLane; + auto start = std::chrono::steady_clock::now(); + auto stages = buildSurgePricedParallelSorobanPhase( + allTxs[iter], cfg, sorobanCfg, surgePricingLaneConfig, + hadTxNotFittingLane); + auto end = std::chrono::steady_clock::now(); + totalDuration += + std::chrono::duration_cast(end - + start) + .count(); + for (auto const& stage : stages) + { + for (auto const& cluster : stage) + { + txsIncluded += cluster.size(); + for (auto const& tx : cluster) + { + insnsIncluded += + tx->sorobanResources().instructions; + } + } + } + } + std::cout << "Stage count: " << stageCount + << ", mean conflicts per tx: " << mean_conflicts_per_tx + << ", mean RO txs per conflict: " + << mean_ro_txs_per_conflict + << ", mean RW txs per conflict: " + << mean_rw_txs_per_conflict + << ", mean txs included: " << txsIncluded / iterCount + << ", insns included %: " + << static_cast(insnsIncluded) / iterCount * + 100.0 / + (sorobanCfg.ledgerMaxInstructions() * + sorobanCfg.ledgerMaxDependentTxClusters()) + << ", mean duration: " << 1e-6 * totalDuration / iterCount + << " ms" << std::endl; + } + }; + runBenchmark(0, 0, 0); + runBenchmark(10, 40, 1); + runBenchmark(20, 40, 1); + runBenchmark(10, 10, 10); +} +#endif } // namespace } // namespace stellar diff --git a/src/main/Config.cpp b/src/main/Config.cpp index e06ea891ae..0c21617442 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -304,6 +304,10 @@ Config::Config() : NODE_SEED(SecretKey::random()) EMIT_LEDGER_CLOSE_META_EXT_V1 = false; FORCE_OLD_STYLE_LEADER_ELECTION = false; + // This is not configurable for now. It doesn't need to be a network-wide + // setting, but on the other hand there aren't many good values for it and + // it's not clear what the right way to configure it would be, if at all. + SOROBAN_PHASE_STAGE_COUNT = 1; #ifdef BUILD_TESTS TEST_CASES_ENABLED = false; diff --git a/src/main/Config.h b/src/main/Config.h index 62e9c4fea0..67e75c0052 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -752,6 +752,8 @@ class Config : public std::enable_shared_from_this bool EMIT_SOROBAN_TRANSACTION_META_EXT_V1; bool EMIT_LEDGER_CLOSE_META_EXT_V1; + uint32_t SOROBAN_PHASE_STAGE_COUNT; + #ifdef BUILD_TESTS // If set to true, the application will be aware this run is for a test // case. This is used right now in the signal handler to exit() instead of