Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add stats print for slot migrations #4456

Merged
merged 7 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions src/server/journal/cmd_serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ class CommandAggregator {
}

enum class CommitMode { kAuto, kNoCommit };
void AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) {

// Returns whether CommitPending() was called
bool AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) {
agg_bytes_ += arg.size();
members_.push_back(std::move(arg));

if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= max_aggragation_bytes_) {
CommitPending();
return true;
}

return false;
}

private:
Expand Down Expand Up @@ -65,26 +70,27 @@ CmdSerializer::CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer
: cb_(std::move(cb)), max_serialization_buffer_size_(max_serialization_buffer_size) {
}

void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
size_t CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
// We send RESTORE commands for small objects, or objects we don't support breaking.
bool use_restore_serialization = true;
size_t flushes = 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After I dive into the code again related to your above comment of flush is the same as the number of commands. But this is not true. We actually accumulating the commands. You dont know if the data was actually written to socket

Copy link
Collaborator Author

@chakaz chakaz Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to commands

if (max_serialization_buffer_size_ > 0 && pv.MallocUsed() > max_serialization_buffer_size_) {
switch (pv.ObjType()) {
case OBJ_SET:
SerializeSet(key, pv);
flushes = SerializeSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_ZSET:
SerializeZSet(key, pv);
flushes = SerializeZSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_HASH:
SerializeHash(key, pv);
flushes = SerializeHash(key, pv);
use_restore_serialization = false;
break;
case OBJ_LIST:
SerializeList(key, pv);
flushes = SerializeList(key, pv);
use_restore_serialization = false;
break;
case OBJ_STRING:
Expand All @@ -105,6 +111,7 @@ void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const
SerializeStickIfNeeded(key, pk);
SerializeExpireIfNeeded(key, expire_ms);
}
return flushes;
}

void CmdSerializer::SerializeCommand(string_view cmd, absl::Span<const string_view> args) {
Expand Down Expand Up @@ -139,54 +146,62 @@ void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms)
SerializeCommand("PEXIRE", {key, absl::StrCat(expire_ms)});
}

void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
size_t CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); },
max_serialization_buffer_size_);

size_t flushes = 0;
container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
flushes += aggregator.AddArg(ce.ToString());
return true;
});
return flushes;
}

void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
size_t CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); },
max_serialization_buffer_size_);

size_t flushes = 0;
container_utils::IterateSortedSet(
pv.GetRobjWrapper(),
[&](container_utils::ContainerEntry ce, double score) {
aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(ce.ToString());
flushes += aggregator.AddArg(ce.ToString());
return true;
},
/*start=*/0, /*end=*/-1, /*reverse=*/false, /*use_score=*/true);
return flushes;
}

void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
size_t CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); },
max_serialization_buffer_size_);

size_t flushes = 0;
container_utils::IterateMap(
pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) {
aggregator.AddArg(k.ToString(), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(v.ToString());
flushes += aggregator.AddArg(v.ToString());
return true;
});
return flushes;
}

void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
size_t CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); },
max_serialization_buffer_size_);

size_t flushes = 0;
container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
flushes += aggregator.AddArg(ce.ToString());
return true;
});
return flushes;
}

void CmdSerializer::SerializeRestore(string_view key, const PrimeValue& pk, const PrimeValue& pv,
Expand Down
13 changes: 7 additions & 6 deletions src/server/journal/cmd_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ class CmdSerializer {

explicit CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size);

void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);
// Returns how many commands we broke this entry into (like multiple HSETs etc)
size_t SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

private:
void SerializeCommand(std::string_view cmd, absl::Span<const std::string_view> args);
void SerializeStickIfNeeded(std::string_view key, const PrimeValue& pk);
void SerializeExpireIfNeeded(std::string_view key, uint64_t expire_ms);

void SerializeSet(std::string_view key, const PrimeValue& pv);
void SerializeZSet(std::string_view key, const PrimeValue& pv);
void SerializeHash(std::string_view key, const PrimeValue& pv);
void SerializeList(std::string_view key, const PrimeValue& pv);
size_t SerializeSet(std::string_view key, const PrimeValue& pv);
size_t SerializeZSet(std::string_view key, const PrimeValue& pv);
size_t SerializeHash(std::string_view key, const PrimeValue& pv);
size_t SerializeList(std::string_view key, const PrimeValue& pv);
void SerializeRestore(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

Expand Down
23 changes: 21 additions & 2 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ void RestoreStreamer::Run() {
auto* blocking_counter = db_slice_->BlockingCounter();
std::lock_guard blocking_counter_guard(*blocking_counter);

stats_.buckets_loop++;
WriteBucket(it);
});

Expand All @@ -232,10 +233,19 @@ void RestoreStreamer::Run() {
last_yield = 0;
}
} while (cursor);

VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString()
<< ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop;
}

void RestoreStreamer::SendFinalize(long attempt) {
VLOG(1) << "RestoreStreamer LSN opcode for : " << db_slice_->shard_id() << " attempt " << attempt;
VLOG(1) << "RestoreStreamer LSN of " << my_slots_.ToSlotRanges().ToString() << ", shard "
<< db_slice_->shard_id() << " attempt " << attempt << " with " << stats_.commands
<< " commands. Buckets looped " << stats_.buckets_loop << ", buckets on_db_update "
<< stats_.buckets_on_db_update << ", buckets skipped " << stats_.buckets_skipped
<< ", buckets written " << stats_.buckets_written << ". Keys skipped "
<< stats_.keys_skipped << ", keys written " << stats_.keys_written;

journal::Entry entry(journal::Op::LSN, attempt);

io::StringSink sink;
Expand Down Expand Up @@ -287,21 +297,28 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {

void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
if (it.GetVersion() < snapshot_version_) {
stats_.buckets_written++;

it.SetVersion(snapshot_version_);
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
const auto& pv = it->second;
string_view key = it->first.GetSlice(&key_buffer);
if (ShouldWrite(key)) {
stats_.keys_written++;
uint64_t expire = 0;
if (pv.HasExpire()) {
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
expire = db_slice_->ExpireTime(eit);
}

WriteEntry(key, it->first, pv, expire);
} else {
stats_.keys_skipped++;
}
}
} else {
stats_.buckets_skipped++;
}
ThrottleIfNeeded();
}
Expand All @@ -310,6 +327,8 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
std::lock_guard guard(big_value_mu_);
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";

stats_.buckets_on_db_update++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it will be better to count actually how many buckets where serialized on ondbchange i,e Write bucket will return true if the bucket was serialized false if skipped
This way if we have a very week test that will send few commands but we almost covered the traveres we will skip the write bucket in this flow and would not be able to detect such week tests

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, you want this in order to handle potential CVCUponInsert() handling multiple buckets? If so we can increase it there, if not, I think I did not understand your comment

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to understand that buckets where actually serialized from this flow and not only the command was called. Because if it is called after the bucket is serialized than we dont do anything (skip write bucket)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh gotcha, sure thing!


PrimeTable* table = db_slice_->GetTables(0).first;

if (const PrimeTable::bucket_iterator* bit = req.update()) {
Expand All @@ -331,7 +350,7 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const Pr
ThrottleIfNeeded();
},
ServerState::tlocal()->serialization_max_chunk_size);
serializer.SerializeEntry(key, pk, pv, expire_ms);
stats_.commands += serializer.SerializeEntry(key, pk, pv, expire_ms);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually counting the number of flush to sync that we have right (number of times you call commit)? not the number of commands generated from each entry.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excluding timeout and stickiness they should be the same

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see now so this is not actually the number of times we actually flush to sync but the time we write to the pending_buffer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}

} // namespace dfly
11 changes: 11 additions & 0 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,24 @@ class RestoreStreamer : public JournalStreamer {
void WriteEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

struct Stats {
size_t buckets_skipped = 0;
size_t buckets_written = 0;
size_t buckets_loop = 0;
size_t buckets_on_db_update = 0;
size_t keys_written = 0;
size_t keys_skipped = 0;
size_t commands = 0;
};

DbSlice* db_slice_;
DbTableArray db_array_;
uint64_t snapshot_version_ = 0;
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;
ThreadLocalMutex big_value_mu_;
Stats stats_;
};

} // namespace dfly
34 changes: 33 additions & 1 deletion tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,21 @@ async def get_node_id(connection):
return id


def stop_and_get_restore_log(instance):
instance.stop()
lines = instance.find_in_logs("RestoreStreamer LSN")
assert len(lines) == 1
line = lines[0]
logging.debug(f"Streamer log line: {line}")
return line


def extract_int_after_prefix(prefix, line):
match = re.search(prefix + "(\\d+)", line)
assert match
return int(match.group(1))


@dfly_args({})
class TestNotEmulated:
async def test_cluster_commands_fails_when_not_emulate(self, async_client: aioredis.Redis):
Expand Down Expand Up @@ -2035,6 +2050,18 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
logging.debug(f"Memory before {mem_before} after {mem_after}")
assert mem_after < mem_before * 1.1

line = stop_and_get_restore_log(nodes[0].instance)

# 'with X commands' - how many breakdowns we used for the keys
assert extract_int_after_prefix("with ", line) > 5_000_000

assert extract_int_after_prefix("Keys skipped ", line) == 0
assert extract_int_after_prefix("buckets skipped ", line) == 0
assert extract_int_after_prefix("keys written ", line) > 90

# We don't send updates during the migration
assert extract_int_after_prefix("buckets on_db_update ", line) == 0


@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@pytest.mark.parametrize("chunk_size", [1_000_000, 30])
Expand All @@ -2056,7 +2083,6 @@ async def test_cluster_migration_while_seeding(
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []
client0 = nodes[0].client
client1 = nodes[1].client

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

Expand Down Expand Up @@ -2098,6 +2124,12 @@ async def test_cluster_migration_while_seeding(
capture = await seeder.capture_fake_redis()
assert await seeder.compare(capture, instances[1].port)

line = stop_and_get_restore_log(nodes[0].instance)
assert extract_int_after_prefix("Keys skipped ", line) == 0
assert extract_int_after_prefix("buckets skipped ", line) > 0
assert extract_int_after_prefix("keys written ", line) > 9_500
assert extract_int_after_prefix("buckets on_db_update ", line) > 0


def parse_lag(replication_info: str):
lags = re.findall("lag=([0-9]+)\r\n", replication_info)
Expand Down
9 changes: 5 additions & 4 deletions tests/dragonfly/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,16 +380,17 @@ async def metrics(self):
for metric_family in text_string_to_metric_families(data)
}

def is_in_logs(self, pattern):
def find_in_logs(self, pattern):
if self.proc is not None:
raise RuntimeError("Must close server first")

results = []
matcher = re.compile(pattern)
for path in self.log_files:
for line in open(path):
if matcher.search(line):
return True
return False
results.append(line)
return results

@property
def rss(self):
Expand All @@ -416,7 +417,7 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn
args.setdefault("noversion_check", None)
# MacOs does not set it automatically, so we need to set it manually
args.setdefault("maxmemory", "8G")
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1,snapshot=1"
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1,snapshot=1,streamer=1"
args.setdefault("vmodule", vmod)
args.setdefault("jsonpathv2")

Expand Down
Loading