Skip to content

Commit

Permalink
add opaque type support to hive type serde (facebookincubator#11253)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#11253

My understanding of opaque types in Velox is that Velox doesn't know about the underlying type of it, and treats them as a `shared_ptr<void>`. For serializing data across processes, we need to somewhat break that assumption, because when we need to know how to deserialize this opaque data.

One option is to have the underlying type as part of the serialized type signature, the other is to store this information with the serialized data itself. I'm adopting the first option here.

We also need to introduce a layer of abstraction for opaque type index, by allowing aliasing opaque types.

The reason we can't use opaque type index is the assumption that they're not stable across processes. So if you serialize a opaque type as string in process A and then deserialize in process B, even if running the same revision there's no guarantee the type ID is the same.

With this change, callers are required to register an alias for opaque types before serializing/deserializing it via `HiveTypeSerializer` and `HiveTypeParser`. I put this registry in `Type.h` but if we want to keep this specific to `HiveTypeSerializer/HiveTypeParser` we could move it elsewhere.

Reviewed By: pedroerp

Differential Revision: D64358220

fbshipit-source-id: fb702e4366592c2f0e84c8ea11b7a2a9f5176854
  • Loading branch information
Guilherme Kunigami authored and facebook-github-bot committed Oct 31, 2024
1 parent 0dc6eb8 commit 5cbba09
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 10 deletions.
30 changes: 30 additions & 0 deletions velox/type/Type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,18 @@ typeFactories() {

} // namespace

std::unordered_map<std::string, std::type_index>& getTypeIndexByOpaqueAlias() {
static std::unordered_map<std::string, std::type_index>
typeIndexByOpaqueAlias;
return typeIndexByOpaqueAlias;
}

std::unordered_map<std::type_index, std::string>& getOpaqueAliasByTypeIndex() {
static std::unordered_map<std::type_index, std::string>
opaqueAliasByTypeIndexMap;
return opaqueAliasByTypeIndexMap;
}

bool registerCustomType(
const std::string& name,
std::unique_ptr<const CustomTypeFactories> factories) {
Expand Down Expand Up @@ -1206,4 +1218,22 @@ TypePtr getType(
return getCustomType(name);
}

std::type_index getTypeIdForOpaqueTypeAlias(const std::string& name) {
auto it = getTypeIndexByOpaqueAlias().find(name);
VELOX_CHECK(
it != getTypeIndexByOpaqueAlias().end(),
"Could not find type '{}'. Did you call registerOpaqueType?",
name);
return it->second;
}

std::string getOpaqueAliasForTypeId(std::type_index typeIndex) {
auto it = getOpaqueAliasByTypeIndex().find(typeIndex);
VELOX_CHECK(
it != getOpaqueAliasByTypeIndex().end(),
"Could not find type index '{}'. Did you call registerOpaqueType?",
typeIndex.name());
return it->second;
}

} // namespace facebook::velox
28 changes: 28 additions & 0 deletions velox/type/Type.h
Original file line number Diff line number Diff line change
Expand Up @@ -2009,6 +2009,34 @@ bool registerCustomType(
const std::string& name,
std::unique_ptr<const CustomTypeFactories> factories);

// See registerOpaqueType() for documentation on type index and opaque type
// alias.
std::unordered_map<std::string, std::type_index>& getTypeIndexByOpaqueAlias();

// Reverse of getTypeIndexByOpaqueAlias() when we need to look up the opaque
// alias by its type index.
std::unordered_map<std::type_index, std::string>& getOpaqueAliasByTypeIndex();

std::type_index getTypeIdForOpaqueTypeAlias(const std::string& name);

std::string getOpaqueAliasForTypeId(std::type_index typeIndex);

/// OpaqueType represents a type that is not part of the Velox type system.
/// To identify the underlying type we use std::type_index which is stable
/// within the same process. However, it is not necessarily stable across
/// processes.
///
/// So if we were to serialize an opaque type using its std::type_index, we
/// might not be able to deserialize it in another process. To solve this
/// problem, we require that both the serializing and deserializing processes
/// register the opaque type using registerOpaqueType() with the same alias.
template <typename Class>
bool registerOpaqueType(const std::string& alias) {
auto typeIndex = std::type_index(typeid(Class));
return getTypeIndexByOpaqueAlias().emplace(alias, typeIndex).second &&
getOpaqueAliasByTypeIndex().emplace(typeIndex, alias).second;
}

/// Return true if a custom type with the specified name exists.
bool customTypeExists(const std::string& name);

Expand Down
34 changes: 26 additions & 8 deletions velox/type/fbhive/HiveTypeParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ HiveTypeParser::HiveTypeParser() {
setupMetadata<TokenType::Binary, TypeKind::VARBINARY>(
{"binary", "varbinary"});
setupMetadata<TokenType::Timestamp, TypeKind::TIMESTAMP>("timestamp");
setupMetadata<TokenType::Opaque, TypeKind::OPAQUE>("opaque");
setupMetadata<TokenType::List, TypeKind::ARRAY>("array");
setupMetadata<TokenType::Map, TypeKind::MAP>("map");
setupMetadata<TokenType::Struct, TypeKind::ROW>({"struct", "row"});
Expand Down Expand Up @@ -87,7 +88,16 @@ std::shared_ptr<const Type> HiveTypeParser::parse(const std::string& ser) {
Result HiveTypeParser::parseType() {
Token nt = nextToken();
VELOX_CHECK(!nt.isEOS(), "Unexpected end of stream parsing type!!!");
if (nt.isValidType() && nt.isPrimitiveType()) {

if (!nt.isValidType()) {
VELOX_FAIL(
"Unexpected token {} at {}. typeKind = {}",
nt.value.toString(),
remaining_.toString(),
nt.typeKind());
}

if (nt.isPrimitiveType()) {
if (nt.metadata->tokenString[0] == "decimal") {
eatToken(TokenType::LeftRoundBracket);
Token precision = nextToken();
Expand Down Expand Up @@ -118,7 +128,16 @@ Result HiveTypeParser::parseType() {
eatToken(TokenType::RightRoundBracket);
}
return Result{scalarType};
} else if (nt.isValidType()) {
} else if (nt.isOpaqueType()) {
eatToken(TokenType::StartSubType);
folly::StringPiece innerTypeName =
eatToken(TokenType::Identifier, true).value;
eatToken(TokenType::EndSubType);

auto typeIndex = getTypeIdForOpaqueTypeAlias(innerTypeName.str());
auto instance = std::make_shared<const OpaqueType>(typeIndex);
return Result{instance};
} else {
ResultList resultList = parseTypeList(TypeKind::ROW == nt.typeKind());
switch (nt.typeKind()) {
case velox::TypeKind::ROW:
Expand All @@ -138,13 +157,8 @@ Result HiveTypeParser::parseType() {
return Result{velox::ARRAY(resultList.typelist.at(0))};
}
default:
VELOX_FAIL("unsupported kind: " + std::to_string((int)nt.typeKind()));
VELOX_FAIL("unsupported kind: " + mapTypeKindToName(nt.typeKind()));
}
} else {
VELOX_FAIL(fmt::format(
"Unexpected token {} at {}",
nt.value.toString(),
remaining_.toString()));
}
}

Expand Down Expand Up @@ -250,6 +264,10 @@ bool Token::isEOS() const {
return metadata->tokenType == TokenType::EndOfStream;
}

bool Token::isOpaqueType() const {
return metadata->tokenType == TokenType::Opaque;
}

int8_t HiveTypeParser::makeTokenId(TokenType tokenType) const {
return static_cast<int8_t>(tokenType);
}
Expand Down
3 changes: 3 additions & 0 deletions velox/type/fbhive/HiveTypeParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum class TokenType {
String,
Binary,
Timestamp,
Opaque,
List,
Map,
Struct,
Expand Down Expand Up @@ -88,6 +89,8 @@ struct Token {
bool isValidType() const;

bool isEOS() const;

bool isOpaqueType() const;
};

struct TokenAndRemaining : public Token {
Expand Down
4 changes: 4 additions & 0 deletions velox/type/fbhive/HiveTypeSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ std::string HiveTypeSerializer::visit(const Type& type) const {
return "map<" + visitChildren(type.asMap()) + ">";
case TypeKind::ROW:
return "struct<" + visitChildren(type.asRow()) + ">";
case TypeKind::OPAQUE: {
auto typeAlias = getOpaqueAliasForTypeId(type.asOpaque().typeIndex());
return "opaque<" + typeAlias + ">";
}
default:
VELOX_UNSUPPORTED("unsupported type: " + type.toString());
}
Expand Down
17 changes: 17 additions & 0 deletions velox/type/fbhive/tests/HiveTypeParserTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,21 @@ TEST(FbHive, parseSpecialChar) {
ASSERT_EQ(t->toString(), "ROW<\"a$_#\":INTEGER>");
}

struct Foo {};
TEST(FbHive, parseOpaque) {
// Use a custom name to highlight this is just an alias.
registerOpaqueType<Foo>("bar");
HiveTypeParser parser;
auto t = parser.parse("opaque<bar>");
ASSERT_EQ(t->toString(), "OPAQUE<facebook::velox::type::fbhive::Foo>");
}

TEST(FbHive, parseUnregisteredOpaque) {
// Use a custom name to highlight this is just an alias.
registerOpaqueType<Foo>("bar");
HiveTypeParser parser;
VELOX_ASSERT_THROW(
parser.parse("opaque<Foo>"),
"Could not find type 'Foo'. Did you call registerOpaqueType?");
}
} // namespace facebook::velox::type::fbhive
24 changes: 22 additions & 2 deletions velox/type/fbhive/tests/HiveTypeSerializerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,29 @@ TEST(HiveTypeSerializer, primitive) {
EXPECT_EQ(result, "bigint");
}

struct Foo {};
TEST(HiveTypeSerializer, opaque) {
std::shared_ptr<const velox::Type> type = velox::OPAQUE<bool>();
// Use a custom name to highlight this is just an alias.
registerOpaqueType<Foo>("bar");

std::shared_ptr<const Type> type = OPAQUE<Foo>();
auto result = HiveTypeSerializer::serialize(type);
EXPECT_EQ(result, "opaque<bar>");
}

TEST(HiveTypeSerializer, unregisteredOpaque) {
// did not call registerOpaqueType<Foo>("Foo")
std::shared_ptr<const Type> type = OPAQUE<Foo>();
VELOX_ASSERT_THROW(
HiveTypeSerializer::serialize(type),
fmt::format(
"Could not find type index '{}'. Did you call registerOpaqueType?",
type->asOpaque().typeIndex().name()));
}

TEST(HiveTypeSerializer, unsupported) {
std::shared_ptr<const Type> type = UNKNOWN();
VELOX_ASSERT_THROW(
HiveTypeSerializer::serialize(type), "unsupported type: OPAQUE<bool>");
HiveTypeSerializer::serialize(type), "unsupported type: UNKNOWN");
}
} // namespace facebook::velox::type::fbhive

0 comments on commit 5cbba09

Please sign in to comment.