Skip to content

Commit e321e2b

Browse files
authored
Merge pull request #29576 from andrwng/lsm-refresh
lsm: add refresh to read-only database
2 parents aedbc37 + d59a772 commit e321e2b

File tree

8 files changed

+236
-27
lines changed

8 files changed

+236
-27
lines changed

src/v/lsm/db/impl.cc

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ ss::future<lookup_result> impl::get(internal::key_view key) {
155155
auto current = _versions->current();
156156
version::get_stats stats{};
157157
auto result = co_await current->get(key, &stats);
158-
if (current->update_stats(stats)) {
158+
if (!_opts->readonly && current->update_stats(stats)) {
159159
maybe_schedule_compaction();
160160
}
161161
co_return result;
@@ -177,7 +177,7 @@ impl::create_iterator(iterator_options opts) {
177177
[this](internal::key_view key) {
178178
return _versions->current()->record_read_sample(key).then(
179179
[this](bool compaction_needed) {
180-
if (compaction_needed) {
180+
if (!_opts->readonly && compaction_needed) {
181181
maybe_schedule_compaction();
182182
}
183183
});
@@ -237,6 +237,14 @@ ss::future<> impl::flush() {
237237
return impl::flush(ssx::instant::infinite_future());
238238
}
239239

240+
ss::future<bool> impl::refresh() {
241+
if (!_opts->readonly) {
242+
throw invalid_argument_exception(
243+
"refresh() can only be called on a read-only database");
244+
}
245+
co_return co_await _versions->refresh();
246+
}
247+
240248
ss::future<> impl::close() {
241249
vlog(log.trace, "close_start");
242250
_as.request_abort_ex(abort_requested_exception("database closing"));
@@ -274,7 +282,7 @@ ss::future<> impl::recover() {
274282
}
275283

276284
void impl::maybe_schedule_compaction() {
277-
if (_as.abort_requested()) {
285+
if (_as.abort_requested() || _opts->readonly) {
278286
return;
279287
}
280288
if (!_flush_task && _imm) {

src/v/lsm/db/impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ class impl {
8888
// Flush with no deadline.
8989
ss::future<> flush();
9090

91+
// Reload the manifest from disk, adding a new version to the version set.
92+
// Only valid if in read-only mode.
93+
ss::future<bool> refresh();
94+
9195
// Close the database, no more operations should happen to the database at
9296
// this point.
9397
//

src/v/lsm/db/tests/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ redpanda_cc_gtest(
166166
deps = [
167167
"//src/v/base",
168168
"//src/v/bytes:iobuf",
169+
"//src/v/lsm/core:exceptions",
169170
"//src/v/lsm/core/internal:keys",
170171
"//src/v/lsm/core/internal:options",
171172
"//src/v/lsm/db:impl",

src/v/lsm/db/tests/impl_test.cc

Lines changed: 170 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
*/
1111

1212
#include "gtest/gtest.h"
13+
#include "lsm/core/exceptions.h"
1314
#include "lsm/core/internal/keys.h"
1415
#include "lsm/core/internal/options.h"
1516
#include "lsm/db/impl.h"
@@ -126,7 +127,17 @@ class ImplTest : public testing::Test {
126127
void SetUp() override {
127128
// Make a smaller sized database so we get some actual leveling
128129
// happening.
129-
_options = ss::make_lw_shared<lsm::internal::options>({
130+
_options = make_options();
131+
_underlying_data_persistence = lsm::io::make_memory_data_persistence();
132+
_meta_persistence = lsm::io::make_memory_metadata_persistence(
133+
&_meta_persistence_controller);
134+
_tracking_data = std::make_unique<tracking_data_persistence>(
135+
_underlying_data_persistence.get());
136+
open();
137+
}
138+
139+
ss::lw_shared_ptr<lsm::internal::options> make_options() {
140+
return ss::make_lw_shared<lsm::internal::options>({
130141
.levels = lsm::internal::options::make_levels(
131142
{.max_total_bytes = 1_MiB, .max_file_size = 256_KiB},
132143
/*multiplier=*/2,
@@ -136,22 +147,22 @@ class ImplTest : public testing::Test {
136147
.write_buffer_size = 256_KiB,
137148
.level_one_compaction_trigger = 2,
138149
});
139-
_underlying_data_persistence = lsm::io::make_memory_data_persistence();
140-
_meta_persistence = lsm::io::make_memory_metadata_persistence(
141-
&_meta_persistence_controller);
142-
_tracking_data = std::make_unique<tracking_data_persistence>(
143-
_underlying_data_persistence.get());
144-
open();
145150
}
146151

147152
void TearDown() override {
148153
_db->close().get();
154+
for (auto& db : _other_dbs) {
155+
db->close().get();
156+
}
149157
_underlying_data_persistence->close().get();
150158
_meta_persistence->close().get();
151159
_shadow.clear();
152160
}
153161

154-
void write_at_least(size_t size) {
162+
void write_at_least(size_t size, lsm::db::impl* db = nullptr) {
163+
if (!db) {
164+
db = _db.get();
165+
}
155166
auto batch = ss::make_lw_shared<lsm::db::memtable>();
156167
decltype(_shadow) shadow_batch;
157168
auto seqno = _db->max_applied_seqno().value_or(0_seqno);
@@ -168,15 +179,15 @@ class ImplTest : public testing::Test {
168179
shadow_entry(value.share(), key.seqno()));
169180
batch->put(key, value.share());
170181
}
171-
_db->apply(std::move(batch)).get();
182+
db->apply(std::move(batch)).get();
172183
// Only apply writes if db write was a success
173184
for (auto& [k, v] : shadow_batch) {
174185
_shadow.insert_or_assign(k, std::move(v));
175186
}
176187
}
177188

178-
testing::AssertionResult matches_shadow() {
179-
return matches_shadow(_shadow, nullptr);
189+
testing::AssertionResult matches_shadow(lsm::db::impl* db = nullptr) {
190+
return matches_shadow(_shadow, nullptr, db);
180191
}
181192

182193
shadow_map clone_shadow_map() {
@@ -187,9 +198,14 @@ class ImplTest : public testing::Test {
187198
return s;
188199
}
189200

190-
testing::AssertionResult
191-
matches_shadow(const shadow_map& shadow, lsm::db::snapshot* snapshot) {
192-
auto iter = _db->create_iterator({.snapshot = snapshot}).get();
201+
testing::AssertionResult matches_shadow(
202+
const shadow_map& shadow,
203+
lsm::db::snapshot* snapshot,
204+
lsm::db::impl* db = nullptr) {
205+
if (!db) {
206+
db = _db.get();
207+
}
208+
auto iter = db->create_iterator({.snapshot = snapshot}).get();
193209
auto it = shadow.begin();
194210
std::vector<std::string> errors;
195211
for (iter->seek_to_first().get(); iter->valid(); iter->next().get()) {
@@ -232,16 +248,24 @@ class ImplTest : public testing::Test {
232248
_db->close().get();
233249
_tracking_data->reset_tracking();
234250
}
235-
void open() {
236-
_db = lsm::db::impl::open(
237-
_options,
238-
{
239-
.data = std::make_unique<proxy_data_persistence>(
240-
_tracking_data.get()),
241-
.metadata = std::make_unique<proxy_metadata_persistence>(
242-
_meta_persistence.get()),
243-
})
244-
.get();
251+
void open() { _db = do_open(_options); }
252+
253+
lsm::db::impl* open(ss::lw_shared_ptr<lsm::internal::options> opts) {
254+
_other_dbs.emplace_back(do_open(std::move(opts)));
255+
return _other_dbs.back().get();
256+
}
257+
258+
std::unique_ptr<lsm::db::impl>
259+
do_open(ss::lw_shared_ptr<lsm::internal::options> opts) {
260+
return lsm::db::impl::open(
261+
opts,
262+
{
263+
.data = std::make_unique<proxy_data_persistence>(
264+
_tracking_data.get()),
265+
.metadata = std::make_unique<proxy_metadata_persistence>(
266+
_meta_persistence.get()),
267+
})
268+
.get();
245269
}
246270

247271
auto max_applied_seqno() { return _db->max_applied_seqno(); }
@@ -268,6 +292,7 @@ class ImplTest : public testing::Test {
268292
std::unique_ptr<lsm::io::metadata_persistence> _meta_persistence;
269293
std::unique_ptr<tracking_data_persistence> _tracking_data;
270294
std::unique_ptr<lsm::db::impl> _db;
295+
std::vector<std::unique_ptr<lsm::db::impl>> _other_dbs;
271296
};
272297

273298
TEST_F(ImplTest, MemtableIsFlushed) {
@@ -527,4 +552,125 @@ TEST_F(ImplTest, GetFindsKeysWithDifferentSeqno) {
527552
EXPECT_FALSE(result.is_missing());
528553
}
529554

555+
TEST_F(ImplTest, RefreshOnWritableDbThrows) {
556+
EXPECT_THROW(_db->refresh().get(), lsm::invalid_argument_exception);
557+
}
558+
559+
TEST_F(ImplTest, RefreshEmpty) {
560+
auto read_opts = make_options();
561+
read_opts->readonly = true;
562+
563+
// Sanity check that the read-only database sees nothing.
564+
auto* read_db = open(read_opts);
565+
EXPECT_TRUE(matches_shadow(read_db));
566+
567+
// Refresh shouldn't have issues with the persistence being empty.
568+
read_db->refresh().get();
569+
EXPECT_TRUE(matches_shadow(read_db));
570+
EXPECT_FALSE(read_db->max_applied_seqno().has_value());
571+
}
572+
573+
TEST_F(ImplTest, RefreshNoOp) {
574+
write_at_least(512_KiB);
575+
_db->flush().get();
576+
577+
auto read_opts = make_options();
578+
read_opts->readonly = true;
579+
auto* read_db = open(read_opts);
580+
EXPECT_TRUE(matches_shadow(read_db));
581+
582+
auto max_persisted = _db->max_persisted_seqno();
583+
write_at_least(512_KiB);
584+
585+
// Without flushing, refreshing should be a no-op.
586+
EXPECT_FALSE(read_db->refresh().get());
587+
EXPECT_LT(read_db->max_applied_seqno(), _db->max_applied_seqno());
588+
EXPECT_EQ(read_db->max_persisted_seqno(), max_persisted);
589+
}
590+
591+
TEST_F(ImplTest, RefreshSeesChanges) {
592+
write_at_least(512_KiB);
593+
_db->flush().get();
594+
write_at_least(512_KiB);
595+
_db->flush().get();
596+
597+
auto read_opts = make_options();
598+
read_opts->readonly = true;
599+
auto* read_db = open(read_opts);
600+
EXPECT_TRUE(matches_shadow(read_db));
601+
EXPECT_EQ(read_db->max_applied_seqno(), _db->max_applied_seqno());
602+
EXPECT_EQ(read_db->max_persisted_seqno(), _db->max_persisted_seqno());
603+
604+
// Write new data.
605+
write_at_least(512_KiB);
606+
EXPECT_FALSE(matches_shadow(read_db));
607+
608+
// Even when flushing the data shouldn't be visible.
609+
_db->flush().get();
610+
EXPECT_FALSE(matches_shadow(read_db));
611+
EXPECT_NE(read_db->max_applied_seqno(), _db->max_applied_seqno());
612+
EXPECT_NE(read_db->max_persisted_seqno(), _db->max_persisted_seqno());
613+
614+
// But once we refresh it should match.
615+
EXPECT_TRUE(read_db->refresh().get());
616+
EXPECT_TRUE(matches_shadow(read_db));
617+
EXPECT_EQ(read_db->max_applied_seqno(), _db->max_applied_seqno());
618+
EXPECT_EQ(read_db->max_persisted_seqno(), _db->max_persisted_seqno());
619+
}
620+
621+
TEST_F(ImplTest, RefreshWithSnapshot) {
622+
write_at_least(512_KiB);
623+
_db->flush().get();
624+
625+
auto read_opts = make_options();
626+
read_opts->readonly = true;
627+
auto* read_db = open(read_opts);
628+
EXPECT_TRUE(matches_shadow(read_db));
629+
630+
auto snap = read_db->create_snapshot();
631+
auto shadow = clone_shadow_map();
632+
EXPECT_TRUE(matches_shadow(shadow, snap->get(), read_db));
633+
634+
write_at_least(512_KiB);
635+
_db->flush().get();
636+
637+
// We should only match one we refresh.
638+
EXPECT_FALSE(matches_shadow(read_db));
639+
EXPECT_TRUE(read_db->refresh().get());
640+
EXPECT_TRUE(matches_shadow(read_db));
641+
642+
// The snapshot should still match.
643+
EXPECT_TRUE(matches_shadow(shadow, snap->get(), read_db));
644+
}
645+
646+
TEST_F(ImplTest, RefreshFailsForLowerSeqno) {
647+
write_at_least(512_KiB);
648+
_db->flush().get();
649+
650+
// Nefarious case: open another database so it gets a low seqno and we can
651+
// flush a seqno lower than the refreshed database below.
652+
auto* another_db = open(make_options());
653+
654+
// Flush a couple more times to bump the seqno.
655+
write_at_least(512_KiB);
656+
_db->flush().get();
657+
write_at_least(512_KiB);
658+
_db->flush().get();
659+
660+
// Open a read-only database.
661+
auto read_opts = make_options();
662+
read_opts->readonly = true;
663+
auto* read_db = open(read_opts);
664+
665+
// Write some data to the other database and flush, lowering the sequence
666+
// number.
667+
write_at_least(512_KiB, another_db);
668+
another_db->flush().get();
669+
670+
// Refreshing shouldn't work.
671+
auto before_refresh = read_db->max_applied_seqno();
672+
EXPECT_ANY_THROW(read_db->refresh().get());
673+
EXPECT_EQ(read_db->max_applied_seqno(), before_refresh);
674+
}
675+
530676
} // namespace

src/v/lsm/db/version_set.cc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,42 @@ ss::future<> version_set::recover() {
564564
_last_seqno = m->last_seqno;
565565
}
566566

567+
ss::future<bool> version_set::refresh() {
568+
auto m = co_await read_manifest();
569+
if (!m) {
570+
co_return false;
571+
}
572+
573+
if (m->next_file_id < _next_file_id) {
574+
throw corruption_exception(
575+
"manifest next_file_id {} is less than current {}",
576+
m->next_file_id(),
577+
_next_file_id());
578+
}
579+
if (_last_seqno.has_value() && m->last_seqno < _last_seqno.value()) {
580+
throw corruption_exception(
581+
"manifest last_seqno {} is less than current {}",
582+
m->last_seqno(),
583+
_last_seqno.value()());
584+
}
585+
586+
if (m->next_file_id == _next_file_id && m->last_seqno == _last_seqno) {
587+
co_return false;
588+
}
589+
590+
finalize(m->version.get());
591+
set_current(std::move(m->version));
592+
_next_file_id = m->next_file_id;
593+
_last_seqno = m->last_seqno;
594+
co_return true;
595+
}
596+
567597
void version_set::finalize(version* v) {
598+
if (_options->readonly) {
599+
// No need to compute any compaction states, as we won't run compaction
600+
// in read-only mode.
601+
return;
602+
}
568603
// Precompute the best level for the next compaction
569604
internal::level best_level = 0_level;
570605
double best_score = static_cast<double>(v->_files[0_level].size())

src/v/lsm/db/version_set.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ class version_set : public file_id_allocator {
162162
// layer.
163163
ss::future<> recover();
164164

165+
// Reload the manifest from disk. Returns true if the manifest was updated,
166+
// false if no change. Throws if the manifest would regress state.
167+
ss::future<bool> refresh();
168+
165169
// The latest seqno applied to the LSM tree.
166170
std::optional<internal::sequence_number> last_seqno() const {
167171
return _last_seqno;

src/v/lsm/lsm.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ snapshot database::create_snapshot() {
217217

218218
write_batch database::create_write_batch() { return write_batch{_impl.get()}; }
219219

220+
ss::future<bool> database::refresh() { return _impl->refresh(); }
221+
220222
write_batch::write_batch(db::impl* db)
221223
: _batch(ss::make_lw_shared<db::memtable>())
222224
, _db(db) {}

0 commit comments

Comments
 (0)