From 3335be08f6a3cf3d7e9e7f69fc39553108a24685 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Sun, 4 May 2025 11:08:07 +0300 Subject: [PATCH 1/3] [pq][federation] add CT_PERS_QUEUE compatibility quite hacky --- .../pq/gateway/native/yql_pq_session.cpp | 11 +++++++- .../pq/provider/yql_pq_dq_integration.cpp | 28 ++++++++++++++++--- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp index 65478d7ece65..cb80a78552d7 100644 --- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp +++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp @@ -83,12 +83,21 @@ NYdb::NDataStreams::V1::TDataStreamsClient& TPqSession::GetDsClient(const TStrin return ClusterDsClients.emplace(cluster, NYdb::NDataStreams::V1::TDataStreamsClient(YdbDriver, GetDsClientOptions(database, cfg, credentialsProviderFactory))).first->second; } -IPqGateway::TAsyncDescribeFederatedTopicResult TPqSession::DescribeFederatedTopic(const TString& cluster, const TString& database, const TString& path, const TString& token) { +IPqGateway::TAsyncDescribeFederatedTopicResult TPqSession::DescribeFederatedTopic(const TString& cluster, const TString& requestedDatabase, const TString& requestedPath, const TString& token) { const auto* config = ClusterConfigs->FindPtr(cluster); if (!config) { ythrow yexception() << "Pq cluster `" << cluster << "` does not exist"; } + TString database = requestedDatabase; + TString path = requestedPath; + if (config->GetClusterType() == TPqClusterConfig::CT_PERS_QUEUE) { + Y_ENSURE(requestedDatabase == "/Root", requestedDatabase); + auto pos = requestedPath.find('/'); + Y_ENSURE(pos != TString::npos, "topic name is expected in format /"); + database = "/logbroker-federation/" + requestedPath.substr(0, pos); + path = requestedPath.substr(pos + 1); + } YQL_ENSURE(config->GetEndpoint(), "Can't describe topic `" << cluster << "`.`" << path << "`: no endpoint"); std::shared_ptr credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(CredentialsFactory, token, config->GetAddBearerToToken()); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index c499d5a255a0..751c0c0a20bf 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -156,12 +156,22 @@ class TPqDqIntegration: public TDqIntegrationBase { TDqPqTopicSource topicSource = maybeTopicSource.Cast(); TPqTopic topic = topicSource.Topic(); - srcDesc.SetTopicPath(TString(topic.Path().Value())); - srcDesc.SetDatabase(TString(topic.Database().Value())); const TStringBuf cluster = topic.Cluster().Value(); const auto* clusterDesc = State_->Configuration->ClustersConfigurationSettings.FindPtr(cluster); YQL_ENSURE(clusterDesc, "Unknown cluster " << cluster); srcDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); + auto topicPath = topic.Path().Value(); + auto topicDatabase = topic.Database().Value(); + if (clusterDesc->ClusterType == NYql::TPqClusterConfig::CT_PERS_QUEUE) { + auto pos = topicPath.find('/'); + Y_ENSURE(pos != TStringBuf::npos); + Y_ENSURE(topicDatabase == "/Root"); + srcDesc.SetTopicPath(TString(topicPath.substr(pos + 1))); + srcDesc.SetDatabase("/logbroker-federation/" + TString(topicPath.substr(0, pos))); + } else { + srcDesc.SetTopicPath(TString(topicPath)); + srcDesc.SetDatabase(TString(topicDatabase)); + } srcDesc.SetDatabaseId(clusterDesc->DatabaseId); const TStructExprType* fullRowType = topicSource.RowType().Ref().GetTypeAnn()->Cast()->GetType()->Cast(); @@ -273,8 +283,18 @@ class TPqDqIntegration: public TDqIntegrationBase { const auto* clusterDesc = State_->Configuration->ClustersConfigurationSettings.FindPtr(cluster); YQL_ENSURE(clusterDesc, "Unknown cluster " << cluster); sinkDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); - sinkDesc.SetTopicPath(TString(topic.Path().Value())); - sinkDesc.SetDatabase(TString(topic.Database().Value())); + auto topicPath = topic.Path().Value(); + auto topicDatabase = topic.Database().Value(); + if (clusterDesc->ClusterType == NYql::TPqClusterConfig::CT_PERS_QUEUE) { + auto pos = topicPath.find('/'); + Y_ENSURE(pos != TStringBuf::npos); + Y_ENSURE(topicDatabase == "/Root"); + sinkDesc.SetTopicPath(TString(topicPath.substr(pos + 1))); + sinkDesc.SetDatabase("/logbroker-federation/" + TString(topicPath.substr(0, pos))); + } else { + sinkDesc.SetTopicPath(TString(topicPath)); + sinkDesc.SetDatabase(TString(topicDatabase)); + } size_t const settingsCount = topicSink.Settings().Size(); for (size_t i = 0; i < settingsCount; ++i) { From c1870316a6e8358e9b160683d02c79d4ef25d2bf Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Tue, 6 May 2025 19:13:24 +0300 Subject: [PATCH 2/3] ignore CT_PERS_QUEUE with Database != /Root instead of bailing out --- .../yql/providers/pq/gateway/native/yql_pq_session.cpp | 3 +-- .../yql/providers/pq/provider/yql_pq_dq_integration.cpp | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp index cb80a78552d7..a6cd6b08cadc 100644 --- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp +++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp @@ -91,8 +91,7 @@ IPqGateway::TAsyncDescribeFederatedTopicResult TPqSession::DescribeFederatedTopi TString database = requestedDatabase; TString path = requestedPath; - if (config->GetClusterType() == TPqClusterConfig::CT_PERS_QUEUE) { - Y_ENSURE(requestedDatabase == "/Root", requestedDatabase); + if (config->GetClusterType() == TPqClusterConfig::CT_PERS_QUEUE && requestedDatabase == "/Root") { auto pos = requestedPath.find('/'); Y_ENSURE(pos != TString::npos, "topic name is expected in format /"); database = "/logbroker-federation/" + requestedPath.substr(0, pos); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 751c0c0a20bf..a6c7e35b43e2 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -162,10 +162,9 @@ class TPqDqIntegration: public TDqIntegrationBase { srcDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); auto topicPath = topic.Path().Value(); auto topicDatabase = topic.Database().Value(); - if (clusterDesc->ClusterType == NYql::TPqClusterConfig::CT_PERS_QUEUE) { + if (clusterDesc->ClusterType == NYql::TPqClusterConfig::CT_PERS_QUEUE && topicDatabase == "/Root") { auto pos = topicPath.find('/'); Y_ENSURE(pos != TStringBuf::npos); - Y_ENSURE(topicDatabase == "/Root"); srcDesc.SetTopicPath(TString(topicPath.substr(pos + 1))); srcDesc.SetDatabase("/logbroker-federation/" + TString(topicPath.substr(0, pos))); } else { @@ -285,10 +284,9 @@ class TPqDqIntegration: public TDqIntegrationBase { sinkDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); auto topicPath = topic.Path().Value(); auto topicDatabase = topic.Database().Value(); - if (clusterDesc->ClusterType == NYql::TPqClusterConfig::CT_PERS_QUEUE) { + if (clusterDesc->ClusterType == NYql::TPqClusterConfig::CT_PERS_QUEUE && topicDatabase == "/Root") { auto pos = topicPath.find('/'); Y_ENSURE(pos != TStringBuf::npos); - Y_ENSURE(topicDatabase == "/Root"); sinkDesc.SetTopicPath(TString(topicPath.substr(pos + 1))); sinkDesc.SetDatabase("/logbroker-federation/" + TString(topicPath.substr(0, pos))); } else { From 7035efddf0a28c439fea6ee377d157c8f979b4b4 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Tue, 6 May 2025 19:19:21 +0300 Subject: [PATCH 3/3] add comment about purpose --- ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp index a6cd6b08cadc..827247db72ec 100644 --- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp +++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp @@ -92,6 +92,8 @@ IPqGateway::TAsyncDescribeFederatedTopicResult TPqSession::DescribeFederatedTopi TString database = requestedDatabase; TString path = requestedPath; if (config->GetClusterType() == TPqClusterConfig::CT_PERS_QUEUE && requestedDatabase == "/Root") { + // RTMR compatibility + // It uses cluster specified in gateways.conf with ClusterType unset (CT_PERS_QUEUE by default) and default Database and its own read/write code auto pos = requestedPath.find('/'); Y_ENSURE(pos != TString::npos, "topic name is expected in format /"); database = "/logbroker-federation/" + requestedPath.substr(0, pos);