diff --git a/ydb/library/yql/dq/runtime/dq_async_output.cpp b/ydb/library/yql/dq/runtime/dq_async_output.cpp index fed5195907e4..6ab8d839019a 100644 --- a/ydb/library/yql/dq/runtime/dq_async_output.cpp +++ b/ydb/library/yql/dq/runtime/dq_async_output.cpp @@ -2,6 +2,7 @@ #include "dq_transport.h" #include +#include "arrow/array.h" #include #include @@ -11,7 +12,7 @@ namespace { class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { struct TValueDesc { - std::variant Value; + std::variant Value; ui64 EstimatedSize; TValueDesc(NUdf::TUnboxedValue&& value, ui64 size) @@ -20,6 +21,12 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { { } + TValueDesc(NKikimr::NMiniKQL::TUnboxedValueVector&& values, ui64 size) + : Value(std::move(values)) + , EstimatedSize(size) + { + } + TValueDesc(NDqProto::TWatermark&& watermark, ui64 size) : Value(std::move(watermark)) , EstimatedSize(size) @@ -78,9 +85,18 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { } void WidePush(NUdf::TUnboxedValue* values, ui32 count) override { - Y_UNUSED(values); - Y_UNUSED(count); - YQL_ENSURE(false, "Wide stream is not supported"); + if (ValuesPushed++ % 1000 == 0) { + ReestimateRowBytes(values, count); + } + Y_ABORT_UNLESS(EstimatedRowBytes > 0); + NKikimr::NMiniKQL::TUnboxedValueVector valuesVector; + for (ui32 i = 0; i < count; ++i) { + valuesVector.emplace_back(values[i]); + } + Values.emplace_back(std::move(valuesVector), EstimatedRowBytes); + EstimatedStoredBytes += EstimatedRowBytes; + + ReportChunkIn(1, EstimatedRowBytes); } void Push(NDqProto::TWatermark&& watermark) override { @@ -115,7 +131,7 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { // Calc values count. for (auto iter = Values.cbegin(), end = Values.cend(); - usedBytes < bytes && iter != end && std::holds_alternative(iter->Value); + usedBytes < bytes && iter != end && (std::holds_alternative(iter->Value) || std::holds_alternative(iter->Value)); ++iter) { ++valuesCount; @@ -124,7 +140,16 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { // Reserve size and return data. while (valuesCount--) { - batch.emplace_back(std::move(std::get(Values.front().Value))); + const auto& value = Values.front().Value; + if (std::holds_alternative(value)) { + batch.emplace_back(std::move(std::get(value))); + } else if (std::holds_alternative(value)) { + // Cerr << TString(TStringBuilder() << "-------------------------------- TDqAsyncOutputBuffer::Pop, pop multi value\n"); + auto multiValue = std::get(value); + batch.PushRow(multiValue.data(), multiValue.size()); + } else { + YQL_ENSURE(false, "Unsupported output value"); + } Values.pop_front(); } Y_ABORT_UNLESS(EstimatedStoredBytes >= usedBytes); @@ -184,6 +209,9 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { if (std::holds_alternative(v.Value)) { return false; } + if (std::holds_alternative(v.Value)) { + return false; + } } // Finished and no data values. return true; @@ -195,7 +223,21 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { private: void ReestimateRowBytes(const NUdf::TUnboxedValue& value) { - const ui64 valueSize = TDqDataSerializer::EstimateSize(value, OutputType); + ReestimateRowBytes(TDqDataSerializer::EstimateSize(value, OutputType)); + } + + void ReestimateRowBytes(const NUdf::TUnboxedValue* values, ui32 count) { + const auto* multiType = static_cast(OutputType); + YQL_ENSURE(multiType, "Expected multi type for wide output"); + + ui64 valueSize = 0; + for (ui32 i = 0; i < count; ++i) { + valueSize += TDqDataSerializer::EstimateSize(values[i], multiType->GetElementType(i)); + } + ReestimateRowBytes(valueSize); + } + + void ReestimateRowBytes(ui64 valueSize) { if (EstimatedRowBytes) { EstimatedRowBytes = static_cast(0.6 * valueSize + 0.4 * EstimatedRowBytes); } else { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index da2ba03a2c97..9b2627913a12 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -1,4 +1,11 @@ #include "yql_s3_write_actor.h" +#include "arrow/result.h" +#include "arrow/table.h" +#include "parquet/arrow/writer.h" +#include "arrow/util/type_fwd.h" +#include "arrow/record_batch.h" +#include "yql/essentials/minikql/mkql_program_builder.h" +#include "yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h" #include "yql_s3_actors_util.h" #include @@ -472,6 +479,48 @@ class TS3FileWriteActor : public TActorBootstrapped { }; class TS3WriteActor : public TActorBootstrapped, public IDqComputeActorAsyncOutput { + class TWriteBuffer : public arrow::io::OutputStream { + public: + TWriteBuffer(TS3WriteActor& self) + : Self(self) + { + Y_UNUSED(Self); + } + + arrow::Status Write(const void* data, int64_t nbytes) override { + Y_UNUSED(data); + // Cerr << TString(TStringBuilder() << "-------------------------------- TWriteBuffer::Write, nbytes: " << nbytes << "\n"); + Pos += nbytes; + Self.BatchSize += nbytes; + Self.FileSize += nbytes; + Self.ArrowData << TStringBuf(reinterpret_cast(data), nbytes); + return arrow::Status::OK(); + } + + arrow::Status Close() override { + // Cerr << TString(TStringBuilder() << "-------------------------------- TWriteBuffer::Close\n"); + Self.BatchSize = 0; + Self.FileSize = 0; + Closed = true; + return arrow::Status::OK(); + } + + arrow::Result Tell() const override { + // Cerr << TString(TStringBuilder() << "-------------------------------- TWriteBuffer::Tell, Pos: " << Pos << "\n"); + return Pos; + } + + bool closed() const override { + // Cerr << TString(TStringBuilder() << "-------------------------------- TWriteBuffer::closed: " << Closed << "\n"); + return Closed; + } + + private: + TS3WriteActor& Self; + int64_t Pos = 0; + bool Closed = false; + }; + public: TS3WriteActor(ui64 outputIndex, TCollectStatsLevel statsLevel, @@ -490,11 +539,12 @@ class TS3WriteActor : public TActorBootstrapped, public IDqComput IDqComputeActorAsyncOutput::ICallbacks* callbacks, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, bool dirtyWrite, - const TString& token) + const TString& token, std::shared_ptr arrowSchema) : Gateway(std::move(gateway)) , Credentials(std::move(credentials)) , RandomProvider(randomProvider) , RetryPolicy(retryPolicy) + , ArrowSchema(arrowSchema) , OutputIndex(outputIndex) , TxId(txId) , Prefix(prefix) @@ -516,6 +566,18 @@ class TS3WriteActor : public TActorBootstrapped, public IDqComput EgressStats.Level = statsLevel; } + void OpenWriter() { + // Choose compression + std::shared_ptr props = + parquet::WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); + + // Opt to store Arrow schema for easier reads back into Arrow + std::shared_ptr arrow_props = + parquet::ArrowWriterProperties::Builder().store_schema()->build(); + + ARROW_OK(parquet::arrow::FileWriter::Open(*ArrowSchema, arrow::default_memory_pool(), std::make_shared(*this), props, arrow_props, &Writer)); + } + void Bootstrap() { LOG_D("TS3WriteActor", "Bootstrap"); Become(&TS3WriteActor::StateFunc); @@ -567,34 +629,106 @@ class TS3WriteActor : public TActorBootstrapped, public IDqComput void SendData(TUnboxedValueBatch&& data, i64, const TMaybe&, bool finished) final { std::unordered_set processedActors; - YQL_ENSURE(!data.IsWide(), "Wide stream is not supported yet"); EgressStats.Resume(); - data.ForEachRow([&](const auto& row) { - const auto& key = MakePartitionKey(row); - const auto [keyIt, insertedNew] = FileWriteActors.emplace(key, std::vector()); - if (insertedNew || keyIt->second.empty() || keyIt->second.back()->IsFinishing()) { - auto fileWrite = std::make_unique( - TxId, - Gateway, - Credentials, - key, - NS3Util::UrlEscapeRet(Url + Path + key + MakeOutputName() + Extension), - Compression, - RetryPolicy, DirtyWrite, Token); - keyIt->second.emplace_back(fileWrite.get()); - RegisterWithSameMailbox(fileWrite.release()); - } - const NUdf::TUnboxedValue& value = Keys.empty() ? row : *row.GetElements(); - TS3FileWriteActor* actor = keyIt->second.back(); - if (value) { - actor->AddData(TString(value.AsStringRef())); - } - if (!Multipart || !value) { - actor->Seal(); + if (data.IsWide()) { + auto doFlushFile = [&](bool closeWriter) { + // Cerr << TString(TStringBuilder() << "-------------------------------- TS3WriteActor::SendData, flush file\n"); + if (closeWriter) { + // Cerr << TString(TStringBuilder() << "-------------------------------- TS3WriteActor::SendData, finish writing\n"); + ARROW_OK(Writer->Close()); + Writer.reset(); + FileSize = 0; + } + + const auto [keyIt, insertedNew] = FileWriteActors.emplace("", std::vector()); + if (insertedNew || keyIt->second.empty() || keyIt->second.back()->IsFinishing()) { + auto fileWrite = std::make_unique( + TxId, + Gateway, + Credentials, + "", + NS3Util::UrlEscapeRet(Url + Path + MakeOutputName() + Extension), + Compression, + RetryPolicy, DirtyWrite, Token); + keyIt->second.emplace_back(fileWrite.get()); + RegisterWithSameMailbox(fileWrite.release()); + } + + TS3FileWriteActor* actor = keyIt->second.back(); + actor->AddData(std::move(ArrowData)); + TStringBuilder b; + ArrowData.swap(b); + ArrowData.reserve(1_MB); + BatchSize = 0; + if (closeWriter) { + actor->Seal(); + } + processedActors.insert(actor); + }; + + // Cerr << TString(TStringBuilder() << "-------------------------------- TS3WriteActor::SendData, send wide data, FileSize: " << FileSize << ", BatchSize: " << BatchSize << ", Multipart: " << Multipart << ", width: " << data.Width() << ", number batches: " << data.RowCount() << "\n"); + data.ForEachRowWide([&](NYql::NUdf::TUnboxedValue* values, ui32 width) { + // Cerr << TString(TStringBuilder() << "-------------------------------- TS3WriteActor::SendData, handle row\n"); + std::vector> columns; + for (ui32 i = 0; i + 1 < width; ++i) { + auto datum = TArrowBlock::From(values[i]).GetDatum(); + // Cerr << TString(TStringBuilder() << "-------------------------------- TS3WriteActor::SendData, make array [" << i << "], is array: " << datum.is_array() << "\n"); + auto arr = datum.make_array(); + // Cerr << TString(TStringBuilder() << "-------------------------------- TS3WriteActor::SendData, push array [" << i << "]\n"); + columns.emplace_back(arr); + } + + // Cerr << TString(TStringBuilder() << "-------------------------------- TS3WriteActor::SendData, create batch\n"); + auto batch = arrow::RecordBatch::Make(ArrowSchema, TArrowBlock::From(values[width - 1]).GetDatum().scalar_as().value, std::move(columns)); + auto res = arrow::Table::FromRecordBatches(batch->schema(), {batch}); + ARROW_OK(res.status()); + std::shared_ptr arrowTable = std::move(res).ValueOrDie(); + + // Cerr << TString(TStringBuilder() << "-------------------------------- TS3WriteActor::SendData, create writer\n"); + if (!Writer) { + OpenWriter(); + } + ARROW_OK(Writer->WriteTable(*arrowTable.get(), batch->num_rows())); + + // Cerr << TString(TStringBuilder() << "-------------------------------- TS3WriteActor::SendData, perform write\n"); + bool finishFile = FileSize > 50_MB || finished || !Multipart; + if (BatchSize > 1_MB || finishFile) { + doFlushFile(finishFile); + } + }); + + if (finished && FileSize) { + doFlushFile(true); } - processedActors.insert(actor); - }); + } else { + data.ForEachRow([&](const auto& row) { + const auto& key = MakePartitionKey(row); + const auto [keyIt, insertedNew] = FileWriteActors.emplace(key, std::vector()); + if (insertedNew || keyIt->second.empty() || keyIt->second.back()->IsFinishing()) { + auto fileWrite = std::make_unique( + TxId, + Gateway, + Credentials, + key, + NS3Util::UrlEscapeRet(Url + Path + key + MakeOutputName() + Extension), + Compression, + RetryPolicy, DirtyWrite, Token); + keyIt->second.emplace_back(fileWrite.get()); + RegisterWithSameMailbox(fileWrite.release()); + } + + const NUdf::TUnboxedValue& value = Keys.empty() ? row : *row.GetElements(); + TS3FileWriteActor* actor = keyIt->second.back(); + if (value) { + actor->AddData(TString(value.AsStringRef())); + } + if (!Multipart || !value) { + actor->Seal(); + } + processedActors.insert(actor); + }); + } for (TS3FileWriteActor* actor : processedActors) { actor->Go(); @@ -681,6 +815,7 @@ class TS3WriteActor : public TActorBootstrapped, public IDqComput IRandomProvider* RandomProvider; TIntrusivePtr DefaultRandomProvider; const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; + std::shared_ptr ArrowSchema; const ui64 OutputIndex; TDqAsyncStats EgressStats; @@ -701,13 +836,18 @@ class TS3WriteActor : public TActorBootstrapped, public IDqComput std::unordered_map> FileWriteActors; bool DirtyWrite; TString Token; + + ui64 BatchSize = 0; + ui64 FileSize = 0; + TStringBuilder ArrowData; + std::unique_ptr Writer; }; } // namespace std::pair CreateS3WriteActor( - const NKikimr::NMiniKQL::TTypeEnvironment&, - const NKikimr::NMiniKQL::IFunctionRegistry&, + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, IRandomProvider* randomProvider, IHTTPGateway::TPtr gateway, NS3::TSink&& params, @@ -720,6 +860,28 @@ std::pair CreateS3WriteActor( ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy) { + const auto pb = std::make_unique(typeEnv, functionRegistry); + + const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(params.GetRowType()), *pb, Cerr); + YQL_ENSURE(outputItemType->IsStruct(), "Row type is not struct"); + const auto structType = static_cast(outputItemType); + + arrow::SchemaBuilder builder; + + for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) { + TStringBuf memberName = structType->GetMemberName(i); + + auto memberType = structType->GetMemberType(i); + std::shared_ptr dataType; + + YQL_ENSURE(ConvertArrowType(memberType, dataType), "Unsupported arrow type"); + ARROW_OK(builder.AddField(std::make_shared(std::string(memberName), dataType, memberType->IsOptional()))); + } + + auto res = builder.Finish(); + ARROW_OK(res.status()); + std::shared_ptr arrowSchema = std::move(res).ValueOrDie(); + const auto token = secureParams.Value(params.GetToken(), TString{}); const auto actor = new TS3WriteActor( outputIndex, @@ -738,7 +900,8 @@ std::pair CreateS3WriteActor( callbacks, retryPolicy, !params.GetAtomicUploadCommit(), - token); + token, + arrowSchema); return {actor, actor}; } diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index f7121ceaf650..9367eecffb4c 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -154,7 +154,8 @@ {"Index": 0, "Name": "Path", "Type": "TCoAtom"}, {"Index": 1, "Name": "Settings", "Type": "TExprList"}, {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"}, - {"Index": 3, "Name": "Extension", "Type": "TCoAtom"} + {"Index": 3, "Name": "Extension", "Type": "TCoAtom"}, + {"Index": 4, "Name": "RowType", "Type": "TExprBase"} ] }, { diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto index 7ef4a3b59fcd..6cc378289708 100644 --- a/ydb/library/yql/providers/s3/proto/sink.proto +++ b/ydb/library/yql/providers/s3/proto/sink.proto @@ -14,6 +14,7 @@ message TSink { optional string Extension = 8; optional bool Multipart = 9; bool AtomicUploadCommit = 10; + optional string RowType = 11; } message TCommitEffect { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 4da2a377b7ec..aca49b80c065 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -179,10 +179,19 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { const auto format = tgt.Format(); + // CHANGER // + // TTypeAnnotationNode::TListType items; + // for (const auto& x : structType->GetItems()) { + // items.push_back(ctx.MakeType(x->GetItemType())); + // } + // items.push_back(ctx.MakeType(ctx.MakeType(EDataSlot::Uint64))); + // auto baseTargeType = ctx.MakeType(items); + // CHANGER // auto baseTargeType = AnnotateTargetBase(format, keys, structType, ctx); if (!baseTargeType) { return TStatus::Error; } + // CHANGER // auto t = ctx.MakeType( TTypeAnnotationNode::TListType{ @@ -357,7 +366,7 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { } TStatus HandleSink(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureArgsCount(*input, 4, ctx)) { + if (!EnsureArgsCount(*input, 5, ctx)) { return TStatus::Error; } input->SetTypeAnn(ctx.MakeType()); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 847d712faf49..7daf28520c68 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -596,6 +596,12 @@ class TS3DqIntegration: public TDqIntegrationBase { sinkDesc.SetMultipart(GetMultipart(settings.Settings().Ref())); sinkDesc.SetAtomicUploadCommit(State_->Configuration->AllowAtomicUploadCommit && State_->Configuration->AtomicUploadCommit.Get().GetOrElse(false)); + const TStructExprType* fullRowType = settings.RowType().Ref().GetTypeAnn()->Cast()->GetType()->Cast(); + // exclude extra columns to get actual row type we need to read from input + auto rowTypeItems = fullRowType->GetItems(); + TExprContext ctx; + sinkDesc.SetRowType(NCommon::WriteTypeToYson(ctx.MakeType(rowTypeItems), NYT::NYson::EYsonFormat::Text)); + protoSettings.PackFrom(sinkDesc); sinkType = "S3Sink"; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index 3feae46c3495..cba2932352bb 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -201,6 +201,13 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { } } + if (format == "parquet" && keys.empty()) { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(target.Pos(), "block_output")); + pair.push_back(ctx.NewAtom(target.Pos(), "true")); + sinkSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair))); + } + if (IsDqPureExpr(input)) { YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << target.Path().StringValue() << "` as stage with sink."; return keys.empty() ? @@ -209,9 +216,7 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { .Program() .Args({}) .Body() - .Input() - .Input(input) - .Build() + .Input(input) .Format(target.Format()) .KeyColumns().Build() .Settings(sinkOutputSettingsBuilder.Done()) @@ -304,6 +309,7 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { .Name().Build(token) .Build() .Extension().Value(extension).Build() + .RowType(ExpandType(writePos, *input.Ref().GetTypeAnn()->Cast()->GetItemType(), ctx)) .Build() .Done(); @@ -311,6 +317,56 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { .Add(sink); if (keys.empty()) { + const auto* structType = input.Ref().GetTypeAnn()->Cast()->GetItemType()->Cast(); + + TVector columns; + columns.reserve(structType->GetSize()); + for (const auto& item : structType->GetItems()) { + columns.emplace_back(item->GetName()); + } + + // TCoWideToBlocks + + // auto wideToBlocks = ctx.Builder(writePos) + // .Callable("WideToBlocks") + // .Callable(0, "FromFlow") + // .Add(0, std::move(expandMap)) + // .Seal() + // .Seal() + // .Build(); + + auto toFlow = Build(ctx, writePos) + .Args({"in"}) + .Body() + .Input("in") + .Build() + .Done(); + + auto expandMap = MakeExpandMap(writePos, columns, toFlow.Body().Ptr(), ctx); + + auto wideToBlocks = ctx.Builder(writePos) + .Callable("WideToBlocks") + .Callable(0, "FromFlow") + .Add(0, std::move(expandMap)) + .Seal() + .Seal() + .Build(); + + // CHANGER // + // return Build(ctx, writePos) + // .Inputs() + // .Add() + // .Output(dqUnion.Output()) + // .Build() + // .Build() + // .Program() + // .Args(toFlow.Args()) + // .Body(wideToBlocks) + // .Build() + // .Settings().Build() + // .Outputs(outputsBuilder.Done()) + // .Done(); + // CHANGER // return Build(ctx, writePos) .Inputs() .Add() @@ -329,6 +385,7 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { .Settings().Build() .Outputs(outputsBuilder.Done()) .Done(); + // CHANGER // } else { return Build(ctx, writePos) .Inputs() @@ -394,6 +451,31 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { } } + TMaybeNode S3RewriteBlockWrite(TExprBase node, TExprContext& ctx) { + auto sinkOutput = node.Cast(); + // if (sinkOutput.Format() != "parquet" || !sinkOutput.KeyColumns().Empty()) { + return node; + // } + + const auto input = sinkOutput.Input(); + const auto* structType = input.Ref().GetTypeAnn()->Cast()->GetItemType()->Cast(); + + TVector columns; + columns.reserve(structType->GetSize()); + for (const auto& item : structType->GetItems()) { + columns.emplace_back(item->GetName()); + } + + auto expandMap = MakeExpandMap(node.Pos(), columns, input.Ptr(), ctx); + return ctx.Builder(node.Pos()) + .Callable("WideToBlocks") + .Callable(0, "FromFlow") + .Add(0, std::move(expandMap)) + .Seal() + .Seal() + .Build(); + } + private: const TS3State::TPtr State_; }; diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 7bdf66fb8619..6d05dfe70101 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -7,7 +7,7 @@ ActorSystemConfig { } Executor { Type: BASIC - Threads: 6 + Threads: 20 SpinThreshold: 1 Name: "User" } @@ -173,17 +173,18 @@ ResourceBrokerConfig { Weight: 30 Limit { - Memory: 6442450944 + Memory: 64424509440 } } ResourceLimit { - Memory: 6442450944 + Memory: 64424509440 } } TableServiceConfig { BindingsMode: BM_DROP + BlockChannelsMode: BLOCK_CHANNELS_FORCE CompileTimeoutMs: 600000 EnableCreateTableAs: true EnableOlapSink: true diff --git a/ydb/tests/tools/kqprun/ya.make b/ydb/tests/tools/kqprun/ya.make index b39159f0ea40..2c501ed9941d 100644 --- a/ydb/tests/tools/kqprun/ya.make +++ b/ydb/tests/tools/kqprun/ya.make @@ -22,6 +22,7 @@ PEERDIR( PEERDIR( yql/essentials/udfs/common/datetime2 + yql/essentials/udfs/common/digest yql/essentials/udfs/common/re2 yql/essentials/udfs/common/string yql/essentials/udfs/common/yson2