From 9dcfd39346ec0f87f5f14f569aef236fecf23333 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Fri, 10 Jan 2025 15:26:25 -0800 Subject: [PATCH] fix: Reuse vector in LocalPartition (#12002) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12002 X-link: https://github.com/facebookincubator/nimble/pull/122 More than 10% of the CPU are spent on the destruction of local partition output when the load is high. Also add some optimizations for serialization. Optimization on `ByteOutputStream::appendBool` does not show significant gain in the query in example (because they are a lot small batches), but it is net gain and would be significant in large batches, so I leave it in the code. Reviewed By: xiaoxmeng Differential Revision: D67742489 fbshipit-source-id: 8e70dd128f31caa7909ed7c1e2b4ac1e59d7c87d --- velox/common/memory/ByteStream.cpp | 30 -------- velox/common/memory/ByteStream.h | 30 +++++++- velox/exec/LocalPartition.cpp | 88 ++++++++++++++------- velox/exec/LocalPartition.h | 34 ++++++++- velox/exec/ScaleWriterLocalPartition.cpp | 3 +- velox/exec/Task.cpp | 7 +- velox/exec/TaskStructs.h | 3 +- velox/exec/tests/LocalPartitionTest.cpp | 31 +++++++- velox/serializers/PrestoSerializer.cpp | 97 ++++++++++++------------ velox/vector/BaseVector.h | 10 ++- velox/vector/ConstantVector.h | 27 +++++-- velox/vector/DictionaryVector.h | 10 ++- velox/vector/LazyVector.h | 2 +- velox/vector/SequenceVector.h | 6 +- 14 files changed, 244 insertions(+), 134 deletions(-) diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index 115d2bd96a95..9aefec3f5b38 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -197,36 +197,6 @@ size_t ByteOutputStream::size() const { return total + std::max(ranges_.back().position, lastRangeEnd_); } -void ByteOutputStream::appendBool(bool value, int32_t count) { - VELOX_DCHECK(isBits_); - - if (count == 1 && current_->size > current_->position) { - bits::setBit( - reinterpret_cast(current_->buffer), - current_->position, - value); - ++current_->position; - return; - } - - int32_t offset{0}; - for (;;) { - const int32_t bitsFit = - std::min(count - offset, current_->size - current_->position); - bits::fillBits( - reinterpret_cast(current_->buffer), - current_->position, - current_->position + bitsFit, - value); - current_->position += bitsFit; - offset += bitsFit; - if (offset == count) { - return; - } - extend(bits::nbytes(count - offset)); - } -} - void ByteOutputStream::appendBits( const uint64_t* bits, int32_t begin, diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index c8435299c118..20678d59fa72 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -305,7 +305,35 @@ class ByteOutputStream { current_->position += sizeof(T) * values.size(); } - void appendBool(bool value, int32_t count); + inline void appendBool(bool value, int32_t count) { + VELOX_DCHECK(isBits_); + + if (count == 1 && current_->size > current_->position) { + bits::setBit( + reinterpret_cast(current_->buffer), + current_->position, + value); + ++current_->position; + return; + } + + int32_t offset{0}; + for (;;) { + const int32_t bitsFit = + std::min(count - offset, current_->size - current_->position); + bits::fillBits( + reinterpret_cast(current_->buffer), + current_->position, + current_->position + bitsFit, + value); + current_->position += bitsFit; + offset += bitsFit; + if (offset == count) { + return; + } + extend(bits::nbytes(count - offset)); + } + } // A fast path for appending bits into pre-cleared buffers after first extend. inline void diff --git a/velox/exec/LocalPartition.cpp b/velox/exec/LocalPartition.cpp index dce6dff0ad38..867da1a46624 100644 --- a/velox/exec/LocalPartition.cpp +++ b/velox/exec/LocalPartition.cpp @@ -55,6 +55,31 @@ std::vector LocalExchangeMemoryManager::decreaseMemoryUsage( return promises; } +void LocalExchangeVectorPool::push(const RowVectorPtr& vector, int64_t size) { + pool_.withWLock([&](auto& pool) { + if (totalSize_ + size <= capacity_) { + pool.emplace(vector, size); + totalSize_ += size; + } + }); +} + +RowVectorPtr LocalExchangeVectorPool::pop() { + return pool_.withWLock([&](auto& pool) -> RowVectorPtr { + while (!pool.empty()) { + auto [vector, size] = std::move(pool.front()); + pool.pop(); + totalSize_ -= size; + VELOX_CHECK_GE(totalSize_, 0); + if (vector.use_count() == 1) { + return vector; + } + } + VELOX_CHECK_EQ(totalSize_, 0); + return nullptr; + }); +} + void LocalExchangeQueue::addProducer() { queue_.withWLock([&](auto& /*queue*/) { VELOX_CHECK(!noMoreProducers_, "addProducer called after noMoreProducers"); @@ -124,6 +149,7 @@ BlockingReason LocalExchangeQueue::next( ContinueFuture* future, memory::MemoryPool* pool, RowVectorPtr* data) { + int64_t size; std::vector memoryPromises; auto blockingReason = queue_.withWLock([&](auto& queue) { *data = nullptr; @@ -138,8 +164,7 @@ BlockingReason LocalExchangeQueue::next( return BlockingReason::kWaitForProducer; } - int64_t size; - std::tie(*data, size) = queue.front(); + std::tie(*data, size) = std::move(queue.front()); queue.pop(); memoryPromises = memoryManager_->decreaseMemoryUsage(size); @@ -147,6 +172,9 @@ BlockingReason LocalExchangeQueue::next( return BlockingReason::kNotBlocked; }); notify(memoryPromises); + if (*data != nullptr) { + vectorPool_->push(*data, size); + } return blockingReason; } @@ -289,34 +317,44 @@ void LocalPartition::allocateIndexBuffers( RowVectorPtr LocalPartition::wrapChildren( const RowVectorPtr& input, vector_size_t size, - BufferPtr indices) { - VELOX_CHECK_EQ(childVectors_.size(), input->type()->size()); + const BufferPtr& indices, + RowVectorPtr reusable) { + if (!reusable) { + reusable = std::make_shared( + pool(), + input->type(), + nullptr, + size, + std::vector(input->childrenSize())); + } else { + VELOX_CHECK(!reusable->mayHaveNulls()); + VELOX_CHECK_EQ(reusable.use_count(), 1); + reusable->unsafeResize(size); + } - for (auto i = 0; i < input->type()->size(); ++i) { - childVectors_[i] = BaseVector::wrapInDictionary( - BufferPtr(nullptr), indices, size, input->childAt(i)); + for (auto i = 0; i < input->childrenSize(); ++i) { + auto& child = reusable->childAt(i); + if (child && child->encoding() == VectorEncoding::Simple::DICTIONARY && + child.use_count() == 1) { + child->BaseVector::resize(size); + child->setWrapInfo(indices); + child->setValueVector(input->childAt(i)); + } else { + child = BaseVector::wrapInDictionary( + nullptr, indices, size, input->childAt(i)); + } } - return std::make_shared( - input->pool(), input->type(), BufferPtr(nullptr), size, childVectors_); + reusable->updateContainsLazyNotLoaded(); + return reusable; } void LocalPartition::addInput(RowVectorPtr input) { prepareForInput(input); - if (numPartitions_ == 1) { - ContinueFuture future; - auto blockingReason = - queues_[0]->enqueue(input, input->retainedSize(), &future); - if (blockingReason != BlockingReason::kNotBlocked) { - blockingReasons_.push_back(blockingReason); - futures_.push_back(std::move(future)); - } - return; - } - - const auto singlePartition = - partitionFunction_->partition(*input, partitions_); + const auto singlePartition = numPartitions_ == 1 + ? 0 + : partitionFunction_->partition(*input, partitions_); if (singlePartition.has_value()) { ContinueFuture future; auto blockingReason = queues_[singlePartition.value()]->enqueue( @@ -349,7 +387,8 @@ void LocalPartition::addInput(RowVectorPtr input) { // Do not enqueue empty partitions. continue; } - auto partitionData = wrapChildren(input, partitionSize, indexBuffers_[i]); + auto partitionData = wrapChildren( + input, partitionSize, indexBuffers_[i], queues_[i]->getVector()); ContinueFuture future; auto reason = queues_[i]->enqueue( partitionData, totalSize * partitionSize / numInput, &future); @@ -371,9 +410,6 @@ void LocalPartition::prepareForInput(RowVectorPtr& input) { for (auto& child : input->children()) { child->loadedVector(); } - if (childVectors_.empty()) { - childVectors_.resize(input->type()->size()); - } } BlockingReason LocalPartition::isBlocked(ContinueFuture* future) { diff --git a/velox/exec/LocalPartition.h b/velox/exec/LocalPartition.h index a6f4d6bbf597..ae56e51ab7ed 100644 --- a/velox/exec/LocalPartition.h +++ b/velox/exec/LocalPartition.h @@ -53,6 +53,24 @@ class LocalExchangeMemoryManager { std::vector promises_; }; +/// A vector pool to reuse the RowVector and DictionaryVectors. Only +/// exclusively owned vectors will be reused. +class LocalExchangeVectorPool { + public: + explicit LocalExchangeVectorPool(int64_t capacity) : capacity_(capacity) {} + + /// `size' is the estimated size of the `vector' (e.g. taking shared + /// dictionary into consideration). + void push(const RowVectorPtr& vector, int64_t size); + + RowVectorPtr pop(); + + private: + const int64_t capacity_; + int64_t totalSize_{0}; + folly::Synchronized>> pool_; +}; + /// Buffers data for a single partition produced by local exchange. Allows /// multiple producers to enqueue data and multiple consumers fetch data. Each /// producer must be registered with a call to 'addProducer'. 'noMoreProducers' @@ -63,8 +81,11 @@ class LocalExchangeQueue { public: LocalExchangeQueue( std::shared_ptr memoryManager, + std::shared_ptr vectorPool, int partition) - : memoryManager_{std::move(memoryManager)}, partition_{partition} {} + : memoryManager_{std::move(memoryManager)}, + vectorPool_{std::move(vectorPool)}, + partition_{partition} {} std::string toString() const { return fmt::format("LocalExchangeQueue({})", partition_); @@ -99,6 +120,12 @@ class LocalExchangeQueue { /// called before all the data has been processed. No-op otherwise. void close(); + /// Get a reusable vector from the vector pool. Return nullptr if none is + /// available. + RowVectorPtr getVector() { + return vectorPool_->pop(); + } + /// Returns true if all producers have sent no more data signal. bool testingProducersDone() const; @@ -108,6 +135,7 @@ class LocalExchangeQueue { bool isFinishedLocked(const Queue& queue) const; const std::shared_ptr memoryManager_; + const std::shared_ptr vectorPool_; const int partition_; folly::Synchronized queue_; @@ -196,7 +224,8 @@ class LocalPartition : public Operator { RowVectorPtr wrapChildren( const RowVectorPtr& input, vector_size_t size, - BufferPtr indices); + const BufferPtr& indices, + RowVectorPtr reusable); const std::vector> queues_; const size_t numPartitions_; @@ -210,7 +239,6 @@ class LocalPartition : public Operator { /// Reusable buffers for input partitioning. std::vector indexBuffers_; std::vector rawIndices_; - std::vector childVectors_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/ScaleWriterLocalPartition.cpp b/velox/exec/ScaleWriterLocalPartition.cpp index 7d66db45bf3c..73fbd1ac4734 100644 --- a/velox/exec/ScaleWriterLocalPartition.cpp +++ b/velox/exec/ScaleWriterLocalPartition.cpp @@ -183,7 +183,8 @@ void ScaleWriterPartitioningLocalPartition::addInput(RowVectorPtr input) { auto writerInput = wrapChildren( input, writerRowCount, - std::move(writerAssignmmentIndicesBuffers_[i])); + std::move(writerAssignmmentIndicesBuffers_[i]), + queues_[i]->getVector()); ContinueFuture future; auto reason = queues_[i]->enqueue( writerInput, totalInputBytes * writerRowCount / numInput, &future); diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 587c571b76f9..b0c6b3e36238 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2609,11 +2609,12 @@ void Task::createLocalExchangeQueuesLocked( LocalExchangeState exchange; exchange.memoryManager = std::make_shared( queryCtx_->queryConfig().maxLocalExchangeBufferSize()); - + exchange.vectorPool = std::make_shared( + queryCtx_->queryConfig().maxLocalExchangeBufferSize()); exchange.queues.reserve(numPartitions); for (auto i = 0; i < numPartitions; ++i) { - exchange.queues.emplace_back( - std::make_shared(exchange.memoryManager, i)); + exchange.queues.emplace_back(std::make_shared( + exchange.memoryManager, exchange.vectorPool, i)); } const auto partitionNode = diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index cf3a39703fd4..bf3418a3f25b 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -88,6 +88,7 @@ struct SplitsState { /// Stores local exchange queues with the memory manager. struct LocalExchangeState { std::shared_ptr memoryManager; + std::shared_ptr vectorPool; std::vector> queues; std::shared_ptr scaleWriterPartitionBalancer; @@ -132,7 +133,7 @@ struct SplitGroupState { uint32_t numFinishedOutputDrivers{0}; // True if the state contains structures used for connecting ungrouped - // execution pipeline with grouped excution pipeline. In that case we don't + // execution pipeline with grouped execution pipeline. In that case we don't // want to clean up some of these structures. bool mixedExecutionMode{false}; diff --git a/velox/exec/tests/LocalPartitionTest.cpp b/velox/exec/tests/LocalPartitionTest.cpp index f5d6245b266c..e2e74f2d26cc 100644 --- a/velox/exec/tests/LocalPartitionTest.cpp +++ b/velox/exec/tests/LocalPartitionTest.cpp @@ -20,9 +20,8 @@ #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/functions/prestosql/window/WindowFunctionsRegistration.h" -using namespace facebook::velox; -using namespace facebook::velox::exec; -using namespace facebook::velox::exec::test; +namespace facebook::velox::exec::test { +namespace { class LocalPartitionTest : public HiveConnectorTestBase { protected: @@ -928,3 +927,29 @@ TEST_F( tasks[0]->requestAbort().wait(); thread.join(); } + +TEST_F(LocalPartitionTest, vectorPool) { + LocalExchangeVectorPool vectorPool(10); + std::vector vectors; + auto makeVector = [&] { + auto vector = + BaseVector::create(ROW({"c0"}, {BIGINT()}), 1, pool()); + vectors.push_back(vector.get()); + return vector; + }; + vectorPool.push(makeVector(), 5); + auto multiReferenced = makeVector(); + vectorPool.push(multiReferenced, 2); + vectorPool.push(makeVector(), 3); + vectorPool.push(makeVector(), 1); + auto vector = vectorPool.pop(); + ASSERT_TRUE(vector != nullptr); + ASSERT_EQ(vector.get(), vectors[0]); + vector = vectorPool.pop(); + ASSERT_TRUE(vector != nullptr); + ASSERT_EQ(vector.get(), vectors[2]); + ASSERT_FALSE(vectorPool.pop()); +} + +} // namespace +} // namespace facebook::velox::exec::test diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 891b6f6a9794..0576557a57f6 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -592,7 +592,7 @@ void read( const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); result->resize(resultOffset + numNewValues); - auto flatResult = result->asFlatVector(); + auto* flatResult = result->asUnchecked>(); auto nullCount = readNulls( source, size, resultOffset, incomingNulls, numIncomingNulls, *flatResult); @@ -609,15 +609,17 @@ void read( return; } } - if (type->isLongDecimal()) { - readDecimalValues( - source, - numNewValues, - resultOffset, - flatResult->nulls(), - nullCount, - values); - return; + if constexpr (std::is_same_v) { + if (type->isLongDecimal()) { + readDecimalValues( + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); + return; + } } if (isUuidType(type)) { readUuidValues( @@ -1497,9 +1499,9 @@ class VectorStream { initializeHeader(typeToEncodingName(type), *streamArena); if (type_->size() > 0) { hasLengths_ = true; - children_.resize(type_->size()); + children_.reserve(type_->size()); for (int32_t i = 0; i < type_->size(); ++i) { - children_[i] = std::make_unique( + children_.emplace_back( type_->childAt(i), std::nullopt, getChildAt(vector, i), @@ -1522,13 +1524,13 @@ class VectorStream { case VectorEncoding::Simple::CONSTANT: { initializeHeader(kRLE, *streamArena); isConstantStream_ = true; - children_.emplace_back(std::make_unique( + children_.emplace_back( type_, std::nullopt, std::nullopt, streamArena, initialNumRows, - opts)); + opts); return; } case VectorEncoding::Simple::DICTIONARY: { @@ -1544,13 +1546,13 @@ class VectorStream { initializeHeader(kDictionary, *streamArena); values_.startWrite(initialNumRows * 4); isDictionaryStream_ = true; - children_.emplace_back(std::make_unique( + children_.emplace_back( type_, std::nullopt, std::nullopt, streamArena, initialNumRows, - opts)); + opts); return; } default: @@ -1724,7 +1726,7 @@ class VectorStream { } VectorStream* childAt(int32_t index) { - return children_[index].get(); + return &children_[index]; } ByteOutputStream& values() { @@ -1750,12 +1752,12 @@ class VectorStream { switch (encoding_.value()) { case VectorEncoding::Simple::CONSTANT: { writeInt32(out, nonNullCount_); - children_[0]->flush(out); + children_[0].flush(out); return; } case VectorEncoding::Simple::DICTIONARY: { writeInt32(out, nonNullCount_); - children_[0]->flush(out); + children_[0].flush(out); values_.flush(out); // Write 24 bytes of 'instance id'. @@ -1779,7 +1781,7 @@ class VectorStream { writeInt32(out, children_.size()); for (auto& child : children_) { - child->flush(out); + child.flush(out); } if (!opts_.nullsFirst) { writeInt32(out, nullCount_ + nonNullCount_); @@ -1789,15 +1791,15 @@ class VectorStream { return; case TypeKind::ARRAY: - children_[0]->flush(out); + children_[0].flush(out); writeInt32(out, nullCount_ + nonNullCount_); lengths_.flush(out); flushNulls(out); return; case TypeKind::MAP: { - children_[0]->flush(out); - children_[1]->flush(out); + children_[0].flush(out); + children_[1].flush(out); // hash table size. -1 means not included in serialization. writeInt32(out, -1); writeInt32(out, nullCount_ + nonNullCount_); @@ -1862,7 +1864,7 @@ class VectorStream { nulls_.startWrite(nulls_.size()); values_.startWrite(values_.size()); for (auto& child : children_) { - child->clear(); + child.clear(); } } @@ -1880,9 +1882,9 @@ class VectorStream { [[fallthrough]]; case TypeKind::MAP: hasLengths_ = true; - children_.resize(type_->size()); + children_.reserve(type_->size()); for (int32_t i = 0; i < type_->size(); ++i) { - children_[i] = std::make_unique( + children_.emplace_back( type_->childAt(i), std::nullopt, getChildAt(vector, i), @@ -1934,7 +1936,7 @@ class VectorStream { ByteOutputStream nulls_; ByteOutputStream lengths_; ByteOutputStream values_; - std::vector> children_; + std::vector children_; bool isDictionaryStream_{false}; bool isConstantStream_{false}; }; @@ -2850,22 +2852,22 @@ void serializeWrapped( int32_t numInner = 0; auto* innerRows = innerRowsHolder.get(numRows); bool mayHaveNulls = vector->mayHaveNulls(); - VectorPtr wrapped; + const VectorPtr* wrapped; if (vector->encoding() == VectorEncoding::Simple::DICTIONARY && !mayHaveNulls) { // Dictionary with no nulls. auto* indices = vector->wrapInfo()->as(); - wrapped = vector->valueVector(); + wrapped = &vector->valueVector(); simd::transpose(indices, rows, innerRows); numInner = numRows; } else { - wrapped = BaseVector::wrappedVectorShared(vector); + wrapped = &BaseVector::wrappedVectorShared(vector); for (int32_t i = 0; i < rows.size(); ++i) { if (mayHaveNulls && vector->isNullAt(rows[i])) { // The wrapper added a null. if (numInner > 0) { serializeColumn( - wrapped, + *wrapped, folly::Range(innerRows, numInner), stream, scratch); @@ -2880,7 +2882,7 @@ void serializeWrapped( if (numInner > 0) { serializeColumn( - wrapped, + *wrapped, folly::Range(innerRows, numInner), stream, scratch); @@ -3669,7 +3671,7 @@ void estimateSerializedSizeInt( } int64_t flushUncompressed( - const std::vector>& streams, + std::vector& streams, int32_t numRows, OutputStream* out, PrestoOutputStreamListener* listener) { @@ -3700,7 +3702,7 @@ int64_t flushUncompressed( writeInt32(out, streams.size()); for (auto& stream : streams) { - stream->flush(out); + stream.flush(out); } // Pause CRC computation @@ -3769,7 +3771,7 @@ void flushSerialization( } FlushSizes flushCompressed( - const std::vector>& streams, + std::vector& streams, const StreamArena& arena, folly::io::Codec& codec, int32_t numRows, @@ -3792,7 +3794,7 @@ FlushSizes flushCompressed( writeInt32(&out, streams.size()); for (auto& stream : streams) { - stream->flush(&out); + stream.flush(&out); } const int32_t uncompressedSize = out.tellp(); @@ -3826,7 +3828,7 @@ FlushSizes flushCompressed( } FlushSizes flushStreams( - const std::vector>& streams, + std::vector& streams, int32_t numRows, const StreamArena& arena, folly::io::Codec& codec, @@ -3947,9 +3949,10 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { const auto numChildren = vector->childrenSize(); StreamArena arena(pool_); - std::vector> streams(numChildren); + std::vector streams; + streams.reserve(numChildren); for (int i = 0; i < numChildren; i++) { - streams[i] = std::make_unique( + streams.emplace_back( rowType->childAt(i), std::nullopt, vector->childAt(i), @@ -3958,7 +3961,7 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { opts_); if (numRows > 0) { - serializeColumn(vector->childAt(i), ranges, streams[i].get(), scratch); + serializeColumn(vector->childAt(i), ranges, &streams[i], scratch); } } @@ -4120,10 +4123,10 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { codec_(common::compressionKindToCodec(opts.compressionKind)) { const auto types = rowType->children(); const auto numTypes = types.size(); - streams_.resize(numTypes); + streams_.reserve(numTypes); for (int i = 0; i < numTypes; ++i) { - streams_[i] = std::make_unique( + streams_.emplace_back( types[i], std::nullopt, std::nullopt, streamArena, numRows, opts); } } @@ -4138,7 +4141,7 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } numRows_ += numNewRows; for (int32_t i = 0; i < vector->childrenSize(); ++i) { - serializeColumn(vector->childAt(i), ranges, streams_[i].get(), scratch); + serializeColumn(vector->childAt(i), ranges, &streams_[i], scratch); } } @@ -4152,14 +4155,14 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } numRows_ += numNewRows; for (int32_t i = 0; i < vector->childrenSize(); ++i) { - serializeColumn(vector->childAt(i), rows, streams_[i].get(), scratch); + serializeColumn(vector->childAt(i), rows, &streams_[i], scratch); } } size_t maxSerializedSize() const override { size_t dataSize = 4; // streams_.size() for (auto& stream : streams_) { - dataSize += stream->serializedSize(); + dataSize += const_cast(stream).serializedSize(); } auto compressedSize = needCompression(*codec_) @@ -4225,7 +4228,7 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { void clear() override { numRows_ = 0; for (auto& stream : streams_) { - stream->clear(); + stream.clear(); } } @@ -4235,7 +4238,7 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { const std::unique_ptr codec_; int32_t numRows_{0}; - std::vector> streams_; + std::vector streams_; // Count of forthcoming compressions to skip. int32_t numCompressionToSkip_{0}; diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index 65e014a193c0..b4a836cdc7eb 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -691,7 +691,7 @@ class BaseVector { VELOX_UNSUPPORTED("Vector is not a wrapper"); } - virtual VectorPtr& valueVector() { + virtual void setValueVector(VectorPtr valueVector) { VELOX_UNSUPPORTED("Vector is not a wrapper"); } @@ -715,8 +715,12 @@ class BaseVector { /// If 'this' is a wrapper, returns the wrap info, interpretation depends on /// encoding. - virtual BufferPtr wrapInfo() const { - throw std::runtime_error("Vector is not a wrapper"); + virtual const BufferPtr& wrapInfo() const { + VELOX_UNSUPPORTED("Vector is not a wrapper"); + } + + virtual void setWrapInfo(BufferPtr wrapInfo) { + VELOX_UNSUPPORTED("Vector is not a wrapper"); } template diff --git a/velox/vector/ConstantVector.h b/velox/vector/ConstantVector.h index affe3d3ee379..c299c40c20a2 100644 --- a/velox/vector/ConstantVector.h +++ b/velox/vector/ConstantVector.h @@ -130,6 +130,9 @@ class ConstantVector final : public SimpleVector { } virtual ~ConstantVector() override { + if (auto* wrapInfo = wrapInfo_.load()) { + delete wrapInfo; + } if (valueVector_) { valueVector_->clearContainingLazyAndWrapped(); } @@ -235,12 +238,21 @@ class ConstantVector final : public SimpleVector { return valueVector_ ? valueVector_->wrappedIndex(index_) : 0; } - BufferPtr wrapInfo() const override { + const BufferPtr& wrapInfo() const override { static const DummyReleaser kDummy; - return BufferView::create( - reinterpret_cast(&index_), - sizeof(vector_size_t), - kDummy); + auto* wrapInfo = wrapInfo_.load(); + if (FOLLY_UNLIKELY(!wrapInfo)) { + wrapInfo = new BufferPtr(BufferView::create( + reinterpret_cast(&index_), + sizeof(vector_size_t), + kDummy)); + BufferPtr* oldWrapInfo = nullptr; + if (!wrapInfo_.compare_exchange_strong(oldWrapInfo, wrapInfo)) { + delete wrapInfo; + wrapInfo = oldWrapInfo; + } + } + return *wrapInfo; } /// Base vector if isScalar() is false (e.g. complex type vector) or if base @@ -249,8 +261,8 @@ class ConstantVector final : public SimpleVector { return valueVector_; } - VectorPtr& valueVector() override { - return valueVector_; + void setValueVector(VectorPtr valueVector) override { + valueVector_ = std::move(valueVector); } /// Index of the element of the base vector that determines the value of this @@ -462,6 +474,7 @@ class ConstantVector final : public SimpleVector { T value_; bool isNull_ = false; bool initialized_{false}; + mutable std::atomic wrapInfo_{nullptr}; // This must be at end to avoid memory corruption. std::conditional_t, char> valueBuffer_; diff --git a/velox/vector/DictionaryVector.h b/velox/vector/DictionaryVector.h index 3791e69f9ff7..3d4d05b58141 100644 --- a/velox/vector/DictionaryVector.h +++ b/velox/vector/DictionaryVector.h @@ -130,14 +130,18 @@ class DictionaryVector : public SimpleVector { return dictionaryValues_; } - VectorPtr& valueVector() override { - return dictionaryValues_; + void setValueVector(VectorPtr valueVector) override { + setDictionaryValues(std::move(valueVector)); } - BufferPtr wrapInfo() const override { + const BufferPtr& wrapInfo() const override { return indices_; } + void setWrapInfo(BufferPtr indices) override { + indices_ = std::move(indices); + } + BufferPtr mutableIndices(vector_size_t size) { BaseVector::resizeIndices( BaseVector::length_, size, BaseVector::pool_, indices_, &rawIndices_); diff --git a/velox/vector/LazyVector.h b/velox/vector/LazyVector.h index 4ef1b0054ef2..d4b8c8ddfa6e 100644 --- a/velox/vector/LazyVector.h +++ b/velox/vector/LazyVector.h @@ -305,7 +305,7 @@ class LazyVector : public BaseVector { return loadedVector()->wrappedIndex(index); } - BufferPtr wrapInfo() const override { + const BufferPtr& wrapInfo() const override { return loadedVector()->wrapInfo(); } diff --git a/velox/vector/SequenceVector.h b/velox/vector/SequenceVector.h index bed33cf0428f..ab277ef7870e 100644 --- a/velox/vector/SequenceVector.h +++ b/velox/vector/SequenceVector.h @@ -125,10 +125,6 @@ class SequenceVector : public SimpleVector { return sequenceValues_; } - VectorPtr& valueVector() override { - return sequenceValues_; - } - BufferPtr getSequenceLengths() const { return sequenceLengths_; } @@ -144,7 +140,7 @@ class SequenceVector : public SimpleVector { return sequenceValues_->size(); } - BufferPtr wrapInfo() const override { + const BufferPtr& wrapInfo() const override { return sequenceLengths_; }