Skip to content

Commit 165ff8d

Browse files
committed
Copy Iceberg files to connectors/lakehouse
1 parent 6b43d48 commit 165ff8d

28 files changed

+4477
-0
lines changed

velox/connectors/lakehouse/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414

1515
add_subdirectory(storage_adapters)
1616
add_subdirectory(common)
17+
add_subdirectory(iceberg)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright (c) Facebook, Inc. and its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
velox_add_library(
16+
velox_hive_iceberg_splitreader
17+
EqualityDeleteFileReader.cpp
18+
FilterUtil.cpp
19+
IcebergDeleteFile.cpp
20+
IcebergSplitReader.cpp
21+
IcebergSplit.cpp
22+
PositionalDeleteFileReader.cpp
23+
IcebergDataSink.cpp)
24+
25+
velox_link_libraries(velox_hive_iceberg_splitreader velox_connector
26+
velox_dwio_common Folly::folly)
27+
28+
if(${VELOX_BUILD_TESTING})
29+
add_subdirectory(tests)
30+
endif()
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h"
18+
19+
#include "velox/connectors/hive/HiveConnectorUtil.h"
20+
#include "velox/connectors/hive/iceberg/FilterUtil.h"
21+
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
22+
#include "velox/core/Expressions.h"
23+
#include "velox/dwio/common/ReaderFactory.h"
24+
25+
using namespace facebook::velox::common;
26+
using namespace facebook::velox::core;
27+
using namespace facebook::velox::exec;
28+
29+
namespace facebook::velox::connector::hive::iceberg {
30+
31+
static constexpr const int kMaxBatchRows = 10'000;
32+
33+
EqualityDeleteFileReader::EqualityDeleteFileReader(
34+
const IcebergDeleteFile& deleteFile,
35+
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
36+
FileHandleFactory* fileHandleFactory,
37+
folly::Executor* executor,
38+
const ConnectorQueryCtx* connectorQueryCtx,
39+
const std::shared_ptr<const HiveConfig>& hiveConfig,
40+
const std::shared_ptr<io::IoStatistics> ioStats,
41+
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
42+
const std::string& connectorId)
43+
: deleteFile_(deleteFile),
44+
baseFileSchema_(baseFileSchema),
45+
fileHandleFactory_(fileHandleFactory),
46+
pool_(connectorQueryCtx->memoryPool()),
47+
deleteSplit_(nullptr),
48+
deleteRowReader_(nullptr) {
49+
VELOX_CHECK_EQ(deleteFile_.content, FileContent::kEqualityDeletes);
50+
51+
if (deleteFile_.recordCount == 0) {
52+
return;
53+
}
54+
55+
// TODO: push down filter if previous delete file contains this one. E.g.
56+
// previous equality delete file has a=1, and this file also contains
57+
// columns a, then a!=1 can be pushed as a filter when reading this delete
58+
// file.
59+
60+
deleteSplit_ = std::make_shared<HiveConnectorSplit>(
61+
connectorId,
62+
deleteFile_.filePath,
63+
deleteFile_.fileFormat,
64+
0,
65+
deleteFile_.fileSizeInBytes);
66+
67+
// Create the Reader and RowReader for the equality delete file
68+
69+
dwio::common::ReaderOptions deleteReaderOpts(pool_);
70+
configureReaderOptions(
71+
hiveConfig,
72+
connectorQueryCtx,
73+
nullptr,
74+
deleteSplit_,
75+
{},
76+
deleteReaderOpts);
77+
78+
const FileHandleKey fileHandleKey{
79+
.filename = deleteFile_.filePath,
80+
.tokenProvider = connectorQueryCtx->fsTokenProvider()};
81+
auto deleteFileHandleCachePtr = fileHandleFactory_->generate(fileHandleKey);
82+
auto deleteFileInput = createBufferedInput(
83+
*deleteFileHandleCachePtr,
84+
deleteReaderOpts,
85+
connectorQueryCtx,
86+
ioStats,
87+
fsStats,
88+
executor);
89+
90+
auto deleteReader =
91+
dwio::common::getReaderFactory(deleteReaderOpts.fileFormat())
92+
->createReader(std::move(deleteFileInput), deleteReaderOpts);
93+
94+
// For now, we assume only the delete columns are written in the delete file
95+
deleteFileRowType_ = deleteReader->rowType();
96+
auto scanSpec = std::make_shared<common::ScanSpec>("<root>");
97+
scanSpec->addAllChildFields(deleteFileRowType_->asRow());
98+
99+
dwio::common::RowReaderOptions deleteRowReaderOpts;
100+
configureRowReaderOptions(
101+
{},
102+
scanSpec,
103+
nullptr,
104+
deleteFileRowType_,
105+
deleteSplit_,
106+
hiveConfig,
107+
connectorQueryCtx->sessionProperties(),
108+
deleteRowReaderOpts);
109+
110+
deleteRowReader_.reset();
111+
deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts);
112+
}
113+
114+
void EqualityDeleteFileReader::readDeleteValues(
115+
SubfieldFilters& subfieldFilters,
116+
std::vector<core::TypedExprPtr>& expressionInputs) {
117+
VELOX_CHECK(deleteRowReader_);
118+
VELOX_CHECK(deleteSplit_);
119+
120+
if (!deleteValuesOutput_) {
121+
deleteValuesOutput_ = BaseVector::create(deleteFileRowType_, 0, pool_);
122+
}
123+
124+
// TODO: verfiy if the field is an Iceberg RowId. Velox currently doesn't
125+
// support pushing down filters to non-RowId types, i.e. sub-fields of Array
126+
// or Map
127+
if (deleteFileRowType_->size() == 1) {
128+
// Construct the IN list filter that can be pushed down to the base file
129+
// readers, then update the baseFileScanSpec.
130+
buildDomainFilter(subfieldFilters);
131+
} else {
132+
// Build the filter functions that will be evaluated after all base file
133+
// read is done
134+
buildFilterFunctions(expressionInputs);
135+
}
136+
137+
deleteSplit_.reset();
138+
}
139+
140+
void EqualityDeleteFileReader::buildDomainFilter(
141+
SubfieldFilters& subfieldFilters) {
142+
std::unique_ptr<Filter> filter = std::make_unique<AlwaysTrue>();
143+
auto name = baseFileSchema_->childByFieldId(deleteFile_.equalityFieldIds[0])
144+
->fullName();
145+
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
146+
if (deleteValuesOutput_->size() == 0) {
147+
continue;
148+
}
149+
150+
deleteValuesOutput_->loadedVector();
151+
auto vector =
152+
std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_)->childAt(0);
153+
154+
auto typeKind = vector->type()->kind();
155+
VELOX_CHECK(
156+
typeKind != TypeKind::DOUBLE && typeKind != TypeKind::REAL,
157+
"Iceberg does not allow DOUBLE or REAL columns as the equality delete columns: {} : {}",
158+
name,
159+
typeKind);
160+
161+
auto notExistsFilter =
162+
createNotInFilter(vector, 0, deleteValuesOutput_->size(), typeKind);
163+
filter = filter->mergeWith(notExistsFilter.get());
164+
}
165+
166+
if (filter->kind() != FilterKind::kAlwaysTrue) {
167+
if (subfieldFilters.find(common::Subfield(name)) != subfieldFilters.end()) {
168+
subfieldFilters[common::Subfield(name)] =
169+
subfieldFilters[common::Subfield(name)]->mergeWith(filter.get());
170+
} else {
171+
subfieldFilters[common::Subfield(name)] = std::move(filter);
172+
}
173+
}
174+
}
175+
176+
void EqualityDeleteFileReader::buildFilterFunctions(
177+
std::vector<core::TypedExprPtr>& expressionInputs) {
178+
auto numDeleteFields = deleteFileRowType_->size();
179+
VELOX_CHECK_GT(
180+
numDeleteFields,
181+
0,
182+
"Iceberg equality delete file should have at least one field.");
183+
184+
// TODO: logical expression simplifications
185+
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
186+
if (deleteValuesOutput_->size() == 0) {
187+
continue;
188+
}
189+
190+
deleteValuesOutput_->loadedVector();
191+
auto rowVector = std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_);
192+
auto numDeletedValues = rowVector->childAt(0)->size();
193+
194+
for (int i = 0; i < numDeletedValues; i++) {
195+
std::vector<core::TypedExprPtr> disconjunctInputs;
196+
197+
for (int j = 0; j < numDeleteFields; j++) {
198+
auto type = deleteFileRowType_->childAt(j);
199+
auto name =
200+
baseFileSchema_->childByFieldId(deleteFile_.equalityFieldIds[j])
201+
->fullName();
202+
auto value = BaseVector::wrapInConstant(1, i, rowVector->childAt(j));
203+
204+
std::vector<core::TypedExprPtr> isNotEqualInputs;
205+
isNotEqualInputs.push_back(
206+
std::make_shared<FieldAccessTypedExpr>(type, name));
207+
isNotEqualInputs.push_back(std::make_shared<ConstantTypedExpr>(value));
208+
// TODO: generalize this to support different engines. Currently, only
209+
// Presto "neq" is supported. Spark does not register the "neq" function
210+
// but does support "not" and "equalto" functions.
211+
auto isNotEqualExpr =
212+
std::make_shared<CallTypedExpr>(BOOLEAN(), isNotEqualInputs, "neq");
213+
214+
disconjunctInputs.push_back(isNotEqualExpr);
215+
}
216+
217+
auto disconjunctNotEqualExpr =
218+
std::make_shared<CallTypedExpr>(BOOLEAN(), disconjunctInputs, "or");
219+
expressionInputs.push_back(disconjunctNotEqualExpr);
220+
}
221+
}
222+
}
223+
224+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/connectors/Connector.h"
20+
#include "velox/connectors/hive/FileHandle.h"
21+
#include "velox/connectors/hive/HiveConfig.h"
22+
#include "velox/connectors/hive/HiveConnectorSplit.h"
23+
#include "velox/dwio/common/Reader.h"
24+
#include "velox/expression/Expr.h"
25+
26+
namespace facebook::velox::connector::hive::iceberg {
27+
28+
class IcebergDeleteFile;
29+
30+
using SubfieldFilters =
31+
std::unordered_map<common::Subfield, std::unique_ptr<common::Filter>>;
32+
33+
class EqualityDeleteFileReader {
34+
public:
35+
EqualityDeleteFileReader(
36+
const IcebergDeleteFile& deleteFile,
37+
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
38+
FileHandleFactory* fileHandleFactory,
39+
folly::Executor* executor,
40+
const ConnectorQueryCtx* connectorQueryCtx,
41+
const std::shared_ptr<const HiveConfig>& hiveConfig,
42+
const std::shared_ptr<io::IoStatistics> ioStats,
43+
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
44+
const std::string& connectorId);
45+
46+
/// Reads the delete values from the equality delete file, and interprets them
47+
/// as filters for the base file reader.
48+
///
49+
/// @subfieldFilters The built SubfieldFilter that can be pushed down to the
50+
/// base file RowReader, when the equality delete file only contains one
51+
/// single subfield of Iceberg RowId type.
52+
/// @typedExpressions The built TypedExpr that will be evaluated by the
53+
/// connector DataSource after rows are read from the base file RowReader.
54+
void readDeleteValues(
55+
SubfieldFilters& subfieldFilters,
56+
std::vector<core::TypedExprPtr>& typedExpressions);
57+
58+
private:
59+
void buildDomainFilter(SubfieldFilters& subfieldFilters);
60+
61+
void buildFilterFunctions(std::vector<core::TypedExprPtr>& expressionInputs);
62+
63+
// The equality delete file to read
64+
const IcebergDeleteFile& deleteFile_;
65+
// The schema of the base file in terms of TypeWithId tree. In addition to the
66+
// existing fields that were included in the base file ScanSpec, it also
67+
// contains the extra fields that are in the equality delete file but not
68+
// in the ScanSpec of the base file
69+
const std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema_;
70+
71+
// The cache factory of the file handles, which can be used to return the file
72+
// handle of the delete file.
73+
FileHandleFactory* const fileHandleFactory_;
74+
memory::MemoryPool* const pool_;
75+
76+
// The split of the equality delete file to be processed by the delete file
77+
// RowReader.
78+
std::shared_ptr<const HiveConnectorSplit> deleteSplit_;
79+
// The RowType of the equality delete file
80+
RowTypePtr deleteFileRowType_;
81+
// The RowReader to read the equality delete file
82+
std::unique_ptr<dwio::common::RowReader> deleteRowReader_;
83+
// The output vector to hold the delete values
84+
VectorPtr deleteValuesOutput_;
85+
};
86+
87+
} // namespace facebook::velox::connector::hive::iceberg

0 commit comments

Comments
 (0)