Skip to content

Commit 3ab8c0f

Browse files
committed
Enable arbitrary commands to be run on cluster mode
1 parent 422252c commit 3ab8c0f

File tree

5 files changed

+131
-25
lines changed

5 files changed

+131
-25
lines changed

cluster_client.cpp

Lines changed: 85 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,54 @@ bool cluster_client::get_key_for_conn(unsigned int conn_id, int iter, unsigned l
358358
}
359359
}
360360

361+
362+
void cluster_client::create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id) {
363+
int cmd_size = 0;
364+
365+
benchmark_debug_log("%s [%s]:\n", cmd->command_name.c_str(), cmd->command.c_str());
366+
367+
for (unsigned int i = 0; i < cmd->command_args.size(); i++) {
368+
const command_arg* arg = &cmd->command_args[i];
369+
370+
if (arg->type == const_type) {
371+
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg);
372+
} else if (arg->type == key_type) {
373+
unsigned long long key_index;
374+
375+
// get key
376+
if (!get_key_for_conn(conn_id, get_arbitrary_obj_iter_type(cmd, m_executed_command_index), &key_index)) {
377+
return;
378+
}
379+
380+
assert(key_index >= 0);
381+
assert(m_key_len > 0);
382+
383+
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_key_buffer, m_key_len);
384+
} else if (arg->type == data_type) {
385+
unsigned int value_len;
386+
const char *value = m_obj_gen->get_value(0, &value_len);
387+
388+
assert(value != NULL);
389+
assert(value_len > 0);
390+
391+
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, value, value_len);
392+
}
393+
}
394+
395+
m_connections[conn_id]->send_arbitrary_command_end(m_executed_command_index, &timestamp, cmd_size);
396+
}
397+
361398
// This function could use some urgent TLC -- but we need to do it without altering the behavior
362399
void cluster_client::create_request(struct timeval timestamp, unsigned int conn_id)
363400
{
401+
// are we using arbitrary command?
402+
if (m_config->arbitrary_commands->is_defined()) {
403+
const arbitrary_command* executed_command = m_config->arbitrary_commands->get_next_executed_command(m_arbitrary_command_ratio_count,
404+
m_executed_command_index);
405+
create_arbitrary_request(executed_command, timestamp, conn_id);
406+
return;
407+
}
408+
364409
// If the Set:Wait ratio is not 0, start off with WAITs
365410
if (m_config->wait_ratio.b &&
366411
(m_tot_wait_ops == 0 ||
@@ -416,16 +461,28 @@ void cluster_client::create_request(struct timeval timestamp, unsigned int conn_
416461
void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp,
417462
request *request, protocol_response *response) {
418463
// update stats
419-
if (request->m_type == rt_get) {
420-
m_stats.update_moved_get_op(&timestamp,
464+
switch (request->m_type) {
465+
case rt_get:
466+
m_stats.update_moved_get_op(&timestamp,
421467
request->m_size + response->get_total_len(),
422468
ts_diff(request->m_sent_time, timestamp));
423-
} else if (request->m_type == rt_set) {
424-
m_stats.update_moved_set_op(&timestamp,
469+
break;
470+
case rt_set:
471+
m_stats.update_moved_set_op(&timestamp,
425472
request->m_size + response->get_total_len(),
426473
ts_diff(request->m_sent_time, timestamp));
427-
} else {
428-
assert(0);
474+
break;
475+
case rt_arbitrary: {
476+
arbitrary_request *ar = static_cast<arbitrary_request *>(request);
477+
m_stats.update_moved_arbitrary_op(&timestamp,
478+
request->m_size + response->get_total_len(),
479+
ts_diff(request->m_sent_time, timestamp),
480+
ar->index);
481+
break;
482+
}
483+
default:
484+
assert(0);
485+
break;
429486
}
430487

431488
// connection already issued 'cluster slots' command, wait for slots mapping to be updated
@@ -444,16 +501,28 @@ void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp
444501
void cluster_client::handle_ask(unsigned int conn_id, struct timeval timestamp,
445502
request *request, protocol_response *response) {
446503
// update stats
447-
if (request->m_type == rt_get) {
448-
m_stats.update_ask_get_op(&timestamp,
449-
request->m_size + response->get_total_len(),
450-
ts_diff(request->m_sent_time, timestamp));
451-
} else if (request->m_type == rt_set) {
452-
m_stats.update_ask_set_op(&timestamp,
453-
request->m_size + response->get_total_len(),
454-
ts_diff(request->m_sent_time, timestamp));
455-
} else {
456-
assert(0);
504+
switch (request->m_type) {
505+
case rt_get:
506+
m_stats.update_ask_get_op(&timestamp,
507+
request->m_size + response->get_total_len(),
508+
ts_diff(request->m_sent_time, timestamp));
509+
break;
510+
case rt_set:
511+
m_stats.update_ask_set_op(&timestamp,
512+
request->m_size + response->get_total_len(),
513+
ts_diff(request->m_sent_time, timestamp));
514+
break;
515+
case rt_arbitrary: {
516+
arbitrary_request *ar = static_cast<arbitrary_request *>(request);
517+
m_stats.update_ask_arbitrary_op(&timestamp,
518+
request->m_size + response->get_total_len(),
519+
ts_diff(request->m_sent_time, timestamp),
520+
ar->index);
521+
break;
522+
}
523+
default:
524+
assert(0);
525+
break;
457526
}
458527
}
459528

cluster_client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class cluster_client : public client {
5252

5353
// client manager api's
5454
virtual void handle_cluster_slots(protocol_response *r);
55+
virtual void create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id);
5556
virtual void create_request(struct timeval timestamp, unsigned int conn_id);
5657
virtual bool hold_pipeline(unsigned int conn_id);
5758
virtual void handle_response(unsigned int conn_id, struct timeval timestamp,

memtier_benchmark.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,6 @@ static bool verify_cluster_option(struct benchmark_config *cfg) {
313313
} else if (cfg->unix_socket) {
314314
fprintf(stderr, "error: cluster mode dose not support unix-socket option.\n");
315315
return false;
316-
} else if (cfg->arbitrary_commands->is_defined()) {
317-
fprintf(stderr, "error: cluster mode dose not support arbitrary command option.\n");
318-
return false;
319316
}
320317

321318
return true;

run_stats.cpp

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,17 @@ void run_stats::update_moved_set_op(struct timeval* ts, unsigned int bytes, unsi
196196
hdr_record_value(m_set_latency_histogram,latency);
197197
}
198198

199+
void run_stats::update_moved_arbitrary_op(struct timeval *ts, unsigned int bytes,
200+
unsigned int latency, size_t request_index) {
201+
roll_cur_stats(ts);
202+
203+
m_cur_stats.m_ar_commands.at(request_index).update_moved_op(bytes, latency);
204+
m_totals.update_op(bytes, latency);
205+
206+
struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
207+
hdr_record_value(hist,latency);
208+
}
209+
199210
void run_stats::update_ask_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency)
200211
{
201212
roll_cur_stats(ts);
@@ -214,6 +225,17 @@ void run_stats::update_ask_set_op(struct timeval* ts, unsigned int bytes, unsign
214225
hdr_record_value(m_set_latency_histogram,latency);
215226
}
216227

228+
void run_stats::update_ask_arbitrary_op(struct timeval *ts, unsigned int bytes,
229+
unsigned int latency, size_t request_index) {
230+
roll_cur_stats(ts);
231+
232+
m_cur_stats.m_ar_commands.at(request_index).update_ask_op(bytes, latency);
233+
m_totals.update_op(bytes, latency);
234+
235+
struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
236+
hdr_record_value(hist,latency);
237+
}
238+
217239
void run_stats::update_wait_op(struct timeval *ts, unsigned int latency)
218240
{
219241
roll_cur_stats(ts);
@@ -975,11 +997,18 @@ void run_stats::print_moved_sec_column(output_table &table) {
975997

976998
column.elements.push_back(*el.init_str("%12s ", "MOVED/sec"));
977999
column.elements.push_back(*el.init_str("%s", "-------------"));
978-
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_moved_sec));
979-
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_moved_sec));
980-
column.elements.push_back(*el.init_str("%12s ", "---"));
981-
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_moved_sec));
9821000

1001+
if (print_arbitrary_commands_results()) {
1002+
for (unsigned int i=0; i<m_totals.m_ar_commands.size(); i++) {
1003+
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ar_commands[i].m_moved_sec));
1004+
}
1005+
} else {
1006+
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_moved_sec));
1007+
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_moved_sec));
1008+
column.elements.push_back(*el.init_str("%12s ", "---"));
1009+
1010+
}
1011+
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_moved_sec));
9831012
table.add_column(column);
9841013
}
9851014

@@ -989,11 +1018,17 @@ void run_stats::print_ask_sec_column(output_table &table) {
9891018

9901019
column.elements.push_back(*el.init_str("%12s ", "ASK/sec"));
9911020
column.elements.push_back(*el.init_str("%s", "-------------"));
992-
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_ask_sec));
1021+
if (print_arbitrary_commands_results()) {
1022+
for (unsigned int i=0; i<m_totals.m_ar_commands.size(); i++) {
1023+
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ar_commands[i].m_ask_sec));
1024+
}
1025+
} else {
1026+
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_ask_sec));
9931027
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_ask_sec));
9941028
column.elements.push_back(*el.init_str("%12s ", "---"));
995-
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ask_sec));
9961029

1030+
}
1031+
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ask_sec));
9971032
table.add_column(column);
9981033
}
9991034

run_stats.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,13 @@ class run_stats {
119119

120120
void update_moved_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
121121
void update_moved_set_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
122+
void update_moved_arbitrary_op(struct timeval *ts, unsigned int bytes,
123+
unsigned int latency, size_t arbitrary_index);
122124

123125
void update_ask_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
124126
void update_ask_set_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
127+
void update_ask_arbitrary_op(struct timeval *ts, unsigned int bytes,
128+
unsigned int latency, size_t arbitrary_index);
125129

126130
void update_wait_op(struct timeval* ts, unsigned int latency);
127131
void update_arbitrary_op(struct timeval *ts, unsigned int bytes,

0 commit comments

Comments
 (0)