From cdbeb2f6c23d2e2310f902b41ac9b79b7bd6660d Mon Sep 17 00:00:00 2001 From: adiholden Date: Sun, 21 Jan 2024 11:01:03 +0200 Subject: [PATCH] fix(tiering): update iterator after await throttle (#2440) * fix(tiering): update iterator after await throttle Signed-off-by: adi_holden --- src/server/string_family.cc | 2 +- src/server/tiered_storage.cc | 60 +++++++++++++++++++++++------------- src/server/tiered_storage.h | 7 ++++- 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 257443b4bc87..59ccff5e14aa 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -626,7 +626,7 @@ OpResult> SetCmd::Set(const SetParams& params, string_view key, TieredStorage::EligibleForOffload(value)) { // external storage enabled. // TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid // afterwards. handle this - shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it); + shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it, key); } if (manual_journal_ && op_args_.shard->journal()) { diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 4fb583a7bb33..c1db615876d8 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -438,7 +438,7 @@ PrimeIterator TieredStorage::Load(DbIndex db_index, PrimeIterator it, string_vie return it; } -error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) { +error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it, string_view key) { CHECK_EQ(OBJ_STRING, it->second.ObjType()); DCHECK(!it->second.IsExternal()); DCHECK(!it->second.HasIoPending()); @@ -453,24 +453,12 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) { db_arr_[db_index] = new PerDb; } - unsigned max_pending_writes = GetFlag(FLAGS_tiered_storage_max_pending_writes); - unsigned throttle_usec = GetFlag(FLAGS_tiered_storage_throttle_us); - if (num_active_requests_ >= max_pending_writes && throttle_usec > 0) { - chrono::steady_clock::time_point next = - chrono::steady_clock::now() + chrono::microseconds(throttle_usec); - stats_.throttled_write_cnt++; - - // TODO: we should reset `it` in case concurrent inserts invalidated the iterator. - // This can happen because we operate on the table from differrent fibers - // (for example, due to inline operations or evictions). - throttle_ec_.await_until([&]() { return num_active_requests_ < max_pending_writes; }, next); - } - if (blob_len > kMaxSmallBin) { - if (num_active_requests_ < max_pending_writes) { - WriteSingle(db_index, it, blob_len); + auto [schedule, res_it] = CanScheduleOffload(db_index, it, key); + if (schedule) { + WriteSingle(db_index, res_it, blob_len); } else { - VLOG(2) << "Skip WriteSingle for: " << it->first.ToString(); + VLOG(2) << "Skip WriteSingle for: " << key; } return error_code{}; } @@ -487,7 +475,7 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) { // TODO: we need to track in stats all the cases where we omit offloading attempt. CHECK_LT(bin_record.pending_entries.size(), max_entries); - VLOG(2) << "ScheduleOffload:" << it->first.ToString(); + VLOG(2) << "ScheduleOffload:" << key; bin_record.pending_entries.insert(it->first.AsRef()); it->second.SetIoPending(true); @@ -495,7 +483,9 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) { return error_code{}; // gather more. bool flush_succeeded = false; - if (num_active_requests_ < max_pending_writes) { + + auto [schedule, res_it] = CanScheduleOffload(db_index, it, key); + if (schedule) { flush_succeeded = FlushPending(db_index, bin_index); // if we reached high utilization of the file range - try to grow the file. @@ -505,10 +495,10 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) { } if (!flush_succeeded) { - VLOG(2) << "flush failed remove entry: " << it->first.ToString(); + VLOG(2) << "flush failed remove entry: " << key; // we could not flush because I/O is saturated, so lets remove the last item. - bin_record.pending_entries.erase(it->first.AsRef()); - it->second.SetIoPending(false); + bin_record.pending_entries.erase(res_it->first.AsRef()); + res_it->second.SetIoPending(false); ++stats_.flush_skip_cnt; } @@ -631,6 +621,32 @@ void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_ ++stats_.tiered_writes; } +std::pair TieredStorage::CanScheduleOffload(DbIndex db_index, PrimeIterator it, + string_view key) { + unsigned max_pending_writes = GetFlag(FLAGS_tiered_storage_max_pending_writes); + unsigned throttle_usec = GetFlag(FLAGS_tiered_storage_throttle_us); + PrimeIterator res_it = it; + if (num_active_requests_ >= max_pending_writes && throttle_usec > 0) { + chrono::steady_clock::time_point next = + chrono::steady_clock::now() + chrono::microseconds(throttle_usec); + stats_.throttled_write_cnt++; + + throttle_ec_.await_until([&]() { return num_active_requests_ < max_pending_writes; }, next); + + PrimeTable* pt = db_slice_.GetTables(db_index).first; + if (!it.IsOccupied() || it->first != key) { + res_it = pt->Find(key); + // During the database write flow, when offloading a value, we acquire a write lock on the + // key. No other operations are allowed to modify or remove the key until the lock is + // released. + CHECK(!res_it.is_done()); + VLOG(1) << "Update iterator after await"; + } + } + + return std::make_pair((num_active_requests_ < max_pending_writes), res_it); +} + bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) { PerDb* db = db_arr_[db_index]; diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 9ee33a6e320e..42bef9f9dd50 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -29,7 +29,7 @@ class TieredStorage { PrimeIterator Load(DbIndex db_index, PrimeIterator it, std::string_view key); // Schedules unloading of the item, pointed by the iterator. - std::error_code ScheduleOffload(DbIndex db_index, PrimeIterator it); + std::error_code ScheduleOffload(DbIndex db_index, PrimeIterator it, std::string_view key); void CancelIo(DbIndex db_index, PrimeIterator it); @@ -50,6 +50,11 @@ class TieredStorage { void WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_len); + // Returns a pair consisting of an bool denoting whether we can write to disk, and updated + // iterator as this function can yield. 'it' should not be used after the call to this function. + std::pair CanScheduleOffload(DbIndex db_index, PrimeIterator it, + std::string_view key); + bool FlushPending(DbIndex db_index, unsigned bin_index); void InitiateGrow(size_t size);