diff --git a/src/server/dfly_bench.cc b/src/server/dfly_bench.cc index 2dd4ff1c7c1e..2c16b3338060 100644 --- a/src/server/dfly_bench.cc +++ b/src/server/dfly_bench.cc @@ -30,6 +30,7 @@ ABSL_FLAG(uint16_t, p, 6379, "Server port"); ABSL_FLAG(uint32_t, c, 20, "Number of connections per thread"); ABSL_FLAG(uint32_t, qps, 20, "QPS schedule at which the generator sends requests to the server"); ABSL_FLAG(uint32_t, n, 1000, "Number of requests to send per connection"); +ABSL_FLAG(uint32_t, test_time, 0, "Testing time in seconds"); ABSL_FLAG(uint32_t, d, 16, "Value size in bytes "); ABSL_FLAG(string, h, "localhost", "server hostname/ip"); ABSL_FLAG(uint64_t, key_minimum, 0, "Min value for keys used"); @@ -242,9 +243,11 @@ struct ClientStats { // Per connection driver. class Driver { public: - explicit Driver(uint32_t num_reqs, ClientStats* stats, ProactorBase* p) - : num_reqs_(num_reqs), stats_(*stats) { + explicit Driver(uint32_t num_reqs, uint32_t time_limit, ClientStats* stats, ProactorBase* p) + : num_reqs_(num_reqs), time_limit_(time_limit), stats_(*stats) { socket_.reset(p->CreateSocket()); + if (time_limit_ > 0) + num_reqs_ = UINT32_MAX; } Driver(const Driver&) = delete; @@ -255,6 +258,8 @@ class Driver { void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen); float done() const { + if (time_limit_ > 0) + return double(absl::GetCurrentTimeNanos() - start_ns_) / (time_limit_ * 1e9); return double(received_) / num_reqs_; } @@ -273,7 +278,8 @@ class Driver { bool might_hit; }; - uint32_t num_reqs_, received_ = 0; + uint32_t num_reqs_, time_limit_, received_ = 0; + int64_t start_ns_ = 0; ClientStats& stats_; unique_ptr socket_; @@ -291,7 +297,7 @@ class TLocalClient { explicit TLocalClient(ProactorBase* p) : p_(p) { drivers_.resize(GetFlag(FLAGS_c)); for (auto& driver : drivers_) { - driver.reset(new Driver{GetFlag(FLAGS_n), &stats, p_}); + driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_}); } } @@ -415,16 +421,20 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) { } void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) { - const int64_t start = absl::GetCurrentTimeNanos(); + start_ns_ = absl::GetCurrentTimeNanos(); unsigned pipeline = GetFlag(FLAGS_pipeline); stats_.num_clients++; - + int64_t time_limit_ns = + time_limit_ > 0 ? int64_t(time_limit_) * 1'000'000'000 + start_ns_ : INT64_MAX; for (unsigned i = 0; i < num_reqs_; ++i) { int64_t now = absl::GetCurrentTimeNanos(); + if (now > time_limit_ns) { + break; + } if (cycle_ns) { - int64_t target_ts = start + i * (*cycle_ns); + int64_t target_ts = start_ns_ + i * (*cycle_ns); int64_t sleep_ns = target_ts - now; if (reqs_.size() > 10 && sleep_ns <= 0) { sleep_ns = 10'000; @@ -468,7 +478,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) { int64_t finish = absl::GetCurrentTimeNanos(); VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took " - << StrFormat("%.1fs", double(finish - start) / 1000000000) + << StrFormat("%.1fs", double(finish - start_ns_) / 1000'000'000) << ". Waiting for server processing"; // TODO: to change to a condvar or something. @@ -662,6 +672,7 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) { uint64_t num_last_resp_cnt = 0; uint64_t resp_goal = GetFlag(FLAGS_c) * pp->size() * GetFlag(FLAGS_n); + uint32_t time_limit = GetFlag(FLAGS_test_time); while (*finish_signal == false) { // we sleep with resolution of 1s but print with lower frequency to be more responsive @@ -692,7 +703,8 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) { uint64_t total_ms = (now - start_time) / 1'000'000; uint64_t period_ms = (now - last_print) / 1'000'000; uint64_t period_resp_cnt = stats.num_responses - num_last_resp_cnt; - double done_perc = double(stats.num_responses) * 100 / resp_goal; + double done_perc = time_limit > 0 ? double(total_ms) / (10 * time_limit) + : double(stats.num_responses) * 100 / resp_goal; double hitrate = stats.hit_opportunities > 0 ? 100 * double(stats.hit_count) / double(stats.hit_opportunities) : 0; @@ -767,10 +779,11 @@ int main(int argc, char* argv[]) { uint32_t thread_key_step = 0; const uint32_t qps = GetFlag(FLAGS_qps); - const int64_t interval = qps ? 1000000000LL / qps : 0; + const int64_t interval = qps ? 1'000'000'000LL / qps : 0; uint64_t num_reqs = GetFlag(FLAGS_n); uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size(); uint64_t total_requests = num_reqs * total_conn_num; + uint32_t time_limit = GetFlag(FLAGS_test_time); if (dist_type == SEQUENTIAL) { thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size()); @@ -781,9 +794,10 @@ int main(int argc, char* argv[]) { } } - CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs - << " requests per each connection, or " << total_requests << " requests overall"; - + if (!time_limit) { + CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs + << " requests per each connection, or " << total_requests << " requests overall"; + } if (interval) { CONSOLE_INFO << "At a rate of " << GetFlag(FLAGS_qps) << " rps per connection, i.e. request every " << interval / 1000 << "us"; @@ -826,7 +840,8 @@ int main(int argc, char* argv[]) { CONSOLE_INFO << "\nTotal time: " << duration << ". Overall number of requests: " << summary.num_responses - << ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan"); + << ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan") + << ", P99 lat: " << summary.hist.Percentile(99) << "us"; if (summary.num_errors) { CONSOLE_INFO << "Got " << summary.num_errors << " error responses!";