diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 16ebab47b57e..a85d44c74af3 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -18,6 +18,7 @@ #include +#include "velox/dwio/common/OnDemandUnitLoader.h" #include "velox/dwio/common/TypeUtils.h" #include "velox/dwio/common/exception/Exception.h" #include "velox/dwio/dwrf/reader/ColumnReader.h" @@ -28,29 +29,196 @@ namespace facebook::velox::dwrf { using dwio::common::ColumnSelector; using dwio::common::FileFormat; +using dwio::common::LoadUnit; using dwio::common::ReaderOptions; using dwio::common::RowReaderOptions; +using dwio::common::UnitLoader; +using dwio::common::UnitLoaderFactory; + +class DwrfUnit : public LoadUnit { + public: + DwrfUnit( + const StripeReaderBase& stripeReaderBase, + const StrideIndexProvider& strideIndexProvider, + dwio::common::ColumnReaderStatistics& columnReaderStatistics, + uint32_t stripeIndex, + std::shared_ptr columnSelector, + RowReaderOptions options) + : stripeReaderBase_{stripeReaderBase}, + strideIndexProvider_{strideIndexProvider}, + columnReaderStatistics_{columnReaderStatistics}, + stripeIndex_{stripeIndex}, + columnSelector_{std::move(columnSelector)}, + options_{std::move(options)}, + stripeInfo_{ + stripeReaderBase.getReader().getFooter().stripes(stripeIndex_)} {} + + ~DwrfUnit() override = default; + + // Perform the IO (read) + void load() override; + + // Unload the unit to free memory + void unload() override; + + // Number of rows in the unit + uint64_t getNumRows() override; + + // Number of bytes that the IO will read + uint64_t getIoSize() override; + + std::unique_ptr& getColumnReader() { + return columnReader_; + } + + std::unique_ptr& + getSelectiveColumnReader() { + return selectiveColumnReader_; + } + + private: + void ensureDecoders(); + void loadDecoders(); + + // Immutables + const StripeReaderBase& stripeReaderBase_; + const StrideIndexProvider& strideIndexProvider_; + dwio::common::ColumnReaderStatistics& columnReaderStatistics_; + const uint32_t stripeIndex_; + const std::shared_ptr columnSelector_; + const RowReaderOptions options_; + const StripeInformationWrapper stripeInfo_; + + // Mutables + bool preloaded_; + std::optional cachedIoSize_; + std::shared_ptr stripeReadState_; + std::unique_ptr stripeStreams_; + std::unique_ptr columnReader_; + std::unique_ptr selectiveColumnReader_; + std::shared_ptr stripeDictionaryCache_; +}; + +void DwrfUnit::load() { + ensureDecoders(); + loadDecoders(); +} + +void DwrfUnit::unload() { + cachedIoSize_.reset(); + stripeStreams_.reset(); + columnReader_.reset(); + selectiveColumnReader_.reset(); + stripeDictionaryCache_.reset(); + stripeReadState_.reset(); +} + +uint64_t DwrfUnit::getNumRows() { + return stripeInfo_.numberOfRows(); +} + +uint64_t DwrfUnit::getIoSize() { + if (cachedIoSize_) { + return *cachedIoSize_; + } + ensureDecoders(); + cachedIoSize_ = + stripeReadState_->stripeMetadata->stripeInput->nextFetchSize(); + return *cachedIoSize_; +} + +void DwrfUnit::ensureDecoders() { + if (columnReader_ || selectiveColumnReader_) { + return; + } + + preloaded_ = options_.getPreloadStripe(); + + stripeReadState_ = std::make_shared( + stripeReaderBase_.readerBaseShared(), + stripeReaderBase_.fetchStripe(stripeIndex_, preloaded_)); + + stripeStreams_ = std::make_unique( + stripeReadState_, + *columnSelector_, + options_, + stripeInfo_.offset(), + stripeInfo_.numberOfRows(), + strideIndexProvider_, + stripeIndex_); + + auto scanSpec = options_.getScanSpec().get(); + auto requestedType = columnSelector_->getSchemaWithId(); + auto fileType = stripeReaderBase_.getReader().getSchemaWithId(); + FlatMapContext flatMapContext; + flatMapContext.keySelectionCallback = options_.getKeySelectionCallback(); + memory::AllocationPool pool(&stripeReaderBase_.getReader().getMemoryPool()); + StreamLabels streamLabels(pool); + + if (scanSpec) { + selectiveColumnReader_ = SelectiveDwrfReader::build( + requestedType, + fileType, + *stripeStreams_, + streamLabels, + columnReaderStatistics_, + scanSpec, + flatMapContext, + true); // isRoot + selectiveColumnReader_->setIsTopLevel(); + } else { + columnReader_ = ColumnReader::build( // enqueue streams + requestedType, + fileType, + *stripeStreams_, + streamLabels, + options_.getDecodingExecutor().get(), + options_.getDecodingParallelismFactor(), + flatMapContext); + } + DWIO_ENSURE( + (columnReader_ != nullptr) != (selectiveColumnReader_ != nullptr), + "ColumnReader was not created"); +} + +void DwrfUnit::loadDecoders() { + // load data plan according to its updated selector + // during column reader construction + // if planReads is off which means stripe data loaded as whole + if (!preloaded_) { + VLOG(1) << "[DWRF] Load read plan for stripe " << stripeIndex_; + stripeStreams_->loadReadPlan(); + } + + stripeDictionaryCache_ = stripeStreams_->getStripeDictionaryCache(); +} + +namespace { + +DwrfUnit* castDwrfUnit(LoadUnit* unit) { + VELOX_CHECK(unit != nullptr); + auto* dwrfUnit = dynamic_cast(unit); + VELOX_CHECK(dwrfUnit != nullptr); + return dwrfUnit; +} + +} // namespace DwrfRowReader::DwrfRowReader( const std::shared_ptr& reader, const RowReaderOptions& opts) : StripeReaderBase(reader), + strideIndex_{0}, options_(opts), - executor_{options_.getDecodingExecutor()}, decodingTimeUsCallback_{options_.getDecodingTimeUsCallback()}, - stripeCountCallback_{options_.getStripeCountCallback()}, columnSelector_{std::make_shared( - ColumnSelector::apply(opts.getSelector(), reader->getSchema()))} { - if (executor_) { - LOG(INFO) << "Using parallel decoding with a parallelism factor of " - << options_.getDecodingParallelismFactor(); - } + ColumnSelector::apply(opts.getSelector(), reader->getSchema()))}, + currentUnit_{nullptr} { auto& fileFooter = getReader().getFooter(); uint32_t numberOfStripes = fileFooter.stripesSize(); currentStripe_ = numberOfStripes; stripeCeiling_ = 0; currentRowInStripe_ = 0; - newStripeReadyForRead_ = false; rowsInCurrentStripe_ = 0; uint64_t rowTotal = 0; @@ -77,8 +245,10 @@ DwrfRowReader::DwrfRowReader( if (stripeCeiling_ == 0) { stripeCeiling_ = firstStripe_; } - if (stripeCountCallback_) { - stripeCountCallback_(stripeCeiling_ - firstStripe_); + + auto stripeCountCallback = options_.getStripeCountCallback(); + if (stripeCountCallback) { + stripeCountCallback(stripeCeiling_ - firstStripe_); } if (currentStripe_ == 0) { @@ -105,12 +275,40 @@ DwrfRowReader::DwrfRowReader( dwio::common::typeutils::checkTypeCompatibility( *getReader().getSchema(), *columnSelector_, createExceptionContext); - stripeLoadBatons_.reserve(numberOfStripes); - for (int i = 0; i < numberOfStripes; i++) { - stripeLoadBatons_.emplace_back(std::make_unique>()); + unitLoader_ = getUnitLoader(); +} + +std::unique_ptr& DwrfRowReader::getColumnReader() { + VELOX_DCHECK(currentUnit_ != nullptr); + return currentUnit_->getColumnReader(); +} + +std::unique_ptr& +DwrfRowReader::getSelectiveColumnReader() { + VELOX_DCHECK(currentUnit_ != nullptr); + return currentUnit_->getSelectiveColumnReader(); +} + +std::unique_ptr DwrfRowReader::getUnitLoader() { + std::vector> loadUnits; + loadUnits.reserve(stripeCeiling_ - firstStripe_); + for (auto stripe = firstStripe_; stripe < stripeCeiling_; stripe++) { + loadUnits.emplace_back(std::make_unique( + /* stripeReaderBase */ *this, + /* strideIndexProvider */ *this, + columnReaderStatistics_, + stripe, + columnSelector_, + options_)); + } + std::shared_ptr unitLoaderFactory = + options_.getUnitLoaderFactory(); + if (!unitLoaderFactory) { + unitLoaderFactory = + std::make_shared( + options_.getBlockedOnIoCallback()); } - stripeLoadStatuses_ = folly::Synchronized( - std::vector(numberOfStripes, FetchStatus::NOT_STARTED)); + return unitLoaderFactory->create(std::move(loadUnits)); } uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { @@ -119,10 +317,6 @@ uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { return 0; } - DWIO_ENSURE( - !prefetchHasOccurred_, - "Prefetch already called. Currently, seek after prefetch is disallowed in DwrfRowReader"); - // If we are reading only a portion of the file // (bounded by firstStripe_ and stripeCeiling_), // seeking before or after the portion of interest should return no data. @@ -157,33 +351,43 @@ uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { return previousRow_; } + const auto previousStripe = currentStripe_; + const auto previousRowInStripe = currentRowInStripe_; currentStripe_ = seekToStripe; currentRowInStripe_ = rowNumber - firstRowOfStripe_[currentStripe_]; previousRow_ = rowNumber; - // Reset baton, since seek flow can load a stripe more than once and is - // synchronous for now. - VLOG(1) << "Resetting baton at " << currentStripe_; - stripeLoadBatons_[currentStripe_] = std::make_unique>(); - stripeLoadStatuses_.wlock()->operator[](currentStripe_) = - FetchStatus::NOT_STARTED; - - VLOG(1) << "rowNumber: " << rowNumber << " currentStripe_: " << currentStripe_ - << " firstStripe_: " << firstStripe_ - << " stripeCeiling_: " << stripeCeiling_; - - // Because prefetch and seek are currently incompatible, there should only - // ever be 1 stripe fetched at this point. - VLOG(1) << "Erasing " << currentStripe_ << " from prefetched_"; - prefetchedStripeStates_.wlock()->erase(currentStripe_); - newStripeReadyForRead_ = false; - startNextStripe(); - - if (selectiveColumnReader_) { - selectiveColumnReader_->skip(currentRowInStripe_); - } else { - columnReader_->skip(currentRowInStripe_); - } + if (currentStripe_ != previousStripe) { + // Different stripe. Let's load the new stripe. + currentUnit_ = nullptr; + loadCurrentStripe(); + if (currentRowInStripe_ > 0) { + skip(currentRowInStripe_); + } + } else if (currentRowInStripe_ < previousRowInStripe) { + // Same stripe but we have to seek backwards. + if (currentUnit_) { + // We had a loaded stripe, we have to reload. + LOG(WARNING) << "Reloading stripe " << currentStripe_ + << " because we have to seek backwards on it from row " + << previousRowInStripe << " to row " << currentRowInStripe_; + currentUnit_->unload(); + currentUnit_->load(); + } else { + // We had no stripe loaded. Let's load the current one. + loadCurrentStripe(); + } + if (currentRowInStripe_ > 0) { + skip(currentRowInStripe_); + } + } else if (currentRowInStripe_ > previousRowInStripe) { + // We have to seek forward on the same stripe. We can just skip. + if (!currentUnit_) { + // Load the current stripe if no stripe was loaded. + loadCurrentStripe(); + } + skip(currentRowInStripe_ - previousRowInStripe); + } // otherwise the seek ended on the same stripe, same row return previousRow_; } @@ -236,7 +440,7 @@ uint64_t DwrfRowReader::skipRows(uint64_t numberOfRowsToSkip) { } void DwrfRowReader::checkSkipStrides(uint64_t strideSize) { - if (!selectiveColumnReader_ || strideSize == 0 || + if (!getSelectiveColumnReader() || strideSize == 0 || currentRowInStripe_ % strideSize != 0) { return; } @@ -245,7 +449,7 @@ void DwrfRowReader::checkSkipStrides(uint64_t strideSize) { StatsContext context( getReader().getWriterName(), getReader().getWriterVersion()); DwrfData::FilterRowGroupsResult res; - selectiveColumnReader_->filterRowGroups(strideSize, context, res); + getSelectiveColumnReader()->filterRowGroups(strideSize, context, res); if (auto& metadataFilter = options_.getMetadataFilter()) { metadataFilter->eval(res.metadataFilterResults, res.filterResult); } @@ -266,7 +470,7 @@ void DwrfRowReader::checkSkipStrides(uint64_t strideSize) { skippedStrides_++; } if (foundStridesToSkip && currentRowInStripe_ < rowsInCurrentStripe_) { - selectiveColumnReader_->seekToRowGroup(currentStride); + getSelectiveColumnReader()->seekToRowGroup(currentStride); } } @@ -274,7 +478,7 @@ void DwrfRowReader::readNext( uint64_t rowsToRead, const dwio::common::Mutation* mutation, VectorPtr& result) { - if (!selectiveColumnReader_) { + if (!getSelectiveColumnReader()) { std::optional startTime; if (decodingTimeUsCallback_) { // We'll use wall time since we have parallel decoding. @@ -286,7 +490,7 @@ void DwrfRowReader::readNext( VELOX_CHECK( mutation == nullptr, "Mutation pushdown is only supported in selective reader"); - columnReader_->next(rowsToRead, result); + getColumnReader()->next(rowsToRead, result); if (startTime.has_value()) { decodingTimeUsCallback_( std::chrono::duration_cast( @@ -296,12 +500,20 @@ void DwrfRowReader::readNext( return; } if (!options_.getAppendRowNumberColumn()) { - selectiveColumnReader_->next(rowsToRead, result, mutation); + getSelectiveColumnReader()->next(rowsToRead, result, mutation); return; } readWithRowNumber(rowsToRead, mutation, result); } +uint64_t DwrfRowReader::skip(uint64_t numValues) { + if (getSelectiveColumnReader()) { + return getSelectiveColumnReader()->skip(numValues); + } else { + return getColumnReader()->skip(numValues); + } +} + void DwrfRowReader::readWithRowNumber( uint64_t rowsToRead, const dwio::common::Mutation* mutation, @@ -332,7 +544,7 @@ void DwrfRowReader::readWithRowNumber( rowVector->size(), std::move(children)); } - selectiveColumnReader_->next(rowsToRead, result, mutation); + getSelectiveColumnReader()->next(rowsToRead, result, mutation); FlatVector* flatRowNum = nullptr; if (rowNumVector && BaseVector::isVectorWritable(rowNumVector)) { flatRowNum = rowNumVector->asFlatVector(); @@ -350,7 +562,7 @@ void DwrfRowReader::readWithRowNumber( std::vector()); flatRowNum = rowNumVector->asUnchecked>(); } - auto rowOffsets = selectiveColumnReader_->outputRows(); + auto rowOffsets = getSelectiveColumnReader()->outputRows(); VELOX_DCHECK_EQ(rowOffsets.size(), result->size()); auto* rawRowNum = flatRowNum->mutableRawValues(); for (int i = 0; i < rowOffsets.size(); ++i) { @@ -387,7 +599,7 @@ int64_t DwrfRowReader::nextRowNumber() { goto advanceToNextStripe; } } - startNextStripe(); + loadCurrentStripe(); } checkSkipStrides(strideSize); if (currentRowInStripe_ < rowsInCurrentStripe_) { @@ -396,7 +608,7 @@ int64_t DwrfRowReader::nextRowNumber() { advanceToNextStripe: ++currentStripe_; currentRowInStripe_ = 0; - newStripeReadyForRead_ = false; + currentUnit_ = nullptr; } atEnd_ = true; return kAtEnd; @@ -438,211 +650,30 @@ uint64_t DwrfRowReader::next( // reading of the data. auto strideSize = getReader().getFooter().rowIndexStride(); strideIndex_ = strideSize > 0 ? currentRowInStripe_ / strideSize : 0; + unitLoader_->onRead(currentStripe_, currentRowInStripe_, rowsToRead); readNext(rowsToRead, mutation, result); currentRowInStripe_ += rowsToRead; return rowsToRead; } void DwrfRowReader::resetFilterCaches() { - if (selectiveColumnReader_) { - selectiveColumnReader_->resetFilterCaches(); + if (getSelectiveColumnReader()) { + getSelectiveColumnReader()->resetFilterCaches(); recomputeStridesToSkip_ = true; } // For columnReader_, this is no-op. } -std::optional> -DwrfRowReader::prefetchUnits() { - auto rowsInStripe = getReader().getRowsPerStripe(); - DWIO_ENSURE(firstStripe_ <= rowsInStripe.size()); - DWIO_ENSURE(stripeCeiling_ <= rowsInStripe.size()); - DWIO_ENSURE(firstStripe_ <= stripeCeiling_); - - std::vector res; - res.reserve(stripeCeiling_ - firstStripe_); - - for (auto stripe = firstStripe_; stripe < stripeCeiling_; ++stripe) { - res.push_back( - {.rowCount = rowsInStripe[stripe], - .prefetch = std::bind(&DwrfRowReader::prefetch, this, stripe)}); - } - return res; -} - -DwrfRowReader::FetchResult DwrfRowReader::fetch(uint32_t stripeIndex) { - FetchStatus prevStatus; - stripeLoadStatuses_.withWLock([&](auto& stripeLoadStatus) { - if (stripeIndex < 0 || stripeIndex >= stripeLoadStatus.size()) { - prevStatus = FetchStatus::ERROR; - } - - prevStatus = stripeLoadStatus[stripeIndex]; - if (prevStatus == FetchStatus::NOT_STARTED) { - stripeLoadStatus[stripeIndex] = FetchStatus::IN_PROGRESS; - } - }); - - DWIO_ENSURE( - prevStatus != FetchStatus::ERROR, "Fetch request was out of bounds"); - - if (prevStatus != FetchStatus::NOT_STARTED) { - bool finishedLoading = prevStatus == FetchStatus::FINISHED; - - VLOG(1) << "Stripe " << stripeIndex << " was not loaded, as it was already " - << (finishedLoading ? "finished loading" : "in progress"); - return finishedLoading ? FetchResult::kAlreadyFetched - : FetchResult::kInProgress; - } - - DWIO_ENSURE( - !prefetchedStripeStates_.rlock()->contains(stripeIndex), - "prefetched stripe state already exists for stripeIndex " + - std::to_string(stripeIndex) + ", LIKELY RACE CONDITION"); - - auto startTime = std::chrono::high_resolution_clock::now(); - PrefetchedStripeState stripeState; - - bool preload = options_.getPreloadStripe(); - auto state = std::make_shared( - readerBaseShared(), fetchStripe(stripeIndex, preload)); - - stripeState.stripeReadState = state; - - auto stripe = getReader().getFooter().stripes(stripeIndex); - StripeStreamsImpl stripeStreams( - state, - getColumnSelector(), - options_, - stripe.offset(), - stripe.numberOfRows(), - *this, - stripeIndex); - - auto scanSpec = options_.getScanSpec().get(); - auto requestedType = getColumnSelector().getSchemaWithId(); - auto fileType = getReader().getSchemaWithId(); - FlatMapContext flatMapContext; - flatMapContext.keySelectionCallback = options_.getKeySelectionCallback(); - memory::AllocationPool pool(&getReader().getMemoryPool()); - StreamLabels streamLabels(pool); - - if (scanSpec) { - stripeState.selectiveColumnReader = SelectiveDwrfReader::build( - requestedType, - fileType, - stripeStreams, - streamLabels, - columnReaderStatistics_, - scanSpec, - flatMapContext, - true); // isRoot - stripeState.selectiveColumnReader->setIsTopLevel(); - } else { - stripeState.columnReader = ColumnReader::build( // enqueue streams - requestedType, - fileType, - stripeStreams, - streamLabels, - executor_.get(), - options_.getDecodingParallelismFactor(), - flatMapContext); - } - DWIO_ENSURE( - (stripeState.columnReader != nullptr) != - (stripeState.selectiveColumnReader != nullptr), - "ColumnReader was not created"); - - // load data plan according to its updated selector - // during column reader construction - // if planReads is off which means stripe data loaded as whole - if (!preload) { - VLOG(1) << "[DWRF] Load read plan for stripe " << currentStripe_; - stripeStreams.loadReadPlan(); - } - - stripeState.stripeDictionaryCache = stripeStreams.getStripeDictionaryCache(); - stripeState.preloaded = preload; - prefetchedStripeStates_.wlock()->operator[](stripeIndex) = - std::move(stripeState); - - auto endTime = std::chrono::high_resolution_clock::now(); - VLOG(1) << " time to complete prefetch: " - << std::chrono::duration_cast( - endTime - startTime) - .count(); - stripeLoadBatons_[stripeIndex]->post(); - VLOG(1) << "done in fetch and baton posted for " << stripeIndex << ", thread " - << std::this_thread::get_id(); - - stripeLoadStatuses_.wlock()->operator[](stripeIndex) = FetchStatus::FINISHED; - return FetchResult::kFetched; -} - -DwrfRowReader::FetchResult DwrfRowReader::prefetch(uint32_t stripeToFetch) { - DWIO_ENSURE(stripeToFetch < stripeCeiling_); - prefetchHasOccurred_ = true; - - VLOG(1) << "Unlocked lock and calling fetch for " << stripeToFetch - << ", thread " << std::this_thread::get_id(); - return fetch(stripeToFetch); -} - -// Guarantee stripe we are currently on is available and loaded -void DwrfRowReader::safeFetchNextStripe() { - auto startTime = std::chrono::high_resolution_clock::now(); - auto fetchResult = fetch(currentStripe_); - // If result is fetched by this thread or in progress in another thread, - // record time spent in this function as time blocked on IO. - bool shouldRecordTimeBlocked = fetchResult != FetchResult::kAlreadyFetched; - - // Check result of fetch to avoid synchronization if we fetched on this - // thread. - if (fetchResult != FetchResult::kFetched) { - // Now we know the stripe was or is being loaded on another thread, - // Await the baton for this stripe before we return to ensure load is done. - VLOG(1) << "Waiting on baton for stripe: " << currentStripe_; - stripeLoadBatons_[currentStripe_]->wait(); - VLOG(1) << "Acquired baton for stripe " << currentStripe_; - } - auto reportBlockedOnIoMetric = options_.getBlockedOnIoCallback(); - if (reportBlockedOnIoMetric) { - if (shouldRecordTimeBlocked) { - auto timeBlockedOnIo = - std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - startTime); - reportBlockedOnIoMetric(timeBlockedOnIo.count()); - } else { - // We still want to populate stat if we are not blocking on IO. - reportBlockedOnIoMetric(0); - }; - } - - DWIO_ENSURE(prefetchedStripeStates_.rlock()->contains(currentStripe_)); -} - -void DwrfRowReader::startNextStripe() { - if (newStripeReadyForRead_ || currentStripe_ >= stripeCeiling_) { +void DwrfRowReader::loadCurrentStripe() { + if (currentUnit_ || currentStripe_ >= stripeCeiling_) { return; } - columnReader_.reset(); - selectiveColumnReader_.reset(); - safeFetchNextStripe(); - prefetchedStripeStates_.withWLock([&](auto& prefetchedStripeStates) { - DWIO_ENSURE(prefetchedStripeStates.contains(currentStripe_)); - - auto stripe = getReader().getFooter().stripes(currentStripe_); - rowsInCurrentStripe_ = stripe.numberOfRows(); - - auto& state = prefetchedStripeStates[currentStripe_]; - columnReader_ = std::move(state.columnReader); - selectiveColumnReader_ = std::move(state.selectiveColumnReader); - stripeDictionaryCache_ = std::move(state.stripeDictionaryCache); - stripeMetadata_ = std::move(state.stripeReadState->stripeMetadata); - - prefetchedStripeStates.erase(currentStripe_); - }); - newStripeReadyForRead_ = true; + VELOX_CHECK_GE(currentStripe_, firstStripe_); + strideIndex_ = 0; + const auto loadUnitIdx = currentStripe_ - firstStripe_; + currentUnit_ = castDwrfUnit(&unitLoader_->getLoadedUnit(loadUnitIdx)); + rowsInCurrentStripe_ = currentUnit_->getNumRows(); } size_t DwrfRowReader::estimatedReaderMemory() const { @@ -777,12 +808,12 @@ DwrfReader::DwrfReader( options.isFileColumnNamesReadAsLowerCase(), options.randomSkip())), options_(options) { - // If we are not using column names to map table columns to file columns, then - // we use indices. In that case we need to ensure the names completely match, - // because we are still mapping columns by names further down the code. - // So we rename column names in the file schema to match table schema. - // We test the options to have 'fileSchema' (actually table schema) as most of - // the unit tests fail to provide it. + // If we are not using column names to map table columns to file columns, + // then we use indices. In that case we need to ensure the names completely + // match, because we are still mapping columns by names further down the + // code. So we rename column names in the file schema to match table schema. + // We test the options to have 'fileSchema' (actually table schema) as most + // of the unit tests fail to provide it. if ((not options_.isUseColumnNamesForColumnMapping()) and (options_.getFileSchema() != nullptr)) { updateColumnNamesFromTableSchema(); @@ -841,7 +872,8 @@ TypePtr updateColumnNames( const TypePtr& tableType, const std::string& fileFieldName, const std::string& tableFieldName) { - // Check type kind equality. If not equal, no point to continue down the tree. + // Check type kind equality. If not equal, no point to continue down the + // tree. if (fileType->kind() != tableType->kind()) { logTypeInequality(*fileType, *tableType, fileFieldName, tableFieldName); return fileType; @@ -1096,7 +1128,7 @@ std::unique_ptr DwrfReader::createDwrfRowReader( // background have a reader tree and can preload the first // stripe. Also the reader tree needs to exist in order to receive // adaptation from a previous reader. - rowReader->startNextStripe(); + rowReader->loadCurrentStripe(); } return rowReader; } diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index 86c9908ba080..296581d87986 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -19,11 +19,13 @@ #include "folly/Executor.h" #include "folly/synchronization/Baton.h" #include "velox/dwio/common/ReaderFactory.h" +#include "velox/dwio/common/UnitLoader.h" #include "velox/dwio/dwrf/reader/SelectiveDwrfReader.h" namespace facebook::velox::dwrf { class ColumnReader; +class DwrfUnit; class DwrfRowReader : public StrideIndexProvider, public StripeReaderBase, @@ -112,25 +114,17 @@ class DwrfRowReader : public StrideIndexProvider, return it->second; } - // Creates column reader tree and may start prefetch of frequently read - // columns. - void startNextStripe(); + void loadCurrentStripe(); - void safeFetchNextStripe(); - - std::optional> prefetchUnits() override; + std::optional> prefetchUnits() override { + return std::nullopt; + } int64_t nextRowNumber() override; int64_t nextReadSize(uint64_t size) override; private: - // Represents the status of a stripe being fetched. - enum class FetchStatus { NOT_STARTED, IN_PROGRESS, FINISHED, ERROR }; - - FetchResult fetch(uint32_t stripeIndex); - FetchResult prefetch(uint32_t stripeToFetch); - // footer std::vector firstRowOfStripe_; mutable std::shared_ptr selectedSchema_; @@ -143,49 +137,14 @@ class DwrfRowReader : public StrideIndexProvider, // stripe in the RowReader's bounds is 3, then stripeCeiling_ is 4. uint32_t stripeCeiling_; uint64_t currentRowInStripe_; - bool newStripeReadyForRead_; uint64_t rowsInCurrentStripe_; uint64_t strideIndex_; - std::shared_ptr stripeDictionaryCache_; dwio::common::RowReaderOptions options_; - std::shared_ptr executor_; std::function decodingTimeUsCallback_; - std::function stripeCountCallback_; - - struct PrefetchedStripeState { - bool preloaded; - std::unique_ptr columnReader; - std::unique_ptr selectiveColumnReader; - std::shared_ptr stripeDictionaryCache; - std::shared_ptr stripeReadState; - }; - - // stripeLoadStatuses_ and prefetchedStripeStates_ will never be acquired - // simultaneously on the same thread. - // Key is stripe index - folly::Synchronized> - prefetchedStripeStates_; - - // Indicates the status of load requests. The ith element in - // stripeLoadStatuses_ represents the status of the ith stripe. - folly::Synchronized> stripeLoadStatuses_; - - // Currently, seek logic relies on reloading the stripe every time the row is - // seeked to, even if the row was present in the already loaded stripe. This - // is a temporary flag to disable seek on a reader which has already - // prefetched, until we implement a good way to support both. - std::atomic prefetchHasOccurred_{false}; - - // Used to indicate which stripes are finished loading. If stripeLoadBatons[i] - // is posted, it means the ith stripe has finished loading - std::vector>> stripeLoadBatons_; // column selector std::shared_ptr columnSelector_; - std::unique_ptr columnReader_; - std::unique_ptr selectiveColumnReader_; - std::unique_ptr stripeMetadata_; const uint64_t* stridesToSkip_; int stridesToSkipSize_; // Record of strides to skip in each visited stripe. Used for diagnostics. @@ -202,6 +161,9 @@ class DwrfRowReader : public StrideIndexProvider, bool atEnd_{false}; + std::unique_ptr unitLoader_; + DwrfUnit* currentUnit_; + // internal methods std::optional estimatedRowSizeHelper( @@ -228,6 +190,15 @@ class DwrfRowReader : public StrideIndexProvider, uint64_t rowsToRead, const dwio::common::Mutation*, VectorPtr& result); + + uint64_t skip(uint64_t numValues); + + std::unique_ptr& getColumnReader(); + + std::unique_ptr& + getSelectiveColumnReader(); + + std::unique_ptr getUnitLoader(); }; class DwrfReader : public dwio::common::Reader { diff --git a/velox/dwio/dwrf/reader/StripeReaderBase.cpp b/velox/dwio/dwrf/reader/StripeReaderBase.cpp index c1240b99e6e7..cfb7b2796b5a 100644 --- a/velox/dwio/dwrf/reader/StripeReaderBase.cpp +++ b/velox/dwio/dwrf/reader/StripeReaderBase.cpp @@ -25,7 +25,7 @@ using dwio::common::LogType; // true) will reuse the result without considering the new preload directive std::unique_ptr StripeReaderBase::fetchStripe( uint32_t index, - bool& preload) { + bool& preload) const { auto& fileFooter = reader_->getFooter(); DWIO_ENSURE_LT(index, fileFooter.stripesSize(), "invalid stripe index"); auto stripe = fileFooter.stripes(index); @@ -101,7 +101,7 @@ void StripeReaderBase::loadEncryptionKeys( uint32_t index, const proto::StripeFooter& stripeFooter, encryption::DecryptionHandler& handler, - const StripeInformationWrapper& stripeInfo) { + const StripeInformationWrapper& stripeInfo) const { if (!handler.isEncrypted()) { return; } diff --git a/velox/dwio/dwrf/reader/StripeReaderBase.h b/velox/dwio/dwrf/reader/StripeReaderBase.h index 6dc499295bbf..7f7ee43bc6ec 100644 --- a/velox/dwio/dwrf/reader/StripeReaderBase.h +++ b/velox/dwio/dwrf/reader/StripeReaderBase.h @@ -71,7 +71,7 @@ class StripeReaderBase { std::unique_ptr fetchStripe( uint32_t index, - bool& preload); + bool& preload) const; private: const std::shared_ptr reader_; @@ -81,7 +81,7 @@ class StripeReaderBase { uint32_t index, const proto::StripeFooter& stripeFooter, encryption::DecryptionHandler& handler, - const StripeInformationWrapper& stripeInfo); + const StripeInformationWrapper& stripeInfo) const; friend class StripeLoadKeysTest; }; diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index a10c4680a1ae..0292c2c1cc0a 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -118,20 +118,6 @@ class TestReader : public testing::Test, public VectorTestBase { } }; -class TestRowReaderPrefetch : public testing::Test, public VectorTestBase { - protected: - static void SetUpTestCase() { - memory::MemoryManager::testingSetInstance({}); - } -}; - -class TestRowReaderPfetch : public testing::Test, public VectorTestBase { - protected: - static void SetUpTestCase() { - memory::MemoryManager::testingSetInstance({}); - } -}; - } // namespace TEST_F(TestReader, testWriterVersions) { @@ -150,42 +136,6 @@ std::unique_ptr createFileBufferedInput( std::make_shared(path), pool); } -// Prefetches the entire range of the reader and verifies correctness in -// PrefetchUnits() API. Does not do any actual reading of the file. -void verifyPrefetch( - DwrfRowReader* rowReader, - const std::vector& expectedPrefetchRowSizes = {}, - const std::vector& shouldTryPrefetch = {}) { - auto prefetchUnitsOpt = rowReader->prefetchUnits(); - ASSERT_TRUE(prefetchUnitsOpt.has_value()); - auto prefetchUnits = std::move(prefetchUnitsOpt.value()); - auto numFetches = prefetchUnits.size(); - auto expectedResultsSize = shouldTryPrefetch.size(); - auto expectedRowsSize = expectedPrefetchRowSizes.size(); - bool shouldCheckResults = expectedResultsSize != 0; - bool shouldCheckRowCount = expectedRowsSize != 0; - - // Empty vector will skip the check, but they should never been different than - // actual expected prefetchUnits vector - DWIO_ENSURE(expectedResultsSize == numFetches || !shouldCheckResults); - DWIO_ENSURE(expectedRowsSize == numFetches || !shouldCheckRowCount); - - for (int i = 0; i < numFetches; i++) { - if (shouldCheckRowCount) { - EXPECT_EQ(prefetchUnits[i].rowCount, expectedPrefetchRowSizes[i]); - } - if (shouldCheckResults && shouldTryPrefetch[i]) { - RowReader::FetchResult result = prefetchUnits[i].prefetch(); - EXPECT_EQ( - result, - // A prefetch request for the first stripe should be already fetched, - // because createDwrfRowReader calls startNextStripe() synchronously. - i == 0 ? RowReader::FetchResult::kAlreadyFetched - : RowReader::FetchResult::kFetched); - } - } -} - // This relies on schema and data inside of our fm_small and fm_large orc files, // and is not composeable with other schema/datas void verifyFlatMapReading( @@ -314,8 +264,6 @@ void verifyFlatMapReading( auto rowReaderOwner = reader->createRowReader(rowReaderOpts); auto rowReader = dynamic_cast(rowReaderOwner.get()); - // Prefetch the requested # of times - verifyPrefetch(rowReader, expectedPrefetchRowSizes, shouldTryPrefetch); verifyFlatMapReading(rowReader, seeks, expectedBatchSize, numBatches); } @@ -558,384 +506,6 @@ VELOX_INSTANTIATE_TEST_SUITE_P( TestFlatMapReader, Values(true, false)); -TEST_F(TestRowReaderPrefetch, testPartialPrefetch) { - // batch size is set as 1000 in reading - std::array seeks; - seeks.fill(0); - const std::array expectedBatchSize{300, 300, 300, 100}; - verifyFlatMapReading( - pool(), - getFMSmallFile(), - seeks.data(), - expectedBatchSize.data(), - expectedBatchSize.size(), - false, - {300, 300, 300, 100}, - /* file has 4 stripes, prefetch only some and verify whole read */ - {true, false, true, false}); -} - -TEST_F(TestRowReaderPrefetch, testPrefetchWholeFile) { - // batch size is set as 1000 in reading - std::array seeks; - seeks.fill(0); - const std::array expectedBatchSize{300, 300, 300, 100}; - verifyFlatMapReading( - pool(), - getFMSmallFile(), - seeks.data(), - expectedBatchSize.data(), - expectedBatchSize.size(), - false, - {300, 300, 300, 100}, - /* file has 4 stripes, issue prefetch for each one */ - {true, true, true, true}); -} - -TEST_F(TestRowReaderPfetch, testSeekBeforePrefetch) { - // batch size is set as 1000 in reading - std::array seeks; - seeks.fill(0); - - dwio::common::ReaderOptions readerOpts{pool()}; - RowReaderOptions rowReaderOpts; - rowReaderOpts.select(std::make_shared(getFlatmapSchema())); - auto reader = DwrfReader::create( - createFileBufferedInput(getFMSmallFile(), readerOpts.getMemoryPool()), - readerOpts); - auto rowReaderOwner = reader->createRowReader(rowReaderOpts); - auto rowReader = dynamic_cast(rowReaderOwner.get()); - - rowReader->seekToRow(100); - // First stripe has 300 rows, but expect 200 due to seeking past first 100 - const std::array expectedBatchSize{200, 300, 300, 100}; - auto prefetches = rowReader->prefetchUnits().value(); - for (auto& prefetch : prefetches) { - prefetch.prefetch(); - } - - verifyFlatMapReading( - rowReader, - seeks.data(), - expectedBatchSize.data(), - expectedBatchSize.size()); -} - -// Synchronous interleaving -TEST_F(TestRowReaderPrefetch, testPrefetchAndStartNextStripeInterleaved) { - // batch size is set as 1000 in reading - std::array seeks; - seeks.fill(0); - const std::array expectedBatchSize{300, 300, 300, 100}; - dwio::common::ReaderOptions readerOpts{pool()}; - readerOpts.setFilePreloadThreshold(0); - RowReaderOptions rowReaderOpts; - rowReaderOpts.select(std::make_shared(getFlatmapSchema())); - auto reader = DwrfReader::create( - createFileBufferedInput(getFMSmallFile(), readerOpts.getMemoryPool()), - readerOpts); - auto rowReaderOwner = reader->createRowReader(rowReaderOpts); - auto rowReader = dynamic_cast(rowReaderOwner.get()); - - // startNextStripe just loads state for current row- it shouldn't prefetch - // ahead of its place - rowReader->startNextStripe(); - - // std::optional> units = - // rowReader->prefetchUnits(); - auto units = rowReader->prefetchUnits().value(); - EXPECT_EQ(units.size(), 4); - - // startNextStripe should not interfere with prefetch- it should just be - // continuously re-loading the stripe its row index is on (currently 0). - EXPECT_EQ(units[0].prefetch(), DwrfRowReader::FetchResult::kAlreadyFetched); - EXPECT_EQ(units[1].prefetch(), DwrfRowReader::FetchResult::kFetched); - EXPECT_EQ(units[1].prefetch(), DwrfRowReader::FetchResult::kAlreadyFetched); - rowReader->startNextStripe(); - rowReader->startNextStripe(); - EXPECT_EQ(units[1].prefetch(), DwrfRowReader::FetchResult::kAlreadyFetched); - - // Prefetch rest of stripe and call again (expecting no-op) - EXPECT_EQ(units[2].prefetch(), DwrfRowReader::FetchResult::kFetched); - EXPECT_EQ(units[3].prefetch(), DwrfRowReader::FetchResult::kFetched); - - // DwrfRowReader still should register having no prefetching to do - rowReader->startNextStripe(); - - // Verify reads are correct - verifyFlatMapReading( - rowReader, - seeks.data(), - expectedBatchSize.data(), - expectedBatchSize.size()); -} - -TEST_F(TestRowReaderPrefetch, testReadLargePrefetch) { - // batch size is set as 1000 in reading - // 3000 per stripe - std::array seeks; - seeks.fill(0); - const std::array expectedBatchSize{ - 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000}; - verifyFlatMapReading( - pool(), - getFMLargeFile(), - seeks.data(), - expectedBatchSize.data(), - expectedBatchSize.size(), - false, - {3000, 3000, 3000, 1000}, - {true, true, false, false}); -} - -TEST_F(TestRowReaderPrefetch, testParallelPrefetch) { - // batch size is set as 1000 in reading - std::array seeks; - seeks.fill(0); - const std::array expectedBatchSize{300, 300, 300, 100}; - dwio::common::ReaderOptions readerOpts{pool()}; - RowReaderOptions rowReaderOpts; - rowReaderOpts.select(std::make_shared(getFlatmapSchema())); - auto reader = DwrfReader::create( - createFileBufferedInput(getFMSmallFile(), readerOpts.getMemoryPool()), - readerOpts); - auto rowReaderOwner = reader->createRowReader(rowReaderOpts); - auto rowReader = dynamic_cast(rowReaderOwner.get()); - - auto units = rowReader->prefetchUnits().value(); - std::vector> prefetches; - prefetches.reserve(4); - for (int i = 0; i < 4; i++) { - prefetches.push_back(std::async(units[i].prefetch)); - } - - // Verify reads are correct - verifyFlatMapReading( - rowReader, - seeks.data(), - expectedBatchSize.data(), - expectedBatchSize.size()); -} - -// Use large file and disable preload to test -TEST_F(TestRowReaderPrefetch, testParallelPrefetchNoPreload) { - // batch size is set as 1000 in reading - std::array seeks; - seeks.fill(0); - const std::array expectedBatchSize{ - 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000}; - dwio::common::ReaderOptions readerOpts{pool()}; - // Explicitly disable so IO takes some time - readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); - RowReaderOptions rowReaderOpts; - rowReaderOpts.select(std::make_shared(getFlatmapSchema())); - auto reader = DwrfReader::create( - createFileBufferedInput(getFMLargeFile(), readerOpts.getMemoryPool()), - readerOpts); - auto rowReaderOwner = reader->createRowReader(rowReaderOpts); - auto rowReader = dynamic_cast(rowReaderOwner.get()); - - auto units = rowReader->prefetchUnits().value(); - std::vector> prefetches; - prefetches.reserve(4); - for (int i = 0; i < 4; i++) { - prefetches.push_back(std::async(units[i].prefetch)); - } - - // Verify reads are correct - verifyFlatMapReading( - rowReader, - seeks.data(), - expectedBatchSize.data(), - expectedBatchSize.size()); -} - -TEST_F(TestRowReaderPrefetch, prefetchWithCachedIndexStream) { - dwio::common::ReaderOptions readerOpts{pool()}; - readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); - RowReaderOptions rowReaderOpts; - - std::shared_ptr requestedType = std::dynamic_pointer_cast< - const RowType>(HiveTypeParser().parse( - "struct")); - rowReaderOpts.select(std::make_shared(requestedType)); - rowReaderOpts.setEagerFirstStripeLoad(false); - - auto reader = DwrfReader::create( - createFileBufferedInput( - getExampleFilePath("dict_encoded_strings.orc"), - readerOpts.getMemoryPool()), - readerOpts); - auto rowReaderOwner = reader->createRowReader(rowReaderOpts); - auto rowReader = dynamic_cast(rowReaderOwner.get()); - - auto units = rowReader->prefetchUnits().value(); - std::vector prefetches; - - prefetches.reserve(1); - for (int i = 0; i < 3; i++) { - prefetches.emplace_back(units[i].prefetch()); - } - - for (auto& fetchResult : prefetches) { - ASSERT_EQ(DwrfRowReader::FetchResult::kFetched, fetchResult); - } - verifyCachedIndexStreamReads(rowReader, 0, 3); -} - -struct ByStripeInfo { - uint64_t offset; - uint64_t length; - uint32_t firstStripe; - uint32_t pastLastStripe; - - ByStripeInfo( - uint64_t offset, - uint64_t length, - uint32_t firstStripe, - uint32_t pastLastStripe) - : offset(offset), - length(length), - firstStripe(firstStripe), - pastLastStripe(pastLastStripe) {} -}; - -class TestRowReaderPrefetchByStripe : public TestWithParam, - public VectorTestBase { - protected: - static void SetUpTestCase() { - memory::MemoryManager::testingSetInstance({}); - } -}; - -// This test ensures that we only return the prefetch units for the stripes that -// we'll actually use, according to the range passed to the row reader. We -// don't need to be able to prefetch stripes that we won't use. That would -// confuse us, since we'd have to calculate, outside of the reader, which -// stripes we need to prefetch. -TEST_P(TestRowReaderPrefetchByStripe, prefetchWithCachedIndexStream) { - auto opt = GetParam(); - dwio::common::ReaderOptions readerOpts{pool()}; - readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); - RowReaderOptions rowReaderOpts; - rowReaderOpts.range(opt.offset, opt.length); - - std::shared_ptr requestedType = std::dynamic_pointer_cast< - const RowType>(HiveTypeParser().parse( - "struct")); - rowReaderOpts.select(std::make_shared(requestedType)); - rowReaderOpts.setEagerFirstStripeLoad(false); - - auto reader = DwrfReader::create( - createFileBufferedInput( - getExampleFilePath("dict_encoded_strings.orc"), - readerOpts.getMemoryPool()), - readerOpts); - auto rowReaderOwner = reader->createRowReader(rowReaderOpts); - auto rowReader = dynamic_cast(rowReaderOwner.get()); - - auto units = rowReader->prefetchUnits().value(); - EXPECT_EQ(units.size(), (opt.pastLastStripe - opt.firstStripe)); - std::vector prefetches; - prefetches.reserve(opt.pastLastStripe - opt.firstStripe); - - for (auto& unit : units) { - prefetches.emplace_back(unit.prefetch()); - } - - ASSERT_EQ(prefetches.size(), opt.pastLastStripe - opt.firstStripe); - - for (auto& fetchResult : prefetches) { - ASSERT_EQ(DwrfRowReader::FetchResult::kFetched, fetchResult); - } - verifyCachedIndexStreamReads(rowReader, opt.firstStripe, opt.pastLastStripe); -} - -// Stripe | offset | length | rows -// 0 | 3 | 583 | 3 -// 1 | 586 | 508 | 100 -// 2 | 1094 | ? (1+) | 100 -VELOX_INSTANTIATE_TEST_SUITE_P( - TestRowReaderPrefetchByStripeSuite, - TestRowReaderPrefetchByStripe, - ValuesIn({ - ByStripeInfo(3, 1, 0, 1), // Stripes: 0 - ByStripeInfo(586, 1, 1, 2), // Stripes: 1 - ByStripeInfo(1094, 1, 2, 3), // Stripes: 2 - ByStripeInfo(3, 584, 0, 2), // Stripes: 0, 1 - ByStripeInfo(586, 509, 1, 3), // Stripes: 1, 2 - ByStripeInfo(3, 1092, 0, 3) // Stripes: 0, 1, 2 - })); - -// This test just verifies read correctness with the eager first stripe load -// config off for regression purposes. It does not ensure the first stripe is -// not loaded before we explicitly prefetch or start reading. -TEST_F(TestRowReaderPrefetch, testNoEagerFirstStripeLoad) { - // batch size is set as 1000 in reading - std::array seeks; - seeks.fill(0); - const std::array expectedBatchSize{300, 300, 300, 100}; - dwio::common::ReaderOptions readerOpts{pool()}; - RowReaderOptions rowReaderOpts; - - // If we ever change default to false, let us fail this test so we can - // change tests in this file accordingly. - ASSERT_TRUE(rowReaderOpts.getEagerFirstStripeLoad()); - rowReaderOpts.setEagerFirstStripeLoad(false); - rowReaderOpts.select(std::make_shared(getFlatmapSchema())); - auto reader = DwrfReader::create( - createFileBufferedInput(getFMSmallFile(), readerOpts.getMemoryPool()), - readerOpts); - auto rowReaderOwner = reader->createRowReader(rowReaderOpts); - auto rowReader = dynamic_cast(rowReaderOwner.get()); - - // Verify reads are correct - verifyFlatMapReading( - rowReader, - seeks.data(), - expectedBatchSize.data(), - expectedBatchSize.size()); -} - -// Other tests use default of eager loading, and test first stripe is -// preloaded after DwrfRowReader::create. This tests the case where eager -// loading is set to false. -TEST_F(TestRowReaderPrefetch, testFirstStripeNotLoadedWithEagerLoadingOff) { - // batch size is set as 1000 in reading - std::array seeks; - seeks.fill(0); - dwio::common::ReaderOptions readerOpts{pool()}; - RowReaderOptions rowReaderOpts; - rowReaderOpts.setEagerFirstStripeLoad(false); - rowReaderOpts.select(std::make_shared(getFlatmapSchema())); - auto reader = DwrfReader::create( - createFileBufferedInput(getFMSmallFile(), readerOpts.getMemoryPool()), - readerOpts); - auto rowReaderOwner = reader->createRowReader(rowReaderOpts); - auto rowReader = dynamic_cast(rowReaderOwner.get()); - - auto units = rowReader->prefetchUnits().value(); - ASSERT_EQ(units[0].prefetch(), DwrfRowReader::FetchResult::kFetched); -} - -// PrefetchUnits should return empty -TEST_F(TestRowReaderPrefetch, testEmptyRowRange) { - dwio::common::ReaderOptions readerOpts{pool()}; - RowReaderOptions rowReaderOpts; - // Set empty range in rowreader options - rowReaderOpts.range(0, 0); - auto reader = DwrfReader::create( - createFileBufferedInput(getFMSmallFile(), readerOpts.getMemoryPool()), - readerOpts); - auto rowReaderOwner = reader->createRowReader(rowReaderOpts); - auto rowReader = dynamic_cast(rowReaderOwner.get()); - auto units = rowReader->prefetchUnits().value(); - ASSERT_EQ(0, units.size()); -} - class TestFlatMapReaderFlatLayout : public TestWithParam>, public VectorTestBase { @@ -1147,7 +717,7 @@ TEST_F(TestReader, testBlockedIoCallbackFiredBlocking) { EXPECT_GE(metricToIncrement, 0); } -TEST_F(TestReader, testBlockedIoCallbackFiredNonBlocking) { +TEST_F(TestReader, DISABLED_testBlockedIoCallbackFiredNonBlocking) { RowReaderOptions rowReaderOpts; std::optional metricToIncrement; @@ -1191,7 +761,7 @@ TEST_F(TestReader, testBlockedIoCallbackFiredNonBlocking) { EXPECT_EQ(metricToIncrement, 0); } -TEST_F(TestReader, testBlockedIoCallbackFiredWithFirstStripeLoad) { +TEST_F(TestReader, DISABLED_testBlockedIoCallbackFiredWithFirstStripeLoad) { RowReaderOptions rowReaderOpts; std::optional metricToIncrement; @@ -1214,7 +784,7 @@ TEST_F(TestReader, testBlockedIoCallbackFiredWithFirstStripeLoad) { EXPECT_EQ(metricToIncrement, std::nullopt); auto rowReader = reader->createRowReader(rowReaderOpts); // Expect metric has now been populated, due to the initial blocking IO of - // startNextStripe() + // loadCurrentStripe() EXPECT_GE(metricToIncrement, 0); auto metricAfterFirstStripe = metricToIncrement; VectorPtr batch;