From 5cbba09794d894f1a0da9f2c69d9453ba5f53dda Mon Sep 17 00:00:00 2001 From: Guilherme Kunigami Date: Wed, 30 Oct 2024 17:27:10 -0700 Subject: [PATCH] add opaque type support to hive type serde (#11253) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11253 My understanding of opaque types in Velox is that Velox doesn't know about the underlying type of it, and treats them as a `shared_ptr`. For serializing data across processes, we need to somewhat break that assumption, because when we need to know how to deserialize this opaque data. One option is to have the underlying type as part of the serialized type signature, the other is to store this information with the serialized data itself. I'm adopting the first option here. We also need to introduce a layer of abstraction for opaque type index, by allowing aliasing opaque types. The reason we can't use opaque type index is the assumption that they're not stable across processes. So if you serialize a opaque type as string in process A and then deserialize in process B, even if running the same revision there's no guarantee the type ID is the same. With this change, callers are required to register an alias for opaque types before serializing/deserializing it via `HiveTypeSerializer` and `HiveTypeParser`. I put this registry in `Type.h` but if we want to keep this specific to `HiveTypeSerializer/HiveTypeParser` we could move it elsewhere. Reviewed By: pedroerp Differential Revision: D64358220 fbshipit-source-id: fb702e4366592c2f0e84c8ea11b7a2a9f5176854 --- velox/type/Type.cpp | 30 ++++++++++++++++ velox/type/Type.h | 28 +++++++++++++++ velox/type/fbhive/HiveTypeParser.cpp | 34 ++++++++++++++----- velox/type/fbhive/HiveTypeParser.h | 3 ++ velox/type/fbhive/HiveTypeSerializer.cpp | 4 +++ .../type/fbhive/tests/HiveTypeParserTests.cpp | 17 ++++++++++ .../fbhive/tests/HiveTypeSerializerTests.cpp | 24 +++++++++++-- 7 files changed, 130 insertions(+), 10 deletions(-) diff --git a/velox/type/Type.cpp b/velox/type/Type.cpp index 9f4f43c93870..fe1cf1f47ff5 100644 --- a/velox/type/Type.cpp +++ b/velox/type/Type.cpp @@ -885,6 +885,18 @@ typeFactories() { } // namespace +std::unordered_map& getTypeIndexByOpaqueAlias() { + static std::unordered_map + typeIndexByOpaqueAlias; + return typeIndexByOpaqueAlias; +} + +std::unordered_map& getOpaqueAliasByTypeIndex() { + static std::unordered_map + opaqueAliasByTypeIndexMap; + return opaqueAliasByTypeIndexMap; +} + bool registerCustomType( const std::string& name, std::unique_ptr factories) { @@ -1206,4 +1218,22 @@ TypePtr getType( return getCustomType(name); } +std::type_index getTypeIdForOpaqueTypeAlias(const std::string& name) { + auto it = getTypeIndexByOpaqueAlias().find(name); + VELOX_CHECK( + it != getTypeIndexByOpaqueAlias().end(), + "Could not find type '{}'. Did you call registerOpaqueType?", + name); + return it->second; +} + +std::string getOpaqueAliasForTypeId(std::type_index typeIndex) { + auto it = getOpaqueAliasByTypeIndex().find(typeIndex); + VELOX_CHECK( + it != getOpaqueAliasByTypeIndex().end(), + "Could not find type index '{}'. Did you call registerOpaqueType?", + typeIndex.name()); + return it->second; +} + } // namespace facebook::velox diff --git a/velox/type/Type.h b/velox/type/Type.h index 878ca2512851..771feb295963 100644 --- a/velox/type/Type.h +++ b/velox/type/Type.h @@ -2009,6 +2009,34 @@ bool registerCustomType( const std::string& name, std::unique_ptr factories); +// See registerOpaqueType() for documentation on type index and opaque type +// alias. +std::unordered_map& getTypeIndexByOpaqueAlias(); + +// Reverse of getTypeIndexByOpaqueAlias() when we need to look up the opaque +// alias by its type index. +std::unordered_map& getOpaqueAliasByTypeIndex(); + +std::type_index getTypeIdForOpaqueTypeAlias(const std::string& name); + +std::string getOpaqueAliasForTypeId(std::type_index typeIndex); + +/// OpaqueType represents a type that is not part of the Velox type system. +/// To identify the underlying type we use std::type_index which is stable +/// within the same process. However, it is not necessarily stable across +/// processes. +/// +/// So if we were to serialize an opaque type using its std::type_index, we +/// might not be able to deserialize it in another process. To solve this +/// problem, we require that both the serializing and deserializing processes +/// register the opaque type using registerOpaqueType() with the same alias. +template +bool registerOpaqueType(const std::string& alias) { + auto typeIndex = std::type_index(typeid(Class)); + return getTypeIndexByOpaqueAlias().emplace(alias, typeIndex).second && + getOpaqueAliasByTypeIndex().emplace(typeIndex, alias).second; +} + /// Return true if a custom type with the specified name exists. bool customTypeExists(const std::string& name); diff --git a/velox/type/fbhive/HiveTypeParser.cpp b/velox/type/fbhive/HiveTypeParser.cpp index 500c718093f8..0ab9e4007123 100644 --- a/velox/type/fbhive/HiveTypeParser.cpp +++ b/velox/type/fbhive/HiveTypeParser.cpp @@ -58,6 +58,7 @@ HiveTypeParser::HiveTypeParser() { setupMetadata( {"binary", "varbinary"}); setupMetadata("timestamp"); + setupMetadata("opaque"); setupMetadata("array"); setupMetadata("map"); setupMetadata({"struct", "row"}); @@ -87,7 +88,16 @@ std::shared_ptr HiveTypeParser::parse(const std::string& ser) { Result HiveTypeParser::parseType() { Token nt = nextToken(); VELOX_CHECK(!nt.isEOS(), "Unexpected end of stream parsing type!!!"); - if (nt.isValidType() && nt.isPrimitiveType()) { + + if (!nt.isValidType()) { + VELOX_FAIL( + "Unexpected token {} at {}. typeKind = {}", + nt.value.toString(), + remaining_.toString(), + nt.typeKind()); + } + + if (nt.isPrimitiveType()) { if (nt.metadata->tokenString[0] == "decimal") { eatToken(TokenType::LeftRoundBracket); Token precision = nextToken(); @@ -118,7 +128,16 @@ Result HiveTypeParser::parseType() { eatToken(TokenType::RightRoundBracket); } return Result{scalarType}; - } else if (nt.isValidType()) { + } else if (nt.isOpaqueType()) { + eatToken(TokenType::StartSubType); + folly::StringPiece innerTypeName = + eatToken(TokenType::Identifier, true).value; + eatToken(TokenType::EndSubType); + + auto typeIndex = getTypeIdForOpaqueTypeAlias(innerTypeName.str()); + auto instance = std::make_shared(typeIndex); + return Result{instance}; + } else { ResultList resultList = parseTypeList(TypeKind::ROW == nt.typeKind()); switch (nt.typeKind()) { case velox::TypeKind::ROW: @@ -138,13 +157,8 @@ Result HiveTypeParser::parseType() { return Result{velox::ARRAY(resultList.typelist.at(0))}; } default: - VELOX_FAIL("unsupported kind: " + std::to_string((int)nt.typeKind())); + VELOX_FAIL("unsupported kind: " + mapTypeKindToName(nt.typeKind())); } - } else { - VELOX_FAIL(fmt::format( - "Unexpected token {} at {}", - nt.value.toString(), - remaining_.toString())); } } @@ -250,6 +264,10 @@ bool Token::isEOS() const { return metadata->tokenType == TokenType::EndOfStream; } +bool Token::isOpaqueType() const { + return metadata->tokenType == TokenType::Opaque; +} + int8_t HiveTypeParser::makeTokenId(TokenType tokenType) const { return static_cast(tokenType); } diff --git a/velox/type/fbhive/HiveTypeParser.h b/velox/type/fbhive/HiveTypeParser.h index a35cea77fd3d..1469ee41502b 100644 --- a/velox/type/fbhive/HiveTypeParser.h +++ b/velox/type/fbhive/HiveTypeParser.h @@ -42,6 +42,7 @@ enum class TokenType { String, Binary, Timestamp, + Opaque, List, Map, Struct, @@ -88,6 +89,8 @@ struct Token { bool isValidType() const; bool isEOS() const; + + bool isOpaqueType() const; }; struct TokenAndRemaining : public Token { diff --git a/velox/type/fbhive/HiveTypeSerializer.cpp b/velox/type/fbhive/HiveTypeSerializer.cpp index c4b35c0b67ea..acf8e23e7624 100644 --- a/velox/type/fbhive/HiveTypeSerializer.cpp +++ b/velox/type/fbhive/HiveTypeSerializer.cpp @@ -58,6 +58,10 @@ std::string HiveTypeSerializer::visit(const Type& type) const { return "map<" + visitChildren(type.asMap()) + ">"; case TypeKind::ROW: return "struct<" + visitChildren(type.asRow()) + ">"; + case TypeKind::OPAQUE: { + auto typeAlias = getOpaqueAliasForTypeId(type.asOpaque().typeIndex()); + return "opaque<" + typeAlias + ">"; + } default: VELOX_UNSUPPORTED("unsupported type: " + type.toString()); } diff --git a/velox/type/fbhive/tests/HiveTypeParserTests.cpp b/velox/type/fbhive/tests/HiveTypeParserTests.cpp index 86fc9f3c5d7b..a658637e87b3 100644 --- a/velox/type/fbhive/tests/HiveTypeParserTests.cpp +++ b/velox/type/fbhive/tests/HiveTypeParserTests.cpp @@ -176,4 +176,21 @@ TEST(FbHive, parseSpecialChar) { ASSERT_EQ(t->toString(), "ROW<\"a$_#\":INTEGER>"); } +struct Foo {}; +TEST(FbHive, parseOpaque) { + // Use a custom name to highlight this is just an alias. + registerOpaqueType("bar"); + HiveTypeParser parser; + auto t = parser.parse("opaque"); + ASSERT_EQ(t->toString(), "OPAQUE"); +} + +TEST(FbHive, parseUnregisteredOpaque) { + // Use a custom name to highlight this is just an alias. + registerOpaqueType("bar"); + HiveTypeParser parser; + VELOX_ASSERT_THROW( + parser.parse("opaque"), + "Could not find type 'Foo'. Did you call registerOpaqueType?"); +} } // namespace facebook::velox::type::fbhive diff --git a/velox/type/fbhive/tests/HiveTypeSerializerTests.cpp b/velox/type/fbhive/tests/HiveTypeSerializerTests.cpp index cf941c2ee514..6712e61fef19 100644 --- a/velox/type/fbhive/tests/HiveTypeSerializerTests.cpp +++ b/velox/type/fbhive/tests/HiveTypeSerializerTests.cpp @@ -33,9 +33,29 @@ TEST(HiveTypeSerializer, primitive) { EXPECT_EQ(result, "bigint"); } +struct Foo {}; TEST(HiveTypeSerializer, opaque) { - std::shared_ptr type = velox::OPAQUE(); + // Use a custom name to highlight this is just an alias. + registerOpaqueType("bar"); + + std::shared_ptr type = OPAQUE(); + auto result = HiveTypeSerializer::serialize(type); + EXPECT_EQ(result, "opaque"); +} + +TEST(HiveTypeSerializer, unregisteredOpaque) { + // did not call registerOpaqueType("Foo") + std::shared_ptr type = OPAQUE(); + VELOX_ASSERT_THROW( + HiveTypeSerializer::serialize(type), + fmt::format( + "Could not find type index '{}'. Did you call registerOpaqueType?", + type->asOpaque().typeIndex().name())); +} + +TEST(HiveTypeSerializer, unsupported) { + std::shared_ptr type = UNKNOWN(); VELOX_ASSERT_THROW( - HiveTypeSerializer::serialize(type), "unsupported type: OPAQUE"); + HiveTypeSerializer::serialize(type), "unsupported type: UNKNOWN"); } } // namespace facebook::velox::type::fbhive