Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions src/server/dfly_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ ABSL_FLAG(string, ratio, "1:10", "Set:Get ratio");
ABSL_FLAG(string, command, "",
"custom command with __key__ placeholder for keys, "
"__data__ for values, __score__ for doubles");
ABSL_FLAG(bool, random_data, true,
"If true, generate random data for each request, otherwise uses incremental sequences."
"Applies for __score__ and __data__ placeholders.");
ABSL_FLAG(string, P, "", "protocol can be empty (for RESP) or memcache_text");

ABSL_FLAG(bool, tcp_nodelay, false, "If true, set nodelay option on tcp socket");
Expand Down Expand Up @@ -94,6 +97,8 @@ using tcp = ::boost::asio::ip::tcp;
using absl::StrCat;

thread_local base::Xoroshiro128p bit_gen;
thread_local uint64_t seq_val = 1;

atomic_bool terminate_requested = false;

#if __INTELLISENSE__
Expand All @@ -104,22 +109,28 @@ enum Protocol { RESP, MC_TEXT } protocol;
enum DistType { UNIFORM, NORMAL, ZIPFIAN, SEQUENTIAL } dist_type{UNIFORM};
constexpr uint16_t kNumSlots = 16384;

static string GetRandomHex(size_t len, bool ascii) {
static string GetRandomBlob(size_t len, bool ascii) {
static bool is_random = GetFlag(FLAGS_random_data);

std::string res(len, '\0');
size_t indx = 0;

for (; indx + 16 <= len; indx += 16) { // 2 chars per byte
absl::numbers_internal::FastHexToBufferZeroPad16(bit_gen(), res.data() + indx);
absl::numbers_internal::FastHexToBufferZeroPad16(is_random ? bit_gen() : seq_val++,
res.data() + indx);
}

DCHECK_LE(indx, len);

if (indx < len) {
char buf[24];
absl::numbers_internal::FastHexToBufferZeroPad16(bit_gen(), buf);

for (unsigned j = 0; indx < len;) {
res[indx++] = buf[j++];
uint64_t next_val = is_random ? bit_gen() : seq_val++;
unsigned count = len - indx;

// extract hex chars from least significant nibble, as it's the one that changes
// with sequential values.
for (unsigned j = 0; j < count; ++j) {
res[indx++] = (next_val & 0x0F) + 'A'; // to ascii (not really hex, but ok for random data)
next_val >>= 4;
}
}

Expand Down Expand Up @@ -255,7 +266,7 @@ class CommandGenerator {
}

private:
enum TemplateType { KEY, VALUE, SCORE };
enum TemplateType : uint8_t { KEY, VALUE, SCORE };

string FillSet(string_view key);
string FillGet(string_view key);
Expand Down Expand Up @@ -364,7 +375,7 @@ string CommandGenerator::Next(SlotRange range) {
size_t value_len = IsRandomValueLen()
? absl::Uniform(bit_gen, value_len_min_, value_len_max_)
: fixed_len_value_.size();
str = GetRandomHex(value_len, is_ascii_);
str = GetRandomBlob(value_len, is_ascii_);
break;
}
case SCORE: {
Expand All @@ -385,7 +396,7 @@ string CommandGenerator::FillSet(string_view key) {
string random_len_value;

if (IsRandomValueLen()) {
random_len_value = GetRandomHex(absl::Uniform(bit_gen, value_len_min_, value_len_max_), true);
random_len_value = GetRandomBlob(absl::Uniform(bit_gen, value_len_min_, value_len_max_), true);
value = random_len_value;
}

Expand Down Expand Up @@ -690,6 +701,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
pipeline = num_reqs_ - i * pipeline;
}

string out_buf;
for (unsigned j = 0; j < pipeline; ++j) {
// TODO: this skews the distribution if slot ranges are uneven.
// Ideally we would like to pick randomly a single slot from all the ranges we have
Expand All @@ -698,26 +710,34 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
slot_range = shard_slots_.NextSlotRange(ep_, i);
}

string cmd = cmd_gen->Next(slot_range);
absl::StrAppend(&out_buf, cmd_gen->Next(slot_range));

Req req;
req.start = absl::GetCurrentTimeNanos();
req.might_hit = cmd_gen->might_hit();

reqs_.push(req);

error_code ec = socket_->Write(io::Buffer(cmd));
if (ec && FiberSocketBase::IsConnClosed(ec)) {
// TODO: report failure
VLOG(1) << "Connection closed";
break;
if (out_buf.size() >= 8192) {
error_code ec = socket_->Write(io::Buffer(out_buf));
out_buf.clear();
if (ec && FiberSocketBase::IsConnClosed(ec)) {
// TODO: report failure
VLOG(1) << "Connection closed";
break;
}
CHECK(!ec) << ec.message();
}
CHECK(!ec) << ec.message();
if (cmd_gen->noreply()) {
PopRequest();
}
}

if (!out_buf.empty()) {
error_code ec = socket_->Write(io::Buffer(out_buf));
CHECK(!ec || FiberSocketBase::IsConnClosed(ec)) << ec.message();
}

now = absl::GetCurrentTimeNanos();
if (cycle_ns) {
int64_t target_ts = start_ns_ + i * (*cycle_ns);
Expand Down
Loading