Skip to content

Commit

Permalink
fix(tiering): update iterator after await throttle (#2440)
Browse files Browse the repository at this point in the history
* fix(tiering): update iterator after await throttle

Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Jan 21, 2024
1 parent 8f454b2 commit cdbeb2f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ OpResult<optional<string>> SetCmd::Set(const SetParams& params, string_view key,
TieredStorage::EligibleForOffload(value)) { // external storage enabled.
// TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid
// afterwards. handle this
shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it);
shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it, key);
}

if (manual_journal_ && op_args_.shard->journal()) {
Expand Down
60 changes: 38 additions & 22 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ PrimeIterator TieredStorage::Load(DbIndex db_index, PrimeIterator it, string_vie
return it;
}

error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it, string_view key) {
CHECK_EQ(OBJ_STRING, it->second.ObjType());
DCHECK(!it->second.IsExternal());
DCHECK(!it->second.HasIoPending());
Expand All @@ -453,24 +453,12 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
db_arr_[db_index] = new PerDb;
}

unsigned max_pending_writes = GetFlag(FLAGS_tiered_storage_max_pending_writes);
unsigned throttle_usec = GetFlag(FLAGS_tiered_storage_throttle_us);
if (num_active_requests_ >= max_pending_writes && throttle_usec > 0) {
chrono::steady_clock::time_point next =
chrono::steady_clock::now() + chrono::microseconds(throttle_usec);
stats_.throttled_write_cnt++;

// TODO: we should reset `it` in case concurrent inserts invalidated the iterator.
// This can happen because we operate on the table from differrent fibers
// (for example, due to inline operations or evictions).
throttle_ec_.await_until([&]() { return num_active_requests_ < max_pending_writes; }, next);
}

if (blob_len > kMaxSmallBin) {
if (num_active_requests_ < max_pending_writes) {
WriteSingle(db_index, it, blob_len);
auto [schedule, res_it] = CanScheduleOffload(db_index, it, key);
if (schedule) {
WriteSingle(db_index, res_it, blob_len);
} else {
VLOG(2) << "Skip WriteSingle for: " << it->first.ToString();
VLOG(2) << "Skip WriteSingle for: " << key;
}
return error_code{};
}
Expand All @@ -487,15 +475,17 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
// TODO: we need to track in stats all the cases where we omit offloading attempt.
CHECK_LT(bin_record.pending_entries.size(), max_entries);

VLOG(2) << "ScheduleOffload:" << it->first.ToString();
VLOG(2) << "ScheduleOffload:" << key;
bin_record.pending_entries.insert(it->first.AsRef());
it->second.SetIoPending(true);

if (bin_record.pending_entries.size() < max_entries)
return error_code{}; // gather more.

bool flush_succeeded = false;
if (num_active_requests_ < max_pending_writes) {

auto [schedule, res_it] = CanScheduleOffload(db_index, it, key);
if (schedule) {
flush_succeeded = FlushPending(db_index, bin_index);

// if we reached high utilization of the file range - try to grow the file.
Expand All @@ -505,10 +495,10 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
}

if (!flush_succeeded) {
VLOG(2) << "flush failed remove entry: " << it->first.ToString();
VLOG(2) << "flush failed remove entry: " << key;
// we could not flush because I/O is saturated, so lets remove the last item.
bin_record.pending_entries.erase(it->first.AsRef());
it->second.SetIoPending(false);
bin_record.pending_entries.erase(res_it->first.AsRef());
res_it->second.SetIoPending(false);
++stats_.flush_skip_cnt;
}

Expand Down Expand Up @@ -631,6 +621,32 @@ void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_
++stats_.tiered_writes;
}

std::pair<bool, PrimeIterator> TieredStorage::CanScheduleOffload(DbIndex db_index, PrimeIterator it,
string_view key) {
unsigned max_pending_writes = GetFlag(FLAGS_tiered_storage_max_pending_writes);
unsigned throttle_usec = GetFlag(FLAGS_tiered_storage_throttle_us);
PrimeIterator res_it = it;
if (num_active_requests_ >= max_pending_writes && throttle_usec > 0) {
chrono::steady_clock::time_point next =
chrono::steady_clock::now() + chrono::microseconds(throttle_usec);
stats_.throttled_write_cnt++;

throttle_ec_.await_until([&]() { return num_active_requests_ < max_pending_writes; }, next);

PrimeTable* pt = db_slice_.GetTables(db_index).first;
if (!it.IsOccupied() || it->first != key) {
res_it = pt->Find(key);
// During the database write flow, when offloading a value, we acquire a write lock on the
// key. No other operations are allowed to modify or remove the key until the lock is
// released.
CHECK(!res_it.is_done());
VLOG(1) << "Update iterator after await";
}
}

return std::make_pair((num_active_requests_ < max_pending_writes), res_it);
}

bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) {
PerDb* db = db_arr_[db_index];

Expand Down
7 changes: 6 additions & 1 deletion src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TieredStorage {
PrimeIterator Load(DbIndex db_index, PrimeIterator it, std::string_view key);

// Schedules unloading of the item, pointed by the iterator.
std::error_code ScheduleOffload(DbIndex db_index, PrimeIterator it);
std::error_code ScheduleOffload(DbIndex db_index, PrimeIterator it, std::string_view key);

void CancelIo(DbIndex db_index, PrimeIterator it);

Expand All @@ -50,6 +50,11 @@ class TieredStorage {

void WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_len);

// Returns a pair consisting of an bool denoting whether we can write to disk, and updated
// iterator as this function can yield. 'it' should not be used after the call to this function.
std::pair<bool, PrimeIterator> CanScheduleOffload(DbIndex db_index, PrimeIterator it,
std::string_view key);

bool FlushPending(DbIndex db_index, unsigned bin_index);

void InitiateGrow(size_t size);
Expand Down

0 comments on commit cdbeb2f

Please sign in to comment.