Skip to content

[feat][cache]: Refactor cache benchmarker. #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions confv2/cache-bench.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
--loglevel=0

############################# benchmark
--opt=put
--threads=1
--op_blksize=4194304
--op_blocks=1
--put_writeback=true
--range_retrive=false
--opt=put
--fsid=1
--ino=0
--s3_ak=
--s3_sk=
--s3_endpoint=
--s3_bucket=
--stat_interval_s=3
--ino=1
--blksize=4194304
--blocks=1
--writeback=true
--retrive=false
--async_max_inflight=128
--runtime=300
--time_based=false

############################# block cache
--cache_store=disk
Expand Down
2 changes: 1 addition & 1 deletion confv2/dingo-cache.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
--listen_ip=127.0.0.1
--listen_port=20000
--group_weight=100
--max_range_size_kb=256
--max_range_size_kb=128
--metadata_filepath="/tmp/cache_group_metadata"
--send_heartbeat_interval_ms=1000
--mds_rpc_addrs=127.0.0.1:6700,127.0.0.1:6701,127.0.0.1:6702
Expand Down
44 changes: 44 additions & 0 deletions src/cache/benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
cache benchmark
===

Quick Start
---

```bash
cache-bench --flagfile bench.conf
```

`bench.conf`:

```
--threads=3
--op=put
--fsid=1
--ino=1
--blksize=4194304
--blocks=100
--writeback=false
--retrive=true
--async_batch=128
--runtime=300
--time_based=true

# Any other client flags here
```

Output
---

```
libaio_read: (g=0): rw=read, bs=(R) 4096KiB-4096KiB, (W) 4096KiB-4096KiB, (T) 4096KiB-4096KiB, ioengine=libaio, iodepth=1
...
fio-3.28
Starting 4 processes

...
[10%] put: 584 op/s 2336 MB/s lat(0.013706 0.042489 0.002988)
[11%] put: 563 op/s 2253 MB/s lat(0.014187 0.044417 0.003051)
...
```

The output shows the performance of the cache benchmark, including operations per second (op/s), throughput in megabytes per second (MB/s), and latency statistics which include average, maximum and minimum latency in seconds.
168 changes: 85 additions & 83 deletions src/cache/benchmark/benchmarker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,139 +24,141 @@

#include <memory>

#include "base/time/time.h"
#include "blockaccess/block_access_log.h"
#include "cache/common/type.h"
#include "cache/config/config.h"
#include "cache/storage/storage_pool.h"
#include "cache/tiercache/tier_block_cache.h"
#include "cache/utils/access_log.h"

namespace dingofs {
namespace cache {

Benchmarker::Benchmarker()
: block_accesser_(std::make_unique<blockaccess::BlockAccesserImpl>(
NewBlockAccessOptions())),
block_cache_(std::make_shared<TierBlockCache>(
BlockCacheOption(), RemoteBlockCacheOption(), block_accesser_.get())),
task_pool_(std::make_unique<TaskThreadPool>("benchmarker")),
countdown_event_(std::make_shared<BthreadCountdownEvent>(FLAGS_threads)),
reporter_(std::make_shared<Reporter>()) {
if (FLAGS_ino == 0) {
FLAGS_ino = base::time::TimeNow().seconds;
}
}

Status Benchmarker::Run() {
// Init logger, block cache, workers
auto status = Init();
if (!status.ok()) {
return status;
}

// Start reporter, workers
status = Start();
if (!status.ok()) {
return status;
: mds_base_(std::make_unique<stub::rpcclient::MDSBaseClient>()),
mds_client_(std::make_shared<stub::rpcclient::MdsClientImpl>()),
storage_pool_(std::make_shared<StoragePoolImpl>(mds_client_)),
collector_(std::make_unique<Collector>()),
reporter_(std::make_shared<Reporter>(collector_)),
task_pool_(std::make_unique<TaskThreadPool>("benchmarker_worker")) {}

Status Benchmarker::Init() { return InitAll(); }

Status Benchmarker::InitAll() {
auto initers = std::vector<std::function<Status()>>{
[this]() { return InitMdsClient(); },
[this]() { return InitStorage(); },
[this]() { return InitBlockCache(); },
[this]() { return InitCollector(); },
[this]() { return InitReporter(); },
[this]() {
InitFactory();
return Status::OK();
},
[this]() {
InitWorkers();
return Status::OK();
}};

for (const auto& initer : initers) {
auto status = initer();
if (!status.ok()) {
return status;
}
}

return Status::OK();
}

void Benchmarker::Shutdown() {
// Wait for all workers to complete
countdown_event_->wait();
void Benchmarker::RunUntilFinish() {
StartAll();
StopAll();
}

LOG(INFO) << "All workers completed, shutting down...";
void Benchmarker::StartAll() {
StartReporter();
StartWorkers();
}

// stop worker, reporter
Stop();
void Benchmarker::StopAll() {
StopWorkers();
StopReporter();
StopCollector();
}

Status Benchmarker::Init() {
auto status = InitBlockCache();
if (!status.ok()) {
LOG(ERROR) << "Init block cache failed: " << status.ToString();
return status;
// init
Status Benchmarker::InitMdsClient() {
auto rc = mds_client_->Init(NewMdsOption(), mds_base_.get());
if (rc != PBFSStatusCode::OK) {
return Status::Internal("init mds client failed");
}
return Status::OK();
}

status = InitWrokers();
Status Benchmarker::InitStorage() {
auto status = storage_pool_->GetStorage(FLAGS_fsid, storage_);
if (!status.ok()) {
LOG(ERROR) << "Init workers failed: " << status.ToString();
LOG(ERROR) << "Init storage failed: " << status.ToString();
return status;
}

return Status::OK();
}

Status Benchmarker::InitBlockCache() {
auto status = block_accesser_->Init();
block_cache_ = std::make_shared<TierBlockCache>(
BlockCacheOption(), RemoteBlockCacheOption(), storage_);

auto status = block_cache_->Init();
if (!status.ok()) {
return status;
LOG(ERROR) << "Init block cache failed: " << status.ToString();
}
return block_cache_->Init();
return status;
}

Status Benchmarker::InitWrokers() {
for (auto i = 0; i < FLAGS_threads; i++) {
auto worker =
std::make_shared<Worker>(i, block_cache_, reporter_, countdown_event_);
auto status = worker->Init();
if (!status.ok()) {
return status;
}

workers_.emplace_back(worker);
Status Benchmarker::InitCollector() {
auto status = collector_->Init();
if (!status.ok()) {
LOG(ERROR) << "Init collector failed: " << status.ToString();
}
return Status::OK();
return status;
}

Status Benchmarker::Start() {
auto status = StartReporter();
Status Benchmarker::InitReporter() {
auto status = reporter_->Init();
if (!status.ok()) {
LOG(ERROR) << "Start reporter failed: " << status.ToString();
return status;
LOG(ERROR) << "Init reporter failed: " << status.ToString();
}
return status;
}

StartWorkers();
void Benchmarker::InitFactory() {
factory_ = NewFactory(block_cache_, FLAGS_op);
}

return Status::OK();
void Benchmarker::InitWorkers() {
CHECK_EQ(task_pool_->Start(FLAGS_threads), 0);
for (auto i = 0; i < FLAGS_threads; i++) {
workers_.emplace_back(std::make_unique<Worker>(i, factory_, collector_));
}
}

Status Benchmarker::StartReporter() { return reporter_->Start(); }
// start
void Benchmarker::StartReporter() { reporter_->Start(); }

void Benchmarker::StartWorkers() {
CHECK_EQ(task_pool_->Start(FLAGS_threads), 0);

for (auto& worker : workers_) {
task_pool_->Enqueue([worker]() { worker->Run(); });
task_pool_->Enqueue([&worker]() { worker->Start(); });
}
}

void Benchmarker::Stop() {
StopWorkers();
StopReporter();
StopBlockCache();
}

// stop
void Benchmarker::StopWorkers() {
for (auto& worker : workers_) {
worker->Shutdown();
worker->Stop();
}
}

void Benchmarker::StopReporter() {
auto status = reporter_->Stop();
if (!status.ok()) {
LOG(ERROR) << "Stop reporter failed: " << status.ToString();
}
}
void Benchmarker::StopReporter() { reporter_->Stop(); }

void Benchmarker::StopBlockCache() {
auto status = block_cache_->Shutdown();
if (!status.ok()) {
LOG(ERROR) << "Shutdown block cache failed: " << status.ToString();
}
}
void Benchmarker::StopCollector() { collector_->Detory(); }

} // namespace cache
} // namespace dingofs
44 changes: 31 additions & 13 deletions src/cache/benchmark/benchmarker.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
#ifndef DINGOFS_SRC_CACHE_BENCHMARK_BENCHMARKER_H_
#define DINGOFS_SRC_CACHE_BENCHMARK_BENCHMARKER_H_

#include "blockaccess/block_accesser.h"
#include "cache/benchmark/factory.h"
#include "cache/benchmark/reporter.h"
#include "cache/benchmark/worker.h"
#include "cache/blockcache/block_cache.h"
#include "cache/common/common.h"
#include "cache/storage/storage.h"
#include "cache/storage/storage_pool.h"
#include "stub/rpcclient/mds_client.h"

namespace dingofs {
namespace cache {
Expand All @@ -35,29 +39,43 @@ class Benchmarker {
public:
Benchmarker();

Status Run();
void Shutdown();
Status Init();

void RunUntilFinish();

private:
Status Init();
// Init
Status InitAll();
Status InitMdsClient();
Status InitStorage();
Status InitBlockCache();
Status InitWrokers();
Status InitCollector();
Status InitReporter();
void InitFactory();
void InitWorkers();

Status Start();
Status StartReporter();
// Start
void StartAll();
void StartReporter();
void StartWorkers();

void Stop();
// Stop
void StopAll();
void StopWorkers();
void StopReporter();
void StopBlockCache();
void StopCollector();

blockaccess::BlockAccesserUPtr block_accesser_;
private:
std::unique_ptr<stub::rpcclient::MDSBaseClient> mds_base_;
std::shared_ptr<stub::rpcclient::MdsClient> mds_client_;
StoragePoolSPtr storage_pool_;
StorageSPtr storage_;
BlockCacheSPtr block_cache_;
TaskThreadPoolUPtr task_pool_;
std::vector<WorkerSPtr> workers_;
BthreadCountdownEventSPtr countdown_event_;
CollectorSPtr collector_;
ReporterSPtr reporter_;
TaskFactorySPtr factory_;
std::vector<WorkerUPtr> workers_;
TaskThreadPoolUPtr task_pool_;
};

} // namespace cache
Expand Down
Loading