3030#include " velox/exec/OperatorUtils.h"
3131#include " velox/exec/SortBuffer.h"
3232
33- #include < boost/lexical_cast.hpp>
34- #include < boost/uuid/uuid_generators.hpp>
35- #include < boost/uuid/uuid_io.hpp>
36-
3733using facebook::velox::common::testutil::TestValue;
3834
3935namespace facebook ::velox::connector::hive {
@@ -95,14 +91,12 @@ std::vector<column_index_t> getPartitionChannels(
9591
9692// Returns the column indices of non-partition data columns.
9793std::vector<column_index_t > getNonPartitionChannels (
98- const std::vector<column_index_t >& partitionChannels,
99- const column_index_t childrenSize) {
94+ const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
10095 std::vector<column_index_t > dataChannels;
101- dataChannels.reserve (childrenSize - partitionChannels.size ());
10296
103- for (column_index_t i = 0 ; i < childrenSize; i++) {
104- if ( std::find (partitionChannels. cbegin (), partitionChannels. cend (), i) ==
105- partitionChannels. cend ()) {
97+ for (column_index_t i = 0 ; i < insertTableHandle-> inputColumns (). size ();
98+ i++) {
99+ if (!insertTableHandle-> inputColumns ()[i]-> isPartitionKey ()) {
106100 dataChannels.push_back (i);
107101 }
108102 }
@@ -119,10 +113,6 @@ std::string makePartitionDirectory(
119113 return tableDirectory;
120114}
121115
122- std::string makeUuid () {
123- return boost::lexical_cast<std::string>(boost::uuids::random_generator ()());
124- }
125-
126116std::unordered_map<LocationHandle::TableType, std::string> tableTypeNames () {
127117 return {
128118 {LocationHandle::TableType::kNew , " kNew" },
@@ -383,7 +373,8 @@ HiveDataSink::HiveDataSink(
383373 ? createBucketFunction(
384374 *insertTableHandle->bucketProperty (),
385375 inputType)
386- : nullptr) {}
376+ : nullptr,
377+ getNonPartitionChannels(insertTableHandle)) {}
387378
388379HiveDataSink::HiveDataSink (
389380 RowTypePtr inputType,
@@ -392,7 +383,8 @@ HiveDataSink::HiveDataSink(
392383 CommitStrategy commitStrategy,
393384 const std::shared_ptr<const HiveConfig>& hiveConfig,
394385 uint32_t bucketCount,
395- std::unique_ptr<core::PartitionFunction> bucketFunction)
386+ std::unique_ptr<core::PartitionFunction> bucketFunction,
387+ const std::vector<column_index_t >& dataChannels)
396388 : inputType_(std::move(inputType)),
397389 insertTableHandle_(std::move(insertTableHandle)),
398390 connectorQueryCtx_(connectorQueryCtx),
@@ -412,8 +404,7 @@ HiveDataSink::HiveDataSink(
412404 hiveConfig_->isPartitionPathAsLowerCase(
413405 connectorQueryCtx->sessionProperties ()))
414406 : nullptr),
415- dataChannels_(
416- getNonPartitionChannels (partitionChannels_, inputType_->size ())),
407+ dataChannels_(dataChannels),
417408 bucketCount_(static_cast <int32_t >(bucketCount)),
418409 bucketFunction_(std::move(bucketFunction)),
419410 writerFactory_(
@@ -489,6 +480,8 @@ void HiveDataSink::appendData(RowVectorPtr input) {
489480 input->childAt (i)->loadedVector ();
490481 }
491482
483+ splitInputRowsAndEnsureWriters (input);
484+
492485 // All inputs belong to a single non-bucketed partition. The partition id
493486 // must be zero.
494487 if (!isBucketed () && partitionIdGenerator_->numPartitions () == 1 ) {
@@ -497,8 +490,6 @@ void HiveDataSink::appendData(RowVectorPtr input) {
497490 return ;
498491 }
499492
500- splitInputRowsAndEnsureWriters ();
501-
502493 for (auto index = 0 ; index < writers_.size (); ++index) {
503494 const vector_size_t partitionSize = partitionSizes_[index];
504495 if (partitionSize == 0 ) {
@@ -670,30 +661,33 @@ bool HiveDataSink::finish() {
670661std::vector<std::string> HiveDataSink::close () {
671662 setState (State::kClosed );
672663 closeInternal ();
664+ return commitMessage ();
665+ }
673666
667+ std::vector<std::string> HiveDataSink::commitMessage () const {
674668 std::vector<std::string> partitionUpdates;
675669 partitionUpdates.reserve (writerInfo_.size ());
676670 for (int i = 0 ; i < writerInfo_.size (); ++i) {
677671 const auto & info = writerInfo_.at (i);
678672 VELOX_CHECK_NOT_NULL (info);
679673 // clang-format off
680- auto partitionUpdateJson = folly::toJson (
681- folly::dynamic::object
682- (" name" , info->writerParameters .partitionName ().value_or (" " ))
683- (" updateMode" ,
684- HiveWriterParameters::updateModeToString (
685- info->writerParameters .updateMode ()))
686- (" writePath" , info->writerParameters .writeDirectory ())
687- (" targetPath" , info->writerParameters .targetDirectory ())
688- (" fileWriteInfos" , folly::dynamic::array (
689- folly::dynamic::object
690- (" writeFileName" , info->writerParameters .writeFileName ())
691- (" targetFileName" , info->writerParameters .targetFileName ())
692- (" fileSize" , ioStats_.at (i)->rawBytesWritten ())))
693- (" rowCount" , info->numWrittenRows )
694- (" inMemoryDataSizeInBytes" , info->inputSizeInBytes )
695- (" onDiskDataSizeInBytes" , ioStats_.at (i)->rawBytesWritten ())
696- (" containsNumberedFileNames" , true ));
674+ auto partitionUpdateJson = folly::toJson (
675+ folly::dynamic::object
676+ (" name" , info->writerParameters .partitionName ().value_or (" " ))
677+ (" updateMode" ,
678+ HiveWriterParameters::updateModeToString (
679+ info->writerParameters .updateMode ()))
680+ (" writePath" , info->writerParameters .writeDirectory ())
681+ (" targetPath" , info->writerParameters .targetDirectory ())
682+ (" fileWriteInfos" , folly::dynamic::array (
683+ folly::dynamic::object
684+ (" writeFileName" , info->writerParameters .writeFileName ())
685+ (" targetFileName" , info->writerParameters .targetFileName ())
686+ (" fileSize" , ioStats_.at (i)->rawBytesWritten ())))
687+ (" rowCount" , info->numWrittenRows )
688+ (" inMemoryDataSizeInBytes" , info->inputSizeInBytes )
689+ (" onDiskDataSizeInBytes" , ioStats_.at (i)->rawBytesWritten ())
690+ (" containsNumberedFileNames" , true ));
697691 // clang-format on
698692 partitionUpdates.push_back (partitionUpdateJson);
699693 }
@@ -740,11 +734,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
740734 VELOX_CHECK_EQ (writers_.size (), writerInfo_.size ());
741735 VELOX_CHECK_EQ (writerIndexMap_.size (), writerInfo_.size ());
742736
743- std::optional<std::string> partitionName;
744- if (isPartitioned ()) {
745- partitionName =
746- partitionIdGenerator_->partitionName (id.partitionId .value ());
747- }
737+ std::optional<std::string> partitionName = getPartitionName (id);
748738
749739 // Without explicitly setting flush policy, the default memory based flush
750740 // policy is used.
@@ -831,15 +821,23 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
831821 options);
832822 writer = maybeCreateBucketSortWriter (std::move (writer));
833823 writers_.emplace_back (std::move (writer));
834- // Extends the buffer used for partition rows calculations.
835- partitionSizes_.emplace_back (0 );
836- partitionRows_.emplace_back (nullptr );
837- rawPartitionRows_.emplace_back (nullptr );
824+
825+ extendBuffersForPartitionedTables ();
838826
839827 writerIndexMap_.emplace (id, writers_.size () - 1 );
840828 return writerIndexMap_[id];
841829}
842830
831+ std::optional<std::string> HiveDataSink::getPartitionName (
832+ const HiveWriterId& id) const {
833+ std::optional<std::string> partitionName;
834+ if (isPartitioned ()) {
835+ partitionName =
836+ partitionIdGenerator_->partitionName (id.partitionId .value ());
837+ }
838+ return partitionName;
839+ }
840+
843841std::unique_ptr<facebook::velox::dwio::common::Writer>
844842HiveDataSink::maybeCreateBucketSortWriter (
845843 std::unique_ptr<facebook::velox::dwio::common::Writer> writer) {
@@ -867,6 +865,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
867865 sortWriterFinishTimeSliceLimitMs_);
868866}
869867
868+ void HiveDataSink::extendBuffersForPartitionedTables () {
869+ // Extends the buffer used for partition rows calculations.
870+ partitionSizes_.emplace_back (0 );
871+ partitionRows_.emplace_back (nullptr );
872+ rawPartitionRows_.emplace_back (nullptr );
873+ }
874+
870875HiveWriterId HiveDataSink::getWriterId (size_t row) const {
871876 std::optional<int32_t > partitionId;
872877 if (isPartitioned ()) {
@@ -881,7 +886,25 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
881886 return HiveWriterId{partitionId, bucketId};
882887}
883888
884- void HiveDataSink::splitInputRowsAndEnsureWriters () {
889+ void HiveDataSink::updatePartitionRows (
890+ uint32_t index,
891+ vector_size_t numRows,
892+ vector_size_t row) {
893+ VELOX_DCHECK_LT (index, partitionSizes_.size ());
894+ VELOX_DCHECK_EQ (partitionSizes_.size (), partitionRows_.size ());
895+ VELOX_DCHECK_EQ (partitionRows_.size (), rawPartitionRows_.size ());
896+ if (FOLLY_UNLIKELY (partitionRows_[index] == nullptr ) ||
897+ (partitionRows_[index]->capacity () < numRows * sizeof (vector_size_t ))) {
898+ partitionRows_[index] =
899+ allocateIndices (numRows, connectorQueryCtx_->memoryPool ());
900+ rawPartitionRows_[index] =
901+ partitionRows_[index]->asMutable <vector_size_t >();
902+ }
903+ rawPartitionRows_[index][partitionSizes_[index]] = row;
904+ ++partitionSizes_[index];
905+ }
906+
907+ void HiveDataSink::splitInputRowsAndEnsureWriters (RowVectorPtr /* input */ ) {
885908 VELOX_CHECK (isPartitioned () || isBucketed ());
886909 if (isBucketed () && isPartitioned ()) {
887910 VELOX_CHECK_EQ (bucketIds_.size (), partitionIds_.size ());
@@ -895,18 +918,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
895918 const auto id = getWriterId (row);
896919 const uint32_t index = ensureWriter (id);
897920
898- VELOX_DCHECK_LT (index, partitionSizes_.size ());
899- VELOX_DCHECK_EQ (partitionSizes_.size (), partitionRows_.size ());
900- VELOX_DCHECK_EQ (partitionRows_.size (), rawPartitionRows_.size ());
901- if (FOLLY_UNLIKELY (partitionRows_[index] == nullptr ) ||
902- (partitionRows_[index]->capacity () < numRows * sizeof (vector_size_t ))) {
903- partitionRows_[index] =
904- allocateIndices (numRows, connectorQueryCtx_->memoryPool ());
905- rawPartitionRows_[index] =
906- partitionRows_[index]->asMutable <vector_size_t >();
907- }
908- rawPartitionRows_[index][partitionSizes_[index]] = row;
909- ++partitionSizes_[index];
921+ updatePartitionRows (index, numRows, row);
910922 }
911923
912924 for (uint32_t i = 0 ; i < partitionSizes_.size (); ++i) {
@@ -917,6 +929,15 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
917929 }
918930}
919931
932+ std::string HiveDataSink::makePartitionDirectory (
933+ const std::string& tableDirectory,
934+ const std::optional<std::string>& partitionSubdirectory) const {
935+ if (partitionSubdirectory.has_value ()) {
936+ return fs::path (tableDirectory) / partitionSubdirectory.value ();
937+ }
938+ return tableDirectory;
939+ }
940+
920941HiveWriterParameters HiveDataSink::getWriterParameters (
921942 const std::optional<std::string>& partition,
922943 std::optional<uint32_t > bucketId) const {
0 commit comments