Skip to content

[DRAFT] YQ-4161 support block writing in s3 #15648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 49 additions & 7 deletions ydb/library/yql/dq/runtime/dq_async_output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "dq_transport.h"

#include <yql/essentials/utils/yql_panic.h>
#include "arrow/array.h"

#include <deque>
#include <variant>
Expand All @@ -11,7 +12,7 @@ namespace {

class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
struct TValueDesc {
std::variant<NUdf::TUnboxedValue, NDqProto::TWatermark, NDqProto::TCheckpoint> Value;
std::variant<NUdf::TUnboxedValue, NDqProto::TWatermark, NDqProto::TCheckpoint, NKikimr::NMiniKQL::TUnboxedValueVector> Value;
ui64 EstimatedSize;

TValueDesc(NUdf::TUnboxedValue&& value, ui64 size)
Expand All @@ -20,6 +21,12 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
{
}

TValueDesc(NKikimr::NMiniKQL::TUnboxedValueVector&& values, ui64 size)
: Value(std::move(values))
, EstimatedSize(size)
{
}

TValueDesc(NDqProto::TWatermark&& watermark, ui64 size)
: Value(std::move(watermark))
, EstimatedSize(size)
Expand Down Expand Up @@ -78,9 +85,18 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
}

void WidePush(NUdf::TUnboxedValue* values, ui32 count) override {
Y_UNUSED(values);
Y_UNUSED(count);
YQL_ENSURE(false, "Wide stream is not supported");
if (ValuesPushed++ % 1000 == 0) {
ReestimateRowBytes(values, count);
}
Y_ABORT_UNLESS(EstimatedRowBytes > 0);
NKikimr::NMiniKQL::TUnboxedValueVector valuesVector;
for (ui32 i = 0; i < count; ++i) {
valuesVector.emplace_back(values[i]);
}
Values.emplace_back(std::move(valuesVector), EstimatedRowBytes);
EstimatedStoredBytes += EstimatedRowBytes;

ReportChunkIn(1, EstimatedRowBytes);
}

void Push(NDqProto::TWatermark&& watermark) override {
Expand Down Expand Up @@ -115,7 +131,7 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {

// Calc values count.
for (auto iter = Values.cbegin(), end = Values.cend();
usedBytes < bytes && iter != end && std::holds_alternative<NUdf::TUnboxedValue>(iter->Value);
usedBytes < bytes && iter != end && (std::holds_alternative<NUdf::TUnboxedValue>(iter->Value) || std::holds_alternative<NKikimr::NMiniKQL::TUnboxedValueVector>(iter->Value));
++iter)
{
++valuesCount;
Expand All @@ -124,7 +140,16 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {

// Reserve size and return data.
while (valuesCount--) {
batch.emplace_back(std::move(std::get<NUdf::TUnboxedValue>(Values.front().Value)));
const auto& value = Values.front().Value;
if (std::holds_alternative<NUdf::TUnboxedValue>(value)) {
batch.emplace_back(std::move(std::get<NUdf::TUnboxedValue>(value)));
} else if (std::holds_alternative<NKikimr::NMiniKQL::TUnboxedValueVector>(value)) {
// Cerr << TString(TStringBuilder() << "-------------------------------- TDqAsyncOutputBuffer::Pop, pop multi value\n");
auto multiValue = std::get<NKikimr::NMiniKQL::TUnboxedValueVector>(value);
batch.PushRow(multiValue.data(), multiValue.size());
} else {
YQL_ENSURE(false, "Unsupported output value");
}
Values.pop_front();
}
Y_ABORT_UNLESS(EstimatedStoredBytes >= usedBytes);
Expand Down Expand Up @@ -184,6 +209,9 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
if (std::holds_alternative<NUdf::TUnboxedValue>(v.Value)) {
return false;
}
if (std::holds_alternative<NKikimr::NMiniKQL::TUnboxedValueVector>(v.Value)) {
return false;
}
}
// Finished and no data values.
return true;
Expand All @@ -195,7 +223,21 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {

private:
void ReestimateRowBytes(const NUdf::TUnboxedValue& value) {
const ui64 valueSize = TDqDataSerializer::EstimateSize(value, OutputType);
ReestimateRowBytes(TDqDataSerializer::EstimateSize(value, OutputType));
}

void ReestimateRowBytes(const NUdf::TUnboxedValue* values, ui32 count) {
const auto* multiType = static_cast<NKikimr::NMiniKQL::TMultiType* const>(OutputType);
YQL_ENSURE(multiType, "Expected multi type for wide output");

ui64 valueSize = 0;
for (ui32 i = 0; i < count; ++i) {
valueSize += TDqDataSerializer::EstimateSize(values[i], multiType->GetElementType(i));
}
ReestimateRowBytes(valueSize);
}

void ReestimateRowBytes(ui64 valueSize) {
if (EstimatedRowBytes) {
EstimatedRowBytes = static_cast<ui64>(0.6 * valueSize + 0.4 * EstimatedRowBytes);
} else {
Expand Down
Loading
Loading