Skip to content

Commit 407f8f2

Browse files
committed
feat(iceberg): Serialize IcebergPartitionSpec
1 parent 3a2750c commit 407f8f2

File tree

5 files changed

+373
-2
lines changed

5 files changed

+373
-2
lines changed

velox/connectors/lakehouse/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ velox_add_library(
2121
IcebergSplitReader.cpp
2222
IcebergSplit.cpp
2323
IcebergTableHandle.cpp
24+
PartitionSpec.cpp
2425
PositionalDeleteFileReader.cpp
2526
IcebergDataSink.cpp)
2627

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/lakehouse/iceberg/PartitionSpec.h"
18+
19+
namespace facebook::velox::connector::lakehouse::iceberg {
20+
21+
namespace {
22+
std::string transformTypeToString(TransformType type) {
23+
switch (type) {
24+
case TransformType::kIdentity:
25+
return "identity";
26+
case TransformType::kHour:
27+
return "hour";
28+
case TransformType::kDay:
29+
return "day";
30+
case TransformType::kMonth:
31+
return "month";
32+
case TransformType::kYear:
33+
return "year";
34+
case TransformType::kBucket:
35+
return "bucket";
36+
case TransformType::kTruncate:
37+
return "truncate";
38+
}
39+
VELOX_UNREACHABLE("Unknown TransformType");
40+
}
41+
42+
TransformType transformTypeFromString(const std::string& str) {
43+
if (str == "identity") {
44+
return TransformType::kIdentity;
45+
} else if (str == "hour") {
46+
return TransformType::kHour;
47+
} else if (str == "day") {
48+
return TransformType::kDay;
49+
} else if (str == "month") {
50+
return TransformType::kMonth;
51+
} else if (str == "year") {
52+
return TransformType::kYear;
53+
} else if (str == "bucket") {
54+
return TransformType::kBucket;
55+
} else if (str == "truncate") {
56+
return TransformType::kTruncate;
57+
} else {
58+
VELOX_USER_FAIL("Unknown TransformType: {}", str);
59+
}
60+
}
61+
} // anonymous namespace
62+
63+
folly::dynamic IcebergPartitionSpec::Field::serialize() const {
64+
folly::dynamic obj = folly::dynamic::object;
65+
obj["name"] = "Field";
66+
obj["fieldName"] = name;
67+
obj["transformType"] = transformTypeToString(transformType);
68+
if (parameter.has_value()) {
69+
obj["parameter"] = parameter.value();
70+
} else {
71+
obj["parameter"] = nullptr;
72+
}
73+
return obj;
74+
}
75+
76+
std::shared_ptr<const ISerializable> IcebergPartitionSpec::Field::create(
77+
const folly::dynamic& obj,
78+
void* context) {
79+
VELOX_CHECK(obj.isObject(), "Field::create expects object");
80+
81+
const auto* fieldNamePtr = obj.get_ptr("fieldName");
82+
VELOX_CHECK(fieldNamePtr, "Field::create: missing 'fieldName'");
83+
VELOX_CHECK(
84+
fieldNamePtr->isString(), "Field::create: 'fieldName' must be string");
85+
auto fieldName = fieldNamePtr->asString();
86+
87+
const auto* transformTypePtr = obj.get_ptr("transformType");
88+
VELOX_CHECK(transformTypePtr, "Field::create: missing 'transformType'");
89+
VELOX_CHECK(
90+
transformTypePtr->isString(),
91+
"Field::create: 'transformType' must be string");
92+
auto transformType = transformTypeFromString(transformTypePtr->asString());
93+
94+
std::optional<int32_t> parameter = std::nullopt;
95+
const auto* parameterPtr = obj.get_ptr("parameter");
96+
if (parameterPtr && !parameterPtr->isNull()) {
97+
VELOX_CHECK(
98+
parameterPtr->isInt(),
99+
"Field::create: 'parameter' must be integer if present");
100+
parameter = static_cast<int32_t>(parameterPtr->asInt());
101+
}
102+
103+
return std::make_shared<const Field>(fieldName, transformType, parameter);
104+
}
105+
106+
void IcebergPartitionSpec::Field::registerSerDe() {
107+
auto& registry = DeserializationWithContextRegistryForSharedPtr();
108+
registry.Register("Field", Field::create);
109+
}
110+
111+
// IcebergPartitionSpec implementation
112+
113+
folly::dynamic IcebergPartitionSpec::serialize() const {
114+
folly::dynamic obj = folly::dynamic::object;
115+
obj["name"] = "IcebergPartitionSpec";
116+
obj["specId"] = specId;
117+
118+
folly::dynamic fieldsArray = folly::dynamic::array;
119+
fieldsArray.reserve(fields.size());
120+
for (const auto& field : fields) {
121+
fieldsArray.push_back(field.serialize());
122+
}
123+
obj["fields"] = std::move(fieldsArray);
124+
return obj;
125+
}
126+
127+
std::shared_ptr<const ISerializable> IcebergPartitionSpec::create(
128+
const folly::dynamic& obj,
129+
void* context) {
130+
VELOX_CHECK(obj.isObject(), "IcebergPartitionSpec::create expects object");
131+
132+
const auto* specIdPtr = obj.get_ptr("specId");
133+
VELOX_CHECK(specIdPtr, "IcebergPartitionSpec::create: missing 'specId'");
134+
VELOX_CHECK(
135+
specIdPtr->isInt(),
136+
"IcebergPartitionSpec::create: 'specId' must be integer");
137+
auto specId = static_cast<int32_t>(specIdPtr->asInt());
138+
139+
const auto* fieldsPtr = obj.get_ptr("fields");
140+
VELOX_CHECK(fieldsPtr, "IcebergPartitionSpec::create: missing 'fields'");
141+
VELOX_CHECK(
142+
fieldsPtr->isArray(),
143+
"IcebergPartitionSpec::create: 'fields' must be array");
144+
145+
std::vector<Field> deserializedFields;
146+
deserializedFields.reserve(fieldsPtr->size());
147+
for (const auto& fieldObj : *fieldsPtr) {
148+
auto fieldPtr = Field::create(fieldObj, context);
149+
auto field = std::static_pointer_cast<const Field>(fieldPtr);
150+
deserializedFields.push_back(*field);
151+
}
152+
153+
return std::make_shared<const IcebergPartitionSpec>(
154+
specId, std::move(deserializedFields));
155+
}
156+
157+
void IcebergPartitionSpec::registerSerDe() {
158+
Field::registerSerDe();
159+
auto& registry = DeserializationWithContextRegistryForSharedPtr();
160+
registry.Register("IcebergPartitionSpec", IcebergPartitionSpec::create);
161+
}
162+
163+
} // namespace facebook::velox::connector::lakehouse::iceberg

velox/connectors/lakehouse/iceberg/PartitionSpec.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <optional>
2020
#include <string>
2121
#include <vector>
22+
#include "velox/common/serialization/Serializable.h"
2223

2324
namespace facebook::velox::connector::lakehouse::iceberg {
2425

@@ -32,8 +33,8 @@ enum class TransformType {
3233
kTruncate
3334
};
3435

35-
struct IcebergPartitionSpec {
36-
struct Field {
36+
struct IcebergPartitionSpec : public ISerializable {
37+
struct Field : public ISerializable {
3738
// The column name and type of this partition field as it appears in the
3839
// partition spec. The column can be a nested column in struct field.
3940
std::string name;
@@ -51,13 +52,29 @@ struct IcebergPartitionSpec {
5152
TransformType _transform,
5253
std::optional<int32_t> _parameter)
5354
: name(_name), transformType(_transform), parameter(_parameter) {}
55+
56+
folly::dynamic serialize() const override;
57+
58+
static std::shared_ptr<const ISerializable> create(
59+
const folly::dynamic& obj,
60+
void* context);
61+
62+
static void registerSerDe();
5463
};
5564

5665
const int32_t specId;
5766
const std::vector<Field> fields;
5867

5968
IcebergPartitionSpec(int32_t _specId, const std::vector<Field>& _fields)
6069
: specId(_specId), fields(_fields) {}
70+
71+
folly::dynamic serialize() const override;
72+
73+
static std::shared_ptr<const ISerializable> create(
74+
const folly::dynamic& obj,
75+
void* context);
76+
77+
static void registerSerDe();
6178
};
6279

6380
} // namespace facebook::velox::connector::lakehouse::iceberg

velox/connectors/lakehouse/iceberg/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ if(VELOX_BUILD_TESTING AND (NOT VELOX_DISABLE_GOOGLETEST))
4646
IcebergSplitReaderBenchmarkTest.cpp
4747
IcebergTableHandleTest.cpp
4848
IcebergTestBase.cpp
49+
PartitionSpecTest.cpp
4950
Main.cpp)
5051
add_test(velox_lakehouse_iceberg_test velox_lakehouse_iceberg_test)
5152

0 commit comments

Comments
 (0)