Skip to content

Commit eebccee

Browse files
author
meilirensheng2020
committed
[feat][client] Support collect fs metrics to mdsv2
1 parent c877712 commit eebccee

19 files changed

+361
-84
lines changed

src/blockaccess/block_accesser.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ static bvar::Adder<uint64_t> block_get_async_num("block_get_async_num");
3636
static bvar::Adder<uint64_t> block_get_sync_num("block_get_sync_num");
3737

3838
using dingofs::utils::kMB;
39+
using stub::metric::ObjectMetric;
3940

4041
Status BlockAccesserImpl::Init() {
4142
if (options_.type == AccesserType::kS3) {
@@ -106,6 +107,11 @@ Status BlockAccesserImpl::Put(const std::string& key, const char* buffer,
106107
(s.ok() ? "ok" : "fail"));
107108
});
108109

110+
// object storage write metrics
111+
BlockAccessMetricGuard metric_guard(&s,
112+
&ObjectMetric::GetInstance().write_object,
113+
length, butil::cpuwide_time_us());
114+
109115
block_put_sync_num << 1;
110116

111117
auto dec = ::absl::MakeCleanup([&]() { block_put_sync_num << -1; });
@@ -131,6 +137,11 @@ void BlockAccesserImpl::AsyncPut(
131137
return absl::StrFormat("async_put_block (%s, %d) : %d", ctx->key,
132138
ctx->buffer_size, ctx->ret_code);
133139
});
140+
// object storage write metrics
141+
Status s = (ctx->ret_code == 0 ? Status::OK() : Status::Unknown(""));
142+
BlockAccessMetricGuard metric_guard(
143+
&s, &ObjectMetric::GetInstance().write_object, ctx->buffer_size,
144+
start_us);
134145

135146
block_put_async_num << -1;
136147
inflight_bytes_throttle_->OnComplete(ctx->buffer_size);
@@ -173,6 +184,11 @@ Status BlockAccesserImpl::Get(const std::string& key, std::string* data) {
173184
(s.ok() ? "ok" : "fail"));
174185
});
175186

187+
// object storage read metrics
188+
BlockAccessMetricGuard metric_guard(&s,
189+
&ObjectMetric::GetInstance().read_object,
190+
data->length(), butil::cpuwide_time_us());
191+
176192
block_get_sync_num << 1;
177193
auto dec = ::absl::MakeCleanup([&]() { block_get_sync_num << -1; });
178194

@@ -198,6 +214,11 @@ void BlockAccesserImpl::AsyncGet(
198214
ctx->offset, ctx->len, ctx->ret_code);
199215
});
200216

217+
// object storage read metrics
218+
Status s = (ctx->ret_code == 0 ? Status::OK() : Status::Unknown(""));
219+
BlockAccessMetricGuard metric_guard(
220+
&s, &ObjectMetric::GetInstance().read_object, ctx->len, start_us);
221+
201222
block_get_async_num << -1;
202223
inflight_bytes_throttle_->OnComplete(ctx->len);
203224

@@ -223,6 +244,11 @@ Status BlockAccesserImpl::Range(const std::string& key, off_t offset,
223244
(s.ok() ? "ok" : "fail"));
224245
});
225246

247+
// object storage read metrics
248+
BlockAccessMetricGuard metric_guard(&s,
249+
&ObjectMetric::GetInstance().read_object,
250+
length, butil::cpuwide_time_us());
251+
226252
block_get_sync_num << 1;
227253
auto dec = ::absl::MakeCleanup([&]() { block_get_sync_num << -1; });
228254

src/blockaccess/block_accesser.h

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "blockaccess/accesser.h"
2828
#include "blockaccess/accesser_common.h"
2929
#include "common/status.h"
30+
#include "stub/metric/metric.h"
3031
#include "utils/throttle.h"
3132

3233
namespace dingofs {
@@ -38,7 +39,7 @@ enum class RetryStrategy : uint8_t {
3839
};
3940

4041
using RetryCallback = std::function<RetryStrategy(int code)>;
41-
42+
using ::dingofs::stub::metric::InterfaceMetric;
4243
// BlockAccesser is a class that provides a way to access block from a data
4344
// source. It is a base class for all data access classes.
4445
class BlockAccesser {
@@ -153,6 +154,28 @@ class BlockAccesserImpl : public BlockAccesser {
153154
std::unique_ptr<AsyncRequestInflightBytesThrottle> inflight_bytes_throttle_;
154155
};
155156

157+
struct BlockAccessMetricGuard {
158+
explicit BlockAccessMetricGuard(Status* status, InterfaceMetric* metric,
159+
size_t count, uint64_t start)
160+
: status_(status), metric_(metric), count_(count), start_(start) {}
161+
~BlockAccessMetricGuard() {
162+
if (status_->ok()) {
163+
metric_->bps.count << count_;
164+
metric_->qps.count << 1;
165+
auto duration = butil::cpuwide_time_us() - start_;
166+
metric_->latency << duration;
167+
metric_->latTotal << duration;
168+
} else {
169+
metric_->eps.count << 1;
170+
}
171+
}
172+
173+
Status* status_;
174+
InterfaceMetric* metric_;
175+
size_t count_;
176+
uint64_t start_;
177+
};
178+
156179
using BlockAccesserSPtr = std::shared_ptr<BlockAccesser>;
157180
using BlockAccesserUPtr = std::unique_ptr<BlockAccesser>;
158181

src/blockaccess/s3/s3_accesser.cpp

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@
2626
namespace dingofs {
2727
namespace blockaccess {
2828

29-
using stub::metric::MetricGuard;
30-
using stub::metric::S3Metric;
31-
3229
bool S3Accesser::Init() {
3330
const auto& s3_info = options_.s3_info;
3431
LOG(INFO) << fmt::format(
@@ -57,12 +54,7 @@ bool S3Accesser::ContainerExist() { return client_->BucketExist(); }
5754

5855
Status S3Accesser::Put(const std::string& key, const char* buffer,
5956
size_t length) {
60-
int rc = 0;
61-
// write s3 metrics
62-
auto start = butil::cpuwide_time_us();
63-
MetricGuard guard(&rc, &S3Metric::GetInstance().write_s3, length, start);
64-
65-
rc = client_->PutObject(S3Key(key), buffer, length);
57+
int rc = client_->PutObject(S3Key(key), buffer, length);
6658
if (rc < 0) {
6759
LOG(ERROR) << fmt::format("[accesser] put object({}) fail, retcode: {}.",
6860
key, rc);
@@ -77,21 +69,14 @@ void S3Accesser::AsyncPut(std::shared_ptr<PutObjectAsyncContext> context) {
7769
auto start_time = butil::cpuwide_time_us();
7870
context->cb = [&, start_time,
7971
origin_cb](const std::shared_ptr<PutObjectAsyncContext>& ctx) {
80-
MetricGuard guard(&ctx->ret_code, &S3Metric::GetInstance().write_s3,
81-
ctx->buffer_size, start_time);
8272
ctx->cb = origin_cb;
8373
ctx->cb(ctx);
8474
};
8575
client_->PutObjectAsync(context);
8676
}
8777

8878
Status S3Accesser::Get(const std::string& key, std::string* data) {
89-
int rc; // read s3 metrics
90-
auto start = butil::cpuwide_time_us();
91-
MetricGuard guard(&rc, &S3Metric::GetInstance().read_s3, data->length(),
92-
start);
93-
94-
rc = client_->GetObject(S3Key(key), data);
79+
int rc = client_->GetObject(S3Key(key), data);
9580
if (rc < 0) {
9681
if (!client_->ObjectExist(S3Key(key))) { // TODO: more efficient
9782
LOG(WARNING) << fmt::format("[accesser] object({}) not found.", key);
@@ -108,11 +93,7 @@ Status S3Accesser::Get(const std::string& key, std::string* data) {
10893

10994
Status S3Accesser::Range(const std::string& key, off_t offset, size_t length,
11095
char* buffer) {
111-
int rc; // read s3 metrics
112-
auto start = butil::cpuwide_time_us();
113-
MetricGuard guard(&rc, &S3Metric::GetInstance().read_s3, length, start);
114-
115-
rc = client_->RangeObject(S3Key(key), buffer, offset, length);
96+
int rc = client_->RangeObject(S3Key(key), buffer, offset, length);
11697
if (rc < 0) {
11798
if (!client_->ObjectExist(S3Key(key))) { // TODO: more efficient
11899
LOG(WARNING) << fmt::format("[accesser] object({}) not found.", key);
@@ -132,8 +113,6 @@ void S3Accesser::AsyncGet(std::shared_ptr<GetObjectAsyncContext> context) {
132113
auto start_time = butil::cpuwide_time_us();
133114
context->cb = [&, start_time,
134115
origin_cb](const std::shared_ptr<GetObjectAsyncContext>& ctx) {
135-
MetricGuard guard(&ctx->ret_code, &S3Metric::GetInstance().read_s3,
136-
ctx->len, start_time);
137116
ctx->cb = origin_cb;
138117
ctx->cb(ctx);
139118
};

src/client/vfs/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ add_subdirectory(data)
1717
add_subdirectory(handle)
1818
add_subdirectory(hub)
1919
add_subdirectory(meta)
20+
add_subdirectory(statistics)
2021

2122
add_library(vfs_lib
2223
vfs_impl.cpp
@@ -30,6 +31,7 @@ target_link_libraries(vfs_lib
3031
vfs_data
3132
vfs_hub
3233
vfs_handle
34+
vfs_statistics
3335
glog::glog
3436
gflags::gflags
3537
fmt::fmt

src/client/vfs/meta/v2/filesystem.cpp

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
#include <fcntl.h>
1818

1919
#include <cstdint>
20+
#include <memory>
2021
#include <string>
2122
#include <utility>
2223
#include <vector>
2324

25+
#include "client/common/dynamic_config.h"
2426
#include "client/vfs/meta/v2/client_id.h"
27+
#include "client/vfs/statistics/fs_stats_manager.h"
2528
#include "client/vfs/vfs_meta.h"
2629
#include "common/status.h"
2730
#include "dingofs/error.pb.h"
@@ -36,6 +39,10 @@ namespace client {
3639
namespace vfs {
3740
namespace v2 {
3841

42+
USING_FLAG(push_metric_interval_millsecond)
43+
44+
using FsStatsManager = dingofs::client::vfs::FsStatsManager;
45+
3946
const uint32_t kMaxHostNameLength = 255;
4047

4148
const uint32_t kMaxXAttrNameLength = 255;
@@ -133,12 +140,14 @@ std::string FileSessionMap::Get(uint64_t fh) {
133140
MDSV2FileSystem::MDSV2FileSystem(mdsv2::FsInfoPtr fs_info,
134141
const ClientId& client_id,
135142
MDSDiscoveryPtr mds_discovery,
136-
MDSClientPtr mds_client)
143+
MDSClientPtr mds_client,
144+
FsStatsManagerUPtr fs_stats_manager)
137145
: name_(fs_info->GetName()),
138146
client_id_(client_id),
139147
fs_info_(fs_info),
140148
mds_discovery_(mds_discovery),
141-
mds_client_(mds_client) {}
149+
mds_client_(mds_client),
150+
fs_stats_manager_(std::move(fs_stats_manager)) {}
142151

143152
MDSV2FileSystem::~MDSV2FileSystem() {} // NOLINT
144153

@@ -271,6 +280,8 @@ void MDSV2FileSystem::Heartbeat() {
271280
}
272281
}
273282

283+
void MDSV2FileSystem::PushFsStatsToMDS() { fs_stats_manager_->PushFsStats(); }
284+
274285
bool MDSV2FileSystem::InitCrontab() {
275286
// Add heartbeat crontab
276287
crontab_configs_.push_back({
@@ -280,6 +291,14 @@ bool MDSV2FileSystem::InitCrontab() {
280291
[this](void*) { this->Heartbeat(); },
281292
});
282293

294+
// Add push fs stats crontab
295+
crontab_configs_.push_back({
296+
"PUSH_FSSTATS",
297+
FLAGS_push_metric_interval_millsecond,
298+
true,
299+
[this](void*) { this->PushFsStatsToMDS(); },
300+
});
301+
283302
crontab_manager_.AddCrontab(crontab_configs_);
284303

285304
return true;
@@ -646,8 +665,12 @@ MDSV2FileSystemUPtr MDSV2FileSystem::Build(const std::string& fs_name,
646665
return nullptr;
647666
}
648667

668+
// create fs stats manager
669+
auto fs_stats_manager = FsStatsManager::New(fs_name, mds_client);
670+
649671
// create filesystem
650-
return MDSV2FileSystem::New(fs_info, client_id, mds_discovery, mds_client);
672+
return MDSV2FileSystem::New(fs_info, client_id, mds_discovery, mds_client,
673+
std::move(fs_stats_manager));
651674
}
652675

653676
} // namespace v2

src/client/vfs/meta/v2/filesystem.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "client/vfs/meta/v2/client_id.h"
2525
#include "client/vfs/meta/v2/mds_client.h"
2626
#include "client/vfs/meta/v2/mds_discovery.h"
27+
#include "client/vfs/statistics/fs_stats_manager.h"
2728
#include "client/vfs/vfs_meta.h"
2829
#include "common/status.h"
2930
#include "dingofs/mdsv2.pb.h"
@@ -77,15 +78,18 @@ class FileSessionMap {
7778
class MDSV2FileSystem : public vfs::MetaSystem {
7879
public:
7980
MDSV2FileSystem(mdsv2::FsInfoPtr fs_info, const ClientId& client_id,
80-
MDSDiscoveryPtr mds_discovery, MDSClientPtr mds_client);
81+
MDSDiscoveryPtr mds_discovery, MDSClientPtr mds_client,
82+
FsStatsManagerUPtr fs_stats_manager);
8183
~MDSV2FileSystem() override;
8284

8385
static MDSV2FileSystemUPtr New(mdsv2::FsInfoPtr fs_info,
8486
const ClientId& client_id,
8587
MDSDiscoveryPtr mds_discovery,
86-
MDSClientPtr mds_client) {
88+
MDSClientPtr mds_client,
89+
FsStatsManagerUPtr fs_stats_manager) {
8790
return std::make_unique<MDSV2FileSystem>(fs_info, client_id, mds_discovery,
88-
mds_client);
91+
mds_client,
92+
std::move(fs_stats_manager));
8993
}
9094

9195
static MDSV2FileSystemUPtr Build(const std::string& fs_name,
@@ -155,6 +159,8 @@ class MDSV2FileSystem : public vfs::MetaSystem {
155159

156160
void Heartbeat();
157161

162+
void PushFsStatsToMDS();
163+
158164
bool InitCrontab();
159165

160166
const std::string name_;
@@ -172,6 +178,8 @@ class MDSV2FileSystem : public vfs::MetaSystem {
172178
std::vector<mdsv2::CrontabConfig> crontab_configs_;
173179
// This is manage crontab, like heartbeat.
174180
mdsv2::CrontabManager crontab_manager_;
181+
// Fs stats manager
182+
FsStatsManagerUPtr fs_stats_manager_;
175183
};
176184

177185
} // namespace v2

src/client/vfs/meta/v2/mds_client.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,25 @@ Status MDSClient::GetFsQuota(FsStat& fs_stat) {
875875
return Status::OK();
876876
}
877877

878+
Status MDSClient::PushFsStatsToMDS(const std::string& fs_name,
879+
const pb::mdsv2::FsStatsData& fs_stat_data) {
880+
CHECK(fs_name != "") << "fs_name is invalid.";
881+
882+
pb::mdsv2::SetFsStatsRequest request;
883+
pb::mdsv2::SetFsStatsResponse response;
884+
885+
request.set_fs_name(fs_name);
886+
request.mutable_stats()->CopyFrom(fs_stat_data);
887+
888+
auto status =
889+
rpc_->SendRequest("MDSService", "SetFsStats", request, response);
890+
if (!status.ok()) {
891+
return status;
892+
}
893+
894+
return Status::OK();
895+
}
896+
878897
bool MDSClient::UpdateRouter() {
879898
pb::mdsv2::FsInfo new_fs_info;
880899
auto status = MDSClient::GetFsInfo(rpc_, fs_info_->GetName(), new_fs_info);

src/client/vfs/meta/v2/mds_client.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ class MDSClient {
104104

105105
Status GetFsQuota(FsStat& fs_stat);
106106

107+
Status PushFsStatsToMDS(const std::string& fs_name,
108+
const pb::mdsv2::FsStatsData& fs_stat_data);
109+
107110
private:
108111
EndPoint GetEndpoint(Ino ino);
109112
EndPoint GetEndpointByParent(int64_t parent);
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright (c) 2025 dingodb.com, Inc. All Rights Reserved
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
add_library(vfs_statistics
16+
fs_stats_manager.cpp
17+
)
18+
19+
target_link_libraries(vfs_statistics
20+
dingofs_common
21+
stub_metric
22+
PROTO_OBJS
23+
)

0 commit comments

Comments
 (0)