diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 77ca538f4859..8495e7ffe3e2 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -198,8 +198,9 @@ void Init( yqCounters->GetSubgroup("subcomponent", "http_gateway")); NYql::NConnector::IClient::TPtr connectorClient = nullptr; + if (protoConfig.GetGateways().GetGeneric().HasConnector()) { - connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector()); + connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric()); } if (protoConfig.GetTokenAccessor().GetEnabled()) { diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp index 0690dc0a0237..c3f5c6759fbc 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp @@ -107,7 +107,7 @@ namespace NKikimr::NKqp { // Initialize Connector client if (queryServiceConfig.HasGeneric()) { GenericGatewaysConfig = queryServiceConfig.GetGeneric(); - ConnectorClient = NYql::NConnector::MakeClientGRPC(GenericGatewaysConfig.GetConnector()); + ConnectorClient = NYql::NConnector::MakeClientGRPC(GenericGatewaysConfig); if (queryServiceConfig.HasMdbTransformHost()) { MdbEndpointGenerator = NFq::MakeMdbEndpointGeneratorGeneric(queryServiceConfig.GetMdbTransformHost()); diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 5a68bf340280..578f5c0f1e61 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1215,7 +1215,7 @@ namespace Tests { if (queryServiceConfig.HasGeneric()) { const auto& genericGatewayConfig = queryServiceConfig.GetGeneric(); - connectorClient = NYql::NConnector::MakeClientGRPC(genericGatewayConfig.GetConnector()); + connectorClient = NYql::NConnector::MakeClientGRPC(genericGatewayConfig); auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId(); Runtime->RegisterService( diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp index b7ca8536782b..18a111c3aa85 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp @@ -18,6 +18,8 @@ namespace NYql::NConnector { /// /// Connection Factory which always returns an exact connection /// + /// TODO: check connection healthiness + /// template class TSingleConnectionFactory final : public IConnectionFactory { public: @@ -159,6 +161,27 @@ namespace NYql::NConnector { std::vector>> Pool_; }; + /// + /// Create a factory based on a it's type @sa NYql::EConnectionFactory + /// + std::unique_ptr> CreateFactoryForConnector( + NYql::EConnectionFactory factory, + std::shared_ptr client, + const NYdbGrpc::TGRpcClientConfig& grpcConfig, + const NYdbGrpc::TTcpKeepAliveSettings& keepAlive) { + + switch (factory) { + case SINGLE: + return std::make_unique>( + client, grpcConfig, keepAlive); + + case NEW_FOR_EACH_REQUEST: + default: + return std::make_unique>( + client, grpcConfig, keepAlive); + } + } + /* struct TConnectorMetrics { @@ -208,24 +231,8 @@ namespace NYql::NConnector { class TClientGRPC: public IClient { public: - TClientGRPC(const TGenericConnectorConfig& config) { - // TODO: place in a config file ? - GrpcClient_ = std::make_shared(DEFAULT_CONNECTION_MANAGER_NUM_THREADS); - auto grpcConfig = ConnectorConfigToGrpcConfig(config); - - auto keepAlive = NYdbGrpc::TTcpKeepAliveSettings { - // TODO configure hardcoded values - .Enabled = true, - .Idle = 30, - .Count = 5, - .Interval = 10 - }; - - // TODO: - // 1. Add config parameter to TGenericConnectorConfig to choose factory - // 2. Add support for multiple connector's config - ConnectionFactory_ = std::make_unique>( - GrpcClient_, grpcConfig, keepAlive); + explicit TClientGRPC(const TGenericGatewayConfig& config) { + Init(config); } ~TClientGRPC() { @@ -300,13 +307,49 @@ namespace NYql::NConnector { } private: - std::shared_ptr> GetConnection(const NYql::EGenericDataSourceKind&) { - // TODO: choose appropriate connection factory by data source kind - return ConnectionFactory_->Create(); + void Init(const TGenericGatewayConfig& config) { + // TODO: place in a config file ? + GrpcClient_ = std::make_shared(DEFAULT_CONNECTION_MANAGER_NUM_THREADS); + + auto keepAlive = NYdbGrpc::TTcpKeepAliveSettings { + // TODO: configure hardcoded values + .Enabled = true, + .Idle = 30, + .Count = 5, + .Interval = 10 + }; + + size_t count = 0; + auto cfg = ConnectorConfigToGrpcConfig( + config.GetConnector(), count++); + + DefaultFactory_ = CreateFactoryForConnector( + config.GetConnector().GetFactory(), GrpcClient_, cfg, keepAlive); + + for (auto c : config.GetConnectors()) { + auto cfg = ConnectorConfigToGrpcConfig(c, count++); + std::shared_ptr> f + = CreateFactoryForConnector(c.GetFactory(), GrpcClient_, cfg, keepAlive); + + for (auto k : c.GetForKinds()) { + if (!FactoryForKind_.try_emplace(NYql::EGenericDataSourceKind(k), f).second) { + throw yexception() + << "Duplicate connector is provided for the kind: " + << EGenericDataSourceKind_Name(k); + } + } + } + } + + std::shared_ptr> GetConnection(NYql::EGenericDataSourceKind kind) const { + auto itr = FactoryForKind_.find(kind); + + return FactoryForKind_.end() == itr ? + DefaultFactory_->Create() : itr->second->Create(); } template - TIteratorAsyncResult> DoEmptyStreamResponse(const grpc::StatusCode& code, TString msg) { + TIteratorAsyncResult> DoEmptyStreamResponse(const grpc::StatusCode& code, TString msg) const { auto promise = NThreading::NewPromise>>(); auto status = NYdbGrpc::TGrpcStatus(grpc::Status(code, msg)); @@ -335,7 +378,7 @@ namespace NYql::NConnector { typename TRpcCallback = typename NYdbGrpc::TStreamRequestReadProcessor::TAsyncRequest > TIteratorAsyncResult> ServerSideStreamingCall( - const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) { + const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) const { auto promise = NThreading::NewPromise>>(); auto callback = [promise](NYdbGrpc::TGrpcStatus&& status, NYdbGrpc::IStreamRequestReadProcessor::TPtr streamProcessor) mutable { @@ -359,16 +402,22 @@ namespace NYql::NConnector { return promise.GetFuture(); } - NYdbGrpc::TGRpcClientConfig ConnectorConfigToGrpcConfig(const TGenericConnectorConfig& config) { - auto cfg = NYdbGrpc::TGRpcClientConfig(); + NYdbGrpc::TGRpcClientConfig ConnectorConfigToGrpcConfig(const TGenericConnectorConfig& config, size_t order) const { + auto cfg = NYdbGrpc::TGRpcClientConfig(); + + // Connector's name. If order equals to zero, it means that the config belongs "TGenericGatewayConfig.Connector" + // (default connector); otherwise, it is from "TGenericGatewayConfig.ConnectorS" + auto name = TStringBuilder() + << "Connector[" << (order == 0 ? TString("default") : TStringBuilder() << (order - 1)) << "]"; - Y_ENSURE(config.GetEndpoint().host(), TStringBuilder() << "Empty host in TGenericConnectorConfig: " << config.DebugString()); - Y_ENSURE(config.GetEndpoint().port(), TStringBuilder() << "Empty port in TGenericConnectorConfig: " << config.DebugString()); + Y_ENSURE(config.GetEndpoint().host(), TStringBuilder() << "Empty host in " << name << ": " << config.DebugString()); + Y_ENSURE(config.GetEndpoint().port(), TStringBuilder() << "Empty port in " << name << ": " << config.DebugString()); cfg.Locator = TStringBuilder() << config.GetEndpoint().host() << ":" << config.GetEndpoint().port(); cfg.EnableSsl = config.GetUseSsl(); - YQL_CLOG(INFO, ProviderGeneric) << "Connector endpoint: " << (config.GetUseSsl() ? "grpcs" : "grpc") << "://" << cfg.Locator; + YQL_CLOG(INFO, ProviderGeneric) << name << " endpoint: " + << (config.GetUseSsl() ? "grpcs" : "grpc") << "://" << cfg.Locator; // Read content of CA cert TString rootCertData; @@ -383,10 +432,18 @@ namespace NYql::NConnector { private: std::shared_ptr GrpcClient_; - std::unique_ptr> ConnectionFactory_; + std::unique_ptr> DefaultFactory_; + std::unordered_map>> FactoryForKind_; }; - IClient::TPtr MakeClientGRPC(const NYql::TGenericConnectorConfig& cfg) { + IClient::TPtr MakeClientGRPC(const ::NYql::TGenericGatewayConfig& cfg) { + if (!cfg.HasConnector()) { + throw yexception() + << "TGenericGatewayConfig.Connector is empty. " + << "In order to create a ClientGRPC it has to be set"; + } + return std::make_shared(cfg); } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.h b/ydb/library/yql/providers/generic/connector/libcpp/client.h index 929e6a6051c5..aa91aa2a8543 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.h @@ -173,5 +173,5 @@ namespace NYql::NConnector { virtual ~IClient() = default; }; - IClient::TPtr MakeClientGRPC(const NYql::TGenericConnectorConfig& cfg); + IClient::TPtr MakeClientGRPC(const ::NYql::TGenericGatewayConfig& cfg); } // namespace NYql::NConnector diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index 450c3a374377..31c2fe278867 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -1036,7 +1036,7 @@ int RunMain(int argc, const char* argv[]) clusters.emplace(to_lower(cluster.GetName()), TString{GenericProviderName}); } - genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric().GetConnector()); + genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric()); dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver, credentialsFactory)); }