Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable inflight limit for ReadRows gRPC call #16014

Open
wants to merge 1 commit into
base: stream-nb-24-3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions ydb/core/cms/console/immediate_controls_configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class TImmediateControlsConfigurator : public TActorBootstrapped<TImmediateContr
TIntrusivePtr<TControlBoard> board);
void ApplyConfig(const ::google::protobuf::Message &cfg,
const TString &prefix,
TIntrusivePtr<TControlBoard> board);
TIntrusivePtr<TControlBoard> board,
bool allowDynamicFields = false);
TString MakePrefix(const TString &prefix,
const TString &name);

Expand Down Expand Up @@ -114,6 +115,11 @@ void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard>
{
for (int i = 0; i < desc->field_count(); ++i) {
auto *fieldDesc = desc->field(i);
auto name = MakePrefix(prefix, fieldDesc->name());

if (fieldDesc->is_map()) {
continue;
}

Y_ABORT_UNLESS(!fieldDesc->is_repeated(),
"Repeated fields are not allowed in Immediate Controls Config");
Expand Down Expand Up @@ -171,17 +177,50 @@ void TImmediateControlsConfigurator::ApplyConfig(const NKikimrConfig::TImmediate

void TImmediateControlsConfigurator::ApplyConfig(const ::google::protobuf::Message &cfg,
const TString &prefix,
TIntrusivePtr<TControlBoard> board)
TIntrusivePtr<TControlBoard> board,
bool allowDynamicFields)
{
auto *desc = cfg.GetDescriptor();
auto *reflection = cfg.GetReflection();
for (int i = 0; i < desc->field_count(); ++i) {
auto *fieldDesc = desc->field(i);
const auto *fieldDesc = desc->field(i);
auto fieldType = fieldDesc->type();
auto name = MakePrefix(prefix, fieldDesc->name());

if (fieldDesc->is_map()) {
auto *mapDesc = fieldDesc->message_type();
auto *mapKey = mapDesc->map_key();
auto *mapValue = mapDesc->map_value();

auto keyType = mapKey->type();
auto valueType = mapValue->type();

Y_ABORT_UNLESS(keyType == google::protobuf::FieldDescriptor::TYPE_STRING,
"Only string keys are allowed in Immediate Controls Config maps");

Y_ABORT_UNLESS(valueType == google::protobuf::FieldDescriptor::TYPE_MESSAGE,
"Only message value are allowed in Immediate Controls Config maps");

auto entryCount = reflection->FieldSize(cfg, fieldDesc);
for (int j = 0; j < entryCount; ++j) {
const auto &entry = reflection->GetRepeatedMessage(cfg, fieldDesc, j);
auto *entryReflection = entry.GetReflection();
auto key = entryReflection->GetString(entry, mapKey);
auto entryName = MakePrefix(name, key);
ApplyConfig(entryReflection->GetMessage(entry, mapValue), entryName, board, true);
}
continue;
}

if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64
|| fieldType == google::protobuf::FieldDescriptor::TYPE_INT64) {
Y_ABORT_UNLESS(Controls.contains(name));
if (!Controls.contains(name)) {
if (!allowDynamicFields) {
Y_ABORT("Missing control for field %s", name.c_str());
}
AddControl(board, fieldDesc, prefix, true);
}

if (reflection->HasField(cfg, fieldDesc)) {
TAtomicBase prev;
if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64)
Expand Down
26 changes: 26 additions & 0 deletions ydb/core/cms/console/immediate_controls_configurator_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,32 @@ Y_UNIT_TEST_SUITE(TImmediateControlsConfiguratorTests)
MakeAddAction(ITEM_CONTROLS_EXCEED_MAX));
CompareControls(runtime, ITEM_CONTROLS_MAX.GetConfig().GetImmediateControlsConfig());
}

Y_UNIT_TEST(TestDynamicMap)
{
TTenantTestRuntime runtime(DefaultConsoleTestConfig());
InitImmediateControlsConfigurator(runtime);
WaitForUpdate(runtime); // initial update

NKikimrConsole::TConfigItem dynamicMapValue;
{
auto &cfg = *dynamicMapValue.MutableConfig()->MutableImmediateControlsConfig();
auto *grpcControls = cfg.MutableGRpcControls();

auto *requestConfigs = grpcControls->MutableRequestConfigs();
auto &r = (*requestConfigs)["FooBar"];
r.SetMaxInFlight(10);
}

ConfigureAndWaitUpdate(runtime, MakeAddAction(dynamicMapValue));

auto icb = runtime.GetAppData().Icb;

TControlWrapper wrapper;

icb->RegisterSharedControl(wrapper, "GRpcControls.RequestConfigs.FooBar.MaxInFlight");
UNIT_ASSERT_VALUES_EQUAL((ui64)(i64)wrapper, 10);
}
}

} // namespace NKikimr
4 changes: 2 additions & 2 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,8 +729,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
}

if (hasTableService) {
server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxies,
hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, AppData->InFlightLimiterRegistry,
grpcRequestProxies, hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
}

if (hasClickhouseInternal) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/grpc_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace NGRpcService {
//using namespace NActors;

NYdbGrpc::IGRpcRequestLimiterPtr TCreateLimiterCB::operator()(const char* serviceName, const char* requestName, i64 limit) const {
TString fullName = TString(serviceName) + "_" + requestName;
TString fullName = "GRpcControls.RequestConfigs." + TString(serviceName) + "_" + requestName;
return LimiterRegistry->RegisterRequestType(fullName, limit);
}

Expand Down Expand Up @@ -34,7 +34,7 @@ NYdbGrpc::IGRpcRequestLimiterPtr TInFlightLimiterRegistry::RegisterRequestType(T
TGuard<TMutex> g(Lock);
if (!PerTypeLimiters.count(name)) {
TControlWrapper control(limit, 0, 1000000);
Icb->RegisterSharedControl(control, name + "_MaxInFlight");
Icb->RegisterSharedControl(control, name + ".MaxInFlight");
PerTypeLimiters[name] = new TRequestInFlightLimiter(control);
}

Expand Down
16 changes: 15 additions & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,20 @@ message TImmediateControlsConfig {
DefaultValue: 0 }];
}

optional TDataShardControls DataShardControls = 1;
message TGRpcControls {
message TRequestConfig {
optional uint64 MaxInFlight = 1 [(ControlOptions) = {
Description: "Max in flight requests",
MinValue: 0,
MaxValue: 1000000,
DefaultValue: 0
}];
}
map<string, TRequestConfig> RequestConfigs = 1;
}


optional TDataShardControls DataShardControls = 1;
optional TTxLimitControls TxLimitControls = 2;
optional TCoordinatorControls CoordinatorControls = 3;
optional TSchemeShardControls SchemeShardControls = 4;
Expand All @@ -1437,6 +1450,7 @@ message TImmediateControlsConfig {
optional TTabletControls TabletControls = 8;
optional TDSProxyControls DSProxyControls = 9;
optional TBlobStorageControllerControls BlobStorageControllerControls = 11;
optional TGRpcControls GRpcControls = 14;
};

message TMeteringConfig {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ namespace Tests {
GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxies, true, 1));
GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxies, true, 1));
GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxies[0], true));
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/grpc/server/grpc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,9 @@ class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter
typename TBase::TStreamRequestCallback requestCallback,
const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters)
: TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter = nullptr)
: TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}
{
}
};
Expand Down
46 changes: 26 additions & 20 deletions ydb/services/ydb/ydb_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,31 @@ namespace NGRpcService {

TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry,
const NActors::TActorId& proxyId,
bool rlAllowed,
size_t handlersPerCompletionQueue)
: TGrpcServiceBase(system, counters, proxyId, rlAllowed)
, HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
, LimiterRegistry_(inFlightLimiterRegistry)
{
}

TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry,
const TVector<NActors::TActorId>& proxies,
bool rlAllowed,
size_t handlersPerCompletionQueue)
: TGrpcServiceBase(system, counters, proxies, rlAllowed)
, HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
, LimiterRegistry_(inFlightLimiterRegistry)
{
}

void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
auto getLimiter = CreateLimiterCb(LimiterRegistry_);

size_t proxyCounter = 0;

Expand Down Expand Up @@ -60,23 +65,24 @@ void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
} \
}

#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE, REQUEST_TYPE) \
for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
for (auto* cq: CQS) { \
MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \
(this, &Service_, cq, \
[this, proxyCounter](NYdbGrpc::IRequestContextBase *ctx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \
new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \
(ctx, &CB, TRequestAuxSettings { \
.RlMode = RLSWITCH(TRateLimiterMode::LIMIT_TYPE), \
.RequestType = NJaegerTracing::ERequestType::TABLE_##REQUEST_TYPE, \
})); \
}, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("table", #NAME))->Run(); \
++proxyCounter; \
} \
#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE, REQUEST_TYPE, USE_LIMITER) \
for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
for (auto* cq: CQS) { \
MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \
(this, &Service_, cq, \
[this, proxyCounter](NYdbGrpc::IRequestContextBase *ctx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \
new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \
(ctx, &CB, TRequestAuxSettings { \
.RlMode = RLSWITCH(TRateLimiterMode::LIMIT_TYPE), \
.RequestType = NJaegerTracing::ERequestType::TABLE_##REQUEST_TYPE, \
})); \
}, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("table", #NAME), \
(USE_LIMITER ? getLimiter("TableService", #NAME, UNLIMITED_INFLIGHT) : nullptr))->Run(); \
++proxyCounter; \
} \
}

ADD_REQUEST_LIMIT(CreateSession, DoCreateSessionRequest, Rps, CREATESESSION)
Expand All @@ -101,9 +107,9 @@ void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
ADD_REQUEST_LIMIT(ExecuteDataQuery, DoExecuteDataQueryRequest, Ru, EXECUTEDATAQUERY, Auditable)
ADD_REQUEST_LIMIT(BulkUpsert, DoBulkUpsertRequest, Ru, BULKUPSERT, Auditable)

ADD_STREAM_REQUEST_LIMIT(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, DoExecuteScanQueryRequest, RuOnProgress, STREAMEXECUTESCANQUERY)
ADD_STREAM_REQUEST_LIMIT(StreamReadTable, ReadTableRequest, ReadTableResponse, DoReadTableRequest, RuOnProgress, STREAMREADTABLE)
ADD_STREAM_REQUEST_LIMIT(ReadRows, ReadRowsRequest, ReadRowsResponse, DoReadRowsRequest, Ru, READROWS)
ADD_STREAM_REQUEST_LIMIT(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, DoExecuteScanQueryRequest, RuOnProgress, STREAMEXECUTESCANQUERY, false)
ADD_STREAM_REQUEST_LIMIT(StreamReadTable, ReadTableRequest, ReadTableResponse, DoReadTableRequest, RuOnProgress, STREAMREADTABLE, false)
ADD_STREAM_REQUEST_LIMIT(ReadRows, ReadRowsRequest, ReadRowsResponse, DoReadRowsRequest, Ru, READROWS, true)

#undef ADD_REQUEST_LIMIT
#undef ADD_STREAM_REQUEST_LIMIT
Expand Down
5 changes: 5 additions & 0 deletions ydb/services/ydb/ydb_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,30 @@
#include <ydb/library/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
#include <ydb/core/grpc_services/base/base_service.h>
#include <ydb/core/grpc_services/grpc_helper.h>

namespace NKikimr {
namespace NGRpcService {

class TGRpcYdbTableService
: public TGrpcServiceBase<Ydb::Table::V1::TableService>
{
constexpr static i64 UNLIMITED_INFLIGHT = 0;
public:
using TGrpcServiceBase<Ydb::Table::V1::TableService>::TGrpcServiceBase;

TGRpcYdbTableService(
NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry,
const NActors::TActorId& proxyId,
bool rlAllowed,
size_t handlersPerCompletionQueue = 1);

TGRpcYdbTableService(
NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry,
const TVector<NActors::TActorId>& proxies,
bool rlAllowed,
size_t handlersPerCompletionQueue);
Expand All @@ -33,6 +37,7 @@ class TGRpcYdbTableService

private:
const size_t HandlersPerCompletionQueue;
TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> LimiterRegistry_;
};

} // namespace NGRpcService
Expand Down
Loading