From 822599122267743c3c1eae8d9d48496a6dd8ff3f Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Tue, 12 Apr 2022 12:06:24 +0800 Subject: [PATCH 01/11] set schema version for cop request (#85) (#91) Signed-off-by: lidezhu --- src/coprocessor/Client.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/coprocessor/Client.cc b/src/coprocessor/Client.cc index 95cc5e66..97603b96 100644 --- a/src/coprocessor/Client.cc +++ b/src/coprocessor/Client.cc @@ -51,6 +51,7 @@ std::vector ResponseIter::handle_task_impl(kv::Backoffer & bo, const co auto req = std::make_shared<::coprocessor::Request>(); req->set_tp(task.req->tp); req->set_start_ts(task.req->start_ts); + req->set_schema_ver(task.req->schema_version); req->set_data(task.req->data); req->set_is_cache_enabled(false); for (auto ts : min_commit_ts_pushed.get_timestamps()) From 5c6bdf5b78b2e6ce57174819def5c1359c71c8db Mon Sep 17 00:00:00 2001 From: root Date: Wed, 13 Apr 2022 20:35:09 +0800 Subject: [PATCH 02/11] [add] ass raw client ref to rust client; [modify] change rpc timeout from second to mill second --- ci/build-test.sh | 10 +- include/pingcap/kv/Backoff.h | 3 + include/pingcap/kv/RawClient.h | 61 ++++ include/pingcap/kv/RegionClient.h | 4 +- include/pingcap/kv/Rpc.h | 5 +- include/pingcap/kv/internal/type_traits.h | 11 + src/CMakeLists.txt | 1 + src/kv/RawClient.cc | 318 ++++++++++++++++++++ src/test/CMakeLists.txt | 1 + src/test/raw_client_test/CMakeLists.txt | 3 + src/test/raw_client_test/test_raw_client.cc | 18 ++ third_party/googletest | 2 +- third_party/kvproto | 2 +- third_party/libfiu | 2 +- 14 files changed, 429 insertions(+), 12 deletions(-) create mode 100644 include/pingcap/kv/RawClient.h create mode 100644 src/kv/RawClient.cc create mode 100644 src/test/raw_client_test/CMakeLists.txt create mode 100644 src/test/raw_client_test/test_raw_client.cc diff --git a/ci/build-test.sh b/ci/build-test.sh index 25db3a9f..644b8ab9 100755 --- a/ci/build-test.sh +++ b/ci/build-test.sh @@ -15,15 +15,15 @@ fi build_dir="$SRCPATH/build" mkdir -p $build_dir && cd $build_dir -cmake "$SRCPATH" \ +cmake3 "$SRCPATH" \ -DENABLE_TESTS=on make -j $NPROC -nohup /mock-tikv/bin/mock-tikv & -mock_kv_pid=$! +# nohup /mock-tikv/bin/mock-tikv & +# mock_kv_pid=$! -cd "$build_dir" && make test +# cd "$build_dir" && make test -kill -9 $mock_kv_pid +# kill -9 $mock_kv_pid diff --git a/include/pingcap/kv/Backoff.h b/include/pingcap/kv/Backoff.h index 6be37b31..06d7e871 100644 --- a/include/pingcap/kv/Backoff.h +++ b/include/pingcap/kv/Backoff.h @@ -90,6 +90,9 @@ constexpr int cleanupMaxBackoff = 20000; constexpr int copBuildTaskMaxBackoff = 5000; constexpr int copNextMaxBackoff = 20000; constexpr int pessimisticLockMaxBackoff = 20000; +constexpr int RawGetMaxBackoff = 20000; +constexpr int RawPutMaxBackoff = 20000; +constexpr int RawDeleteMaxBackoff = 20000; using BackoffPtr = std::shared_ptr; diff --git a/include/pingcap/kv/RawClient.h b/include/pingcap/kv/RawClient.h new file mode 100644 index 00000000..5dbbc3b5 --- /dev/null +++ b/include/pingcap/kv/RawClient.h @@ -0,0 +1,61 @@ +#pragma once + +#include +#include + +#include + +namespace pingcap +{ +namespace kv +{ + +enum ColumnFamily: int8_t { + Default = 0, + Lock, + Write, +}; +constexpr const char* kCfString[3] = {"default", "lock", "write"}; + +// raw client imitate from the rust raw client +//https://docs.rs/tikv-client/latest/tikv_client/struct.RawClient.html +struct RawClient +{ + ClusterPtr cluster_ptr; + bool for_cas; + ColumnFamily cf; + + RawClient(const std::vector & pd_addrs); + RawClient(const std::vector & pd_addrs, bool cas); + RawClient(const std::vector & pd_addrs, const ClusterConfig & config); + RawClient(const std::vector & pd_addrs, const ClusterConfig & config, bool cas); + bool IsCASClient(); + RawClient& AsCASClient(); + RawClient& AsRawClient(); + void SetColumnFamily(ColumnFamily cof); + std::string GetColumnFamily(); + + // without cache method + void Put(const std::string &key, const std::string &value); + void Put(const std::string &key, const std::string &value, uint64_t ttl); + void Put(const std::string &key, const std::string &value, int64_t timeout_ms); + void Put(const std::string &key, const std::string &value, int64_t timeout_ms, uint64_t ttl); + // delete + void Delete(const std::string &key); + void Delete(const std::string &key, int64_t timeout_ms); + uint64_t GetKeyTTL(const std::string &key); + uint64_t GetKeyTTL(const std::string &key, int64_t timeout_ms); + std::optional Get(const std::string &key); + std::optional Get(const std::string &key, int64_t timeout_ms); + std::optional CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap); + std::optional CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap, int64_t timeout_ms); + std::optional CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap, int64_t timeout_ms, uint64_t ttl); + +}; + +} // namespace kv +} // namespace pingcap + diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index 84491d4d..dcd964d4 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -10,7 +10,7 @@ namespace pingcap namespace kv { -constexpr int dailTimeout = 5; +constexpr int dailTimeout = 5000; constexpr int copTimeout = 20; // RegionClient sends KV/Cop requests to tikv server (corresponding to `RegionRequestSender` in go-client). It handles network errors and some region errors internally. @@ -34,7 +34,7 @@ struct RegionClient // This method send a request to region, but is NOT Thread-Safe !! template - auto sendReqToRegion(Backoffer & bo, std::shared_ptr req, int timeout = dailTimeout, StoreType store_type = StoreType::TiKV) + auto sendReqToRegion(Backoffer & bo, std::shared_ptr req, int64_t timeout = dailTimeout, StoreType store_type = StoreType::TiKV) { RpcCall rpc(req); for (;;) diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index acee8318..27030d25 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -56,10 +56,11 @@ class RpcCall std::shared_ptr getResp() { return resp; } - void call(std::shared_ptr client, int timeout) + void call(std::shared_ptr client, int64_t timeout) { grpc::ClientContext context; - context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(timeout)); + // context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(timeout)); + context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(timeout)); auto status = Trait::doRPCCall(&context, client, *req, resp.get()); if (!status.ok()) { diff --git a/include/pingcap/kv/internal/type_traits.h b/include/pingcap/kv/internal/type_traits.h index 8b08771f..a21fb9aa 100644 --- a/include/pingcap/kv/internal/type_traits.h +++ b/include/pingcap/kv/internal/type_traits.h @@ -42,6 +42,17 @@ PINGCAP_DEFINE_TRAITS(kvrpcpb, TxnHeartBeat, KvTxnHeartBeat) PINGCAP_DEFINE_TRAITS(kvrpcpb, CheckSecondaryLocks, KvCheckSecondaryLocks) PINGCAP_DEFINE_TRAITS(coprocessor, , Coprocessor) PINGCAP_DEFINE_TRAITS(mpp, DispatchTask, DispatchMPPTask) +// add raw methods +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawGet, RawGet) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawBatchGet, RawBatchGet) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawPut, RawPut) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawBatchPut, RawBatchPut) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawDelete, RawDelete) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawBatchDelete, RawBatchDelete) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawScan, RawScan) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawDeleteRange, RawDeleteRange) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawGetKeyTTL, RawGetKeyTTL) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawCAS, RawCompareAndSwap) } // namespace kv diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 72156cff..09fbf08a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,6 +12,7 @@ list(APPEND kvClient_sources kv/Scanner.cc) list(APPEND kvClient_sources pd/Client.cc) list(APPEND kvClient_sources coprocessor/Client.cc) list(APPEND kvClient_sources RedactHelpers.cc) +list(APPEND kvClient_sources kv/RawClient.cc) set(kvClient_INCLUDE_DIR ${kvClient_SOURCE_DIR}/include) diff --git a/src/kv/RawClient.cc b/src/kv/RawClient.cc new file mode 100644 index 00000000..29a79415 --- /dev/null +++ b/src/kv/RawClient.cc @@ -0,0 +1,318 @@ +#include +#include +#include +#include + +namespace pingcap { + +namespace kv { + +RawClient::RawClient(const std::vector & pd_addrs) + : for_cas(false), cf(Default) { + cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); +} + +RawClient::RawClient(const std::vector & pd_addrs, bool cas) + : for_cas(cas), cf(Default) { + cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); +} + +RawClient::RawClient(const std::vector & pd_addrs, const ClusterConfig & config) + : for_cas(false), cf(Default) { + cluster_ptr = std::make_unique(pd_addrs, config); +} + +RawClient::RawClient(const std::vector & pd_addrs, const ClusterConfig & config, bool cas) + : for_cas(cas), cf(Default) { + cluster_ptr = std::make_unique(pd_addrs, config); +} + +bool RawClient::IsCASClient() { + return for_cas; +} + +RawClient& RawClient::AsCASClient() { + for_cas = true; + return *this; +} + +RawClient& RawClient::AsRawClient() { + for_cas = false; + return *this; +} + +void RawClient::SetColumnFamily(ColumnFamily cof) { + cf = cof; +} + +std::string RawClient::GetColumnFamily() { + return std::string(kCfString[cf]); +} + +void RawClient::Put(const std::string &key, const std::string &value) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawPutRequest()); + req->set_key(key); + req->set_value(value); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +void RawClient::Put(const std::string &key, const std::string &value, uint64_t ttl) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawPutRequest()); + req->set_key(key); + req->set_value(value); + req->set_ttl(ttl); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + + +void RawClient::Put(const std::string &key, const std::string &value, int64_t to_ms) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawPutRequest()); + req->set_key(key); + req->set_value(value); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +void RawClient::Put(const std::string &key, const std::string &value, int64_t to_ms, uint64_t ttl) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawPutRequest()); + req->set_key(key); + req->set_value(value); + req->set_ttl(ttl); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +void RawClient::Delete(const std::string &key) { + Backoffer bo(RawDeleteMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawDeleteRequest()); + req->set_key(key); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +void RawClient::Delete(const std::string &key, int64_t to_ms) { + Backoffer bo(RawDeleteMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawDeleteRequest()); + req->set_key(key); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +uint64_t RawClient::GetKeyTTL(const std::string &key) { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetKeyTTLRequest()); + req->set_key(key); + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + return resp->ttl(); +} + +uint64_t RawClient::GetKeyTTL(const std::string &key, int64_t to_ms) { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetKeyTTLRequest()); + req->set_key(key); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + return resp->ttl(); +} + +std::optional RawClient::Get(const std::string &key) { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); + req->set_key(key); + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->not_found()) { + return std::nullopt; + } + return resp->value(); +} + +std::optional RawClient::Get(const std::string &key, int64_t to_ms) { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); + req->set_key(key); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->not_found()) { + return std::nullopt; + } + return resp->value(); +} + +std::optional RawClient::CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawCASRequest()); + req->set_key(key); + req->set_value(new_value); + if(old_value.has_value()) { + req->set_previous_not_exist(false); + req->set_previous_value(old_value.value()); + } else { + req->set_previous_not_exist(true); + req->set_previous_value(""); + } + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->previous_not_exist()) { + is_swap = false; + return std::nullopt; + } + is_swap = true; + return resp->previous_value(); +} + +std::optional RawClient::CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap, int64_t to_ms) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawCASRequest()); + req->set_key(key); + req->set_value(new_value); + if(old_value.has_value()) { + req->set_previous_not_exist(false); + req->set_previous_value(old_value.value()); + } else { + req->set_previous_not_exist(true); + req->set_previous_value(""); + } + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->previous_not_exist()) { + is_swap = false; + return std::nullopt; + } + is_swap = true; + return resp->previous_value(); +} + +std::optional RawClient::CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap, int64_t to_ms, uint64_t ttl) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawCASRequest()); + req->set_key(key); + req->set_value(new_value); + req->set_ttl(ttl); + if(old_value.has_value()) { + req->set_previous_not_exist(false); + req->set_previous_value(old_value.value()); + } else { + req->set_previous_not_exist(true); + req->set_previous_value(""); + } + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->previous_not_exist()) { + is_swap = false; + return std::nullopt; + } + is_swap = true; + return resp->previous_value(); +} + +}//namespace kv +}//namespace pincap + diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 9feeb8e5..d96dc664 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -18,3 +18,4 @@ add_test(kv_client_test kv_client_ut) add_subdirectory(bank_test) add_subdirectory(real_tikv_test) +add_subdirectory(raw_client_test) diff --git a/src/test/raw_client_test/CMakeLists.txt b/src/test/raw_client_test/CMakeLists.txt new file mode 100644 index 00000000..c85c9277 --- /dev/null +++ b/src/test/raw_client_test/CMakeLists.txt @@ -0,0 +1,3 @@ +add_executable(raw_client test_raw_client.cc) +target_include_directories(raw_client PUBLIC ${test_includes}) +target_link_libraries(raw_client ${test_libs}) \ No newline at end of file diff --git a/src/test/raw_client_test/test_raw_client.cc b/src/test/raw_client_test/test_raw_client.cc new file mode 100644 index 00000000..e4dcf975 --- /dev/null +++ b/src/test/raw_client_test/test_raw_client.cc @@ -0,0 +1,18 @@ +#include +#include + +using namespace pingcap; +using namespace pingcap::kv; + +int main() { + std::vector pd_addrs{"127.0.0.1:2379"}; + RawClient client(pd_addrs); + for(int i = 0; i < 100; i++) { + client.Put("key" + std::to_string(i), "value" + std::to_string(i)); + } + for(int i = 0; i < 100; i++) { + auto value = client.Get("key" + std::to_string(i)); + std::cout << "value is : " << value.value_or("null") << std::endl; + } + return 0; +} \ No newline at end of file diff --git a/third_party/googletest b/third_party/googletest index e08a4602..a1cc8c55 160000 --- a/third_party/googletest +++ b/third_party/googletest @@ -1 +1 @@ -Subproject commit e08a4602778b3cbea36dbd53724db0f18840e274 +Subproject commit a1cc8c55195661a58ad60c3bb062a0b9c302710d diff --git a/third_party/kvproto b/third_party/kvproto index fc36d786..f7a7c8cc 160000 --- a/third_party/kvproto +++ b/third_party/kvproto @@ -1 +1 @@ -Subproject commit fc36d7869035ffd96810efdb9c1f053c6081a773 +Subproject commit f7a7c8ccda74ae17acdde7b47cf829ac6f5c2a52 diff --git a/third_party/libfiu b/third_party/libfiu index 20ea5e85..4906c58c 160000 --- a/third_party/libfiu +++ b/third_party/libfiu @@ -1 +1 @@ -Subproject commit 20ea5e85ec63e3dedd6904e103fb2e56b46840cf +Subproject commit 4906c58ccdbbac4ac1d867ab1e3ee606993139b2 From aba83546cd77d5b33330f5a53bf42d4922f07319 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 21 Apr 2022 10:47:27 +0800 Subject: [PATCH 03/11] [add] add some unit-test; [fixed] fixed timeout expection issue; [todo] ttl setting --- include/pingcap/kv/RegionClient.h | 2 +- src/test/raw_client_test/test_raw_client.cc | 65 ++++++++++++++++++--- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index dcd964d4..1b049f0f 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -10,7 +10,7 @@ namespace pingcap namespace kv { -constexpr int dailTimeout = 5000; +constexpr int dailTimeout = 10000; constexpr int copTimeout = 20; // RegionClient sends KV/Cop requests to tikv server (corresponding to `RegionRequestSender` in go-client). It handles network errors and some region errors internally. diff --git a/src/test/raw_client_test/test_raw_client.cc b/src/test/raw_client_test/test_raw_client.cc index e4dcf975..0f31bdfb 100644 --- a/src/test/raw_client_test/test_raw_client.cc +++ b/src/test/raw_client_test/test_raw_client.cc @@ -1,18 +1,69 @@ #include #include +#include using namespace pingcap; using namespace pingcap::kv; -int main() { - std::vector pd_addrs{"127.0.0.1:2379"}; - RawClient client(pd_addrs); - for(int i = 0; i < 100; i++) { - client.Put("key" + std::to_string(i), "value" + std::to_string(i)); +void TestPutAndGet(std::shared_ptr client, const int start) { + for(int i = start; i < start + 10; i++) { + client->Put("key" + std::to_string(i), "value" + std::to_string(i)); + } + for(int i = start; i < start + 10; i++) { + auto value = client->Get("key" + std::to_string(i)); + std::cout << "value is : " << value.value_or("null") << std::endl; + } +} + +// if with ttl rocksdb should open ttl function +void TestPutAndGetWithTTL(std::shared_ptr client, const int start, uint64_t ms) { + for(int i = start; i < start + 10; i++) { + client->Put("key" + std::to_string(i), "value" + std::to_string(i), ms); } - for(int i = 0; i < 100; i++) { - auto value = client.Get("key" + std::to_string(i)); + + for(int i = start; i < start + 10; i++) { + auto value = client->Get("key" + std::to_string(i)); std::cout << "value is : " << value.value_or("null") << std::endl; } + + for(int i = start; i < start + 10; i++) { + auto value = client->GetKeyTTL("key" + std::to_string(i)); + std::cout << "value TTL is : " << value << std::endl; + } +} + +void TestDeleteValues(std::shared_ptr client, const int start) { + for(int i = start; i < start + 10; i++) { + client->Delete("key" + std::to_string(i)); + } + + for(int i = start; i < start + 10; i++) { + auto value = client->Get("key" + std::to_string(i)); + std::cout << "value is : " << value.value_or("deleted") << std::endl; + } +} + +void TestCompareAndSwap(std::shared_ptr client, const int start) { + for(int i = start; i < start + 10; i++) { + bool s; + auto v = client->CompareAndSwap("key" + std::to_string(i), "value" + std::to_string(i), + "value" + std::to_string(i + 10), s); + std::cout << "old value: " << v.value_or("null") << std::endl; + } + + for(int i = start; i < start + 10; i++) { + auto value = client->Get("key" + std::to_string(i)); + std::cout << "value is : " << value.value_or("deleted") << std::endl; + } +} + + +int main() { + std::vector pd_addrs{"127.0.0.1:2379"}; + std::shared_ptr client = std::shared_ptr(new RawClient(pd_addrs)); + TestPutAndGet(client, 0); + // with TTL + TestCompareAndSwap(client, 0); + TestDeleteValues(client, 0); return 0; } \ No newline at end of file From 9b03f9907d78b03c06719edf062f754a4c829b74 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 Apr 2022 17:52:04 +0800 Subject: [PATCH 04/11] [add] add perf test --- src/test/raw_client_test/CMakeLists.txt | 6 +- src/test/raw_client_test/stress.cc | 279 ++++++++++++++++++++ src/test/raw_client_test/test_raw_client.cc | 37 ++- 3 files changed, 315 insertions(+), 7 deletions(-) create mode 100644 src/test/raw_client_test/stress.cc diff --git a/src/test/raw_client_test/CMakeLists.txt b/src/test/raw_client_test/CMakeLists.txt index c85c9277..4068e20c 100644 --- a/src/test/raw_client_test/CMakeLists.txt +++ b/src/test/raw_client_test/CMakeLists.txt @@ -1,3 +1,7 @@ add_executable(raw_client test_raw_client.cc) target_include_directories(raw_client PUBLIC ${test_includes}) -target_link_libraries(raw_client ${test_libs}) \ No newline at end of file +target_link_libraries(raw_client ${test_libs}) + +add_executable(stress stress.cc) +target_include_directories(stress PUBLIC ${test_includes}) +target_link_libraries(stress ${test_libs}) \ No newline at end of file diff --git a/src/test/raw_client_test/stress.cc b/src/test/raw_client_test/stress.cc new file mode 100644 index 00000000..36c0a65d --- /dev/null +++ b/src/test/raw_client_test/stress.cc @@ -0,0 +1,279 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace pingcap; +using namespace pingcap::kv; + +class TimerCounter { + std::chrono::time_point start_; + std::chrono::time_point end_; + +public: + void Start() { start_ = std::chrono::high_resolution_clock::now(); } + void Stop() { end_ = std::chrono::high_resolution_clock::now(); } + void PrintTime(const std::string &msg, int64_t base) { + std::cout + << msg << " Run time " + << base * 1000 / std::chrono::duration(end_ - start_).count() + << "QPS" << std::endl; + } +}; + +// void WriteMapToFile(const std::string &file_path, +// std::unordered_map &value) { +// std::ofstream o_file(file_path); +// for (auto &v : value) { +// o_file << v.first << " : " << v.second << std::endl; +// } +// } + +// bool GetVertexSetFromFile(const std::string &dataPath, +// std::vector &vid_vec) { +// std::cout << "********** read vertex from file**********" << std::endl; +// std::cout << std::endl; +// io::CSVReader<1> vertexData(dataPath); +// vertexData.read_header(io::ignore_extra_column, "id"); +// uint64_t vid = 0; +// while (vertexData.read_row(vid)) { +// vid_vec.push_back(vid); +// } +// std::cout << "********** read vertex from file done lines: **********" +// << vid_vec.size() << std::endl; +// std::cout << std::endl; +// return true; +// } + +// void GetOneHopVertex(DB *db, const std::vector &vid_vec, size_t start, +// size_t end) { +// for (size_t t = start; t < end; t++) { +// VertexKey vid(0, vid_vec[t], 0); +// auto bytes_v = vid.ToEdgePrefixBytes(); +// auto iter = db->NewIterator(ReadOptions()); +// Slice pref(bytes_v.Data(), bytes_v.Size()); +// size_t k = 0; +// for (iter->Seek(pref); iter->Valid() && iter->key().starts_with(pref); +// iter->Next()) { +// k++; +// } +// delete iter; +// #if (STATISTICS == 1) +// statistics_map[vid_vec[t]] = k; +// #endif +// // std::cout << "one hop friends: " << k << std::endl; +// } +// } + +// bool MultiAssignJobs( +// std::function &, size_t, size_t)> +// fun, +// DB *db, size_t workers, std::vector vid_vec) { +// size_t jobs = vid_vec.size(); +// size_t per_job = (jobs + workers - 1) / workers; +// std::vector pool; +// pool.reserve(workers); +// for (size_t w = 0; w < workers; w++) { +// pool.emplace_back(fun, db, std::ref(vid_vec), w * per_job, +// (std::min((w + 1) * per_job, jobs))); +// } +// for (size_t i = 0; i < workers; i++) { +// pool[i].join(); +// } + +// return true; +// } + +// void GetVertexQuery(DB *db, const std::vector &vid_vec, size_t start, +// size_t end) { +// for (size_t t = start; t < end; t++) { +// VertexKey vid(0, vid_vec[t], 0); +// auto bytes_v = vid.AsBytes(); +// auto iter = db->NewIterator(ReadOptions()); +// Slice pref(bytes_v.Data(), bytes_v.Size()); +// size_t k = 0; +// std::vector hop_vid; +// for (iter->Seek(pref); iter->Valid() && iter->key().starts_with(pref); +// iter->Next()) { +// k++; +// hop_vid.push_back( +// GetSecondVertexIdFromEdgeKey(iter->key().data(), iter->key().size())); +// } +// delete iter; +// int cpu_num; +// cpu_num = sysconf(_SC_NPROCESSORS_CONF); +// auto ret = MultiAssignJobs(GetOneHopVertex, db, cpu_num, hop_vid); +// CHECKTRUEANDTHROW(ret == false, "Get two hop vertex error") +// } +// } + +// void GetTwoHopVertex(DB *db, const std::vector &vid_vec, size_t start, +// size_t end) { +// for (size_t t = start; t < end; t++) { +// VertexKey vid(0, vid_vec[t], 0); +// auto bytes_v = vid.ToEdgePrefixBytes(); +// auto iter = db->NewIterator(ReadOptions()); +// Slice pref(bytes_v.Data(), bytes_v.Size()); +// size_t k = 0; +// std::vector hop_vid; +// for (iter->Seek(pref); iter->Valid() && iter->key().starts_with(pref); +// iter->Next()) { +// k++; +// hop_vid.push_back( +// GetSecondVertexIdFromEdgeKey(iter->key().data(), iter->key().size())); +// } +// delete iter; +// int cpu_num; +// cpu_num = sysconf(_SC_NPROCESSORS_CONF); +// auto ret = MultiAssignJobs(GetOneHopVertex, db, cpu_num, hop_vid); +// CHECKTRUEANDTHROW(ret == false, "Get two hop vertex error") +// } +// } + +// bool OneHopQueryPara(const std::string &oneHopPath, DB *db, size_t workers) { +// std::cout << "********** one hop query, ues workers: **********" << workers +// << std::endl; +// std::cout << std::endl; + +// std::vector vid_vec; +// vid_vec.reserve(1010); +// bool get_data = GetVertexSetFromFile(oneHopPath, vid_vec); +// CHECKTRUEANDTHROW(get_data == false, "Get data error") +// bool ok = MultiAssignJobs(GetOneHopVertex, db, workers, vid_vec); +// CHECKTRUEANDTHROW(ok == false, "One hop query error") +// return true; +// } + +// bool TwoHopQueryPara(const std::string &twoHopPath, DB *db, size_t workers) { +// std::cout << "********** Two hop query, use workers: " << workers +// << std::endl; +// std::cout << std::endl; +// std::vector vid_vec; +// vid_vec.reserve(1010); +// bool get_data = GetVertexSetFromFile(twoHopPath, vid_vec); +// CHECKTRUEANDTHROW(get_data == false, "Get data error") +// bool ok = MultiAssignJobs(GetTwoHopVertex, db, workers, vid_vec); +// CHECKTRUEANDTHROW(ok == false, "Two hop query error") +// return true; +// } + +// bool OneHopQuerySeq(const std::string &oneHopPath, DB *db) { +// std::cout << "********** one hop query seq, ues workers: **********" +// << std::endl; +// std::cout << std::endl; + +// std::vector vid_vec; +// vid_vec.reserve(1010); +// bool get_data = GetVertexSetFromFile(oneHopPath, vid_vec); +// CHECKTRUEANDTHROW(get_data == false, "Get data error") +// GetOneHopVertex(db, vid_vec, 0, vid_vec.size()); +// return true; +// } + +// bool TwoHopQuerySeq(const std::string &twoHopPath, DB *db) { +// std::cout << "********** Two hop query seq, use workers: " << std::endl; +// std::cout << std::endl; +// std::vector vid_vec; +// vid_vec.reserve(1010); +// bool get_data = GetVertexSetFromFile(twoHopPath, vid_vec); +// CHECKTRUEANDTHROW(get_data == false, "Get data error") + +// for (auto &id : vid_vec) { +// VertexKey vid(0, id, 0); +// auto bytes_v = vid.ToEdgePrefixBytes(); +// auto iter = db->NewIterator(ReadOptions()); +// Slice pref(bytes_v.Data(), bytes_v.Size()); +// size_t k = 0; +// std::vector hop_vid; +// for (iter->Seek(pref); iter->Valid() && iter->key().starts_with(pref); +// iter->Next()) { +// k++; +// hop_vid.push_back( +// GetSecondVertexIdFromEdgeKey(iter->key().data(), iter->key().size())); +// } +// validate_map[id] = k; // validate one hop map +// delete iter; +// // Get one hop data +// size_t two = 0; +// for (auto &h_vid : hop_vid) { +// VertexKey vid_h(0, h_vid, 0); +// auto bytes_h = vid_h.ToEdgePrefixBytes(); +// auto iter_h = db->NewIterator(ReadOptions()); +// Slice pref_h(bytes_h.Data(), bytes_h.Size()); +// for (iter_h->Seek(pref_h); +// iter_h->Valid() && iter_h->key().starts_with(pref_h); +// iter_h->Next()) { +// two++; +// } +// delete iter_h; +// } +// two_sum_map[id] = two; +// } + +// return true; +// } + +void multithread_write_to_db( + std::shared_ptr client, size_t start, size_t end) { + std::cout<< "start: " << start << ", end" << end << std::endl; + for (size_t i = start; i < end; i++) { + client->Put("key" + std::to_string(i), "value" + std::to_string(i)); + } +} + +bool multi_import_data(std::shared_ptr client, size_t jobs, size_t workers) { + + size_t per_job = (jobs + workers - 1) / workers; + std::vector pool; + pool.reserve(workers); + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_write_to_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + + for (size_t i = 0; i < workers; i++) { + pool[i].join(); + } + + return true; +} + +#define BATCH 1000 + +int main(int argc, char *argv[]) { + if (argc != 3) { + std::cout << "usage: ./exec $concurrent $batch"<< std::endl; + exit(EXIT_SUCCESS); + } + + std::cout << std::endl; + std::cout << "**********path config*********" << std::endl; + std::cout << std::endl; + std::cout << "concurrent number: " << argv[1] << std::endl; + std::cout << "batch number: " << argv[2] << std::endl; + int concurrent = std::atoi(argv[1]); + uint64_t batch = std::atol(argv[2]); + std::cout << std::endl; + std::cout << std::endl; + + + int cpu_num; + cpu_num = concurrent ? concurrent: sysconf(_SC_NPROCESSORS_CONF); + batch = batch? batch: BATCH; + std::vector pd_addrs{"127.0.0.1:2379"}; + std::shared_ptr client = std::shared_ptr(new RawClient(pd_addrs)); + TimerCounter tc; + tc.Start(); + multi_import_data(client, batch, cpu_num); + tc.Stop(); + tc.PrintTime("Single Put", batch); + return 0; +} \ No newline at end of file diff --git a/src/test/raw_client_test/test_raw_client.cc b/src/test/raw_client_test/test_raw_client.cc index 0f31bdfb..83fa7cb9 100644 --- a/src/test/raw_client_test/test_raw_client.cc +++ b/src/test/raw_client_test/test_raw_client.cc @@ -1,15 +1,38 @@ #include #include #include +#include + +class TimerCounter { + std::chrono::time_point start_; + std::chrono::time_point end_; + +public: + void Start() { start_ = std::chrono::high_resolution_clock::now(); } + void Stop() { end_ = std::chrono::high_resolution_clock::now(); } + void PrintTime(const std::string &msg, int64_t base) { + std::cout + << msg << " Run time " + << base * 1000 / std::chrono::duration(end_ - start_).count() + << "QPS" << std::endl; + } +}; using namespace pingcap; using namespace pingcap::kv; -void TestPutAndGet(std::shared_ptr client, const int start) { - for(int i = start; i < start + 10; i++) { +void TestPutAndGet(std::shared_ptr client, const int cnt) { + for(int i = 0; i < cnt; i++) { client->Put("key" + std::to_string(i), "value" + std::to_string(i)); } - for(int i = start; i < start + 10; i++) { + // for(int i = start; i < start + 10; i++) { + // auto value = client->Get("key" + std::to_string(i)); + // std::cout << "value is : " << value.value_or("null") << std::endl; + // } +} + +void TestValidGet(std::shared_ptr client, const int cnt) { + for(int i = 0; i < cnt + 10; i++) { auto value = client->Get("key" + std::to_string(i)); std::cout << "value is : " << value.value_or("null") << std::endl; } @@ -61,9 +84,11 @@ void TestCompareAndSwap(std::shared_ptr client, const int start) { int main() { std::vector pd_addrs{"127.0.0.1:2379"}; std::shared_ptr client = std::shared_ptr(new RawClient(pd_addrs)); - TestPutAndGet(client, 0); + TimerCounter tc; + tc.Start(); + TestPutAndGet(client, 10000); + tc.Stop(); + tc.PrintTime("run time", 10000); // with TTL - TestCompareAndSwap(client, 0); - TestDeleteValues(client, 0); return 0; } \ No newline at end of file From 085fb32a8abf21597941c2c94014a71c7cecaa36 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 25 Apr 2022 16:19:07 +0800 Subject: [PATCH 05/11] [add] add stress --- CMakeLists.txt | 5 +- include/pingcap/Histogram.h | 36 +++ src/CMakeLists.txt | 1 + src/Histogram.cc | 276 +++++++++++++++++++++++ src/test/raw_client_test/stress.cc | 348 +++++++++++------------------ 5 files changed, 444 insertions(+), 222 deletions(-) create mode 100644 include/pingcap/Histogram.h create mode 100644 src/Histogram.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e533e2a..f5bc50ac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,11 @@ cmake_minimum_required(VERSION 2.8) project(kvClient) set (CMAKE_CXX_STANDARD 17) -set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing") +set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O2") set (CMAKE_EXPORT_COMPILE_COMMANDS ON) + +message("${CMAKE_CXX_FLAGS}") + enable_testing() if (NOT gRPC_FOUND) diff --git a/include/pingcap/Histogram.h b/include/pingcap/Histogram.h new file mode 100644 index 00000000..5bdd6657 --- /dev/null +++ b/include/pingcap/Histogram.h @@ -0,0 +1,36 @@ +#pragma once +#include + +namespace pingcap +{ +class Histogram { + public: + Histogram() {} + ~Histogram() {} + + void Clear(); + void Add(double value); + void Merge(const Histogram& other); + std::string ToString() const; + double Median() const; + double Percentile(double p) const; + double Average() const; + double StandardDeviation() const; + double Minimum() const; + double Count() const; + double Maximum() const; + + private: + enum { kNumBuckets = 154 }; + static const double kBucketLimit[kNumBuckets]; + + double min_; + double max_; + double num_; + double sum_; + double sum_squares_; + + double buckets_[kNumBuckets]; +}; + +} //namespace pingcap \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 09fbf08a..1a33d4b7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,6 +13,7 @@ list(APPEND kvClient_sources pd/Client.cc) list(APPEND kvClient_sources coprocessor/Client.cc) list(APPEND kvClient_sources RedactHelpers.cc) list(APPEND kvClient_sources kv/RawClient.cc) +list(APPEND kvClient_sources Histogram.cc) set(kvClient_INCLUDE_DIR ${kvClient_SOURCE_DIR}/include) diff --git a/src/Histogram.cc b/src/Histogram.cc new file mode 100644 index 00000000..5ddd3675 --- /dev/null +++ b/src/Histogram.cc @@ -0,0 +1,276 @@ +#include +#include +#include + +namespace pingcap { +const double Histogram::kBucketLimit[kNumBuckets] = { + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 12, + 14, + 16, + 18, + 20, + 25, + 30, + 35, + 40, + 45, + 50, + 60, + 70, + 80, + 90, + 100, + 120, + 140, + 160, + 180, + 200, + 250, + 300, + 350, + 400, + 450, + 500, + 600, + 700, + 800, + 900, + 1000, + 1200, + 1400, + 1600, + 1800, + 2000, + 2500, + 3000, + 3500, + 4000, + 4500, + 5000, + 6000, + 7000, + 8000, + 9000, + 10000, + 12000, + 14000, + 16000, + 18000, + 20000, + 25000, + 30000, + 35000, + 40000, + 45000, + 50000, + 60000, + 70000, + 80000, + 90000, + 100000, + 120000, + 140000, + 160000, + 180000, + 200000, + 250000, + 300000, + 350000, + 400000, + 450000, + 500000, + 600000, + 700000, + 800000, + 900000, + 1000000, + 1200000, + 1400000, + 1600000, + 1800000, + 2000000, + 2500000, + 3000000, + 3500000, + 4000000, + 4500000, + 5000000, + 6000000, + 7000000, + 8000000, + 9000000, + 10000000, + 12000000, + 14000000, + 16000000, + 18000000, + 20000000, + 25000000, + 30000000, + 35000000, + 40000000, + 45000000, + 50000000, + 60000000, + 70000000, + 80000000, + 90000000, + 100000000, + 120000000, + 140000000, + 160000000, + 180000000, + 200000000, + 250000000, + 300000000, + 350000000, + 400000000, + 450000000, + 500000000, + 600000000, + 700000000, + 800000000, + 900000000, + 1000000000, + 1200000000, + 1400000000, + 1600000000, + 1800000000, + 2000000000, + 2500000000.0, + 3000000000.0, + 3500000000.0, + 4000000000.0, + 4500000000.0, + 5000000000.0, + 6000000000.0, + 7000000000.0, + 8000000000.0, + 9000000000.0, + 1e200, +}; + +void Histogram::Clear() { + min_ = kBucketLimit[kNumBuckets - 1]; + max_ = 0; + num_ = 0; + sum_ = 0; + sum_squares_ = 0; + for (int i = 0; i < kNumBuckets; i++) { + buckets_[i] = 0; + } +} + +void Histogram::Add(double value) { + // Linear search is fast enough for our usage in db_bench + int b = 0; + while (b < kNumBuckets - 1 && kBucketLimit[b] <= value) { + b++; + } + buckets_[b] += 1.0; + if (min_ > value) min_ = value; + if (max_ < value) max_ = value; + num_++; + sum_ += value; + sum_squares_ += (value * value); +} + +void Histogram::Merge(const Histogram& other) { + if (other.min_ < min_) min_ = other.min_; + if (other.max_ > max_) max_ = other.max_; + num_ += other.num_; + sum_ += other.sum_; + sum_squares_ += other.sum_squares_; + for (int b = 0; b < kNumBuckets; b++) { + buckets_[b] += other.buckets_[b]; + } +} + +double Histogram::Median() const { return Percentile(50.0); } + +double Histogram::Percentile(double p) const { + double threshold = num_ * (p / 100.0); + double sum = 0; + for (int b = 0; b < kNumBuckets; b++) { + sum += buckets_[b]; + if (sum >= threshold) { + // Scale linearly within this bucket + double left_point = (b == 0) ? 0 : kBucketLimit[b - 1]; + double right_point = kBucketLimit[b]; + double left_sum = sum - buckets_[b]; + double right_sum = sum; + double pos = (threshold - left_sum) / (right_sum - left_sum); + double r = left_point + (right_point - left_point) * pos; + if (r < min_) r = min_; + if (r > max_) r = max_; + return r; + } + } + return max_; +} + +double Histogram::Minimum() const { + return min_; +} + +double Histogram::Maximum() const { + return max_; +} + +double Histogram::Count() const { + return num_; +} + +double Histogram::Average() const { + if (num_ == 0.0) return 0; + return sum_ / num_; +} + +double Histogram::StandardDeviation() const { + if (num_ == 0.0) return 0; + double variance = (sum_squares_ * num_ - sum_ * sum_) / (num_ * num_); + return sqrt(variance); +} + +std::string Histogram::ToString() const { + std::string r; + char buf[200]; + std::snprintf(buf, sizeof(buf), "Count: %.0f Average: %.4f StdDev: %.2f\n", + num_, Average(), StandardDeviation()); + r.append(buf); + std::snprintf(buf, sizeof(buf), "Min: %.4f Median: %.4f Max: %.4f\n", + (num_ == 0.0 ? 0.0 : min_), Median(), max_); + r.append(buf); + r.append("------------------------------------------------------\n"); + const double mult = 100.0 / num_; + double sum = 0; + for (int b = 0; b < kNumBuckets; b++) { + if (buckets_[b] <= 0.0) continue; + sum += buckets_[b]; + std::snprintf(buf, sizeof(buf), "[ %7.0f, %7.0f ) %7.0f %7.3f%% %7.3f%% ", + ((b == 0) ? 0.0 : kBucketLimit[b - 1]), // left + kBucketLimit[b], // right + buckets_[b], // count + mult * buckets_[b], // percentage + mult * sum); // cumulative percentage + r.append(buf); + + // Add hash marks based on percentage; 20 marks for 100%. + int marks = static_cast(20 * (buckets_[b] / num_) + 0.5); + r.append(marks, '#'); + r.push_back('\n'); + } + return r; +} + +} // namespace pigncap \ No newline at end of file diff --git a/src/test/raw_client_test/stress.cc b/src/test/raw_client_test/stress.cc index 36c0a65d..7618fc38 100644 --- a/src/test/raw_client_test/stress.cc +++ b/src/test/raw_client_test/stress.cc @@ -1,7 +1,7 @@ #include +#include #include #include -#include #include #include #include @@ -10,270 +10,176 @@ #include #include #include +#include +#include +#include using namespace pingcap; using namespace pingcap::kv; -class TimerCounter { - std::chrono::time_point start_; - std::chrono::time_point end_; +std::atomic fail_cnt; +class TimerCounter { + struct timeval start_, end_; public: - void Start() { start_ = std::chrono::high_resolution_clock::now(); } - void Stop() { end_ = std::chrono::high_resolution_clock::now(); } - void PrintTime(const std::string &msg, int64_t base) { - std::cout - << msg << " Run time " - << base * 1000 / std::chrono::duration(end_ - start_).count() - << "QPS" << std::endl; + void Start() { gettimeofday(&start_, NULL); } + void Stop() { gettimeofday(&end_, NULL); } + void PrintTime(int64_t base) { + std::cout << "Queries: " + << base << " Runtime: " + << ((end_.tv_sec - start_.tv_sec) + (end_.tv_usec - start_.tv_usec)/1000000.0) << "s, QPS: " + << (base * 1000) / ((end_.tv_sec - start_.tv_sec) *1000 + (end_.tv_usec - start_.tv_usec)/1000) + << std::endl; } }; -// void WriteMapToFile(const std::string &file_path, -// std::unordered_map &value) { -// std::ofstream o_file(file_path); -// for (auto &v : value) { -// o_file << v.first << " : " << v.second << std::endl; -// } -// } - -// bool GetVertexSetFromFile(const std::string &dataPath, -// std::vector &vid_vec) { -// std::cout << "********** read vertex from file**********" << std::endl; -// std::cout << std::endl; -// io::CSVReader<1> vertexData(dataPath); -// vertexData.read_header(io::ignore_extra_column, "id"); -// uint64_t vid = 0; -// while (vertexData.read_row(vid)) { -// vid_vec.push_back(vid); -// } -// std::cout << "********** read vertex from file done lines: **********" -// << vid_vec.size() << std::endl; -// std::cout << std::endl; -// return true; -// } - -// void GetOneHopVertex(DB *db, const std::vector &vid_vec, size_t start, -// size_t end) { -// for (size_t t = start; t < end; t++) { -// VertexKey vid(0, vid_vec[t], 0); -// auto bytes_v = vid.ToEdgePrefixBytes(); -// auto iter = db->NewIterator(ReadOptions()); -// Slice pref(bytes_v.Data(), bytes_v.Size()); -// size_t k = 0; -// for (iter->Seek(pref); iter->Valid() && iter->key().starts_with(pref); -// iter->Next()) { -// k++; -// } -// delete iter; -// #if (STATISTICS == 1) -// statistics_map[vid_vec[t]] = k; -// #endif -// // std::cout << "one hop friends: " << k << std::endl; -// } -// } - -// bool MultiAssignJobs( -// std::function &, size_t, size_t)> -// fun, -// DB *db, size_t workers, std::vector vid_vec) { -// size_t jobs = vid_vec.size(); -// size_t per_job = (jobs + workers - 1) / workers; -// std::vector pool; -// pool.reserve(workers); -// for (size_t w = 0; w < workers; w++) { -// pool.emplace_back(fun, db, std::ref(vid_vec), w * per_job, -// (std::min((w + 1) * per_job, jobs))); -// } -// for (size_t i = 0; i < workers; i++) { -// pool[i].join(); -// } - -// return true; -// } - -// void GetVertexQuery(DB *db, const std::vector &vid_vec, size_t start, -// size_t end) { -// for (size_t t = start; t < end; t++) { -// VertexKey vid(0, vid_vec[t], 0); -// auto bytes_v = vid.AsBytes(); -// auto iter = db->NewIterator(ReadOptions()); -// Slice pref(bytes_v.Data(), bytes_v.Size()); -// size_t k = 0; -// std::vector hop_vid; -// for (iter->Seek(pref); iter->Valid() && iter->key().starts_with(pref); -// iter->Next()) { -// k++; -// hop_vid.push_back( -// GetSecondVertexIdFromEdgeKey(iter->key().data(), iter->key().size())); -// } -// delete iter; -// int cpu_num; -// cpu_num = sysconf(_SC_NPROCESSORS_CONF); -// auto ret = MultiAssignJobs(GetOneHopVertex, db, cpu_num, hop_vid); -// CHECKTRUEANDTHROW(ret == false, "Get two hop vertex error") -// } -// } - -// void GetTwoHopVertex(DB *db, const std::vector &vid_vec, size_t start, -// size_t end) { -// for (size_t t = start; t < end; t++) { -// VertexKey vid(0, vid_vec[t], 0); -// auto bytes_v = vid.ToEdgePrefixBytes(); -// auto iter = db->NewIterator(ReadOptions()); -// Slice pref(bytes_v.Data(), bytes_v.Size()); -// size_t k = 0; -// std::vector hop_vid; -// for (iter->Seek(pref); iter->Valid() && iter->key().starts_with(pref); -// iter->Next()) { -// k++; -// hop_vid.push_back( -// GetSecondVertexIdFromEdgeKey(iter->key().data(), iter->key().size())); -// } -// delete iter; -// int cpu_num; -// cpu_num = sysconf(_SC_NPROCESSORS_CONF); -// auto ret = MultiAssignJobs(GetOneHopVertex, db, cpu_num, hop_vid); -// CHECKTRUEANDTHROW(ret == false, "Get two hop vertex error") -// } -// } - -// bool OneHopQueryPara(const std::string &oneHopPath, DB *db, size_t workers) { -// std::cout << "********** one hop query, ues workers: **********" << workers -// << std::endl; -// std::cout << std::endl; - -// std::vector vid_vec; -// vid_vec.reserve(1010); -// bool get_data = GetVertexSetFromFile(oneHopPath, vid_vec); -// CHECKTRUEANDTHROW(get_data == false, "Get data error") -// bool ok = MultiAssignJobs(GetOneHopVertex, db, workers, vid_vec); -// CHECKTRUEANDTHROW(ok == false, "One hop query error") -// return true; -// } - -// bool TwoHopQueryPara(const std::string &twoHopPath, DB *db, size_t workers) { -// std::cout << "********** Two hop query, use workers: " << workers -// << std::endl; -// std::cout << std::endl; -// std::vector vid_vec; -// vid_vec.reserve(1010); -// bool get_data = GetVertexSetFromFile(twoHopPath, vid_vec); -// CHECKTRUEANDTHROW(get_data == false, "Get data error") -// bool ok = MultiAssignJobs(GetTwoHopVertex, db, workers, vid_vec); -// CHECKTRUEANDTHROW(ok == false, "Two hop query error") -// return true; -// } - -// bool OneHopQuerySeq(const std::string &oneHopPath, DB *db) { -// std::cout << "********** one hop query seq, ues workers: **********" -// << std::endl; -// std::cout << std::endl; - -// std::vector vid_vec; -// vid_vec.reserve(1010); -// bool get_data = GetVertexSetFromFile(oneHopPath, vid_vec); -// CHECKTRUEANDTHROW(get_data == false, "Get data error") -// GetOneHopVertex(db, vid_vec, 0, vid_vec.size()); -// return true; -// } - -// bool TwoHopQuerySeq(const std::string &twoHopPath, DB *db) { -// std::cout << "********** Two hop query seq, use workers: " << std::endl; -// std::cout << std::endl; -// std::vector vid_vec; -// vid_vec.reserve(1010); -// bool get_data = GetVertexSetFromFile(twoHopPath, vid_vec); -// CHECKTRUEANDTHROW(get_data == false, "Get data error") - -// for (auto &id : vid_vec) { -// VertexKey vid(0, id, 0); -// auto bytes_v = vid.ToEdgePrefixBytes(); -// auto iter = db->NewIterator(ReadOptions()); -// Slice pref(bytes_v.Data(), bytes_v.Size()); -// size_t k = 0; -// std::vector hop_vid; -// for (iter->Seek(pref); iter->Valid() && iter->key().starts_with(pref); -// iter->Next()) { -// k++; -// hop_vid.push_back( -// GetSecondVertexIdFromEdgeKey(iter->key().data(), iter->key().size())); -// } -// validate_map[id] = k; // validate one hop map -// delete iter; -// // Get one hop data -// size_t two = 0; -// for (auto &h_vid : hop_vid) { -// VertexKey vid_h(0, h_vid, 0); -// auto bytes_h = vid_h.ToEdgePrefixBytes(); -// auto iter_h = db->NewIterator(ReadOptions()); -// Slice pref_h(bytes_h.Data(), bytes_h.Size()); -// for (iter_h->Seek(pref_h); -// iter_h->Valid() && iter_h->key().starts_with(pref_h); -// iter_h->Next()) { -// two++; -// } -// delete iter_h; -// } -// two_sum_map[id] = two; -// } +void multithread_write_to_db( + std::shared_ptr client, size_t start, size_t end) { + Histogram his; + his.Clear(); + struct timeval s, e; + for (size_t i = start; i < end; i++) { + gettimeofday(&s, NULL); + client->Put("key" + std::to_string(i), "test_value"); + gettimeofday(&e, NULL); + his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + std::cout << "Latency (us): Min: " << his.Minimum() + << " Avg: " << his.Average() + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; +} -// return true; -// } +void multithread_read_db( + std::shared_ptr client, size_t start, size_t end) { + Histogram his; + his.Clear(); + struct timeval s, e; + for (size_t i = start; i < end; i++) { + gettimeofday(&s, NULL); + auto ret = client->Get("key" + std::to_string(i)); + gettimeofday(&e, NULL); + if(!ret.has_value()) { + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + std::cout << "Latency (us): Min: " << his.Minimum() + << " Avg: " << his.Average() + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; +} -void multithread_write_to_db( +void multithread_cas_db( std::shared_ptr client, size_t start, size_t end) { - std::cout<< "start: " << start << ", end" << end << std::endl; + Histogram his; + his.Clear(); + struct timeval s, e; for (size_t i = start; i < end; i++) { - client->Put("key" + std::to_string(i), "value" + std::to_string(i)); + gettimeofday(&s, NULL); + bool is_swap; + client->CompareAndSwap("key" + std::to_string(i), "test_value", "test_new_value", is_swap); + gettimeofday(&e, NULL); + if(!is_swap) { + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); } + std::cout << "Latency (us): Min: " << his.Minimum() + << " Avg: " << his.Average() + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; } -bool multi_import_data(std::shared_ptr client, size_t jobs, size_t workers) { +void random_valid_to_db(std::shared_ptr client, size_t start, size_t end) { + for (size_t i = 0; i < 100; i++) { + auto ret = client->Get("key" + std::to_string(i)); + std::cout << "valid key: " << i << " ,value: " << ret.value_or("NOT FOUND") << std::endl; + } +} +bool multi_assign_jobs(std::shared_ptr client, size_t jobs, size_t workers, std::string rw) { size_t per_job = (jobs + workers - 1) / workers; std::vector pool; pool.reserve(workers); - for (size_t w = 0; w < workers; w++) { - pool.emplace_back(multithread_write_to_db, client, + + if(rw == "w") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_write_to_db, client, w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "r") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_read_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "cas") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_cas_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else { + random_valid_to_db(client, 0, jobs); + return true; } for (size_t i = 0; i < workers; i++) { pool[i].join(); } - return true; } + #define BATCH 1000 int main(int argc, char *argv[]) { - if (argc != 3) { - std::cout << "usage: ./exec $concurrent $batch"<< std::endl; + if (argc != 5) { + std::cout << "usage: ./exec $ip:port $concurrent $batch $rw"<< std::endl; exit(EXIT_SUCCESS); } std::cout << std::endl; - std::cout << "**********path config*********" << std::endl; + std::cout << "**********config*********" << std::endl; std::cout << std::endl; - std::cout << "concurrent number: " << argv[1] << std::endl; - std::cout << "batch number: " << argv[2] << std::endl; - int concurrent = std::atoi(argv[1]); - uint64_t batch = std::atol(argv[2]); + std::string ip_add = std::string(argv[1]); + std::cout << "ip address: " << ip_add << std::endl; + std::cout << "concurrent number: " << argv[2] << std::endl; + std::cout << "batch number: " << argv[3] << std::endl; + int concurrent = std::atoi(argv[2]); + uint64_t batch = std::atol(argv[3]); + std::string rw = argv[4]; + if(rw != "r" && rw != "w" && rw != "cas" && rw != "v") { + std::cout << "input rw error" << std::endl; + return 0; + } std::cout << std::endl; std::cout << std::endl; - - int cpu_num; cpu_num = concurrent ? concurrent: sysconf(_SC_NPROCESSORS_CONF); batch = batch? batch: BATCH; - std::vector pd_addrs{"127.0.0.1:2379"}; - std::shared_ptr client = std::shared_ptr(new RawClient(pd_addrs)); + ip_add = ip_add.empty()? "127.0.0.1:2379": ip_add; + std::vector pd_addrs{ip_add}; + + std::shared_ptr client; + if(rw != "cas") + client = std::shared_ptr(new RawClient(pd_addrs)); + else { + auto clit = new RawClient(pd_addrs); + clit->AsCASClient(); + client = std::shared_ptr(clit); + } + TimerCounter tc; tc.Start(); - multi_import_data(client, batch, cpu_num); + multi_assign_jobs(client, batch, cpu_num, rw); tc.Stop(); - tc.PrintTime("Single Put", batch); + std::cout << "failed: " << fail_cnt << std::endl; + tc.PrintTime(batch); return 0; } \ No newline at end of file From fa85b06f7aae61e0d171ae2c8eee3d9303d8dff9 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 26 Apr 2022 15:00:31 +0800 Subject: [PATCH 06/11] [modify] nodify unique ptr to share ptr --- include/pingcap/kv/RawClient.h | 3 +- src/kv/RawClient.cc | 12 +- src/test/raw_client_test/CMakeLists.txt | 6 +- src/test/raw_client_test/stress.cc | 77 ++++++---- src/test/raw_client_test/stress_v2.cc | 191 ++++++++++++++++++++++++ 5 files changed, 255 insertions(+), 34 deletions(-) create mode 100644 src/test/raw_client_test/stress_v2.cc diff --git a/include/pingcap/kv/RawClient.h b/include/pingcap/kv/RawClient.h index 5dbbc3b5..a023a939 100644 --- a/include/pingcap/kv/RawClient.h +++ b/include/pingcap/kv/RawClient.h @@ -21,7 +21,8 @@ constexpr const char* kCfString[3] = {"default", "lock", "write"}; //https://docs.rs/tikv-client/latest/tikv_client/struct.RawClient.html struct RawClient { - ClusterPtr cluster_ptr; + // ClusterPtr cluster_ptr; + std::shared_ptr cluster_ptr; bool for_cas; ColumnFamily cf; diff --git a/src/kv/RawClient.cc b/src/kv/RawClient.cc index 29a79415..e4535935 100644 --- a/src/kv/RawClient.cc +++ b/src/kv/RawClient.cc @@ -9,22 +9,26 @@ namespace kv { RawClient::RawClient(const std::vector & pd_addrs) : for_cas(false), cf(Default) { - cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); + // cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); + cluster_ptr = std::make_shared(pd_addrs, ClusterConfig()); } RawClient::RawClient(const std::vector & pd_addrs, bool cas) : for_cas(cas), cf(Default) { - cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); + // cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); + cluster_ptr = std::make_shared(pd_addrs, ClusterConfig()); } RawClient::RawClient(const std::vector & pd_addrs, const ClusterConfig & config) : for_cas(false), cf(Default) { - cluster_ptr = std::make_unique(pd_addrs, config); + // cluster_ptr = std::make_unique(pd_addrs, config); + cluster_ptr = std::make_shared(pd_addrs, ClusterConfig()); } RawClient::RawClient(const std::vector & pd_addrs, const ClusterConfig & config, bool cas) : for_cas(cas), cf(Default) { - cluster_ptr = std::make_unique(pd_addrs, config); + // cluster_ptr = std::make_unique(pd_addrs, config); + cluster_ptr = std::make_shared(pd_addrs, ClusterConfig()); } bool RawClient::IsCASClient() { diff --git a/src/test/raw_client_test/CMakeLists.txt b/src/test/raw_client_test/CMakeLists.txt index 4068e20c..93ccada5 100644 --- a/src/test/raw_client_test/CMakeLists.txt +++ b/src/test/raw_client_test/CMakeLists.txt @@ -4,4 +4,8 @@ target_link_libraries(raw_client ${test_libs}) add_executable(stress stress.cc) target_include_directories(stress PUBLIC ${test_includes}) -target_link_libraries(stress ${test_libs}) \ No newline at end of file +target_link_libraries(stress ${test_libs}) + +add_executable(stress_v2 stress_v2.cc) +target_include_directories(stress_v2 PUBLIC ${test_includes}) +target_link_libraries(stress_v2 ${test_libs}) \ No newline at end of file diff --git a/src/test/raw_client_test/stress.cc b/src/test/raw_client_test/stress.cc index 7618fc38..dd43d357 100644 --- a/src/test/raw_client_test/stress.cc +++ b/src/test/raw_client_test/stress.cc @@ -34,18 +34,25 @@ class TimerCounter { }; void multithread_write_to_db( - std::shared_ptr client, size_t start, size_t end) { + const std::vector &ip_addr, size_t start, size_t end) { Histogram his; his.Clear(); struct timeval s, e; + std::shared_ptr client = std::shared_ptr(new RawClient(ip_addr)); for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); - client->Put("key" + std::to_string(i), "test_value"); + try { + client->Put("key" + std::to_string(i), "test_value"); + } catch(...) { + std::cout << "put data exception" << std::endl; + } gettimeofday(&e, NULL); his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); } - std::cout << "Latency (us): Min: " << his.Minimum() + std::cout << "Latency (us):" + << " Min: " << his.Minimum() << " Avg: " << his.Average() + << " P99: " << his.Percentile(99.0) << " Max: " << his.Maximum() << " StdDev: " << his.StandardDeviation() << " Queries: " << his.Count() @@ -53,32 +60,43 @@ void multithread_write_to_db( } void multithread_read_db( - std::shared_ptr client, size_t start, size_t end) { + const std::vector &ip_addr, size_t start, size_t end) { Histogram his; his.Clear(); struct timeval s, e; + std::optional ret; + std::shared_ptr client = std::shared_ptr(new RawClient(ip_addr)); for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); - auto ret = client->Get("key" + std::to_string(i)); + try { + ret = client->Get("key" + std::to_string(i)); + } catch (...) { + std::cout << "get key exception error" << std::endl; + } gettimeofday(&e, NULL); if(!ret.has_value()) { fail_cnt.fetch_add(1, std::memory_order_relaxed); } his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); } - std::cout << "Latency (us): Min: " << his.Minimum() - << " Avg: " << his.Average() - << " Max: " << his.Maximum() - << " StdDev: " << his.StandardDeviation() - << " Queries: " << his.Count() - << std::endl; + std::cout << "Latency (us):" + << " Min: " << his.Minimum() + << " Avg: " << his.Average() + << " P99: " << his.Percentile(99.0) + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; } void multithread_cas_db( - std::shared_ptr client, size_t start, size_t end) { + const std::vector &ip_addr, size_t start, size_t end) { Histogram his; his.Clear(); struct timeval s, e; + auto clit = new RawClient(ip_addr); + clit->AsCASClient(); + std::shared_ptr client = std::shared_ptr(clit); for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); bool is_swap; @@ -89,43 +107,46 @@ void multithread_cas_db( } his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); } - std::cout << "Latency (us): Min: " << his.Minimum() + std::cout << "Latency (us):" + << " Min: " << his.Minimum() << " Avg: " << his.Average() + << " P99: " << his.Percentile(99.0) << " Max: " << his.Maximum() << " StdDev: " << his.StandardDeviation() << " Queries: " << his.Count() << std::endl; } -void random_valid_to_db(std::shared_ptr client, size_t start, size_t end) { +void random_valid_to_db(const std::vector &ip_addr, size_t start, size_t end) { + std::shared_ptr client = std::shared_ptr(new RawClient(ip_addr)); for (size_t i = 0; i < 100; i++) { auto ret = client->Get("key" + std::to_string(i)); std::cout << "valid key: " << i << " ,value: " << ret.value_or("NOT FOUND") << std::endl; } } -bool multi_assign_jobs(std::shared_ptr client, size_t jobs, size_t workers, std::string rw) { +bool multi_assign_jobs(std::vector &ip_addr, size_t jobs, size_t workers, std::string rw) { size_t per_job = (jobs + workers - 1) / workers; std::vector pool; pool.reserve(workers); if(rw == "w") { for (size_t w = 0; w < workers; w++) { - pool.emplace_back(multithread_write_to_db, client, + pool.emplace_back(multithread_write_to_db, std::ref(ip_addr), w * per_job, (std::min((w + 1) * per_job, jobs))); } } else if(rw == "r") { for (size_t w = 0; w < workers; w++) { - pool.emplace_back(multithread_read_db, client, + pool.emplace_back(multithread_read_db, std::ref(ip_addr), w * per_job, (std::min((w + 1) * per_job, jobs))); } } else if(rw == "cas") { for (size_t w = 0; w < workers; w++) { - pool.emplace_back(multithread_cas_db, client, + pool.emplace_back(multithread_cas_db, std::ref(ip_addr), w * per_job, (std::min((w + 1) * per_job, jobs))); } } else { - random_valid_to_db(client, 0, jobs); + random_valid_to_db(std::ref(ip_addr), 0, jobs); return true; } @@ -166,18 +187,18 @@ int main(int argc, char *argv[]) { ip_add = ip_add.empty()? "127.0.0.1:2379": ip_add; std::vector pd_addrs{ip_add}; - std::shared_ptr client; - if(rw != "cas") - client = std::shared_ptr(new RawClient(pd_addrs)); - else { - auto clit = new RawClient(pd_addrs); - clit->AsCASClient(); - client = std::shared_ptr(clit); - } + // std::shared_ptr client; + // if(rw != "cas") + // client = std::shared_ptr(new RawClient(pd_addrs)); + // else { + // auto clit = new RawClient(pd_addrs); + // clit->AsCASClient(); + // client = std::shared_ptr(clit); + // } TimerCounter tc; tc.Start(); - multi_assign_jobs(client, batch, cpu_num, rw); + multi_assign_jobs(pd_addrs, batch, cpu_num, rw); tc.Stop(); std::cout << "failed: " << fail_cnt << std::endl; tc.PrintTime(batch); diff --git a/src/test/raw_client_test/stress_v2.cc b/src/test/raw_client_test/stress_v2.cc new file mode 100644 index 00000000..23517e43 --- /dev/null +++ b/src/test/raw_client_test/stress_v2.cc @@ -0,0 +1,191 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace pingcap; +using namespace pingcap::kv; + +std::atomic fail_cnt; + +class TimerCounter { + struct timeval start_, end_; +public: + void Start() { gettimeofday(&start_, NULL); } + void Stop() { gettimeofday(&end_, NULL); } + void PrintTime(int64_t base) { + std::cout << "Queries: " + << base << " Runtime: " + << ((end_.tv_sec - start_.tv_sec) + (end_.tv_usec - start_.tv_usec)/1000000.0) << "s, QPS: " + << (base * 1000) / ((end_.tv_sec - start_.tv_sec) *1000 + (end_.tv_usec - start_.tv_usec)/1000) + << std::endl; + } +}; + +void multithread_write_to_db( + std::shared_ptr client, size_t start, size_t end) { + Histogram his; + his.Clear(); + struct timeval s, e; + for (size_t i = start; i < end; i++) { + gettimeofday(&s, NULL); + client->Put("key" + std::to_string(i), "test_value"); + gettimeofday(&e, NULL); + his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + std::cout << "Latency (us):" + << " Min: " << his.Minimum() + << " Avg: " << his.Average() + << " P99: " << his.Percentile(99.0) + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; +} + +void multithread_read_db( + std::shared_ptr client, size_t start, size_t end) { + Histogram his; + his.Clear(); + struct timeval s, e; + for (size_t i = start; i < end; i++) { + gettimeofday(&s, NULL); + auto ret = client->Get("key" + std::to_string(i)); + gettimeofday(&e, NULL); + if(!ret.has_value()) { + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + std::cout << "Latency (us):" + << " Min: " << his.Minimum() + << " Avg: " << his.Average() + << " P99: " << his.Percentile(99.0) + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; +} + +void multithread_cas_db( + std::shared_ptr client, size_t start, size_t end) { + Histogram his; + his.Clear(); + struct timeval s, e; + for (size_t i = start; i < end; i++) { + gettimeofday(&s, NULL); + bool is_swap; + client->CompareAndSwap("key" + std::to_string(i), "test_value", "test_new_value", is_swap); + gettimeofday(&e, NULL); + if(!is_swap) { + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + std::cout << "Latency (us):" + << " Min: " << his.Minimum() + << " Avg: " << his.Average() + << " P99: " << his.Percentile(99.0) + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; +} + +void random_valid_to_db(std::shared_ptr client, size_t start, size_t end) { + for (size_t i = 0; i < 100; i++) { + auto ret = client->Get("key" + std::to_string(i)); + std::cout << "valid key: " << i << " ,value: " << ret.value_or("NOT FOUND") << std::endl; + } +} + +bool multi_assign_jobs(std::shared_ptr client, size_t jobs, size_t workers, std::string rw) { + size_t per_job = (jobs + workers - 1) / workers; + std::vector pool; + pool.reserve(workers); + + if(rw == "w") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_write_to_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "r") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_read_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "cas") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_cas_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else { + random_valid_to_db(client, 0, jobs); + return true; + } + + for (size_t i = 0; i < workers; i++) { + pool[i].join(); + } + return true; +} + + +#define BATCH 1000 + +int main(int argc, char *argv[]) { + if (argc != 5) { + std::cout << "usage: ./exec $ip:port $concurrent $batch $rw"<< std::endl; + exit(EXIT_SUCCESS); + } + + std::cout << std::endl; + std::cout << "**********config*********" << std::endl; + std::cout << std::endl; + std::string ip_add = std::string(argv[1]); + std::cout << "ip address: " << ip_add << std::endl; + std::cout << "concurrent number: " << argv[2] << std::endl; + std::cout << "batch number: " << argv[3] << std::endl; + int concurrent = std::atoi(argv[2]); + uint64_t batch = std::atol(argv[3]); + std::string rw = argv[4]; + if(rw != "r" && rw != "w" && rw != "cas" && rw != "v") { + std::cout << "input rw error" << std::endl; + return 0; + } + std::cout << std::endl; + std::cout << std::endl; + int cpu_num; + cpu_num = concurrent ? concurrent: sysconf(_SC_NPROCESSORS_CONF); + batch = batch? batch: BATCH; + ip_add = ip_add.empty()? "127.0.0.1:2379": ip_add; + std::vector pd_addrs{ip_add}; + + std::shared_ptr client; + if(rw != "cas") + client = std::shared_ptr(new RawClient(pd_addrs)); + else { + auto clit = new RawClient(pd_addrs); + clit->AsCASClient(); + client = std::shared_ptr(clit); + } + + TimerCounter tc; + tc.Start(); + multi_assign_jobs(client, batch, cpu_num, rw); + tc.Stop(); + std::cout << "failed: " << fail_cnt << std::endl; + tc.PrintTime(batch); + return 0; +} \ No newline at end of file From d6ed1157bab54ca81e4d60f12018a2e89f8d3564 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 29 Apr 2022 11:46:57 +0800 Subject: [PATCH 07/11] [modify] modify stress test and change rwa client share ptr to unique ptr --- CMakeLists.txt | 1 + include/pingcap/kv/RawClient.h | 3 +- include/pingcap/kv/RegionClient.h | 3 +- src/kv/RawClient.cc | 47 ++++++++++++++------------- src/test/raw_client_test/stress.cc | 12 +++---- src/test/raw_client_test/stress_v2.cc | 8 ++--- 6 files changed, 36 insertions(+), 38 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f5bc50ac..d6e23aff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,6 +2,7 @@ cmake_minimum_required(VERSION 2.8) project(kvClient) set (CMAKE_CXX_STANDARD 17) set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O2") +#set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing") set (CMAKE_EXPORT_COMPILE_COMMANDS ON) message("${CMAKE_CXX_FLAGS}") diff --git a/include/pingcap/kv/RawClient.h b/include/pingcap/kv/RawClient.h index a023a939..5dbbc3b5 100644 --- a/include/pingcap/kv/RawClient.h +++ b/include/pingcap/kv/RawClient.h @@ -21,8 +21,7 @@ constexpr const char* kCfString[3] = {"default", "lock", "write"}; //https://docs.rs/tikv-client/latest/tikv_client/struct.RawClient.html struct RawClient { - // ClusterPtr cluster_ptr; - std::shared_ptr cluster_ptr; + ClusterPtr cluster_ptr; bool for_cas; ColumnFamily cf; diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index 1b049f0f..7238db49 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -10,7 +10,7 @@ namespace pingcap namespace kv { -constexpr int dailTimeout = 10000; +constexpr int dailTimeout = 100000; constexpr int copTimeout = 20; // RegionClient sends KV/Cop requests to tikv server (corresponding to `RegionRequestSender` in go-client). It handles network errors and some region errors internally. @@ -64,6 +64,7 @@ struct RegionClient { log->warning("region " + region_id.toString() + " find error: " + resp->region_error().message()); onRegionError(bo, ctx, resp->region_error()); + // set error and return } else { diff --git a/src/kv/RawClient.cc b/src/kv/RawClient.cc index e4535935..084e179e 100644 --- a/src/kv/RawClient.cc +++ b/src/kv/RawClient.cc @@ -9,26 +9,22 @@ namespace kv { RawClient::RawClient(const std::vector & pd_addrs) : for_cas(false), cf(Default) { - // cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); - cluster_ptr = std::make_shared(pd_addrs, ClusterConfig()); + cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); } RawClient::RawClient(const std::vector & pd_addrs, bool cas) : for_cas(cas), cf(Default) { - // cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); - cluster_ptr = std::make_shared(pd_addrs, ClusterConfig()); + cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); } RawClient::RawClient(const std::vector & pd_addrs, const ClusterConfig & config) : for_cas(false), cf(Default) { - // cluster_ptr = std::make_unique(pd_addrs, config); - cluster_ptr = std::make_shared(pd_addrs, ClusterConfig()); + cluster_ptr = std::make_unique(pd_addrs, config); } RawClient::RawClient(const std::vector & pd_addrs, const ClusterConfig & config, bool cas) : for_cas(cas), cf(Default) { - // cluster_ptr = std::make_unique(pd_addrs, config); - cluster_ptr = std::make_shared(pd_addrs, ClusterConfig()); + cluster_ptr = std::make_unique(pd_addrs, config); } bool RawClient::IsCASClient() { @@ -189,22 +185,27 @@ uint64_t RawClient::GetKeyTTL(const std::string &key, int64_t to_ms) { } std::optional RawClient::Get(const std::string &key) { - Backoffer bo(RawGetMaxBackoff); - auto local = cluster_ptr->region_cache->locateKey(bo, key); - RegionClient client(cluster_ptr.get(), local.region); - auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); - req->set_key(key); - auto resp = client.sendReqToRegion(bo, req); - if(resp->has_region_error()) { - throw Exception(resp->region_error().message(), RegionUnavailable); - } - if(resp->error() != "") { - throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); - } - if(resp->not_found()) { - return std::nullopt; + try { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); + req->set_key(key); + + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->not_found()) { + return std::nullopt; + } + return resp->value(); + } catch(const std::exception &e) { + std::cout << "get value with expections: " << std::endl; } - return resp->value(); } std::optional RawClient::Get(const std::string &key, int64_t to_ms) { diff --git a/src/test/raw_client_test/stress.cc b/src/test/raw_client_test/stress.cc index dd43d357..6f24c49e 100644 --- a/src/test/raw_client_test/stress.cc +++ b/src/test/raw_client_test/stress.cc @@ -42,7 +42,7 @@ void multithread_write_to_db( for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); try { - client->Put("key" + std::to_string(i), "test_value"); + client->Put(std::to_string(i), std::string(20480, 'a')); } catch(...) { std::cout << "put data exception" << std::endl; } @@ -68,11 +68,7 @@ void multithread_read_db( std::shared_ptr client = std::shared_ptr(new RawClient(ip_addr)); for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); - try { - ret = client->Get("key" + std::to_string(i)); - } catch (...) { - std::cout << "get key exception error" << std::endl; - } + ret = client->Get(std::to_string(i)); gettimeofday(&e, NULL); if(!ret.has_value()) { fail_cnt.fetch_add(1, std::memory_order_relaxed); @@ -100,7 +96,7 @@ void multithread_cas_db( for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); bool is_swap; - client->CompareAndSwap("key" + std::to_string(i), "test_value", "test_new_value", is_swap); + client->CompareAndSwap(std::to_string(i), std::string(20480, 'a'), "test_new_value", is_swap); gettimeofday(&e, NULL); if(!is_swap) { fail_cnt.fetch_add(1, std::memory_order_relaxed); @@ -120,7 +116,7 @@ void multithread_cas_db( void random_valid_to_db(const std::vector &ip_addr, size_t start, size_t end) { std::shared_ptr client = std::shared_ptr(new RawClient(ip_addr)); for (size_t i = 0; i < 100; i++) { - auto ret = client->Get("key" + std::to_string(i)); + auto ret = client->Get(std::to_string(i)); std::cout << "valid key: " << i << " ,value: " << ret.value_or("NOT FOUND") << std::endl; } } diff --git a/src/test/raw_client_test/stress_v2.cc b/src/test/raw_client_test/stress_v2.cc index 23517e43..63c0fa60 100644 --- a/src/test/raw_client_test/stress_v2.cc +++ b/src/test/raw_client_test/stress_v2.cc @@ -40,7 +40,7 @@ void multithread_write_to_db( struct timeval s, e; for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); - client->Put("key" + std::to_string(i), "test_value"); + client->Put(std::to_string(i), std::string(20480, 'a')); gettimeofday(&e, NULL); his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); } @@ -61,7 +61,7 @@ void multithread_read_db( struct timeval s, e; for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); - auto ret = client->Get("key" + std::to_string(i)); + auto ret = client->Get(std::to_string(i)); gettimeofday(&e, NULL); if(!ret.has_value()) { fail_cnt.fetch_add(1, std::memory_order_relaxed); @@ -86,7 +86,7 @@ void multithread_cas_db( for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); bool is_swap; - client->CompareAndSwap("key" + std::to_string(i), "test_value", "test_new_value", is_swap); + client->CompareAndSwap(std::to_string(i), std::string(20480, 'a'), "test_new_value", is_swap); gettimeofday(&e, NULL); if(!is_swap) { fail_cnt.fetch_add(1, std::memory_order_relaxed); @@ -105,7 +105,7 @@ void multithread_cas_db( void random_valid_to_db(std::shared_ptr client, size_t start, size_t end) { for (size_t i = 0; i < 100; i++) { - auto ret = client->Get("key" + std::to_string(i)); + auto ret = client->Get(std::to_string(i)); std::cout << "valid key: " << i << " ,value: " << ret.value_or("NOT FOUND") << std::endl; } } From 9e3e2921acea0d461939357a1e8c858781df2ce7 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 11 May 2022 16:55:01 +0800 Subject: [PATCH 08/11] [fix] fix split region, format error bug --- CMakeLists.txt | 4 +-- include/pingcap/kv/RegionClient.h | 3 ++ include/pingcap/pd/CodecClient.h | 6 ++-- src/kv/RawClient.cc | 38 ++++++++++------------ src/kv/RegionCache.cc | 3 +- src/test/raw_client_test/stress_v2.cc | 47 +++++++++++++++++++++++++-- 6 files changed, 73 insertions(+), 28 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d6e23aff..ee71d23e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,8 @@ cmake_minimum_required(VERSION 2.8) project(kvClient) set (CMAKE_CXX_STANDARD 17) -set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O2") -#set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing") +#set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O2") +set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O0 -g") set (CMAKE_EXPORT_COMPILE_COMMANDS ON) message("${CMAKE_CXX_FLAGS}") diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index 7238db49..1682be27 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -4,6 +4,8 @@ #include #include #include +#include +#include namespace pingcap { @@ -56,6 +58,7 @@ struct RegionClient } catch (const Exception & e) { + log->warning("send rpc excpetion: " + e.displayText()); onSendFail(bo, e, ctx); continue; } diff --git a/include/pingcap/pd/CodecClient.h b/include/pingcap/pd/CodecClient.h index e630a283..cbe04085 100644 --- a/include/pingcap/pd/CodecClient.h +++ b/include/pingcap/pd/CodecClient.h @@ -28,8 +28,10 @@ struct CodecClient : public Client metapb::Region processRegionResult(metapb::Region & region) { - region.set_start_key(decodeBytes(region.start_key())); - region.set_end_key(decodeBytes(region.end_key())); + // region.set_start_key(decodeBytes(region.start_key())); + // region.set_end_key(decodeBytes(region.end_key())); + region.set_start_key(encodeBytes(region.start_key())); + region.set_end_key(encodeBytes(region.end_key())); return region; } diff --git a/src/kv/RawClient.cc b/src/kv/RawClient.cc index 084e179e..d761e72a 100644 --- a/src/kv/RawClient.cc +++ b/src/kv/RawClient.cc @@ -184,28 +184,24 @@ uint64_t RawClient::GetKeyTTL(const std::string &key, int64_t to_ms) { return resp->ttl(); } -std::optional RawClient::Get(const std::string &key) { - try { - Backoffer bo(RawGetMaxBackoff); - auto local = cluster_ptr->region_cache->locateKey(bo, key); - RegionClient client(cluster_ptr.get(), local.region); - auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); - req->set_key(key); +std::optional RawClient::Get(const std::string &key) { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); + req->set_key(key); - auto resp = client.sendReqToRegion(bo, req); - if(resp->has_region_error()) { - throw Exception(resp->region_error().message(), RegionUnavailable); - } - if(resp->error() != "") { - throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); - } - if(resp->not_found()) { - return std::nullopt; - } - return resp->value(); - } catch(const std::exception &e) { - std::cout << "get value with expections: " << std::endl; - } + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->not_found()) { + return std::nullopt; + } + return resp->value(); } std::optional RawClient::Get(const std::string &key, int64_t to_ms) { diff --git a/src/kv/RegionCache.cc b/src/kv/RegionCache.cc index b85f0ea1..0c0cca21 100644 --- a/src/kv/RegionCache.cc +++ b/src/kv/RegionCache.cc @@ -89,7 +89,7 @@ KeyLocation RegionCache::locateKey(Backoffer & bo, const std::string & key) } region = loadRegionByKey(bo, key); - + log->information("add locate region: " + region->verID().toString() +", start key: " + region->startKey() + ", end key: " + region->endKey()); insertRegionToCache(region); return KeyLocation(region->verID(), region->startKey(), region->endKey()); @@ -312,6 +312,7 @@ void RegionCache::onRegionStale(Backoffer & bo, RPCContextPtr ctx, const errorpb region->switchPeer(ctx->peer.store_id()); insertRegionToCache(region); } + log->information("region stale for region " + ctx->region.toString() + " end."); } std::pair>, RegionVerID> RegionCache::groupKeysByRegion( diff --git a/src/test/raw_client_test/stress_v2.cc b/src/test/raw_client_test/stress_v2.cc index 63c0fa60..29c7d632 100644 --- a/src/test/raw_client_test/stress_v2.cc +++ b/src/test/raw_client_test/stress_v2.cc @@ -13,10 +13,14 @@ #include #include #include +#include +#include +#include "pingcap/Log.h" using namespace pingcap; using namespace pingcap::kv; +#define LARGE_VALUE std::atomic fail_cnt; class TimerCounter { @@ -40,7 +44,20 @@ void multithread_write_to_db( struct timeval s, e; for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); - client->Put(std::to_string(i), std::string(20480, 'a')); + if(i % 100 == 0) std::cout << "key: " << i << std::endl; +#ifdef LARGE_VALUE + for(;;) { + try{ + client->Put(std::to_string(i), std::string(20480, 'a')); + } catch(...) { + std::cout << "put key error and try re-put: " << i << std::endl; + continue; + } + break; + } +#else + client->Put(std::to_string(i), std::string(10, 'a')); +#endif gettimeofday(&e, NULL); his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); } @@ -60,9 +77,29 @@ void multithread_read_db( his.Clear(); struct timeval s, e; for (size_t i = start; i < end; i++) { + if(i % 100 == 0) std::cout << "key: " << i << std::endl; gettimeofday(&s, NULL); - auto ret = client->Get(std::to_string(i)); + std::optional ret; + for(;;) { + try { + ret = client->Get(std::to_string(i)); + } + catch(...) { + std::cerr << "get key error, and try re-get" << i << '\n'; + continue; + } + break; + } gettimeofday(&e, NULL); +#ifdef LARGE_VALUE + if(ret.value_or("").size() < 20480) { + std::cout << "get value error key: " << i << std::endl; + } +#else + if(ret.value_or("").size() < 10) { + std::cout << "get value error"<< std::endl; + } +#endif if(!ret.has_value()) { fail_cnt.fetch_add(1, std::memory_order_relaxed); } @@ -86,7 +123,11 @@ void multithread_cas_db( for (size_t i = start; i < end; i++) { gettimeofday(&s, NULL); bool is_swap; +#ifdef LARGE_VALUE client->CompareAndSwap(std::to_string(i), std::string(20480, 'a'), "test_new_value", is_swap); +#else + client->CompareAndSwap(std::to_string(i), std::string(10, 'a'), "test_new_value", is_swap); +#endif gettimeofday(&e, NULL); if(!is_swap) { fail_cnt.fetch_add(1, std::memory_order_relaxed); @@ -171,6 +212,8 @@ int main(int argc, char *argv[]) { batch = batch? batch: BATCH; ip_add = ip_add.empty()? "127.0.0.1:2379": ip_add; std::vector pd_addrs{ip_add}; + Poco::AutoPtr console_channel(new Poco::ConsoleChannel); + pingcap::Logger::get("pingcap.tikv").setChannel(console_channel); std::shared_ptr client; if(rw != "cas") From af4442c97e81a622e1e0006ac83fa98ff3e57798 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 12 May 2022 15:01:01 +0800 Subject: [PATCH 09/11] [modify] modify the encryption and not encryption flags --- include/pingcap/kv/Cluster.h | 2 +- include/pingcap/pd/Client.h | 2 ++ include/pingcap/pd/CodecClient.h | 10 ++++++---- include/pingcap/pd/IClient.h | 2 ++ src/kv/RawClient.cc | 1 + src/kv/RegionCache.cc | 17 +++++++++++------ src/test/raw_client_test/stress_v2.cc | 6 +++--- 7 files changed, 26 insertions(+), 14 deletions(-) diff --git a/include/pingcap/kv/Cluster.h b/include/pingcap/kv/Cluster.h index 27a1f3ce..6e665fc6 100644 --- a/include/pingcap/kv/Cluster.h +++ b/include/pingcap/kv/Cluster.h @@ -26,7 +26,7 @@ struct Cluster LockResolverPtr lock_resolver; - Cluster() : pd_client(std::make_shared()), rpc_client(std::make_unique()) {} + // Cluster() : pd_client(std::make_shared()), rpc_client(std::make_unique()) {} Cluster(const std::vector & pd_addrs, const ClusterConfig & config) : pd_client(std::make_shared(pd_addrs, config)), diff --git a/include/pingcap/pd/Client.h b/include/pingcap/pd/Client.h index a7070ec4..42cb5b07 100644 --- a/include/pingcap/pd/Client.h +++ b/include/pingcap/pd/Client.h @@ -39,6 +39,8 @@ class Client : public IClient // only implement a weak get ts. uint64_t getTS() override; + std::string name() override {return "client";} + std::pair getRegionByKey(const std::string & key) override; //std::pair getPrevRegion(std::string key) override; diff --git a/include/pingcap/pd/CodecClient.h b/include/pingcap/pd/CodecClient.h index cbe04085..0d11f804 100644 --- a/include/pingcap/pd/CodecClient.h +++ b/include/pingcap/pd/CodecClient.h @@ -14,9 +14,13 @@ struct CodecClient : public Client { CodecClient(const std::vector & addrs, const ClusterConfig & config) : Client(addrs, config) {} + std::string name() override {return "codeClient";} + std::pair getRegionByKey(const std::string & key) override { auto [region, leader] = Client::getRegionByKey(encodeBytes(key)); + if(!region.has_encryption_meta()) + return std::make_pair(region, leader); return std::make_pair(processRegionResult(region), leader); } @@ -28,10 +32,8 @@ struct CodecClient : public Client metapb::Region processRegionResult(metapb::Region & region) { - // region.set_start_key(decodeBytes(region.start_key())); - // region.set_end_key(decodeBytes(region.end_key())); - region.set_start_key(encodeBytes(region.start_key())); - region.set_end_key(encodeBytes(region.end_key())); + region.set_start_key(decodeBytes(region.start_key())); + region.set_end_key(decodeBytes(region.end_key())); return region; } diff --git a/include/pingcap/pd/IClient.h b/include/pingcap/pd/IClient.h index ecfa4616..4ab514a6 100644 --- a/include/pingcap/pd/IClient.h +++ b/include/pingcap/pd/IClient.h @@ -37,6 +37,8 @@ class IClient virtual uint64_t getGCSafePoint() = 0; virtual bool isMock() = 0; + + virtual std::string name() {return "base";} }; using ClientPtr = std::shared_ptr; diff --git a/src/kv/RawClient.cc b/src/kv/RawClient.cc index d761e72a..2ff09fbf 100644 --- a/src/kv/RawClient.cc +++ b/src/kv/RawClient.cc @@ -186,6 +186,7 @@ uint64_t RawClient::GetKeyTTL(const std::string &key, int64_t to_ms) { std::optional RawClient::Get(const std::string &key) { Backoffer bo(RawGetMaxBackoff); + std::cout << "get key is: " << key << "\n"; auto local = cluster_ptr->region_cache->locateKey(bo, key); RegionClient client(cluster_ptr.get(), local.region); auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); diff --git a/src/kv/RegionCache.cc b/src/kv/RegionCache.cc index 0c0cca21..4055aec9 100644 --- a/src/kv/RegionCache.cc +++ b/src/kv/RegionCache.cc @@ -82,14 +82,15 @@ RegionPtr RegionCache::getRegionByID(Backoffer & bo, const RegionVerID & id) KeyLocation RegionCache::locateKey(Backoffer & bo, const std::string & key) { - RegionPtr region = searchCachedRegion(key); + RegionPtr region = searchCachedRegion(key); /*key not encode*/ + if (region != nullptr) { return KeyLocation(region->verID(), region->startKey(), region->endKey()); } region = loadRegionByKey(bo, key); - log->information("add locate region: " + region->verID().toString() +", start key: " + region->startKey() + ", end key: " + region->endKey()); + log->information("add region: " + region->verID().toString() +", start key: " + region->startKey() + ", end key: " + region->endKey()); insertRegionToCache(region); return KeyLocation(region->verID(), region->startKey(), region->endKey()); @@ -149,6 +150,7 @@ RegionPtr RegionCache::loadRegionByKey(Backoffer & bo, const std::string & key) { try { + log->information("pd client name: " + pd_client->name()); auto [meta, leader] = pd_client->getRegionByKey(key); if (!meta.IsInitialized()) { @@ -167,6 +169,7 @@ RegionPtr RegionCache::loadRegionByKey(Backoffer & bo, const std::string & key) } catch (const Exception & e) { + log->information( "exception: " + e.displayText()); bo.backoff(boPDRPC, e); } } @@ -304,15 +307,17 @@ void RegionCache::onRegionStale(Backoffer & bo, RPCContextPtr ctx, const errorpb for (int i = 0; i < stale_epoch.current_regions_size(); i++) { auto meta = stale_epoch.current_regions(i); - if (auto * pd = static_cast(pd_client.get())) - { - pd->processRegionResult(meta); + if(meta.has_encryption_meta()) { + log->information("keys encryption"); + if (auto * pd = static_cast(pd_client.get())) + { + pd->processRegionResult(meta); + } } RegionPtr region = std::make_shared(meta, meta.peers(0)); region->switchPeer(ctx->peer.store_id()); insertRegionToCache(region); } - log->information("region stale for region " + ctx->region.toString() + " end."); } std::pair>, RegionVerID> RegionCache::groupKeysByRegion( diff --git a/src/test/raw_client_test/stress_v2.cc b/src/test/raw_client_test/stress_v2.cc index 29c7d632..21fd474e 100644 --- a/src/test/raw_client_test/stress_v2.cc +++ b/src/test/raw_client_test/stress_v2.cc @@ -84,9 +84,9 @@ void multithread_read_db( try { ret = client->Get(std::to_string(i)); } - catch(...) { - std::cerr << "get key error, and try re-get" << i << '\n'; - continue; + catch(const Exception &exc) { + std::cerr << "get key: " << i << ",error, and try re-get, because: " << exc.displayText() << '\n'; + // continue; } break; } From 480d16415120628af8ce3de657d28ed00a66c96c Mon Sep 17 00:00:00 2001 From: root Date: Thu, 12 May 2022 15:05:24 +0800 Subject: [PATCH 10/11] [modify] gcc -O2 optimize --- CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ee71d23e..96b5f0f6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,8 @@ cmake_minimum_required(VERSION 2.8) project(kvClient) set (CMAKE_CXX_STANDARD 17) -#set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O2") -set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O0 -g") +set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O2") +#set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O0 -g") set (CMAKE_EXPORT_COMPILE_COMMANDS ON) message("${CMAKE_CXX_FLAGS}") From 2466ff050116b23bff4688c40a55b1c31b4df23f Mon Sep 17 00:00:00 2001 From: root Date: Wed, 1 Jun 2022 22:05:16 +0800 Subject: [PATCH 11/11] [add] add large value stress --- src/kv/RawClient.cc | 1 - src/test/raw_client_test/CMakeLists.txt | 6 +- src/test/raw_client_test/stress_v2.cc | 139 ++++++++------ src/test/raw_client_test/volume_stress.cc | 215 ++++++++++++++++++++++ 4 files changed, 300 insertions(+), 61 deletions(-) create mode 100644 src/test/raw_client_test/volume_stress.cc diff --git a/src/kv/RawClient.cc b/src/kv/RawClient.cc index 2ff09fbf..d761e72a 100644 --- a/src/kv/RawClient.cc +++ b/src/kv/RawClient.cc @@ -186,7 +186,6 @@ uint64_t RawClient::GetKeyTTL(const std::string &key, int64_t to_ms) { std::optional RawClient::Get(const std::string &key) { Backoffer bo(RawGetMaxBackoff); - std::cout << "get key is: " << key << "\n"; auto local = cluster_ptr->region_cache->locateKey(bo, key); RegionClient client(cluster_ptr.get(), local.region); auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); diff --git a/src/test/raw_client_test/CMakeLists.txt b/src/test/raw_client_test/CMakeLists.txt index 93ccada5..27a9f05e 100644 --- a/src/test/raw_client_test/CMakeLists.txt +++ b/src/test/raw_client_test/CMakeLists.txt @@ -8,4 +8,8 @@ target_link_libraries(stress ${test_libs}) add_executable(stress_v2 stress_v2.cc) target_include_directories(stress_v2 PUBLIC ${test_includes}) -target_link_libraries(stress_v2 ${test_libs}) \ No newline at end of file +target_link_libraries(stress_v2 ${test_libs}) + +add_executable(volume_stress volume_stress.cc) +target_include_directories(volume_stress PUBLIC ${test_includes}) +target_link_libraries(volume_stress ${test_libs}) \ No newline at end of file diff --git a/src/test/raw_client_test/stress_v2.cc b/src/test/raw_client_test/stress_v2.cc index 21fd474e..3b426c1e 100644 --- a/src/test/raw_client_test/stress_v2.cc +++ b/src/test/raw_client_test/stress_v2.cc @@ -16,6 +16,7 @@ #include #include #include "pingcap/Log.h" +#include using namespace pingcap; using namespace pingcap::kv; @@ -39,18 +40,18 @@ class TimerCounter { void multithread_write_to_db( std::shared_ptr client, size_t start, size_t end) { - Histogram his; - his.Clear(); - struct timeval s, e; + // Histogram his; + // his.Clear(); + // struct timeval s, e; for (size_t i = start; i < end; i++) { - gettimeofday(&s, NULL); + // gettimeofday(&s, NULL); if(i % 100 == 0) std::cout << "key: " << i << std::endl; #ifdef LARGE_VALUE - for(;;) { + for(int ty = 0; ty < 5; ty ++) { try{ client->Put(std::to_string(i), std::string(20480, 'a')); } catch(...) { - std::cout << "put key error and try re-put: " << i << std::endl; + std::cout << "put key error: " << i << ", retry: " << ty << std::endl; continue; } break; @@ -58,43 +59,41 @@ void multithread_write_to_db( #else client->Put(std::to_string(i), std::string(10, 'a')); #endif - gettimeofday(&e, NULL); - his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + // gettimeofday(&e, NULL); + // his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); } - std::cout << "Latency (us):" - << " Min: " << his.Minimum() - << " Avg: " << his.Average() - << " P99: " << his.Percentile(99.0) - << " Max: " << his.Maximum() - << " StdDev: " << his.StandardDeviation() - << " Queries: " << his.Count() - << std::endl; + // std::cout << "Latency (us):" + // << " Min: " << his.Minimum() + // << " Avg: " << his.Average() + // << " P99: " << his.Percentile(99.0) + // << " Max: " << his.Maximum() + // << " StdDev: " << his.StandardDeviation() + // << " Queries: " << his.Count() + // << std::endl; } void multithread_read_db( std::shared_ptr client, size_t start, size_t end) { - Histogram his; - his.Clear(); - struct timeval s, e; + // Histogram his; + // his.Clear(); + // struct timeval s, e; for (size_t i = start; i < end; i++) { if(i % 100 == 0) std::cout << "key: " << i << std::endl; - gettimeofday(&s, NULL); + // gettimeofday(&s, NULL); std::optional ret; - for(;;) { + for(int ty = 0; ty < 5; ty ++) { try { ret = client->Get(std::to_string(i)); } catch(const Exception &exc) { std::cerr << "get key: " << i << ",error, and try re-get, because: " << exc.displayText() << '\n'; - // continue; + continue; } break; } - gettimeofday(&e, NULL); + // gettimeofday(&e, NULL); #ifdef LARGE_VALUE - if(ret.value_or("").size() < 20480) { - std::cout << "get value error key: " << i << std::endl; - } + assert(ret.has_value() && (ret.value().size() == 20480)); #else if(ret.value_or("").size() < 10) { std::cout << "get value error"<< std::endl; @@ -103,52 +102,74 @@ void multithread_read_db( if(!ret.has_value()) { fail_cnt.fetch_add(1, std::memory_order_relaxed); } - his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + // his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); } - std::cout << "Latency (us):" - << " Min: " << his.Minimum() - << " Avg: " << his.Average() - << " P99: " << his.Percentile(99.0) - << " Max: " << his.Maximum() - << " StdDev: " << his.StandardDeviation() - << " Queries: " << his.Count() - << std::endl; + // std::cout << "Latency (us):" + // << " Min: " << his.Minimum() + // << " Avg: " << his.Average() + // << " P99: " << his.Percentile(99.0) + // << " Max: " << his.Maximum() + // << " StdDev: " << his.StandardDeviation() + // << " Queries: " << his.Count() + // << std::endl; } void multithread_cas_db( std::shared_ptr client, size_t start, size_t end) { - Histogram his; - his.Clear(); - struct timeval s, e; + // Histogram his; + // his.Clear(); + // struct timeval s, e; for (size_t i = start; i < end; i++) { - gettimeofday(&s, NULL); + // gettimeofday(&s, NULL); bool is_swap; #ifdef LARGE_VALUE - client->CompareAndSwap(std::to_string(i), std::string(20480, 'a'), "test_new_value", is_swap); + for(int ty = 0; ty < 5; ty ++) { + try{ + client->CompareAndSwap(std::to_string(i), std::string(20480, 'a'), "test_new_value", is_swap); + } catch(...) { + std::cout << "cas key error: " << i << ", retry: " << ty << std::endl; + continue; + } + break; + } #else client->CompareAndSwap(std::to_string(i), std::string(10, 'a'), "test_new_value", is_swap); #endif - gettimeofday(&e, NULL); + // gettimeofday(&e, NULL); if(!is_swap) { fail_cnt.fetch_add(1, std::memory_order_relaxed); } - his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + // his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); } - std::cout << "Latency (us):" - << " Min: " << his.Minimum() - << " Avg: " << his.Average() - << " P99: " << his.Percentile(99.0) - << " Max: " << his.Maximum() - << " StdDev: " << his.StandardDeviation() - << " Queries: " << his.Count() - << std::endl; + // std::cout << "Latency (us):" + // << " Min: " << his.Minimum() + // << " Avg: " << his.Average() + // << " P99: " << his.Percentile(99.0) + // << " Max: " << his.Maximum() + // << " StdDev: " << his.StandardDeviation() + // << " Queries: " << his.Count() + // << std::endl; } void random_valid_to_db(std::shared_ptr client, size_t start, size_t end) { - for (size_t i = 0; i < 100; i++) { - auto ret = client->Get(std::to_string(i)); - std::cout << "valid key: " << i << " ,value: " << ret.value_or("NOT FOUND") << std::endl; + size_t cnt = 0; + for (size_t i = 0; i < 10000000; i++) { + for(int k = 0; k < 5; k++) { + std::optional ret; + try { + ret = client->Get(std::to_string(i)); + } + catch(const Exception &exc) { + std::cerr << "get key: " << i << "retry: " << k << '\n'; + continue; + } + if(ret.has_value() && ret.value().size() == 14) { + cnt ++; + } + break; + } } + std::cout << "key-value number" << cnt << std::endl; } bool multi_assign_jobs(std::shared_ptr client, size_t jobs, size_t workers, std::string rw) { @@ -212,8 +233,8 @@ int main(int argc, char *argv[]) { batch = batch? batch: BATCH; ip_add = ip_add.empty()? "127.0.0.1:2379": ip_add; std::vector pd_addrs{ip_add}; - Poco::AutoPtr console_channel(new Poco::ConsoleChannel); - pingcap::Logger::get("pingcap.tikv").setChannel(console_channel); + // Poco::AutoPtr console_channel(new Poco::ConsoleChannel); + // pingcap::Logger::get("pingcap.tikv").setChannel(console_channel); std::shared_ptr client; if(rw != "cas") @@ -224,11 +245,11 @@ int main(int argc, char *argv[]) { client = std::shared_ptr(clit); } - TimerCounter tc; - tc.Start(); + // TimerCounter tc; + // tc.Start(); multi_assign_jobs(client, batch, cpu_num, rw); - tc.Stop(); + // tc.Stop(); std::cout << "failed: " << fail_cnt << std::endl; - tc.PrintTime(batch); + // tc.PrintTime(batch); return 0; } \ No newline at end of file diff --git a/src/test/raw_client_test/volume_stress.cc b/src/test/raw_client_test/volume_stress.cc new file mode 100644 index 00000000..404b85b3 --- /dev/null +++ b/src/test/raw_client_test/volume_stress.cc @@ -0,0 +1,215 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "pingcap/Log.h" +#include +#include +#include + +using namespace pingcap; +using namespace pingcap::kv; + +std::atomic fail_cnt; +std::mutex plk; + +void multithread_write_to_db( + std::shared_ptr client, size_t start, size_t end) { + int64_t fail_sta = 0; + + for (size_t i = start; i < end; i++) { + int ty = 0; + for(; ty < 5; ty ++) { + try{ + client->Put(std::to_string(i), std::string(20480, 'a')); + } catch(...) { + continue; + } + break; + } // end try loop + if(ty >= 5) { + fail_sta ++; + } + } // end for loop + + { + std::lock_guard lk(plk); + std::cout << "| tid | number_keys | fail number |" << std::endl; + std::cout << "| " << std::this_thread::get_id() << " | " << end-start << " | " << fail_sta << " |" << std::endl; + std::cout << "|-------|---------------|---------------|" << std::endl; + } + +} + +void multithread_read_db( + std::shared_ptr client, size_t start, size_t end) { + int64_t fail_read = 0; + + for (size_t i = start; i < end; i++) { + std::optional ret; + int ty = 0; + for(; ty < 5; ty ++) { + try { + ret = client->Get(std::to_string(i)); + } + catch(const Exception &exc) { + continue; + } + break; + } // end try loop + assert(ret.has_value() && (ret.value().size() == 20480)); + if(!ret.has_value() || ty >=5) { + fail_read ++; + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + } // end for loop + + { + std::lock_guard lk(plk); + std::cout << "| tid | number_keys | fail number |" << std::endl; + std::cout << "| " << std::this_thread::get_id() << " | " << end-start << " | " << fail_read << " |" << std::endl; + std::cout << "|-------|---------------|---------------|" << std::endl; + } + +} + +void multithread_cas_db( + std::shared_ptr client, size_t start, size_t end) { + int64_t fail_cas = 0; + + for (size_t i = start; i < end; i++) { + bool is_swap; + int ty = 0; + for(; ty < 5; ty ++) { + try{ + client->CompareAndSwap(std::to_string(i), std::string(20480, 'a'), std::string(20480, 'b'), is_swap); + } catch(...) { + continue; + } + break; + } // end try loop + if(!is_swap) { + fail_cas ++; + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + }// end for loop + + { + std::lock_guard lk(plk); + std::cout << "| tid | number_keys | fail number |" << std::endl; + std::cout << "| " << std::this_thread::get_id() << " | " << end-start << " | " << fail_cas << " |" << std::endl; + std::cout << "|-------|---------------|---------------|" << std::endl; + + } +} + +void random_valid_to_db(std::shared_ptr client, size_t start, size_t end) { + size_t cnt = 0; + for (size_t i = 0; i < 100; i++) { + for(int k = 0; k < 5; k++) { + std::optional ret; + try { + ret = client->Get(std::to_string(i)); + } + catch(const Exception &exc) { + continue; + } + if(ret.has_value() && ret.value().size() == 14) { + cnt ++; + } + std::cout << "get key: " << ret.value() << std::endl; + break; + } + } +} + +bool multi_assign_jobs(std::shared_ptr client, size_t jobs, size_t workers, std::string rw) { + size_t per_job = (jobs + workers - 1) / workers; + std::vector pool; + pool.reserve(workers); + + if(rw == "w") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_write_to_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "r") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_read_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "cas") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_cas_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else { + random_valid_to_db(client, 0, jobs); + return true; + } + + for (size_t i = 0; i < workers; i++) { + pool[i].join(); + } + return true; +} + + +#define BATCH 1000 + +int main(int argc, char *argv[]) { + if (argc != 5) { + std::cout << "usage: ./exec $ip:port $concurrent $batch $rw"<< std::endl; + exit(EXIT_SUCCESS); + } + + std::cout << std::endl; + std::cout << "**********config*********" << std::endl; + std::cout << std::endl; + std::string ip_add = std::string(argv[1]); + std::cout << "ip address: " << ip_add << std::endl; + std::cout << "concurrent number: " << argv[2] << std::endl; + std::cout << "batch number: " << argv[3] << std::endl; + int concurrent = std::atoi(argv[2]); + uint64_t batch = std::atol(argv[3]); + std::string rw = argv[4]; + if(rw != "r" && rw != "w" && rw != "cas" && rw != "v") { + std::cout << "input rw error" << std::endl; + return 0; + } + std::cout << std::endl; + std::cout << std::endl; + int cpu_num; + cpu_num = concurrent ? concurrent: sysconf(_SC_NPROCESSORS_CONF); + batch = batch? batch: BATCH; + ip_add = ip_add.empty()? "127.0.0.1:2379": ip_add; + std::vector pd_addrs{ip_add}; + // Poco::AutoPtr console_channel(new Poco::ConsoleChannel); + // pingcap::Logger::get("pingcap.tikv").setChannel(console_channel); + + std::shared_ptr client; + if(rw != "cas") + client = std::shared_ptr(new RawClient(pd_addrs)); + else { + auto clit = new RawClient(pd_addrs); + clit->AsCASClient(); + client = std::shared_ptr(clit); + } + + multi_assign_jobs(client, batch, cpu_num, rw); + std::cout << "failed: " << fail_cnt << std::endl; + return 0; +} \ No newline at end of file