Skip to content

feat: add 'testing_time' limit option to dfly_bench #4487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions src/server/dfly_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ABSL_FLAG(uint16_t, p, 6379, "Server port");
ABSL_FLAG(uint32_t, c, 20, "Number of connections per thread");
ABSL_FLAG(uint32_t, qps, 20, "QPS schedule at which the generator sends requests to the server");
ABSL_FLAG(uint32_t, n, 1000, "Number of requests to send per connection");
ABSL_FLAG(uint32_t, test_time, 0, "Testing time in seconds");
ABSL_FLAG(uint32_t, d, 16, "Value size in bytes ");
ABSL_FLAG(string, h, "localhost", "server hostname/ip");
ABSL_FLAG(uint64_t, key_minimum, 0, "Min value for keys used");
Expand Down Expand Up @@ -242,9 +243,11 @@ struct ClientStats {
// Per connection driver.
class Driver {
public:
explicit Driver(uint32_t num_reqs, ClientStats* stats, ProactorBase* p)
: num_reqs_(num_reqs), stats_(*stats) {
explicit Driver(uint32_t num_reqs, uint32_t time_limit, ClientStats* stats, ProactorBase* p)
: num_reqs_(num_reqs), time_limit_(time_limit), stats_(*stats) {
socket_.reset(p->CreateSocket());
if (time_limit_ > 0)
num_reqs_ = UINT32_MAX;
}

Driver(const Driver&) = delete;
Expand All @@ -255,6 +258,8 @@ class Driver {
void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen);

float done() const {
if (time_limit_ > 0)
return double(absl::GetCurrentTimeNanos() - start_ns_) / (time_limit_ * 1e9);
return double(received_) / num_reqs_;
}

Expand All @@ -273,7 +278,8 @@ class Driver {
bool might_hit;
};

uint32_t num_reqs_, received_ = 0;
uint32_t num_reqs_, time_limit_, received_ = 0;
int64_t start_ns_ = 0;

ClientStats& stats_;
unique_ptr<FiberSocketBase> socket_;
Expand All @@ -291,7 +297,7 @@ class TLocalClient {
explicit TLocalClient(ProactorBase* p) : p_(p) {
drivers_.resize(GetFlag(FLAGS_c));
for (auto& driver : drivers_) {
driver.reset(new Driver{GetFlag(FLAGS_n), &stats, p_});
driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_});
}
}

Expand Down Expand Up @@ -415,16 +421,20 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
}

void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
const int64_t start = absl::GetCurrentTimeNanos();
start_ns_ = absl::GetCurrentTimeNanos();
unsigned pipeline = GetFlag(FLAGS_pipeline);

stats_.num_clients++;

int64_t time_limit_ns =
time_limit_ > 0 ? int64_t(time_limit_) * 1'000'000'000 + start_ns_ : INT64_MAX;
for (unsigned i = 0; i < num_reqs_; ++i) {
int64_t now = absl::GetCurrentTimeNanos();

if (now > time_limit_ns) {
break;
}
if (cycle_ns) {
int64_t target_ts = start + i * (*cycle_ns);
int64_t target_ts = start_ns_ + i * (*cycle_ns);
int64_t sleep_ns = target_ts - now;
if (reqs_.size() > 10 && sleep_ns <= 0) {
sleep_ns = 10'000;
Expand Down Expand Up @@ -468,7 +478,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {

int64_t finish = absl::GetCurrentTimeNanos();
VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took "
<< StrFormat("%.1fs", double(finish - start) / 1000000000)
<< StrFormat("%.1fs", double(finish - start_ns_) / 1000'000'000)
<< ". Waiting for server processing";

// TODO: to change to a condvar or something.
Expand Down Expand Up @@ -662,6 +672,7 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) {
uint64_t num_last_resp_cnt = 0;

uint64_t resp_goal = GetFlag(FLAGS_c) * pp->size() * GetFlag(FLAGS_n);
uint32_t time_limit = GetFlag(FLAGS_test_time);

while (*finish_signal == false) {
// we sleep with resolution of 1s but print with lower frequency to be more responsive
Expand Down Expand Up @@ -692,7 +703,8 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) {
uint64_t total_ms = (now - start_time) / 1'000'000;
uint64_t period_ms = (now - last_print) / 1'000'000;
uint64_t period_resp_cnt = stats.num_responses - num_last_resp_cnt;
double done_perc = double(stats.num_responses) * 100 / resp_goal;
double done_perc = time_limit > 0 ? double(total_ms) / (10 * time_limit)
: double(stats.num_responses) * 100 / resp_goal;
double hitrate = stats.hit_opportunities > 0
? 100 * double(stats.hit_count) / double(stats.hit_opportunities)
: 0;
Expand Down Expand Up @@ -767,10 +779,11 @@ int main(int argc, char* argv[]) {

uint32_t thread_key_step = 0;
const uint32_t qps = GetFlag(FLAGS_qps);
const int64_t interval = qps ? 1000000000LL / qps : 0;
const int64_t interval = qps ? 1'000'000'000LL / qps : 0;
uint64_t num_reqs = GetFlag(FLAGS_n);
uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size();
uint64_t total_requests = num_reqs * total_conn_num;
uint32_t time_limit = GetFlag(FLAGS_test_time);

if (dist_type == SEQUENTIAL) {
thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size());
Expand All @@ -781,9 +794,10 @@ int main(int argc, char* argv[]) {
}
}

CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs
<< " requests per each connection, or " << total_requests << " requests overall";

if (!time_limit) {
CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs
<< " requests per each connection, or " << total_requests << " requests overall";
}
if (interval) {
CONSOLE_INFO << "At a rate of " << GetFlag(FLAGS_qps)
<< " rps per connection, i.e. request every " << interval / 1000 << "us";
Expand Down Expand Up @@ -826,7 +840,8 @@ int main(int argc, char* argv[]) {

CONSOLE_INFO << "\nTotal time: " << duration
<< ". Overall number of requests: " << summary.num_responses
<< ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan");
<< ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan")
<< ", P99 lat: " << summary.hist.Percentile(99) << "us";

if (summary.num_errors) {
CONSOLE_INFO << "Got " << summary.num_errors << " error responses!";
Expand Down
Loading