From 85381f515be5d8cda74baf4843bf4c635c517904 Mon Sep 17 00:00:00 2001
From: Thomas Brady <thomas.brady@stellar.org>
Date: Mon, 25 Nov 2024 11:55:44 -0800
Subject: [PATCH] Live to initentry conversion in lowest level live bucket
 merge

---
 src/bucket/BucketOutputIterator.cpp    | 24 +++++++++
 src/bucket/BucketUtils.h               |  1 +
 src/bucket/LiveBucket.h                |  3 ++
 src/bucket/test/BucketListTests.cpp    | 73 ++++++++++++++++++++++++++
 src/bucket/test/BucketManagerTests.cpp |  4 ++
 5 files changed, 105 insertions(+)

diff --git a/src/bucket/BucketOutputIterator.cpp b/src/bucket/BucketOutputIterator.cpp
index dbc9e7c159..93d8c85d18 100644
--- a/src/bucket/BucketOutputIterator.cpp
+++ b/src/bucket/BucketOutputIterator.cpp
@@ -157,6 +157,30 @@ BucketOutputIterator<BucketT>::put(typename BucketT::EntryT const& e)
         mBuf = std::make_unique<typename BucketT::EntryT>();
     }
 
+    // If BucketT is a live bucket, and this is the lowest level of the
+    // bucketlist, we also want to convert each LIVEENTRY to an INITENTRY.
+    // This is because each level of the bucket list contains only one entry
+    // per key, and per CAP-0020, INITENTRY implies that no entry with
+    // the same ledger key exists in an older bucket. Therefore, all entries
+    // of type LIVEENTRY in the lowest level should be of type INITENTRY.
+    if constexpr (std::is_same_v<BucketT, LiveBucket>)
+    {
+        if (!mKeepTombstoneEntries /* lowest level */ &&
+            e.type() == LIVEENTRY &&
+            protocolVersionStartsFrom(
+                mMeta.ledgerVersion,
+                LiveBucket::
+                    FIRST_PROTOCOL_CONVERTING_BOTTOM_LEVEL_LIVE_TO_INIT))
+        {
+            ++mMergeCounters.mOutputIteratorLiveToInitRewrites;
+            ++mMergeCounters.mOutputIteratorBufferUpdates;
+            auto eCopy = e;
+            eCopy.type(INITENTRY);
+            *mBuf = eCopy;
+            return;
+        }
+    }
+
     // In any case, replace *mBuf with e.
     ++mMergeCounters.mOutputIteratorBufferUpdates;
     *mBuf = e;
diff --git a/src/bucket/BucketUtils.h b/src/bucket/BucketUtils.h
index ebc98f79dd..7e66d6932e 100644
--- a/src/bucket/BucketUtils.h
+++ b/src/bucket/BucketUtils.h
@@ -64,6 +64,7 @@ struct MergeCounters
     uint64_t mOutputIteratorTombstoneElisions{0};
     uint64_t mOutputIteratorBufferUpdates{0};
     uint64_t mOutputIteratorActualWrites{0};
+    uint64_t mOutputIteratorLiveToInitRewrites{0};
     MergeCounters& operator+=(MergeCounters const& delta);
     bool operator==(MergeCounters const& other) const;
 };
diff --git a/src/bucket/LiveBucket.h b/src/bucket/LiveBucket.h
index a34a3d8393..deb7ad5f47 100644
--- a/src/bucket/LiveBucket.h
+++ b/src/bucket/LiveBucket.h
@@ -73,6 +73,9 @@ class LiveBucket : public BucketBase,
             ProtocolVersion::V_11;
     static constexpr ProtocolVersion FIRST_PROTOCOL_SHADOWS_REMOVED =
         ProtocolVersion::V_12;
+    static constexpr ProtocolVersion
+        FIRST_PROTOCOL_CONVERTING_BOTTOM_LEVEL_LIVE_TO_INIT =
+            ProtocolVersion::V_23;
 
     static void checkProtocolLegality(BucketEntry const& entry,
                                       uint32_t protocolVersion);
diff --git a/src/bucket/test/BucketListTests.cpp b/src/bucket/test/BucketListTests.cpp
index 5bb6a71d52..f6baf4dc00 100644
--- a/src/bucket/test/BucketListTests.cpp
+++ b/src/bucket/test/BucketListTests.cpp
@@ -454,6 +454,79 @@ TEST_CASE_VERSIONS("hot archive bucket tombstones expire at bottom level",
     });
 }
 
+TEST_CASE_VERSIONS(
+    "live bucket entries converted to init enties at bottom level",
+    "[bucket][bucketlist]")
+{
+    VirtualClock clock;
+    Config const& cfg = getTestConfig();
+
+    for_versions_with_differing_bucket_logic(cfg, [&](Config const& cfg) {
+        Application::pointer app = createTestApplication(clock, cfg);
+        LiveBucketList bl;
+        BucketManager& bm = app->getBucketManager();
+        auto& mergeTimer = bm.getMergeTimer();
+        CLOG_INFO(Bucket, "Establishing random bucketlist");
+        for (uint32_t i = 0; i < LiveBucketList::kNumLevels; ++i)
+        {
+            auto& level = bl.getLevel(i);
+            level.setCurr(LiveBucket::fresh(
+                bm, getAppLedgerVersion(app), {}, // No init entries.
+                LedgerTestUtils::generateValidUniqueLedgerEntries(8),
+                LedgerTestUtils::generateValidLedgerEntryKeysWithExclusions(
+                    {CONFIG_SETTING}, 5),
+                /*countMergeEvents=*/true, clock.getIOContext(),
+                /*doFsync=*/true));
+            level.setSnap(LiveBucket::fresh(
+                bm, getAppLedgerVersion(app), {},
+                LedgerTestUtils::generateValidUniqueLedgerEntries(8),
+                LedgerTestUtils::generateValidLedgerEntryKeysWithExclusions(
+                    {CONFIG_SETTING}, 5),
+                /*countMergeEvents=*/true, clock.getIOContext(),
+                /*doFsync=*/true));
+        }
+
+        auto countNonBottomLevelEntries = [&] {
+            auto size = 0;
+            for (uint32_t i = 0; i < LiveBucketList::kNumLevels - 1; ++i)
+            {
+                auto& level = bl.getLevel(i);
+                size += countEntries(level.getCurr());
+                size += countEntries(level.getSnap());
+            }
+            return size;
+        };
+
+        auto ledger = 1;
+        // Close ledgers until all entries have merged into the bottom level
+        // bucket
+        while (countNonBottomLevelEntries() != 0)
+        {
+            bl.addBatch(*app, ledger, getAppLedgerVersion(app), {}, {}, {});
+            ++ledger;
+        }
+
+        auto bottomCurr = bl.getLevel(LiveBucketList::kNumLevels - 1).getCurr();
+        EntryCounts<LiveBucket> e(bottomCurr);
+
+        if (protocolVersionStartsFrom(
+                cfg.LEDGER_PROTOCOL_VERSION,
+                LiveBucket::
+                    FIRST_PROTOCOL_CONVERTING_BOTTOM_LEVEL_LIVE_TO_INIT))
+        {
+            // Assert that init entries are converted to live entries
+            // at the lowest level.
+            REQUIRE(e.nLive == 0);
+            REQUIRE(e.nInitOrArchived != 0);
+        }
+        else
+        {
+            REQUIRE(e.nLive != 0);
+            REQUIRE(e.nInitOrArchived == 0);
+        }
+    });
+}
+
 TEST_CASE_VERSIONS("live bucket tombstones expire at bottom level",
                    "[bucket][bucketlist][tombstones]")
 {
diff --git a/src/bucket/test/BucketManagerTests.cpp b/src/bucket/test/BucketManagerTests.cpp
index fc5390653f..bb017c06f3 100644
--- a/src/bucket/test/BucketManagerTests.cpp
+++ b/src/bucket/test/BucketManagerTests.cpp
@@ -760,6 +760,8 @@ class StopAndRestartBucketMergesTest
                       mMergeCounters.mDeadEntryShadowElisions);
             CLOG_INFO(Bucket, "OutputIteratorTombstoneElisions: {}",
                       mMergeCounters.mOutputIteratorTombstoneElisions);
+            CLOG_INFO(Bucket, "OutputIteratorLiveToInitConversions: {}",
+                      mMergeCounters.mOutputIteratorLiveToInitRewrites);
             CLOG_INFO(Bucket, "OutputIteratorBufferUpdates: {}",
                       mMergeCounters.mOutputIteratorBufferUpdates);
             CLOG_INFO(Bucket, "OutputIteratorActualWrites: {}",
@@ -915,6 +917,8 @@ class StopAndRestartBucketMergesTest
 
             CHECK(mMergeCounters.mOutputIteratorTombstoneElisions ==
                   other.mMergeCounters.mOutputIteratorTombstoneElisions);
+            CHECK(mMergeCounters.mOutputIteratorLiveToInitRewrites ==
+                  other.mMergeCounters.mOutputIteratorLiveToInitRewrites);
             CHECK(mMergeCounters.mOutputIteratorBufferUpdates ==
                   other.mMergeCounters.mOutputIteratorBufferUpdates);
             CHECK(mMergeCounters.mOutputIteratorActualWrites ==