Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Choose a connector based on a request's data source kind #16052

Merged
merged 6 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ void Init(
yqCounters->GetSubgroup("subcomponent", "http_gateway"));

NYql::NConnector::IClient::TPtr connectorClient = nullptr;

if (protoConfig.GetGateways().GetGeneric().HasConnector()) {
connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric());
}

if (protoConfig.GetTokenAccessor().GetEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ namespace NKikimr::NKqp {
// Initialize Connector client
if (queryServiceConfig.HasGeneric()) {
GenericGatewaysConfig = queryServiceConfig.GetGeneric();
ConnectorClient = NYql::NConnector::MakeClientGRPC(GenericGatewaysConfig.GetConnector());
ConnectorClient = NYql::NConnector::MakeClientGRPC(GenericGatewaysConfig);

if (queryServiceConfig.HasMdbTransformHost()) {
MdbEndpointGenerator = NFq::MakeMdbEndpointGeneratorGeneric(queryServiceConfig.GetMdbTransformHost());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,7 @@ namespace Tests {
if (queryServiceConfig.HasGeneric()) {
const auto& genericGatewayConfig = queryServiceConfig.GetGeneric();

connectorClient = NYql::NConnector::MakeClientGRPC(genericGatewayConfig.GetConnector());
connectorClient = NYql::NConnector::MakeClientGRPC(genericGatewayConfig);

auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId();
Runtime->RegisterService(
Expand Down
117 changes: 87 additions & 30 deletions ydb/library/yql/providers/generic/connector/libcpp/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace NYql::NConnector {
///
/// Connection Factory which always returns an exact connection
///
/// TODO: check connection healthiness
///
template<typename T>
class TSingleConnectionFactory final : public IConnectionFactory<T> {
public:
Expand Down Expand Up @@ -159,6 +161,27 @@ namespace NYql::NConnector {
std::vector<std::shared_ptr<NYdbGrpc::TServiceConnection<T>>> Pool_;
};

///
/// Create a factory based on a it's type @sa NYql::EConnectionFactory
///
std::unique_ptr<IConnectionFactory<NApi::Connector>> CreateFactoryForConnector(
NYql::EConnectionFactory factory,
std::shared_ptr<NYdbGrpc::TGRpcClientLow> client,
const NYdbGrpc::TGRpcClientConfig& grpcConfig,
const NYdbGrpc::TTcpKeepAliveSettings& keepAlive) {

switch (factory) {
case SINGLE:
return std::make_unique<TSingleConnectionFactory<NApi::Connector>>(
client, grpcConfig, keepAlive);

case NEW_FOR_EACH_REQUEST:
default:
return std::make_unique<TNewOnEachRequestConnectionFactory<NApi::Connector>>(
client, grpcConfig, keepAlive);
}
}

/*

struct TConnectorMetrics {
Expand Down Expand Up @@ -208,24 +231,8 @@ namespace NYql::NConnector {

class TClientGRPC: public IClient {
public:
TClientGRPC(const TGenericConnectorConfig& config) {
// TODO: place in a config file ?
GrpcClient_ = std::make_shared<NYdbGrpc::TGRpcClientLow>(DEFAULT_CONNECTION_MANAGER_NUM_THREADS);
auto grpcConfig = ConnectorConfigToGrpcConfig(config);

auto keepAlive = NYdbGrpc::TTcpKeepAliveSettings {
// TODO configure hardcoded values
.Enabled = true,
.Idle = 30,
.Count = 5,
.Interval = 10
};

// TODO:
// 1. Add config parameter to TGenericConnectorConfig to choose factory
// 2. Add support for multiple connector's config
ConnectionFactory_ = std::make_unique<TNewOnEachRequestConnectionFactory<NApi::Connector>>(
GrpcClient_, grpcConfig, keepAlive);
explicit TClientGRPC(const TGenericGatewayConfig& config) {
Init(config);
}

~TClientGRPC() {
Expand Down Expand Up @@ -300,13 +307,49 @@ namespace NYql::NConnector {
}

private:
std::shared_ptr<NYdbGrpc::TServiceConnection<NApi::Connector>> GetConnection(const NYql::EGenericDataSourceKind&) {
// TODO: choose appropriate connection factory by data source kind
return ConnectionFactory_->Create();
void Init(const TGenericGatewayConfig& config) {
// TODO: place in a config file ?
GrpcClient_ = std::make_shared<NYdbGrpc::TGRpcClientLow>(DEFAULT_CONNECTION_MANAGER_NUM_THREADS);

auto keepAlive = NYdbGrpc::TTcpKeepAliveSettings {
// TODO: configure hardcoded values
.Enabled = true,
.Idle = 30,
.Count = 5,
.Interval = 10
};

size_t count = 0;
auto cfg = ConnectorConfigToGrpcConfig(
config.GetConnector(), count++);

DefaultFactory_ = CreateFactoryForConnector(
config.GetConnector().GetFactory(), GrpcClient_, cfg, keepAlive);

for (auto c : config.GetConnectors()) {
auto cfg = ConnectorConfigToGrpcConfig(c, count++);
std::shared_ptr<IConnectionFactory<NApi::Connector>> f
= CreateFactoryForConnector(c.GetFactory(), GrpcClient_, cfg, keepAlive);

for (auto k : c.GetForKinds()) {
if (!FactoryForKind_.try_emplace(NYql::EGenericDataSourceKind(k), f).second) {
throw yexception()
<< "Duplicate connector is provided for the kind: "
<< EGenericDataSourceKind_Name(k);
}
}
}
}

std::shared_ptr<NYdbGrpc::TServiceConnection<NApi::Connector>> GetConnection(NYql::EGenericDataSourceKind kind) const {
auto itr = FactoryForKind_.find(kind);

return FactoryForKind_.end() == itr ?
DefaultFactory_->Create() : itr->second->Create();
}

template<typename TResponse>
TIteratorAsyncResult<IStreamIterator<TResponse>> DoEmptyStreamResponse(const grpc::StatusCode& code, TString msg) {
TIteratorAsyncResult<IStreamIterator<TResponse>> DoEmptyStreamResponse(const grpc::StatusCode& code, TString msg) const {
auto promise = NThreading::NewPromise<TIteratorResult<IStreamIterator<TResponse>>>();
auto status = NYdbGrpc::TGrpcStatus(grpc::Status(code, msg));

Expand Down Expand Up @@ -335,7 +378,7 @@ namespace NYql::NConnector {
typename TRpcCallback = typename NYdbGrpc::TStreamRequestReadProcessor<NApi::Connector::Stub, TRequest, TResponse>::TAsyncRequest
>
TIteratorAsyncResult<IStreamIterator<TResponse>> ServerSideStreamingCall(
const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) {
const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) const {
auto promise = NThreading::NewPromise<TIteratorResult<IStreamIterator<TResponse>>>();

auto callback = [promise](NYdbGrpc::TGrpcStatus&& status, NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr streamProcessor) mutable {
Expand All @@ -359,16 +402,22 @@ namespace NYql::NConnector {
return promise.GetFuture();
}

NYdbGrpc::TGRpcClientConfig ConnectorConfigToGrpcConfig(const TGenericConnectorConfig& config) {
auto cfg = NYdbGrpc::TGRpcClientConfig();
NYdbGrpc::TGRpcClientConfig ConnectorConfigToGrpcConfig(const TGenericConnectorConfig& config, size_t order) const {
auto cfg = NYdbGrpc::TGRpcClientConfig();

// Connector's name. If order equals to zero, it means that the config belongs "TGenericGatewayConfig.Connector"
// (default connector); otherwise, it is from "TGenericGatewayConfig.ConnectorS"
auto name = TStringBuilder()
<< "Connector[" << (order == 0 ? TString("default") : TStringBuilder() << (order - 1)) << "]";

Y_ENSURE(config.GetEndpoint().host(), TStringBuilder() << "Empty host in TGenericConnectorConfig: " << config.DebugString());
Y_ENSURE(config.GetEndpoint().port(), TStringBuilder() << "Empty port in TGenericConnectorConfig: " << config.DebugString());
Y_ENSURE(config.GetEndpoint().host(), TStringBuilder() << "Empty host in " << name << ": " << config.DebugString());
Y_ENSURE(config.GetEndpoint().port(), TStringBuilder() << "Empty port in " << name << ": " << config.DebugString());

cfg.Locator = TStringBuilder() << config.GetEndpoint().host() << ":" << config.GetEndpoint().port();
cfg.EnableSsl = config.GetUseSsl();

YQL_CLOG(INFO, ProviderGeneric) << "Connector endpoint: " << (config.GetUseSsl() ? "grpcs" : "grpc") << "://" << cfg.Locator;
YQL_CLOG(INFO, ProviderGeneric) << name << " endpoint: "
<< (config.GetUseSsl() ? "grpcs" : "grpc") << "://" << cfg.Locator;

// Read content of CA cert
TString rootCertData;
Expand All @@ -383,10 +432,18 @@ namespace NYql::NConnector {

private:
std::shared_ptr<NYdbGrpc::TGRpcClientLow> GrpcClient_;
std::unique_ptr<IConnectionFactory<NApi::Connector>> ConnectionFactory_;
std::unique_ptr<IConnectionFactory<NApi::Connector>> DefaultFactory_;
std::unordered_map<NYql::EGenericDataSourceKind,
std::shared_ptr<IConnectionFactory<NApi::Connector>>> FactoryForKind_;
};

IClient::TPtr MakeClientGRPC(const NYql::TGenericConnectorConfig& cfg) {
IClient::TPtr MakeClientGRPC(const ::NYql::TGenericGatewayConfig& cfg) {
if (!cfg.HasConnector()) {
throw yexception()
<< "TGenericGatewayConfig.Connector is empty. "
<< "In order to create a ClientGRPC it has to be set";
}

return std::make_shared<TClientGRPC>(cfg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,5 @@ namespace NYql::NConnector {
virtual ~IClient() = default;
};

IClient::TPtr MakeClientGRPC(const NYql::TGenericConnectorConfig& cfg);
IClient::TPtr MakeClientGRPC(const ::NYql::TGenericGatewayConfig& cfg);
} // namespace NYql::NConnector
2 changes: 1 addition & 1 deletion ydb/library/yql/tools/dqrun/dqrun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ int RunMain(int argc, const char* argv[])
clusters.emplace(to_lower(cluster.GetName()), TString{GenericProviderName});
}

genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric().GetConnector());
genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric());

dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver, credentialsFactory));
}
Expand Down
Loading