Skip to content

Commit 765cb35

Browse files
authored
Inflight limit for ReadRows to prevent dynnodes OOM (#16014)
(cherry picked from commit 773d088)
1 parent 57d5fd4 commit 765cb35

File tree

9 files changed

+123
-32
lines changed

9 files changed

+123
-32
lines changed

ydb/core/cms/console/immediate_controls_configurator.cpp

+43-4
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class TImmediateControlsConfigurator : public TActorBootstrapped<TImmediateContr
5151
TIntrusivePtr<TControlBoard> board);
5252
void ApplyConfig(const ::google::protobuf::Message &cfg,
5353
const TString &prefix,
54-
TIntrusivePtr<TControlBoard> board);
54+
TIntrusivePtr<TControlBoard> board,
55+
bool allowDynamicFields = false);
5556
TString MakePrefix(const TString &prefix,
5657
const TString &name);
5758

@@ -114,6 +115,11 @@ void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard>
114115
{
115116
for (int i = 0; i < desc->field_count(); ++i) {
116117
auto *fieldDesc = desc->field(i);
118+
auto name = MakePrefix(prefix, fieldDesc->name());
119+
120+
if (fieldDesc->is_map()) {
121+
continue;
122+
}
117123

118124
Y_ABORT_UNLESS(!fieldDesc->is_repeated(),
119125
"Repeated fields are not allowed in Immediate Controls Config");
@@ -171,17 +177,50 @@ void TImmediateControlsConfigurator::ApplyConfig(const NKikimrConfig::TImmediate
171177

172178
void TImmediateControlsConfigurator::ApplyConfig(const ::google::protobuf::Message &cfg,
173179
const TString &prefix,
174-
TIntrusivePtr<TControlBoard> board)
180+
TIntrusivePtr<TControlBoard> board,
181+
bool allowDynamicFields)
175182
{
176183
auto *desc = cfg.GetDescriptor();
177184
auto *reflection = cfg.GetReflection();
178185
for (int i = 0; i < desc->field_count(); ++i) {
179-
auto *fieldDesc = desc->field(i);
186+
const auto *fieldDesc = desc->field(i);
180187
auto fieldType = fieldDesc->type();
181188
auto name = MakePrefix(prefix, fieldDesc->name());
189+
190+
if (fieldDesc->is_map()) {
191+
auto *mapDesc = fieldDesc->message_type();
192+
auto *mapKey = mapDesc->map_key();
193+
auto *mapValue = mapDesc->map_value();
194+
195+
auto keyType = mapKey->type();
196+
auto valueType = mapValue->type();
197+
198+
Y_ABORT_UNLESS(keyType == google::protobuf::FieldDescriptor::TYPE_STRING,
199+
"Only string keys are allowed in Immediate Controls Config maps");
200+
201+
Y_ABORT_UNLESS(valueType == google::protobuf::FieldDescriptor::TYPE_MESSAGE,
202+
"Only message value are allowed in Immediate Controls Config maps");
203+
204+
auto entryCount = reflection->FieldSize(cfg, fieldDesc);
205+
for (int j = 0; j < entryCount; ++j) {
206+
const auto &entry = reflection->GetRepeatedMessage(cfg, fieldDesc, j);
207+
auto *entryReflection = entry.GetReflection();
208+
auto key = entryReflection->GetString(entry, mapKey);
209+
auto entryName = MakePrefix(name, key);
210+
ApplyConfig(entryReflection->GetMessage(entry, mapValue), entryName, board, true);
211+
}
212+
continue;
213+
}
214+
182215
if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64
183216
|| fieldType == google::protobuf::FieldDescriptor::TYPE_INT64) {
184-
Y_ABORT_UNLESS(Controls.contains(name));
217+
if (!Controls.contains(name)) {
218+
if (!allowDynamicFields) {
219+
Y_ABORT("Missing control for field %s", name.c_str());
220+
}
221+
AddControl(board, fieldDesc, prefix, true);
222+
}
223+
185224
if (reflection->HasField(cfg, fieldDesc)) {
186225
TAtomicBase prev;
187226
if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64)

ydb/core/cms/console/immediate_controls_configurator_ut.cpp

+26
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,32 @@ Y_UNIT_TEST_SUITE(TImmediateControlsConfiguratorTests)
258258
MakeAddAction(ITEM_CONTROLS_EXCEED_MAX));
259259
CompareControls(runtime, ITEM_CONTROLS_MAX.GetConfig().GetImmediateControlsConfig());
260260
}
261+
262+
Y_UNIT_TEST(TestDynamicMap)
263+
{
264+
TTenantTestRuntime runtime(DefaultConsoleTestConfig());
265+
InitImmediateControlsConfigurator(runtime);
266+
WaitForUpdate(runtime); // initial update
267+
268+
NKikimrConsole::TConfigItem dynamicMapValue;
269+
{
270+
auto &cfg = *dynamicMapValue.MutableConfig()->MutableImmediateControlsConfig();
271+
auto *grpcControls = cfg.MutableGRpcControls();
272+
273+
auto *requestConfigs = grpcControls->MutableRequestConfigs();
274+
auto &r = (*requestConfigs)["FooBar"];
275+
r.SetMaxInFlight(10);
276+
}
277+
278+
ConfigureAndWaitUpdate(runtime, MakeAddAction(dynamicMapValue));
279+
280+
auto icb = runtime.GetAppData().Icb;
281+
282+
TControlWrapper wrapper;
283+
284+
icb->RegisterSharedControl(wrapper, "GRpcControls.RequestConfigs.FooBar.MaxInFlight");
285+
UNIT_ASSERT_VALUES_EQUAL((ui64)(i64)wrapper, 10);
286+
}
261287
}
262288

263289
} // namespace NKikimr

ydb/core/driver_lib/run/run.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -729,8 +729,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
729729
}
730730

731731
if (hasTableService) {
732-
server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxies,
733-
hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
732+
server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, AppData->InFlightLimiterRegistry,
733+
grpcRequestProxies, hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
734734
}
735735

736736
if (hasClickhouseInternal) {

ydb/core/grpc_services/grpc_helper.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace NGRpcService {
66
//using namespace NActors;
77

88
NYdbGrpc::IGRpcRequestLimiterPtr TCreateLimiterCB::operator()(const char* serviceName, const char* requestName, i64 limit) const {
9-
TString fullName = TString(serviceName) + "_" + requestName;
9+
TString fullName = "GRpcControls.RequestConfigs." + TString(serviceName) + "_" + requestName;
1010
return LimiterRegistry->RegisterRequestType(fullName, limit);
1111
}
1212

@@ -34,7 +34,7 @@ NYdbGrpc::IGRpcRequestLimiterPtr TInFlightLimiterRegistry::RegisterRequestType(T
3434
TGuard<TMutex> g(Lock);
3535
if (!PerTypeLimiters.count(name)) {
3636
TControlWrapper control(limit, 0, 1000000);
37-
Icb->RegisterSharedControl(control, name + "_MaxInFlight");
37+
Icb->RegisterSharedControl(control, name + ".MaxInFlight");
3838
PerTypeLimiters[name] = new TRequestInFlightLimiter(control);
3939
}
4040

ydb/core/protos/config.proto

+15-1
Original file line numberDiff line numberDiff line change
@@ -1427,7 +1427,20 @@ message TImmediateControlsConfig {
14271427
DefaultValue: 0 }];
14281428
}
14291429

1430-
optional TDataShardControls DataShardControls = 1;
1430+
message TGRpcControls {
1431+
message TRequestConfig {
1432+
optional uint64 MaxInFlight = 1 [(ControlOptions) = {
1433+
Description: "Max in flight requests",
1434+
MinValue: 0,
1435+
MaxValue: 1000000,
1436+
DefaultValue: 0
1437+
}];
1438+
}
1439+
map<string, TRequestConfig> RequestConfigs = 1;
1440+
}
1441+
1442+
1443+
optional TDataShardControls DataShardControls = 1;
14311444
optional TTxLimitControls TxLimitControls = 2;
14321445
optional TCoordinatorControls CoordinatorControls = 3;
14331446
optional TSchemeShardControls SchemeShardControls = 4;
@@ -1437,6 +1450,7 @@ message TImmediateControlsConfig {
14371450
optional TTabletControls TabletControls = 8;
14381451
optional TDSProxyControls DSProxyControls = 9;
14391452
optional TBlobStorageControllerControls BlobStorageControllerControls = 11;
1453+
optional TGRpcControls GRpcControls = 14;
14401454
};
14411455

14421456
message TMeteringConfig {

ydb/core/testlib/test_client.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ namespace Tests {
374374
GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxies[0], true));
375375
GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxies[0], true));
376376
GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxies[0], true));
377-
GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxies, true, 1));
377+
GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxies, true, 1));
378378
GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxies[0], true));
379379
GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxies[0], true));
380380
GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxies[0], true));

ydb/library/grpc/server/grpc_request.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -605,8 +605,9 @@ class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter
605605
typename TBase::TStreamRequestCallback requestCallback,
606606
const char* name,
607607
TLoggerPtr logger,
608-
ICounterBlockPtr counters)
609-
: TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}
608+
ICounterBlockPtr counters,
609+
IGRpcRequestLimiterPtr limiter = nullptr)
610+
: TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}
610611
{
611612
}
612613
};

ydb/services/ydb/ydb_table.cpp

+26-20
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,31 @@ namespace NGRpcService {
99

1010
TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system,
1111
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
12+
TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry,
1213
const NActors::TActorId& proxyId,
1314
bool rlAllowed,
1415
size_t handlersPerCompletionQueue)
1516
: TGrpcServiceBase(system, counters, proxyId, rlAllowed)
1617
, HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
18+
, LimiterRegistry_(inFlightLimiterRegistry)
1719
{
1820
}
1921

2022
TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system,
2123
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
24+
TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry,
2225
const TVector<NActors::TActorId>& proxies,
2326
bool rlAllowed,
2427
size_t handlersPerCompletionQueue)
2528
: TGrpcServiceBase(system, counters, proxies, rlAllowed)
2629
, HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
30+
, LimiterRegistry_(inFlightLimiterRegistry)
2731
{
2832
}
2933

3034
void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
3135
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
36+
auto getLimiter = CreateLimiterCb(LimiterRegistry_);
3237

3338
size_t proxyCounter = 0;
3439

@@ -60,23 +65,24 @@ void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
6065
} \
6166
}
6267

63-
#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE, REQUEST_TYPE) \
64-
for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
65-
for (auto* cq: CQS) { \
66-
MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \
67-
(this, &Service_, cq, \
68-
[this, proxyCounter](NYdbGrpc::IRequestContextBase *ctx) { \
69-
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
70-
ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \
71-
new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \
72-
(ctx, &CB, TRequestAuxSettings { \
73-
.RlMode = RLSWITCH(TRateLimiterMode::LIMIT_TYPE), \
74-
.RequestType = NJaegerTracing::ERequestType::TABLE_##REQUEST_TYPE, \
75-
})); \
76-
}, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
77-
#NAME, logger, getCounterBlock("table", #NAME))->Run(); \
78-
++proxyCounter; \
79-
} \
68+
#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE, REQUEST_TYPE, USE_LIMITER) \
69+
for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
70+
for (auto* cq: CQS) { \
71+
MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \
72+
(this, &Service_, cq, \
73+
[this, proxyCounter](NYdbGrpc::IRequestContextBase *ctx) { \
74+
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
75+
ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \
76+
new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \
77+
(ctx, &CB, TRequestAuxSettings { \
78+
.RlMode = RLSWITCH(TRateLimiterMode::LIMIT_TYPE), \
79+
.RequestType = NJaegerTracing::ERequestType::TABLE_##REQUEST_TYPE, \
80+
})); \
81+
}, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
82+
#NAME, logger, getCounterBlock("table", #NAME), \
83+
(USE_LIMITER ? getLimiter("TableService", #NAME, UNLIMITED_INFLIGHT) : nullptr))->Run(); \
84+
++proxyCounter; \
85+
} \
8086
}
8187

8288
ADD_REQUEST_LIMIT(CreateSession, DoCreateSessionRequest, Rps, CREATESESSION)
@@ -101,9 +107,9 @@ void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
101107
ADD_REQUEST_LIMIT(ExecuteDataQuery, DoExecuteDataQueryRequest, Ru, EXECUTEDATAQUERY, Auditable)
102108
ADD_REQUEST_LIMIT(BulkUpsert, DoBulkUpsertRequest, Ru, BULKUPSERT, Auditable)
103109

104-
ADD_STREAM_REQUEST_LIMIT(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, DoExecuteScanQueryRequest, RuOnProgress, STREAMEXECUTESCANQUERY)
105-
ADD_STREAM_REQUEST_LIMIT(StreamReadTable, ReadTableRequest, ReadTableResponse, DoReadTableRequest, RuOnProgress, STREAMREADTABLE)
106-
ADD_STREAM_REQUEST_LIMIT(ReadRows, ReadRowsRequest, ReadRowsResponse, DoReadRowsRequest, Ru, READROWS)
110+
ADD_STREAM_REQUEST_LIMIT(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, DoExecuteScanQueryRequest, RuOnProgress, STREAMEXECUTESCANQUERY, false)
111+
ADD_STREAM_REQUEST_LIMIT(StreamReadTable, ReadTableRequest, ReadTableResponse, DoReadTableRequest, RuOnProgress, STREAMREADTABLE, false)
112+
ADD_STREAM_REQUEST_LIMIT(ReadRows, ReadRowsRequest, ReadRowsResponse, DoReadRowsRequest, Ru, READROWS, true)
107113

108114
#undef ADD_REQUEST_LIMIT
109115
#undef ADD_STREAM_REQUEST_LIMIT

ydb/services/ydb/ydb_table.h

+5
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,30 @@
44
#include <ydb/library/grpc/server/grpc_server.h>
55
#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
66
#include <ydb/core/grpc_services/base/base_service.h>
7+
#include <ydb/core/grpc_services/grpc_helper.h>
78

89
namespace NKikimr {
910
namespace NGRpcService {
1011

1112
class TGRpcYdbTableService
1213
: public TGrpcServiceBase<Ydb::Table::V1::TableService>
1314
{
15+
constexpr static i64 UNLIMITED_INFLIGHT = 0;
1416
public:
1517
using TGrpcServiceBase<Ydb::Table::V1::TableService>::TGrpcServiceBase;
1618

1719
TGRpcYdbTableService(
1820
NActors::TActorSystem *system,
1921
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
22+
TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry,
2023
const NActors::TActorId& proxyId,
2124
bool rlAllowed,
2225
size_t handlersPerCompletionQueue = 1);
2326

2427
TGRpcYdbTableService(
2528
NActors::TActorSystem *system,
2629
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
30+
TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry,
2731
const TVector<NActors::TActorId>& proxies,
2832
bool rlAllowed,
2933
size_t handlersPerCompletionQueue);
@@ -33,6 +37,7 @@ class TGRpcYdbTableService
3337

3438
private:
3539
const size_t HandlersPerCompletionQueue;
40+
TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> LimiterRegistry_;
3641
};
3742

3843
} // namespace NGRpcService

0 commit comments

Comments
 (0)