diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp index 65478d7ece65..827247db72ec 100644 --- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp +++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp @@ -83,12 +83,22 @@ NYdb::NDataStreams::V1::TDataStreamsClient& TPqSession::GetDsClient(const TStrin return ClusterDsClients.emplace(cluster, NYdb::NDataStreams::V1::TDataStreamsClient(YdbDriver, GetDsClientOptions(database, cfg, credentialsProviderFactory))).first->second; } -IPqGateway::TAsyncDescribeFederatedTopicResult TPqSession::DescribeFederatedTopic(const TString& cluster, const TString& database, const TString& path, const TString& token) { +IPqGateway::TAsyncDescribeFederatedTopicResult TPqSession::DescribeFederatedTopic(const TString& cluster, const TString& requestedDatabase, const TString& requestedPath, const TString& token) { const auto* config = ClusterConfigs->FindPtr(cluster); if (!config) { ythrow yexception() << "Pq cluster `" << cluster << "` does not exist"; } + TString database = requestedDatabase; + TString path = requestedPath; + if (config->GetClusterType() == TPqClusterConfig::CT_PERS_QUEUE && requestedDatabase == "/Root") { + // RTMR compatibility + // It uses cluster specified in gateways.conf with ClusterType unset (CT_PERS_QUEUE by default) and default Database and its own read/write code + auto pos = requestedPath.find('/'); + Y_ENSURE(pos != TString::npos, "topic name is expected in format /"); + database = "/logbroker-federation/" + requestedPath.substr(0, pos); + path = requestedPath.substr(pos + 1); + } YQL_ENSURE(config->GetEndpoint(), "Can't describe topic `" << cluster << "`.`" << path << "`: no endpoint"); std::shared_ptr credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(CredentialsFactory, token, config->GetAddBearerToToken()); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index c499d5a255a0..a6c7e35b43e2 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -156,12 +156,21 @@ class TPqDqIntegration: public TDqIntegrationBase { TDqPqTopicSource topicSource = maybeTopicSource.Cast(); TPqTopic topic = topicSource.Topic(); - srcDesc.SetTopicPath(TString(topic.Path().Value())); - srcDesc.SetDatabase(TString(topic.Database().Value())); const TStringBuf cluster = topic.Cluster().Value(); const auto* clusterDesc = State_->Configuration->ClustersConfigurationSettings.FindPtr(cluster); YQL_ENSURE(clusterDesc, "Unknown cluster " << cluster); srcDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); + auto topicPath = topic.Path().Value(); + auto topicDatabase = topic.Database().Value(); + if (clusterDesc->ClusterType == NYql::TPqClusterConfig::CT_PERS_QUEUE && topicDatabase == "/Root") { + auto pos = topicPath.find('/'); + Y_ENSURE(pos != TStringBuf::npos); + srcDesc.SetTopicPath(TString(topicPath.substr(pos + 1))); + srcDesc.SetDatabase("/logbroker-federation/" + TString(topicPath.substr(0, pos))); + } else { + srcDesc.SetTopicPath(TString(topicPath)); + srcDesc.SetDatabase(TString(topicDatabase)); + } srcDesc.SetDatabaseId(clusterDesc->DatabaseId); const TStructExprType* fullRowType = topicSource.RowType().Ref().GetTypeAnn()->Cast()->GetType()->Cast(); @@ -273,8 +282,17 @@ class TPqDqIntegration: public TDqIntegrationBase { const auto* clusterDesc = State_->Configuration->ClustersConfigurationSettings.FindPtr(cluster); YQL_ENSURE(clusterDesc, "Unknown cluster " << cluster); sinkDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); - sinkDesc.SetTopicPath(TString(topic.Path().Value())); - sinkDesc.SetDatabase(TString(topic.Database().Value())); + auto topicPath = topic.Path().Value(); + auto topicDatabase = topic.Database().Value(); + if (clusterDesc->ClusterType == NYql::TPqClusterConfig::CT_PERS_QUEUE && topicDatabase == "/Root") { + auto pos = topicPath.find('/'); + Y_ENSURE(pos != TStringBuf::npos); + sinkDesc.SetTopicPath(TString(topicPath.substr(pos + 1))); + sinkDesc.SetDatabase("/logbroker-federation/" + TString(topicPath.substr(0, pos))); + } else { + sinkDesc.SetTopicPath(TString(topicPath)); + sinkDesc.SetDatabase(TString(topicDatabase)); + } size_t const settingsCount = topicSink.Settings().Size(); for (size_t i = 0; i < settingsCount; ++i) {