Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Path Payment Memoization POC #4644

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/ledger/LedgerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ class LedgerCloseData;
class Database;
class SorobanMetrics;

// Maps the entire path hash to a map of {sendValue, minimumFailedRecvValue}
// Essentially, for the given path hash, we check all memos from TXs that have
// an equal or greater sendValue. If one of these asked to receive less than the
// current op and failed, we can guarantee the op will also fail.
using PathPaymentStrictSendMap = UnorderedMap<Hash, std::map<int64_t, int64_t>>;

// Change map definition to use unordered_map with asset pair as key
using AssetToPathsMap =
UnorderedMap<AssetPair, std::vector<Hash>, AssetPairHash>;

/**
* LedgerManager maintains, in memory, a logical pair of ledgers:
*
Expand Down Expand Up @@ -206,5 +216,25 @@ class LedgerManager
}

virtual bool isApplying() const = 0;

// Clear the path payment strict send failure cache
virtual void clearPathPaymentStrictSendCache() = 0;

// Cache a failed path payment strict send attempt
virtual void cachePathPaymentStrictSendFailure(
Hash const& pathHash, int64_t sendAmount, int64_t receiveAmount,
Asset const& source, std::vector<Asset> const& assets) = 0;

// Get cached failures for a path, or end iterator if not found
virtual PathPaymentStrictSendMap::const_iterator
getPathPaymentStrictSendCache(Hash const& pathHash) const = 0;

// Get the end iterator for the path payment cache
virtual PathPaymentStrictSendMap::const_iterator
getPathPaymentStrictSendCacheEnd() const = 0;

// Invalidate paths containing the given asset pair (in that order)
virtual void
invalidatePathPaymentCachesForAssetPair(AssetPair const& pair) = 0;
};
}
100 changes: 100 additions & 0 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ LedgerManagerImpl::LedgerManagerImpl(Application& app)

{
setupLedgerCloseMetaStream();

mPathPaymentStrictSendFailureCache.reserve(1000);
mAssetToPaths.reserve(1000);
}

void
Expand Down Expand Up @@ -1017,6 +1020,8 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData,
emitNextMeta();
}

// releaseAssertOrThrow(ledgerSeq != 53514768);

// The next 7 steps happen in a relatively non-obvious, subtle order.
// This is unfortunate and it would be nice if we could make it not
// be so subtle, but for the time being this is where we are.
Expand Down Expand Up @@ -1113,7 +1118,11 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData,
CLOG_DEBUG(Perf, "Applied ledger {} in {} seconds", ledgerSeq,
ledgerTimeSeconds.count());
FrameMark;

// Clear the path payment cache at the start of processing a new ledger
clearPathPaymentStrictSendCache();
}

void
LedgerManagerImpl::deleteOldEntries(Database& db, uint32_t ledgerSeq,
uint32_t count)
Expand Down Expand Up @@ -1874,4 +1883,95 @@ LedgerManagerImpl::ledgerClosed(

return res;
}

void
LedgerManagerImpl::clearPathPaymentStrictSendCache()
{
mPathPaymentStrictSendFailureCache.clear();
mAssetToPaths.clear();
}

void
LedgerManagerImpl::cachePathPaymentStrictSendFailure(
Hash const& pathHash, int64_t sendAmount, int64_t receiveAmount,
Asset const& source, std::vector<Asset> const& assets)
{
ZoneScoped;
auto iter = mPathPaymentStrictSendFailureCache.find(pathHash);
if (iter == mPathPaymentStrictSendFailureCache.end())
{
// If path hash does not exist, populate
auto val = std::map<int64_t, int64_t>();
val[sendAmount] = receiveAmount;
mPathPaymentStrictSendFailureCache.emplace(pathHash, std::move(val));
}
else
{
// If hash path exists, but this send amount does not exist, insert the
// new send amount
auto& sendAmountToMinReceiveAmount = iter->second;
if (auto sendToRecIter = sendAmountToMinReceiveAmount.find(sendAmount);
sendToRecIter == sendAmountToMinReceiveAmount.end())
{
sendAmountToMinReceiveAmount[sendAmount] = receiveAmount;
}
else
{
// Else, update receive amount if it's lower than the current
// minimum for the given send amount
if (sendToRecIter->second > receiveAmount)
{
sendToRecIter->second = receiveAmount;
}
}
}

auto insert = [&](AssetPair const& pair) {
auto iter = mAssetToPaths.find(pair);
if (iter == mAssetToPaths.end())
{
mAssetToPaths.emplace(pair, std::vector<Hash>{pathHash});
}
else
{
iter->second.push_back(pathHash);
}
};

// Convert path into buy-sell pairs
insert(AssetPair{assets[0], source});
for (size_t i = 0; i < assets.size() - 1; i++)
{
insert(AssetPair{assets[i + 1], assets[i]});
}
}

PathPaymentStrictSendMap::const_iterator
LedgerManagerImpl::getPathPaymentStrictSendCache(Hash const& pathHash) const
{
return mPathPaymentStrictSendFailureCache.find(pathHash);
}

PathPaymentStrictSendMap::const_iterator
LedgerManagerImpl::getPathPaymentStrictSendCacheEnd() const
{
return mPathPaymentStrictSendFailureCache.end();
}

void
LedgerManagerImpl::invalidatePathPaymentCachesForAssetPair(
AssetPair const& pair)
{
ZoneScoped;

auto it = mAssetToPaths.find(pair);
if (it != mAssetToPaths.end())
{
for (auto const& pathHash : it->second)
{
mPathPaymentStrictSendFailureCache.erase(pathHash);
}
mAssetToPaths.erase(it);
}
}
}
20 changes: 20 additions & 0 deletions src/ledger/LedgerManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ class LedgerManagerImpl : public LedgerManager
// is currently closing a ledger or has ledgers queued to apply.
bool mCurrentlyApplyingLedger{false};

// Cache for path payment strict send operations
// Hash(subPath) -> (sendAmount -> set of receiveAmounts)
PathPaymentStrictSendMap mPathPaymentStrictSendFailureCache;

// Maps assets to the paths that contain them
AssetToPathsMap mAssetToPaths;

static std::vector<MutableTxResultPtr> processFeesSeqNums(
ApplicableTxSetFrame const& txSet, AbstractLedgerTxn& ltxOuter,
std::unique_ptr<LedgerCloseMetaFrame> const& ledgerCloseMeta,
Expand Down Expand Up @@ -251,5 +258,18 @@ class LedgerManagerImpl : public LedgerManager
{
return mCurrentlyApplyingLedger;
}

void clearPathPaymentStrictSendCache() override;
void cachePathPaymentStrictSendFailure(
Hash const& pathHash, int64_t sendAmount, int64_t receiveAmount,
Asset const& source, std::vector<Asset> const& assets) override;
PathPaymentStrictSendMap::const_iterator
getPathPaymentStrictSendCache(Hash const& pathHash) const override;

PathPaymentStrictSendMap::const_iterator
getPathPaymentStrictSendCacheEnd() const override;

void
invalidatePathPaymentCachesForAssetPair(AssetPair const& pair) override;
};
}
1 change: 1 addition & 0 deletions src/ledger/LedgerTxn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ LedgerTxn::commit() noexcept
void
LedgerTxn::Impl::commit() noexcept
{
ZoneNamedN(commitZone, "child_commit", true);
maybeUpdateLastModifiedThenInvokeThenSeal([&](EntryMap const& entries) {
// getEntryIterator has the strong exception safety guarantee
// commitChild has the strong exception safety guarantee
Expand Down
4 changes: 4 additions & 0 deletions src/transactions/LiquidityPoolDepositOpFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ LiquidityPoolDepositOpFrame::doApply(
throw std::runtime_error("insufficient liquidity pool limit");
}

app.getLedgerManager().invalidatePathPaymentCachesForAssetPair(
AssetPair{cpp().assetA, cpp().assetB});
app.getLedgerManager().invalidatePathPaymentCachesForAssetPair(
AssetPair{cpp().assetB, cpp().assetA});
innerResult(res).code(LIQUIDITY_POOL_DEPOSIT_SUCCESS);
return true;
}
Expand Down
5 changes: 5 additions & 0 deletions src/transactions/LiquidityPoolWithdrawOpFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ LiquidityPoolWithdrawOpFrame::doApply(
throw std::runtime_error("insufficient reserveB");
}

app.getLedgerManager().invalidatePathPaymentCachesForAssetPair(AssetPair{
constantProduct().params.assetA, constantProduct().params.assetB});
app.getLedgerManager().invalidatePathPaymentCachesForAssetPair(AssetPair{
constantProduct().params.assetB, constantProduct().params.assetA});

innerResult(res).code(LIQUIDITY_POOL_WITHDRAW_SUCCESS);
return true;
}
Expand Down
20 changes: 20 additions & 0 deletions src/transactions/ManageOfferOpFrameBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ ManageOfferOpFrameBase::doApply(
uint32_t flags = 0;
LedgerEntry::_ext_t extension;

Price oldSellSheepOfferPrice(Price(0, 0));
int64_t oldSheepAmount = 0;

if (mOfferID)
{ // modifying an old offer
auto sellSheepOffer = stellar::loadOffer(ltx, getSourceID(), mOfferID);
Expand All @@ -244,6 +247,9 @@ ManageOfferOpFrameBase::doApply(
return false;
}

oldSellSheepOfferPrice = sellSheepOffer.current().data.offer().price;
oldSheepAmount = sellSheepOffer.current().data.offer().amount;

// We are releasing the liabilites associated with this offer. This is
// required in order to produce available balance for the offer to be
// executed. Both trust lines must be reset since it is possible that
Expand Down Expand Up @@ -539,6 +545,20 @@ ManageOfferOpFrameBase::doApply(
}

ltx.commit();

// Deleting an offer does not invalidate any cached fails
if (!isDeleteOffer())
{
// Unfortunately, due to rounding errors we need to invalidate both side
// of the offer. Sometimes, even if an offer gets "worse", (like if the
// amount offer decreases), different rounding behavior can actually
// cause the "worse" offer to now favor the taker.
app.getLedgerManager().invalidatePathPaymentCachesForAssetPair(
AssetPair{mSheep, mWheat});
app.getLedgerManager().invalidatePathPaymentCachesForAssetPair(
AssetPair{mWheat, mSheep});
}

return true;
}

Expand Down
42 changes: 37 additions & 5 deletions src/transactions/OfferExchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ int64_t
canSellAtMost(LedgerTxnHeader const& header, LedgerTxnEntry const& account,
Asset const& asset, TrustLineWrapper const& trustLine)
{
ZoneScoped;
if (asset.type() == ASSET_TYPE_NATIVE)
{
// can only send above the minimum balance
Expand All @@ -73,6 +74,7 @@ int64_t
canSellAtMost(LedgerTxnHeader const& header, ConstLedgerTxnEntry const& account,
Asset const& asset, ConstTrustLineWrapper const& trustLine)
{
ZoneScoped;
if (asset.type() == ASSET_TYPE_NATIVE)
{
// can only send above the minimum balance
Expand All @@ -91,6 +93,7 @@ int64_t
canBuyAtMost(LedgerTxnHeader const& header, LedgerTxnEntry const& account,
Asset const& asset, TrustLineWrapper const& trustLine)
{
ZoneScoped;
if (asset.type() == ASSET_TYPE_NATIVE)
{
return std::max({getMaxAmountReceive(header, account), int64_t(0)});
Expand All @@ -107,6 +110,7 @@ int64_t
canBuyAtMost(LedgerTxnHeader const& header, ConstLedgerTxnEntry const& account,
Asset const& asset, ConstTrustLineWrapper const& trustLine)
{
ZoneScoped;
if (asset.type() == ASSET_TYPE_NATIVE)
{
return std::max({getMaxAmountReceive(header, account), int64_t(0)});
Expand Down Expand Up @@ -786,6 +790,7 @@ adjustOffer(LedgerTxnHeader const& header, LedgerTxnEntry& offer,
TrustLineWrapper const& wheatLine, Asset const& sheep,
TrustLineWrapper const& sheepLine)
{
ZoneScoped;
OfferEntry& oe = offer.current().data.offer();
int64_t maxWheatSend =
std::min({oe.amount, canSellAtMost(header, account, wheat, wheatLine)});
Expand Down Expand Up @@ -1206,8 +1211,18 @@ crossOfferV10(AbstractLedgerTxn& ltx, LedgerTxnEntry& sellingWheatOffer,
auto res = (offer.amount == 0) ? CrossOfferResult::eOfferTaken
: CrossOfferResult::eOfferPartial;
{
ZoneNamedN(commitZone, "offer_commit_parent", true);
// Construct == 238 MS
LedgerTxn ltxInner(ltx);

ZoneNamedN(constructZone, "offer_ltx_construct_end", true);

// Copy header == 21 ms
header = ltxInner.loadHeader();

ZoneNamedN(loadZone, "offer_ltx_header_end", true);

// Body: 457 MS
sellingWheatOffer = loadOffer(ltxInner, accountBID, offerID);
if (res == CrossOfferResult::eOfferTaken)
{
Expand All @@ -1220,16 +1235,25 @@ crossOfferV10(AbstractLedgerTxn& ltx, LedgerTxnEntry& sellingWheatOffer,
{
acquireLiabilities(ltxInner, header, sellingWheatOffer);
}
ltxInner.commit();

{
// Commit == 257 MS
ZoneNamedN(commitZone, "offer_commit_actual", true);
ltxInner.commit();
}
}

// Note: The previous block creates a nested LedgerTxn so all entries are
// deactivated at this point. Specifically, you cannot use sellingWheatOffer
// or offer (which is a reference) since it is not active (and may have been
// erased) at this point.
offerTrail.emplace_back(
makeClaimAtom(ltx.loadHeader().current().ledgerVersion, accountBID,
offerID, wheat, numWheatReceived, sheep, numSheepSend));
{

ZoneNamedN(claimAtomZone, "offer_trail", true);
offerTrail.emplace_back(makeClaimAtom(
ltx.loadHeader().current().ledgerVersion, accountBID, offerID,
wheat, numWheatReceived, sheep, numSheepSend));
}
return res;
}

Expand Down Expand Up @@ -1513,7 +1537,11 @@ convertWithOffers(

while (needMore)
{
ZoneNamedN(offerStart, "convert_with_offers_start", true);
LedgerTxn ltx(ltxOuter);

ZoneNamedN(convertOffersConstruct, "convert_with_offers_post_construct",
true);
auto wheatOffer = ltx.loadBestOffer(sheep, wheat);
if (!wheatOffer)
{
Expand Down Expand Up @@ -1583,7 +1611,11 @@ convertWithOffers(
{
return ConvertResult::ePartial;
}
ltx.commit();

{
ZoneNamedN(commitZone, "convert_with_offers_commit", true);
ltx.commit();
}

sheepSend += numSheepSend;
maxSheepSend -= numSheepSend;
Expand Down
Loading
Loading