diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 44a13f25..1739c455 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,14 +53,14 @@ jobs: - name: Test OSS-CLUSTER TCP timeout-minutes: 10 run: | - OSS_STANDALONE=0 OSS_CLUSTER=1 \ + OSS_STANDALONE=0 OSS_CLUSTER=1 VERBOSE=1 \ ./tests/run_tests.sh - name: Test OSS-CLUSTER TCP TLS timeout-minutes: 10 if: matrix.platform == 'ubuntu-latest' run: | - OSS_STANDALONE=0 OSS_CLUSTER=1 TLS=1 \ + OSS_STANDALONE=0 OSS_CLUSTER=1 TLS=1 VERBOSE=1 \ ./tests/run_tests.sh - name: Capture code coverage diff --git a/client.cpp b/client.cpp index 979dcdd9..af1802da 100755 --- a/client.cpp +++ b/client.cpp @@ -72,6 +72,10 @@ bool client::setup_client(benchmark_config *config, abstract_protocol *protocol, else if (config->distinct_client_seed) m_obj_gen->set_random_seed(config->next_client_idx); + // Setup first arbitrary command + if (config->arbitrary_commands->is_defined()) + advance_arbitrary_command_index(); + // Parallel key-pattern determined according to the first command if ((config->arbitrary_commands->is_defined() && config->arbitrary_commands->at(0).key_pattern == 'P') || (config->key_pattern[key_pattern_set]=='P')) { @@ -244,25 +248,36 @@ bool client::hold_pipeline(unsigned int conn_id) { return false; } -void client::create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id) { +get_key_response client::get_key_for_conn(unsigned int command_index, unsigned int conn_id, unsigned long long* key_index) { + int iter; + if (m_config->arbitrary_commands->is_defined()) + iter = arbitrary_obj_iter_type(command_index); + else + iter = obj_iter_type(m_config, command_index); + + *key_index = m_obj_gen->get_key_index(iter); + m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index); + + return available_for_conn; +} + +bool client::create_arbitrary_request(unsigned int command_index, struct timeval& timestamp, unsigned int conn_id) { int cmd_size = 0; - benchmark_debug_log("%s [%s]:\n", cmd->command_name.c_str(), cmd->command.c_str()); + const arbitrary_command& cmd = get_arbitrary_command(command_index); - for (unsigned int i = 0; i < cmd->command_args.size(); i++) { - const command_arg* arg = &cmd->command_args[i]; + benchmark_debug_log("%s: %s:\n", m_connections[conn_id]->get_readable_id(), cmd.command.c_str()); + for (unsigned int i = 0; i < cmd.command_args.size(); i++) { + const command_arg* arg = &cmd.command_args[i]; if (arg->type == const_type) { cmd_size += m_connections[conn_id]->send_arbitrary_command(arg); } else if (arg->type == key_type) { - int iter = get_arbitrary_obj_iter_type(cmd, m_executed_command_index); - unsigned int key_len; - const char *key = m_obj_gen->get_key(iter, &key_len); - - assert(key != NULL); - assert(key_len > 0); - - cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, key, key_len); + unsigned long long key_index; + get_key_response res = get_key_for_conn(command_index, conn_id, &key_index); + /* If key not available for this connection, we have a bug of sending partial request */ + assert(res == available_for_conn); + cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_key_buffer, m_key_len); } else if (arg->type == data_type) { unsigned int value_len; const char *value = m_obj_gen->get_value(0, &value_len); @@ -274,8 +289,68 @@ void client::create_arbitrary_request(const arbitrary_command* cmd, struct timev } } - m_connections[conn_id]->send_arbitrary_command_end(m_executed_command_index, ×tamp, cmd_size); - m_reqs_generated++; + m_connections[conn_id]->send_arbitrary_command_end(command_index, ×tamp, cmd_size); + return true; +} + +bool client::create_wait_request(struct timeval& timestamp, unsigned int conn_id) { + unsigned int num_slaves = m_obj_gen->random_range(m_config->num_slaves.min, m_config->num_slaves.max); + unsigned int timeout = m_obj_gen->normal_distribution(m_config->wait_timeout.min, + m_config->wait_timeout.max, 0, + ((m_config->wait_timeout.max - m_config->wait_timeout.min)/2.0) + m_config->wait_timeout.min); + + m_connections[conn_id]->send_wait_command(×tamp, num_slaves, timeout); + return true; +} + +bool client::create_set_request(struct timeval& timestamp, unsigned int conn_id) { + unsigned long long key_index; + get_key_response res = get_key_for_conn(SET_CMD_IDX, conn_id, &key_index); + if (res == not_available) + return false; + + if (res == available_for_conn) { + unsigned int value_len; + const char *value = m_obj_gen->get_value(key_index, &value_len); + + m_connections[conn_id]->send_set_command(×tamp, m_key_buffer, m_key_len, + value, value_len, m_obj_gen->get_expiry(), + m_config->data_offset); + } + + return true; +} + +bool client::create_get_request(struct timeval& timestamp, unsigned int conn_id) { + unsigned long long key_index; + get_key_response res = get_key_for_conn(GET_CMD_IDX, conn_id, &key_index); + if (res == not_available) + return false; + + if (res == available_for_conn) { + m_connections[conn_id]->send_get_command(×tamp, m_key_buffer, m_key_len, m_config->data_offset); + } + + return true; +} + +bool client::create_mget_request(struct timeval& timestamp, unsigned int conn_id) { + unsigned long long key_index; + unsigned int keys_count = m_config->ratio.b - m_get_ratio_count; + if ((int)keys_count > m_config->multi_key_get) + keys_count = m_config->multi_key_get; + + m_keylist->clear(); + for (unsigned int i = 0; i < keys_count; i++) { + get_key_response res = get_key_for_conn(GET_CMD_IDX, conn_id, &key_index); + /* Not supported in cluster mode */ + assert(res == available_for_conn); + + m_keylist->add_key(m_key_buffer, m_key_len); + } + + m_connections[conn_id]->send_mget_command(×tamp, m_keylist); + return true; } // This function could use some urgent TLC -- but we need to do it without altering the behavior @@ -283,10 +358,10 @@ void client::create_request(struct timeval timestamp, unsigned int conn_id) { // are we using arbitrary command? if (m_config->arbitrary_commands->is_defined()) { - const arbitrary_command* executed_command = m_config->arbitrary_commands->get_next_executed_command(m_arbitrary_command_ratio_count, - m_executed_command_index); - create_arbitrary_request(executed_command, timestamp, conn_id); - + if (create_arbitrary_request(m_executed_command_index, timestamp, conn_id)) { + advance_arbitrary_command_index(); + m_reqs_generated++; + } return; } @@ -294,67 +369,38 @@ void client::create_request(struct timeval timestamp, unsigned int conn_id) if (m_config->wait_ratio.b && (m_tot_wait_ops == 0 || (m_tot_set_ops/m_tot_wait_ops > m_config->wait_ratio.a/m_config->wait_ratio.b))) { + if (!create_wait_request(timestamp, conn_id)) + return; - m_tot_wait_ops++; - - unsigned int num_slaves = m_obj_gen->random_range(m_config->num_slaves.min, m_config->num_slaves.max); - unsigned int timeout = m_obj_gen->normal_distribution(m_config->wait_timeout.min, - m_config->wait_timeout.max, 0, - ((m_config->wait_timeout.max - m_config->wait_timeout.min)/2.0) + m_config->wait_timeout.min); - - m_connections[conn_id]->send_wait_command(×tamp, num_slaves, timeout); m_reqs_generated++; + m_tot_wait_ops++; } + // are we set or get? this depends on the ratio else if (m_set_ratio_count < m_config->ratio.a) { - // set command - data_object *obj = m_obj_gen->get_object(obj_iter_type(m_config, 0)); - unsigned int key_len; - const char *key = obj->get_key(&key_len); - unsigned int value_len; - const char *value = obj->get_value(&value_len); + if (!create_set_request(timestamp, conn_id)) + return; - m_connections[conn_id]->send_set_command(×tamp, key, key_len, - value, value_len, obj->get_expiry(), - m_config->data_offset); - m_reqs_generated++; m_set_ratio_count++; + m_reqs_generated++; m_tot_set_ops++; } else if (m_get_ratio_count < m_config->ratio.b) { - // get command - int iter = obj_iter_type(m_config, 2); - - if (m_config->multi_key_get > 0) { - unsigned int keys_count; - - keys_count = m_config->ratio.b - m_get_ratio_count; - if ((int)keys_count > m_config->multi_key_get) - keys_count = m_config->multi_key_get; + // GET command + if (!m_config->multi_key_get) { + if (!create_get_request(timestamp, conn_id)) + return; - m_keylist->clear(); - while (m_keylist->get_keys_count() < keys_count) { - unsigned int keylen; - const char *key = m_obj_gen->get_key(iter, &keylen); - - assert(key != NULL); - assert(keylen > 0); - - m_keylist->add_key(key, keylen); - } - - m_connections[conn_id]->send_mget_command(×tamp, m_keylist); - m_reqs_generated++; - m_get_ratio_count += keys_count; - } else { - unsigned int keylen; - const char *key = m_obj_gen->get_key(iter, &keylen); - assert(key != NULL); - assert(keylen > 0); - - m_connections[conn_id]->send_get_command(×tamp, key, keylen, m_config->data_offset); - m_reqs_generated++; m_get_ratio_count++; + m_reqs_generated++; + return; } + + // MGET command + if (!create_mget_request(timestamp, conn_id)) + return; + + m_get_ratio_count += m_config->multi_key_get; + m_reqs_generated++; } else { // overlap counters m_get_ratio_count = m_set_ratio_count = 0; diff --git a/client.h b/client.h index 06da93bd..cff0519d 100755 --- a/client.h +++ b/client.h @@ -47,6 +47,11 @@ struct benchmark_config; class object_generator; class data_object; +#define SET_CMD_IDX 0 +#define GET_CMD_IDX 2 + +enum get_key_response { not_available, available_for_conn, available_for_other_conn }; + class client : public connections_manager { protected: @@ -56,6 +61,10 @@ class client : public connections_manager { bool m_initialized; bool m_end_set; + // key buffer + char m_key_buffer[250]; + int m_key_len; + // test related benchmark_config* m_config; object_generator* m_obj_gen; @@ -77,13 +86,18 @@ class client : public connections_manager { client(client_group* group); client(struct event_base *event_base, benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen); virtual ~client(); - virtual bool setup_client(benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen); - virtual int prepare(void); - + bool setup_client(benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen); + int prepare(void); bool initialized(void); - run_stats* get_stats(void) { return &m_stats; } + virtual get_key_response get_key_for_conn(unsigned int command_index, unsigned int conn_id, unsigned long long* key_index); + virtual bool create_arbitrary_request(unsigned int command_index, struct timeval& timestamp, unsigned int conn_id); + bool create_wait_request(struct timeval& timestamp, unsigned int conn_id); + bool create_set_request(struct timeval& timestamp, unsigned int conn_id); + bool create_get_request(struct timeval& timestamp, unsigned int conn_id); + bool create_mget_request(struct timeval& timestamp, unsigned int conn_id); + // client manager api's unsigned long long get_reqs_processed() { return m_reqs_processed; @@ -110,13 +124,33 @@ class client : public connections_manager { virtual bool finished(void); virtual void set_start_time(); virtual void set_end_time(); - virtual void create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id); virtual void create_request(struct timeval timestamp, unsigned int conn_id); virtual bool hold_pipeline(unsigned int conn_id); virtual int connect(void); virtual void disconnect(void); // + /* Get current executed arbitrary command */ + const arbitrary_command & get_arbitrary_command(unsigned int command_index) { + return m_config->arbitrary_commands->at(command_index); + } + + /* Set the arbitrary command index to the next to be executed */ + void advance_arbitrary_command_index() { + while(true) { + if (m_arbitrary_command_ratio_count < get_arbitrary_command(m_executed_command_index).ratio) { + m_arbitrary_command_ratio_count++; + return; + } else { + m_arbitrary_command_ratio_count = 0; + m_executed_command_index++; + if (m_executed_command_index == m_config->arbitrary_commands->size()) { + m_executed_command_index = 0; + } + } + } + + } // Utility function to get the object iterator type based on the config inline int obj_iter_type(benchmark_config *cfg, unsigned char index) { @@ -132,10 +166,11 @@ class client : public connections_manager { } } - inline int get_arbitrary_obj_iter_type(const arbitrary_command* cmd, unsigned int index) { - if (cmd->key_pattern == 'R') { + inline int arbitrary_obj_iter_type(unsigned int index) { + const arbitrary_command& cmd = get_arbitrary_command(index); + if (cmd.key_pattern == 'R') { return OBJECT_GENERATOR_KEY_RANDOM; - } else if (cmd->key_pattern == 'G') { + } else if (cmd.key_pattern == 'G') { return OBJECT_GENERATOR_KEY_GAUSSIAN; } else { return index; diff --git a/cluster_client.cpp b/cluster_client.cpp index 1333167b..ba17ad6e 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -297,7 +297,7 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) { return true; } - // don't exceed requests + /* Don't exceed requests. */ if (m_config->requests) { if (m_key_index_pools[conn_id]->empty() && m_reqs_generated >= m_config->requests) { @@ -308,108 +308,103 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) { return false; } -bool cluster_client::get_key_for_conn(unsigned int conn_id, int iter, unsigned long long* key_index) { - // first check if we already have key in pool +get_key_response cluster_client::get_key_for_conn(unsigned int command_index, unsigned int conn_id, unsigned long long* key_index) { + // first check if we already have a key in the pool if (!m_key_index_pools[conn_id]->empty()) { *key_index = m_key_index_pools[conn_id]->front(); m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index); m_key_index_pools[conn_id]->pop(); - return true; + return available_for_conn; } - // keep generate key till it match for this connection, or requests reached - while (true) { - // generate key - *key_index = m_obj_gen->get_key_index(iter); - m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index); + // generate key + client::get_key_for_conn(command_index, conn_id, key_index); - unsigned int hslot = calc_hslot_crc16_cluster(m_key_buffer, m_key_len); + unsigned int hslot = calc_hslot_crc16_cluster(m_key_buffer, m_key_len); - // check if the key match for this connection - if (m_slot_to_shard[hslot] == conn_id) { - m_reqs_generated++; - return true; - } + // check if the key match for this connection + if (m_slot_to_shard[hslot] == conn_id) { + benchmark_debug_log("%s generated key=[%.*s] for itself\n", m_connections[conn_id]->get_readable_id(), m_key_len, m_key_buffer); + return available_for_conn; + } - // handle key for other connection - unsigned int other_conn_id = m_slot_to_shard[hslot]; + // handle key for other connection + unsigned int other_conn_id = m_slot_to_shard[hslot]; - // in case we generated key for connection that is disconnected, 'slot to shard' map may need to be updated - if (m_connections[other_conn_id]->get_connection_state() == conn_disconnected) { - m_connections[conn_id]->set_cluster_slots(); - return false; - } + // in case we generated key for connection that is disconnected, 'slot to shard' map may need to be updated + if (m_connections[other_conn_id]->get_connection_state() == conn_disconnected) { + m_connections[conn_id]->set_cluster_slots(); + return not_available; + } - // in case connection is during cluster slots command, his slots mapping not relevant - if (m_connections[other_conn_id]->get_cluster_slots_state() != setup_done) - continue; + // in case connection is during cluster slots command, his slots mapping not relevant + if (m_connections[other_conn_id]->get_cluster_slots_state() != setup_done) + return not_available; - // store key for other connection, if queue is not full - key_index_pool* key_idx_pool = m_key_index_pools[other_conn_id]; - if (key_idx_pool->size() < KEY_INDEX_QUEUE_MAX_SIZE) { - key_idx_pool->push(*key_index); - m_reqs_generated++; - } + key_index_pool* key_idx_pool = m_key_index_pools[other_conn_id]; + if (key_idx_pool->size() >= KEY_INDEX_QUEUE_MAX_SIZE) + return not_available; - // don't exceed requests - if (m_config->requests > 0 && m_reqs_generated >= m_config->requests) - return false; - } + // store command and key for the other connection + benchmark_debug_log("%s generated key=[%.*s] for %s\n", m_connections[conn_id]->get_readable_id(), m_key_len, m_key_buffer, m_connections[other_conn_id]->get_readable_id()); + + key_idx_pool->push(command_index); + key_idx_pool->push(*key_index); + return available_for_other_conn; } -// This function could use some urgent TLC -- but we need to do it without altering the behavior -void cluster_client::create_request(struct timeval timestamp, unsigned int conn_id) -{ - // If the Set:Wait ratio is not 0, start off with WAITs - if (m_config->wait_ratio.b && - (m_tot_wait_ops == 0 || - (m_tot_set_ops/m_tot_wait_ops > m_config->wait_ratio.a/m_config->wait_ratio.b))) { +bool cluster_client::create_arbitrary_request(unsigned int command_index, struct timeval& timestamp, unsigned int conn_id) { + /* In arbitrary request, where we send the command arg by arg, we need to check for a key command, + * if the generated key belongs to this connection before starting to send it */ + assert(m_key_index_pools[conn_id]->empty()); - m_tot_wait_ops++; + /* keyless command can be used by any connection */ + if (get_arbitrary_command(command_index).keys_count == 0) { + client::create_arbitrary_request(command_index, timestamp, conn_id); + return true; + } - unsigned int num_slaves = m_obj_gen->random_range(m_config->num_slaves.min, m_config->num_slaves.max); - unsigned int timeout = m_obj_gen->normal_distribution(m_config->wait_timeout.min, - m_config->wait_timeout.max, 0, - ((m_config->wait_timeout.max - m_config->wait_timeout.min)/2.0) + m_config->wait_timeout.min); + unsigned long long key_index; + get_key_response res = get_key_for_conn(command_index, conn_id, &key_index); - m_connections[conn_id]->send_wait_command(×tamp, num_slaves, timeout); - m_reqs_generated++; - } - // are we set or get? this depends on the ratio - else if (m_set_ratio_count < m_config->ratio.a) { - // set command - unsigned long long key_index; + if (res == not_available) + return false; - // get key - if (!get_key_for_conn(conn_id, obj_iter_type(m_config, 0), &key_index)) { - return; - } + /* If we generated a key for a different connection, we will use it later */ + if (res == available_for_other_conn) + return true; - // get value - unsigned int value_len; - const char *value = m_obj_gen->get_value(key_index, &value_len); - - m_connections[conn_id]->send_set_command(×tamp, m_key_buffer, m_key_len, - value, value_len, m_obj_gen->get_expiry(), - m_config->data_offset); - m_set_ratio_count++; - m_tot_set_ops++; - } else if (m_get_ratio_count < m_config->ratio.b) { - // get command - unsigned long long key_index; - - // get key - if (!get_key_for_conn(conn_id, obj_iter_type(m_config, 2), &key_index)) { - return; - } + /* We got a key for this connection, put it back into the pool and + * use it inside client::create_arbitrary_request() */ + m_key_index_pools[conn_id]->push(key_index); + client::create_arbitrary_request(command_index, timestamp, conn_id); - m_connections[conn_id]->send_get_command(×tamp, m_key_buffer, m_key_len, m_config->data_offset); - m_get_ratio_count++; - } else { - // overlap counters - m_get_ratio_count = m_set_ratio_count = 0; + return true; +} + +void cluster_client::create_request(struct timeval timestamp, unsigned int conn_id) { + /* If pool is empty continue with base class */ + if (m_key_index_pools[conn_id]->empty()) { + client::create_request(timestamp, conn_id); + return; } + + unsigned int pool_size = m_key_index_pools[conn_id]->size(); + unsigned int command_index = m_key_index_pools[conn_id]->front(); + m_key_index_pools[conn_id]->pop(); + + if (m_config->arbitrary_commands->is_defined()) + client::create_arbitrary_request(command_index, timestamp, conn_id); + else if (command_index == SET_CMD_IDX) + create_set_request(timestamp, conn_id); + else if (command_index == GET_CMD_IDX) + create_get_request(timestamp, conn_id); + else + assert("Unexpected command index"); + + /* Make sure we used pair of command and key index */ + assert(m_key_index_pools[conn_id]->size() == pool_size - 2); } // In case of -MOVED response, we sends CLUSTER SLOTS command to get the new topology @@ -424,6 +419,12 @@ void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp m_stats.update_moved_set_op(×tamp, request->m_size + response->get_total_len(), ts_diff(request->m_sent_time, timestamp)); + } else if (request->m_type == rt_arbitrary) { + arbitrary_request *ar = static_cast(request); + m_stats.update_moved_arbitrary_op(×tamp, + request->m_size + response->get_total_len(), + ts_diff(request->m_sent_time, timestamp), + ar->index); } else { assert(0); } @@ -452,6 +453,12 @@ void cluster_client::handle_ask(unsigned int conn_id, struct timeval timestamp, m_stats.update_ask_set_op(×tamp, request->m_size + response->get_total_len(), ts_diff(request->m_sent_time, timestamp)); + } else if (request->m_type == rt_arbitrary) { + arbitrary_request *ar = static_cast(request); + m_stats.update_ask_arbitrary_op(×tamp, + request->m_size + response->get_total_len(), + ts_diff(request->m_sent_time, timestamp), + ar->index); } else { assert(0); } diff --git a/cluster_client.h b/cluster_client.h index 493ba395..c792f67b 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -32,15 +32,11 @@ class cluster_client : public client { std::vector m_key_index_pools; unsigned int m_slot_to_shard[16384]; - char m_key_buffer[250]; - int m_key_len; - virtual int connect(void); virtual void disconnect(void); shard_connection* create_shard_connection(abstract_protocol* abs_protocol); bool connect_shard_connection(shard_connection* sc, char* address, char* port); - bool get_key_for_conn(unsigned int conn_id, int iter, unsigned long long* key_index); void handle_moved(unsigned int conn_id, struct timeval timestamp, request *request, protocol_response *response); void handle_ask(unsigned int conn_id, struct timeval timestamp, @@ -50,6 +46,9 @@ class cluster_client : public client { cluster_client(client_group* group); virtual ~cluster_client(); + virtual get_key_response get_key_for_conn(unsigned int command_index, unsigned int conn_id, unsigned long long* key_index); + virtual bool create_arbitrary_request(unsigned int command_index, struct timeval& timestamp, unsigned int conn_id); + // client manager api's virtual void handle_cluster_slots(protocol_response *r); virtual void create_request(struct timeval timestamp, unsigned int conn_id); diff --git a/config_types.cpp b/config_types.cpp index 3e9f9285..60ce4c90 100644 --- a/config_types.cpp +++ b/config_types.cpp @@ -308,7 +308,7 @@ static int hex_digit_to_int(char c) { } } -arbitrary_command::arbitrary_command(const char* cmd) : command(cmd), key_pattern('R'), ratio(1) { +arbitrary_command::arbitrary_command(const char* cmd) : command(cmd), key_pattern('R'), keys_count(0), ratio(1) { // command name is the first word in the command size_t pos = command.find(" "); if (pos == std::string::npos) { diff --git a/config_types.h b/config_types.h index 787e0428..323d6a7c 100644 --- a/config_types.h +++ b/config_types.h @@ -130,6 +130,7 @@ struct arbitrary_command { std::string command; std::string command_name; char key_pattern; + unsigned int keys_count; unsigned int ratio; }; @@ -163,23 +164,6 @@ struct arbitrary_command_list { return !commands_list.empty(); } - const arbitrary_command* get_next_executed_command(unsigned int& ratio_count, unsigned int& executed_command_index) const { - while(true) { - const arbitrary_command* executed_command = &commands_list[executed_command_index]; - - if (ratio_count < executed_command->ratio) { - ratio_count++; - return executed_command; - } else { - ratio_count = 0; - executed_command_index++; - if (executed_command_index == size()) { - executed_command_index = 0; - } - } - } - } - unsigned int get_max_command_name_length() const { unsigned int max_length = 0; diff --git a/memtier_benchmark.cpp b/memtier_benchmark.cpp index 0ef1f360..22ad2d20 100755 --- a/memtier_benchmark.cpp +++ b/memtier_benchmark.cpp @@ -328,9 +328,6 @@ static bool verify_cluster_option(struct benchmark_config *cfg) { } else if (cfg->unix_socket) { fprintf(stderr, "error: cluster mode dose not support unix-socket option.\n"); return false; - } else if (cfg->arbitrary_commands->is_defined()) { - fprintf(stderr, "error: cluster mode dose not support arbitrary command option.\n"); - return false; } return true; @@ -1293,6 +1290,11 @@ int main(int argc, char *argv[]) exit(1); } + // Cluster mode supports only a single key commands + if (cfg.cluster_mode && cfg.arbitrary_commands->at(i).keys_count > 1) { + benchmark_error_log("error: Cluster mode supports only a single key commands\n"); + exit(1); + } delete tmp_protocol; } diff --git a/protocol.cpp b/protocol.cpp index 7a1575ec..5bb14dcd 100644 --- a/protocol.cpp +++ b/protocol.cpp @@ -716,7 +716,7 @@ bool redis_protocol::format_arbitrary_command(arbitrary_command &cmd) { benchmark_error_log("error: key placeholder can't combined with other data\n"); return false; } - + cmd.keys_count++; current_arg->type = key_type; } else if (current_arg->data.find(DATA_PLACEHOLDER) != std::string::npos) { if (current_arg->data.length() != strlen(DATA_PLACEHOLDER)) { diff --git a/run_stats.cpp b/run_stats.cpp index a848a84e..dc09540e 100644 --- a/run_stats.cpp +++ b/run_stats.cpp @@ -211,6 +211,17 @@ void run_stats::update_moved_set_op(struct timeval* ts, unsigned int bytes, unsi hdr_record_value(inst_m_set_latency_histogram,latency); } +void run_stats::update_moved_arbitrary_op(struct timeval *ts, unsigned int bytes, + unsigned int latency, size_t request_index) { + roll_cur_stats(ts); + + m_cur_stats.m_ar_commands.at(request_index).update_moved_op(bytes, latency); + m_totals.update_op(bytes, latency); + + struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index); + hdr_record_value(hist,latency); +} + void run_stats::update_ask_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency) { roll_cur_stats(ts); @@ -231,6 +242,17 @@ void run_stats::update_ask_set_op(struct timeval* ts, unsigned int bytes, unsign hdr_record_value(inst_m_set_latency_histogram,latency); } +void run_stats::update_ask_arbitrary_op(struct timeval *ts, unsigned int bytes, + unsigned int latency, size_t request_index) { + roll_cur_stats(ts); + + m_cur_stats.m_ar_commands.at(request_index).update_ask_op(bytes, latency); + m_totals.update_op(bytes, latency); + + struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index); + hdr_record_value(hist,latency); +} + void run_stats::update_wait_op(struct timeval *ts, unsigned int latency) { roll_cur_stats(ts); @@ -996,11 +1018,18 @@ void run_stats::print_moved_sec_column(output_table &table) { column.elements.push_back(*el.init_str("%12s ", "MOVED/sec")); column.elements.push_back(*el.init_str("%s", "-------------")); - column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_moved_sec)); - column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_moved_sec)); - column.elements.push_back(*el.init_str("%12s ", "---")); - column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_moved_sec)); + if (print_arbitrary_commands_results()) { + for (unsigned int i=0; itype == key_type) { - benchmark_debug_log("key: value[%.*s]\n", val_len, val); + benchmark_debug_log("key=[%.*s]\n", val_len, val); } else { - benchmark_debug_log("data: value_len=%u\n", val_len); + benchmark_debug_log("value_len=%u\n", val_len); } cmd_size = m_protocol->write_arbitrary_command(val, val_len); diff --git a/tests/run_tests.sh b/tests/run_tests.sh index 210daf00..58f12b0d 100755 --- a/tests/run_tests.sh +++ b/tests/run_tests.sh @@ -62,6 +62,7 @@ run_tests() { OSS_STANDALONE=${OSS_STANDALONE:-1} OSS_CLUSTER=${OSS_CLUSTER:-0} SHARDS=${SHARDS:-3} +TEST=${TEST:-""} TLS_KEY=$ROOT/tests/tls/redis.key TLS_CERT=$ROOT/tests/tls/redis.crt @@ -70,6 +71,7 @@ REDIS_SERVER=${REDIS_SERVER:-redis-server} MEMTIER_BINARY=$ROOT/memtier_benchmark RLTEST_ARGS=" --oss-redis-path $REDIS_SERVER" +[[ "$TEST" != "" ]] && RLTEST_ARGS+=" --test $TEST" [[ $VERBOSE == 1 ]] && RLTEST_ARGS+=" -v" [[ $TLS == 1 ]] && RLTEST_ARGS+=" --tls-cert-file $TLS_CERT --tls-key-file $TLS_KEY --tls-ca-cert-file $TLS_CACERT --tls" diff --git a/tests/tests_oss_simple_flow.py b/tests/tests_oss_simple_flow.py index c229e7df..30541869 100644 --- a/tests/tests_oss_simple_flow.py +++ b/tests/tests_oss_simple_flow.py @@ -234,7 +234,6 @@ def test_default_set_get_1_1(env): # run each test on different env def test_default_set_get_3_runs(env): - env.skipOnCluster() run_count = 3 benchmark_specs = {"name": env.testName, "args": ['--run-count={}'.format(run_count)]} addTLSArgs(benchmark_specs, env) @@ -262,7 +261,6 @@ def test_default_set_get_3_runs(env): def test_default_arbitrary_command_pubsub(env): - env.skipOnCluster() benchmark_specs = {"name": env.testName, "args": ['--command=publish \"__key__\" \"__data__\"']} addTLSArgs(benchmark_specs, env) config = get_default_memtier_config() @@ -282,8 +280,27 @@ def test_default_arbitrary_command_pubsub(env): debugPrintMemtierOnError(config, env) +def test_default_arbitrary_command_keyless(env): + benchmark_specs = {"name": env.testName, "args": ['--command=PING']} + addTLSArgs(benchmark_specs, env) + config = get_default_memtier_config() + master_nodes_list = env.getMasterNodesList() + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + # Create a temporary directory + test_dir = tempfile.mkdtemp() + + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + if not benchmark.run(): + debugPrintMemtierOnError(config, env) + + def test_default_arbitrary_command_set(env): - env.skipOnCluster() benchmark_specs = {"name": env.testName, "args": ['--command=SET __key__ __data__']} addTLSArgs(benchmark_specs, env) config = get_default_memtier_config() @@ -310,7 +327,6 @@ def test_default_arbitrary_command_set(env): def test_default_arbitrary_command_hset(env): - env.skipOnCluster() benchmark_specs = {"name": env.testName, "args": ['--command=HSET __key__ field1 __data__']} addTLSArgs(benchmark_specs, env) config = get_default_memtier_config() @@ -335,3 +351,30 @@ def test_default_arbitrary_command_hset(env): overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats) assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, overall_request_count) + +def test_default_arbitrary_command_hset_multi_data_placeholders(env): + benchmark_specs = {"name": env.testName, "args": ['--command=HSET __key__ field1 __data__ field2 __data__ field3 __data__']} + addTLSArgs(benchmark_specs, env) + config = get_default_memtier_config() + master_nodes_list = env.getMasterNodesList() + overall_expected_request_count = get_expected_request_count(config) + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + # Create a temporary directory + test_dir = tempfile.mkdtemp() + + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + # benchmark.run() returns True if the return code of memtier_benchmark was 0 + memtier_ok = benchmark.run() + debugPrintMemtierOnError(config, env) + + master_nodes_connections = env.getOSSMasterNodesConnectionList() + merged_command_stats = {'cmdstat_hset': {'calls': 0}} + overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats) + assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, + overall_request_count)