diff --git a/ydb/core/cms/console/immediate_controls_configurator.cpp b/ydb/core/cms/console/immediate_controls_configurator.cpp index 6a23585afb60..78389f41bc06 100644 --- a/ydb/core/cms/console/immediate_controls_configurator.cpp +++ b/ydb/core/cms/console/immediate_controls_configurator.cpp @@ -51,7 +51,8 @@ class TImmediateControlsConfigurator : public TActorBootstrapped board); void ApplyConfig(const ::google::protobuf::Message &cfg, const TString &prefix, - TIntrusivePtr board); + TIntrusivePtr board, + bool allowDynamicFields = false); TString MakePrefix(const TString &prefix, const TString &name); @@ -114,6 +115,11 @@ void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr { for (int i = 0; i < desc->field_count(); ++i) { auto *fieldDesc = desc->field(i); + auto name = MakePrefix(prefix, fieldDesc->name()); + + if (fieldDesc->is_map()) { + continue; + } Y_ABORT_UNLESS(!fieldDesc->is_repeated(), "Repeated fields are not allowed in Immediate Controls Config"); @@ -171,17 +177,50 @@ void TImmediateControlsConfigurator::ApplyConfig(const NKikimrConfig::TImmediate void TImmediateControlsConfigurator::ApplyConfig(const ::google::protobuf::Message &cfg, const TString &prefix, - TIntrusivePtr board) + TIntrusivePtr board, + bool allowDynamicFields) { auto *desc = cfg.GetDescriptor(); auto *reflection = cfg.GetReflection(); for (int i = 0; i < desc->field_count(); ++i) { - auto *fieldDesc = desc->field(i); + const auto *fieldDesc = desc->field(i); auto fieldType = fieldDesc->type(); auto name = MakePrefix(prefix, fieldDesc->name()); + + if (fieldDesc->is_map()) { + auto *mapDesc = fieldDesc->message_type(); + auto *mapKey = mapDesc->map_key(); + auto *mapValue = mapDesc->map_value(); + + auto keyType = mapKey->type(); + auto valueType = mapValue->type(); + + Y_ABORT_UNLESS(keyType == google::protobuf::FieldDescriptor::TYPE_STRING, + "Only string keys are allowed in Immediate Controls Config maps"); + + Y_ABORT_UNLESS(valueType == google::protobuf::FieldDescriptor::TYPE_MESSAGE, + "Only message value are allowed in Immediate Controls Config maps"); + + auto entryCount = reflection->FieldSize(cfg, fieldDesc); + for (int j = 0; j < entryCount; ++j) { + const auto &entry = reflection->GetRepeatedMessage(cfg, fieldDesc, j); + auto *entryReflection = entry.GetReflection(); + auto key = entryReflection->GetString(entry, mapKey); + auto entryName = MakePrefix(name, key); + ApplyConfig(entryReflection->GetMessage(entry, mapValue), entryName, board, true); + } + continue; + } + if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64 || fieldType == google::protobuf::FieldDescriptor::TYPE_INT64) { - Y_ABORT_UNLESS(Controls.contains(name)); + if (!Controls.contains(name)) { + if (!allowDynamicFields) { + Y_ABORT("Missing control for field %s", name.c_str()); + } + AddControl(board, fieldDesc, prefix, true); + } + if (reflection->HasField(cfg, fieldDesc)) { TAtomicBase prev; if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64) diff --git a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp index 7e40db01f845..9e13a6510893 100644 --- a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp +++ b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp @@ -258,6 +258,32 @@ Y_UNIT_TEST_SUITE(TImmediateControlsConfiguratorTests) MakeAddAction(ITEM_CONTROLS_EXCEED_MAX)); CompareControls(runtime, ITEM_CONTROLS_MAX.GetConfig().GetImmediateControlsConfig()); } + + Y_UNIT_TEST(TestDynamicMap) + { + TTenantTestRuntime runtime(DefaultConsoleTestConfig()); + InitImmediateControlsConfigurator(runtime); + WaitForUpdate(runtime); // initial update + + NKikimrConsole::TConfigItem dynamicMapValue; + { + auto &cfg = *dynamicMapValue.MutableConfig()->MutableImmediateControlsConfig(); + auto *grpcControls = cfg.MutableGRpcControls(); + + auto *requestConfigs = grpcControls->MutableRequestConfigs(); + auto &r = (*requestConfigs)["FooBar"]; + r.SetMaxInFlight(10); + } + + ConfigureAndWaitUpdate(runtime, MakeAddAction(dynamicMapValue)); + + auto icb = runtime.GetAppData().Icb; + + TControlWrapper wrapper; + + icb->RegisterSharedControl(wrapper, "GRpcControls.RequestConfigs.FooBar.MaxInFlight"); + UNIT_ASSERT_VALUES_EQUAL((ui64)(i64)wrapper, 10); + } } } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index d2bd9600803b..4036950ef9bf 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -729,8 +729,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { } if (hasTableService) { - server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxies, - hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); + server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, AppData->InFlightLimiterRegistry, + grpcRequestProxies, hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); } if (hasClickhouseInternal) { diff --git a/ydb/core/grpc_services/grpc_helper.cpp b/ydb/core/grpc_services/grpc_helper.cpp index 47918f038672..66626b9a23e1 100644 --- a/ydb/core/grpc_services/grpc_helper.cpp +++ b/ydb/core/grpc_services/grpc_helper.cpp @@ -6,7 +6,7 @@ namespace NGRpcService { //using namespace NActors; NYdbGrpc::IGRpcRequestLimiterPtr TCreateLimiterCB::operator()(const char* serviceName, const char* requestName, i64 limit) const { - TString fullName = TString(serviceName) + "_" + requestName; + TString fullName = "GRpcControls.RequestConfigs." + TString(serviceName) + "_" + requestName; return LimiterRegistry->RegisterRequestType(fullName, limit); } @@ -34,7 +34,7 @@ NYdbGrpc::IGRpcRequestLimiterPtr TInFlightLimiterRegistry::RegisterRequestType(T TGuard g(Lock); if (!PerTypeLimiters.count(name)) { TControlWrapper control(limit, 0, 1000000); - Icb->RegisterSharedControl(control, name + "_MaxInFlight"); + Icb->RegisterSharedControl(control, name + ".MaxInFlight"); PerTypeLimiters[name] = new TRequestInFlightLimiter(control); } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 2a1df083bc56..0c9ba41544b0 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1427,7 +1427,20 @@ message TImmediateControlsConfig { DefaultValue: 0 }]; } - optional TDataShardControls DataShardControls = 1; + message TGRpcControls { + message TRequestConfig { + optional uint64 MaxInFlight = 1 [(ControlOptions) = { + Description: "Max in flight requests", + MinValue: 0, + MaxValue: 1000000, + DefaultValue: 0 + }]; + } + map RequestConfigs = 1; + } + + + optional TDataShardControls DataShardControls = 1; optional TTxLimitControls TxLimitControls = 2; optional TCoordinatorControls CoordinatorControls = 3; optional TSchemeShardControls SchemeShardControls = 4; @@ -1437,6 +1450,7 @@ message TImmediateControlsConfig { optional TTabletControls TabletControls = 8; optional TDSProxyControls DSProxyControls = 9; optional TBlobStorageControllerControls BlobStorageControllerControls = 11; + optional TGRpcControls GRpcControls = 14; }; message TMeteringConfig { diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index fbc0ed46e7db..631d58d07a72 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -374,7 +374,7 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxies[0], true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxies, true, 1)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxies, true, 1)); GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxies[0], true)); diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h index 3be0cb44ee9f..ee853ee88f91 100644 --- a/ydb/library/grpc/server/grpc_request.h +++ b/ydb/library/grpc/server/grpc_request.h @@ -605,8 +605,9 @@ class TGRpcRequest: public TGRpcRequestImpl counters, + TIntrusivePtr inFlightLimiterRegistry, const NActors::TActorId& proxyId, bool rlAllowed, size_t handlersPerCompletionQueue) : TGrpcServiceBase(system, counters, proxyId, rlAllowed) , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) + , LimiterRegistry_(inFlightLimiterRegistry) { } TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + TIntrusivePtr inFlightLimiterRegistry, const TVector& proxies, bool rlAllowed, size_t handlersPerCompletionQueue) : TGrpcServiceBase(system, counters, proxies, rlAllowed) , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) + , LimiterRegistry_(inFlightLimiterRegistry) { } void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + auto getLimiter = CreateLimiterCb(LimiterRegistry_); size_t proxyCounter = 0; @@ -60,23 +65,24 @@ void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { } \ } -#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE, REQUEST_TYPE) \ - for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ - for (auto* cq: CQS) { \ - MakeIntrusive> \ - (this, &Service_, cq, \ - [this, proxyCounter](NYdbGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \ - new TGrpcRequestNoOperationCall \ - (ctx, &CB, TRequestAuxSettings { \ - .RlMode = RLSWITCH(TRateLimiterMode::LIMIT_TYPE), \ - .RequestType = NJaegerTracing::ERequestType::TABLE_##REQUEST_TYPE, \ - })); \ - }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("table", #NAME))->Run(); \ - ++proxyCounter; \ - } \ +#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE, REQUEST_TYPE, USE_LIMITER) \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (auto* cq: CQS) { \ + MakeIntrusive> \ + (this, &Service_, cq, \ + [this, proxyCounter](NYdbGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \ + new TGrpcRequestNoOperationCall \ + (ctx, &CB, TRequestAuxSettings { \ + .RlMode = RLSWITCH(TRateLimiterMode::LIMIT_TYPE), \ + .RequestType = NJaegerTracing::ERequestType::TABLE_##REQUEST_TYPE, \ + })); \ + }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("table", #NAME), \ + (USE_LIMITER ? getLimiter("TableService", #NAME, UNLIMITED_INFLIGHT) : nullptr))->Run(); \ + ++proxyCounter; \ + } \ } ADD_REQUEST_LIMIT(CreateSession, DoCreateSessionRequest, Rps, CREATESESSION) @@ -101,9 +107,9 @@ void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { ADD_REQUEST_LIMIT(ExecuteDataQuery, DoExecuteDataQueryRequest, Ru, EXECUTEDATAQUERY, Auditable) ADD_REQUEST_LIMIT(BulkUpsert, DoBulkUpsertRequest, Ru, BULKUPSERT, Auditable) - ADD_STREAM_REQUEST_LIMIT(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, DoExecuteScanQueryRequest, RuOnProgress, STREAMEXECUTESCANQUERY) - ADD_STREAM_REQUEST_LIMIT(StreamReadTable, ReadTableRequest, ReadTableResponse, DoReadTableRequest, RuOnProgress, STREAMREADTABLE) - ADD_STREAM_REQUEST_LIMIT(ReadRows, ReadRowsRequest, ReadRowsResponse, DoReadRowsRequest, Ru, READROWS) + ADD_STREAM_REQUEST_LIMIT(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, DoExecuteScanQueryRequest, RuOnProgress, STREAMEXECUTESCANQUERY, false) + ADD_STREAM_REQUEST_LIMIT(StreamReadTable, ReadTableRequest, ReadTableResponse, DoReadTableRequest, RuOnProgress, STREAMREADTABLE, false) + ADD_STREAM_REQUEST_LIMIT(ReadRows, ReadRowsRequest, ReadRowsResponse, DoReadRowsRequest, Ru, READROWS, true) #undef ADD_REQUEST_LIMIT #undef ADD_STREAM_REQUEST_LIMIT diff --git a/ydb/services/ydb/ydb_table.h b/ydb/services/ydb/ydb_table.h index 05c9a950cdcf..08060e4d2605 100644 --- a/ydb/services/ydb/ydb_table.h +++ b/ydb/services/ydb/ydb_table.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace NKikimr { namespace NGRpcService { @@ -11,12 +12,14 @@ namespace NGRpcService { class TGRpcYdbTableService : public TGrpcServiceBase { + constexpr static i64 UNLIMITED_INFLIGHT = 0; public: using TGrpcServiceBase::TGrpcServiceBase; TGRpcYdbTableService( NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + TIntrusivePtr inFlightLimiterRegistry, const NActors::TActorId& proxyId, bool rlAllowed, size_t handlersPerCompletionQueue = 1); @@ -24,6 +27,7 @@ class TGRpcYdbTableService TGRpcYdbTableService( NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + TIntrusivePtr inFlightLimiterRegistry, const TVector& proxies, bool rlAllowed, size_t handlersPerCompletionQueue); @@ -33,6 +37,7 @@ class TGRpcYdbTableService private: const size_t HandlersPerCompletionQueue; + TIntrusivePtr LimiterRegistry_; }; } // namespace NGRpcService