|
24 | 24 |
|
25 | 25 | #include <memory>
|
26 | 26 |
|
27 |
| -#include "base/time/time.h" |
28 |
| -#include "blockaccess/block_access_log.h" |
| 27 | +#include "cache/common/type.h" |
29 | 28 | #include "cache/config/config.h"
|
| 29 | +#include "cache/storage/storage_pool.h" |
30 | 30 | #include "cache/tiercache/tier_block_cache.h"
|
31 |
| -#include "cache/utils/access_log.h" |
32 | 31 |
|
33 | 32 | namespace dingofs {
|
34 | 33 | namespace cache {
|
35 | 34 |
|
36 | 35 | Benchmarker::Benchmarker()
|
37 |
| - : block_accesser_(std::make_unique<blockaccess::BlockAccesserImpl>( |
38 |
| - NewBlockAccessOptions())), |
39 |
| - block_cache_(std::make_shared<TierBlockCache>( |
40 |
| - BlockCacheOption(), RemoteBlockCacheOption(), block_accesser_.get())), |
41 |
| - task_pool_(std::make_unique<TaskThreadPool>("benchmarker")), |
42 |
| - countdown_event_(std::make_shared<BthreadCountdownEvent>(FLAGS_threads)), |
43 |
| - reporter_(std::make_shared<Reporter>()) { |
44 |
| - if (FLAGS_ino == 0) { |
45 |
| - FLAGS_ino = base::time::TimeNow().seconds; |
46 |
| - } |
47 |
| -} |
48 |
| - |
49 |
| -Status Benchmarker::Run() { |
50 |
| - // Init logger, block cache, workers |
51 |
| - auto status = Init(); |
52 |
| - if (!status.ok()) { |
53 |
| - return status; |
54 |
| - } |
55 |
| - |
56 |
| - // Start reporter, workers |
57 |
| - status = Start(); |
58 |
| - if (!status.ok()) { |
59 |
| - return status; |
| 36 | + : mds_base_(std::make_unique<stub::rpcclient::MDSBaseClient>()), |
| 37 | + mds_client_(std::make_shared<stub::rpcclient::MdsClientImpl>()), |
| 38 | + collector_(std::make_unique<Collector>()), |
| 39 | + reporter_(std::make_shared<Reporter>(collector_)), |
| 40 | + task_pool_(std::make_unique<TaskThreadPool>("benchmarker_worker")) {} |
| 41 | + |
| 42 | +Status Benchmarker::Init() { return InitAll(); } |
| 43 | + |
| 44 | +Status Benchmarker::InitAll() { |
| 45 | + auto initers = std::vector<std::function<Status()>>{ |
| 46 | + [this]() { return InitMdsClient(); }, |
| 47 | + [this]() { return InitStorage(); }, |
| 48 | + [this]() { return InitBlockCache(); }, |
| 49 | + [this]() { return InitCollector(); }, |
| 50 | + [this]() { return InitReporter(); }, |
| 51 | + [this]() { |
| 52 | + InitFactory(); |
| 53 | + return Status::OK(); |
| 54 | + }, |
| 55 | + [this]() { |
| 56 | + InitWorkers(); |
| 57 | + return Status::OK(); |
| 58 | + }}; |
| 59 | + |
| 60 | + for (const auto& initer : initers) { |
| 61 | + auto status = initer(); |
| 62 | + if (!status.ok()) { |
| 63 | + return status; |
| 64 | + } |
60 | 65 | }
|
61 | 66 |
|
62 | 67 | return Status::OK();
|
63 | 68 | }
|
64 | 69 |
|
65 |
| -void Benchmarker::Shutdown() { |
66 |
| - // Wait for all workers to complete |
67 |
| - countdown_event_->wait(); |
| 70 | +void Benchmarker::RunUntilFinish() { |
| 71 | + StartAll(); |
| 72 | + StopAll(); |
| 73 | +} |
68 | 74 |
|
69 |
| - LOG(INFO) << "All workers completed, shutting down..."; |
| 75 | +void Benchmarker::StartAll() { |
| 76 | + StartReporter(); |
| 77 | + StartWorkers(); |
| 78 | +} |
70 | 79 |
|
71 |
| - // stop worker, reporter |
72 |
| - Stop(); |
| 80 | +void Benchmarker::StopAll() { |
| 81 | + StopWorkers(); |
| 82 | + StopReporter(); |
| 83 | + StopCollector(); |
73 | 84 | }
|
74 | 85 |
|
75 |
| -Status Benchmarker::Init() { |
76 |
| - auto status = InitBlockCache(); |
77 |
| - if (!status.ok()) { |
78 |
| - LOG(ERROR) << "Init block cache failed: " << status.ToString(); |
79 |
| - return status; |
| 86 | +// init |
| 87 | +Status Benchmarker::InitMdsClient() { |
| 88 | + auto rc = mds_client_->Init(NewMdsOption(), mds_base_.get()); |
| 89 | + if (rc != PBFSStatusCode::OK) { |
| 90 | + return Status::Internal("init mds client failed"); |
80 | 91 | }
|
| 92 | + return Status::OK(); |
| 93 | +} |
81 | 94 |
|
82 |
| - status = InitWrokers(); |
| 95 | +Status Benchmarker::InitStorage() { |
| 96 | + auto storage_pool = StoragePoolImpl(mds_client_); |
| 97 | + auto status = storage_pool.GetStorage(FLAGS_fsid, storage_); |
83 | 98 | if (!status.ok()) {
|
84 |
| - LOG(ERROR) << "Init workers failed: " << status.ToString(); |
| 99 | + LOG(ERROR) << "Init storage failed: " << status.ToString(); |
85 | 100 | return status;
|
86 | 101 | }
|
87 |
| - |
88 | 102 | return Status::OK();
|
89 | 103 | }
|
90 | 104 |
|
91 | 105 | Status Benchmarker::InitBlockCache() {
|
92 |
| - auto status = block_accesser_->Init(); |
| 106 | + block_cache_ = std::make_shared<TierBlockCache>( |
| 107 | + BlockCacheOption(), RemoteBlockCacheOption(), storage_); |
| 108 | + |
| 109 | + auto status = block_cache_->Init(); |
93 | 110 | if (!status.ok()) {
|
94 |
| - return status; |
| 111 | + LOG(ERROR) << "Init block cache failed: " << status.ToString(); |
95 | 112 | }
|
96 |
| - return block_cache_->Init(); |
| 113 | + return status; |
97 | 114 | }
|
98 | 115 |
|
99 |
| -Status Benchmarker::InitWrokers() { |
100 |
| - for (auto i = 0; i < FLAGS_threads; i++) { |
101 |
| - auto worker = |
102 |
| - std::make_shared<Worker>(i, block_cache_, reporter_, countdown_event_); |
103 |
| - auto status = worker->Init(); |
104 |
| - if (!status.ok()) { |
105 |
| - return status; |
106 |
| - } |
107 |
| - |
108 |
| - workers_.emplace_back(worker); |
| 116 | +Status Benchmarker::InitCollector() { |
| 117 | + auto status = collector_->Init(); |
| 118 | + if (!status.ok()) { |
| 119 | + LOG(ERROR) << "Init collector failed: " << status.ToString(); |
109 | 120 | }
|
110 |
| - return Status::OK(); |
| 121 | + return status; |
111 | 122 | }
|
112 | 123 |
|
113 |
| -Status Benchmarker::Start() { |
114 |
| - auto status = StartReporter(); |
| 124 | +Status Benchmarker::InitReporter() { |
| 125 | + auto status = reporter_->Init(); |
115 | 126 | if (!status.ok()) {
|
116 |
| - LOG(ERROR) << "Start reporter failed: " << status.ToString(); |
117 |
| - return status; |
| 127 | + LOG(ERROR) << "Init reporter failed: " << status.ToString(); |
118 | 128 | }
|
| 129 | + return status; |
| 130 | +} |
119 | 131 |
|
120 |
| - StartWorkers(); |
| 132 | +void Benchmarker::InitFactory() { |
| 133 | + factory_ = NewFactory(block_cache_, FLAGS_op); |
| 134 | +} |
121 | 135 |
|
122 |
| - return Status::OK(); |
| 136 | +void Benchmarker::InitWorkers() { |
| 137 | + CHECK_EQ(task_pool_->Start(FLAGS_threads), 0); |
| 138 | + for (auto i = 0; i < FLAGS_threads; i++) { |
| 139 | + workers_.emplace_back(std::make_unique<Worker>(i, factory_, collector_)); |
| 140 | + } |
123 | 141 | }
|
124 | 142 |
|
125 |
| -Status Benchmarker::StartReporter() { return reporter_->Start(); } |
| 143 | +// start |
| 144 | +void Benchmarker::StartReporter() { reporter_->Start(); } |
126 | 145 |
|
127 | 146 | void Benchmarker::StartWorkers() {
|
128 |
| - CHECK_EQ(task_pool_->Start(FLAGS_threads), 0); |
129 |
| - |
130 | 147 | for (auto& worker : workers_) {
|
131 |
| - task_pool_->Enqueue([worker]() { worker->Run(); }); |
| 148 | + task_pool_->Enqueue([&worker]() { worker->Start(); }); |
132 | 149 | }
|
133 | 150 | }
|
134 | 151 |
|
135 |
| -void Benchmarker::Stop() { |
136 |
| - StopWorkers(); |
137 |
| - StopReporter(); |
138 |
| - StopBlockCache(); |
139 |
| -} |
140 |
| - |
| 152 | +// stop |
141 | 153 | void Benchmarker::StopWorkers() {
|
142 | 154 | for (auto& worker : workers_) {
|
143 |
| - worker->Shutdown(); |
| 155 | + worker->Stop(); |
144 | 156 | }
|
145 | 157 | }
|
146 | 158 |
|
147 |
| -void Benchmarker::StopReporter() { |
148 |
| - auto status = reporter_->Stop(); |
149 |
| - if (!status.ok()) { |
150 |
| - LOG(ERROR) << "Stop reporter failed: " << status.ToString(); |
151 |
| - } |
152 |
| -} |
| 159 | +void Benchmarker::StopReporter() { reporter_->Stop(); } |
153 | 160 |
|
154 |
| -void Benchmarker::StopBlockCache() { |
155 |
| - auto status = block_cache_->Shutdown(); |
156 |
| - if (!status.ok()) { |
157 |
| - LOG(ERROR) << "Shutdown block cache failed: " << status.ToString(); |
158 |
| - } |
159 |
| -} |
| 161 | +void Benchmarker::StopCollector() { collector_->Detory(); } |
160 | 162 |
|
161 | 163 | } // namespace cache
|
162 | 164 | } // namespace dingofs
|
0 commit comments