Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for complex union types and maps #98

Merged
merged 3 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[libraries]
assertj = "org.assertj:assertj-core:3.12.0"
avro = "org.apache.avro:avro:1.10.2"
calcite-avatica = "org.apache.calcite.avatica:avatica:1.23.0"
calcite-core = "org.apache.calcite:calcite-core:1.34.0"
calcite-server = "org.apache.calcite:calcite-server:1.34.0"
calcite-avatica = "org.apache.calcite.avatica:avatica:1.25.0"
calcite-core = "org.apache.calcite:calcite-core:1.38.0"
calcite-server = "org.apache.calcite:calcite-server:1.38.0"
commons-cli = "commons-cli:commons-cli:1.4"
flink-clients = "org.apache.flink:flink-clients:1.18.1"
flink-connector-base = "org.apache.flink:flink-connector-base:1.18.1"
Expand All @@ -19,16 +19,15 @@ flink-table-common = "org.apache.flink:flink-table-common:1.18.1"
flink-table-planner = "org.apache.flink:flink-table-planner_2.12:1.18.1"
flink-table-runtime = "org.apache.flink:flink-table-runtime:1.18.1"
gson = "com.google.code.gson:gson:2.9.0"
jackson = "com.fasterxml.jackson.core:jackson-core:2.14.1"
jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
jackson = "com.fasterxml.jackson.core:jackson-core:2.15.0"
jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0"
javax-annotation-api = "javax.annotation:javax.annotation-api:1.3.2"
junit = "junit:junit:4.12"
kafka-clients = "org.apache.kafka:kafka-clients:3.2.0"
kubernetes-client = "io.kubernetes:client-java:16.0.2"
kubernetes-extended-client = "io.kubernetes:client-java-extended:16.0.2"
kubernetes-client = "io.kubernetes:client-java:18.0.0"
kubernetes-extended-client = "io.kubernetes:client-java-extended:18.0.0"
slf4j-simple = "org.slf4j:slf4j-simple:1.7.30"
slf4j-api = "org.slf4j:slf4j-api:1.7.30"
snakeyaml = "org.yaml:snakeyaml:1.33"
sqlline = "sqlline:sqlline:1.12.0"
quidem = "net.hydromatic:quidem:0.11"
venice = "com.linkedin.venice:venice-common:0.4.376"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
switch (dataType.getSqlTypeName()) {
case INTEGER:
return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
case SMALLINT:
return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
case BIGINT:
return createAvroTypeWithNullability(Schema.Type.LONG, dataType.isNullable());
case VARCHAR:
Expand All @@ -48,10 +50,9 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
case ARRAY:
return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())),
dataType.isNullable());
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable());
case MAP:
return createAvroSchemaWithNullability(Schema.createMap(avro(null, null, dataType.getValueType())),
dataType.isNullable());
case UNKNOWN:
case NULL:
return Schema.createUnion(Schema.create(Schema.Type.NULL));
Expand Down Expand Up @@ -90,12 +91,11 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boo
RelDataType unknown = typeFactory.createUnknownType();
switch (schema.getType()) {
case RECORD:
return typeFactory.createStructType(schema.getFields()
.stream()
return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getFields().stream()
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable)))
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList()));
.collect(Collectors.toList())), nullable);
case INT:
return createRelType(typeFactory, SqlTypeName.INTEGER, nullable);
case LONG:
Expand All @@ -113,21 +113,25 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boo
case BOOLEAN:
return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable);
case ARRAY:
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1);
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable))
return typeFactory.createTypeWithNullability(
typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1), nullable);
case MAP:
return typeFactory.createTypeWithNullability(
typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)), nullable);
case UNION:
boolean isNullable = schema.isNullable();
if (schema.isNullable() && schema.getTypes().size() == 2) {
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true);
} else {
// TODO support more elaborate union types
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
}
return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getTypes().stream()
.filter(x -> x.getType() != Schema.Type.NULL)
.map(x -> new AbstractMap.SimpleEntry<>(x.getName(), rel(x, typeFactory, isNullable)))
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList())), isNullable);
default:
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), nullable);
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,23 @@ public void convertsNestedSchemas() {
RelDataType rel4 = AvroConverter.rel(avroSchema4);
assertTrue("types match", RelOptUtil.eq("rel4", rel4, "rel1", rel1, Litmus.THROW));
}

@Test
public void convertsNestedUnionSchemas() {
String schemaString = "{\"type\":\"record\",\"name\":\"record\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"event\",\"type\":[{\"type\":\"record\",\"name\":\"record_event1\",\"fields\":[{\"name\":\"strField\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"record_event2\",\"fields\":[{\"name\":\"strField\",\"type\":\"string\"}]}]}]}";
Schema avroSchema1 = (new Schema.Parser()).parse(schemaString);
RelDataType rel1 = AvroConverter.rel(avroSchema1);
assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size());
assertNotNull(rel1.toString(), rel1.getField("event", false, false));
RelDataType rel2 = rel1.getField("event", false, false).getType();
assertTrue(rel2.isStruct());
Schema avroSchema2 = avroSchema1.getField("event").schema();
assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getTypes().size());
RelDataType rel3 = rel2.getField("record_event1", false, false).getType();
Schema avroSchema3 = avroSchema2.getTypes().get(0);
assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size());
Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1);
assertFalse("!avroSchema4.isNullable()", avroSchema4.isNullable());
assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

public final class DataTypeUtils {

private static final String MAP_KEY_TYPE = "keyType";
private static final String MAP_VALUE_TYPE = "valueType";

private DataTypeUtils() {
}

Expand Down Expand Up @@ -46,13 +49,17 @@ private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType data
flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(),
Stream.of(field.getName())).collect(Collectors.toList()));
}
} else if (!dataType.isStruct()) {
builder.add(String.join("$", path), dataType);
} else {
} else if (dataType.isStruct()) {
for (RelDataTypeField field : dataType.getFieldList()) {
flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(),
Stream.of(field.getName())).collect(Collectors.toList()));
}
} else if (dataType.getKeyType() != null && dataType.getValueType() != null) {
builder.add(String.join("$", path) + "$" + MAP_KEY_TYPE, dataType.getKeyType());
flattenInto(typeFactory, dataType.getValueType(), builder, Stream.concat(path.stream(),
Stream.of(MAP_VALUE_TYPE)).collect(Collectors.toList()));
} else {
builder.add(String.join("$", path), dataType);
}
}

Expand Down Expand Up @@ -86,6 +93,13 @@ private static RelDataType buildRecord(Node node, RelDataTypeFactory typeFactory
return node.dataType;
}
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
// Placeholders to handle MAP type
if (node.children.size() == 2
&& node.children.containsKey(MAP_KEY_TYPE) && node.children.containsKey(MAP_VALUE_TYPE)) {
RelDataType keyType = buildRecord(node.children.get(MAP_KEY_TYPE), typeFactory);
RelDataType valueType = buildRecord(node.children.get(MAP_VALUE_TYPE), typeFactory);
return typeFactory.createMapType(keyType, valueType);
}
for (Map.Entry<String, Node> child : node.children.entrySet()) {
builder.add(child.getKey(), buildRecord(child.getValue(), typeFactory));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.Pair;
import org.apache.calcite.runtime.ImmutablePairList;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import com.linkedin.hoptimator.Deployable;
import com.linkedin.hoptimator.Job;
Expand Down Expand Up @@ -49,15 +48,15 @@ public interface PipelineRel extends RelNode {

/** Implements a deployable Pipeline. */
class Implementor {
private final ImmutableList<Pair<Integer, String>> targetFields;
private final ImmutablePairList<Integer, String> targetFields;
private final Map<Source, RelDataType> sources = new LinkedHashMap<>();
private RelNode query;
private String sinkDatabase = "pipeline";
private List<String> sinkPath = Arrays.asList(new String[]{"PIPELINE", "SINK"});
private RelDataType sinkRowType = null;
private Map<String, String> sinkOptions = Collections.emptyMap();

public Implementor(ImmutableList<Pair<Integer, String>> targetFields) {
public Implementor(ImmutablePairList<Integer, String> targetFields) {
this.targetFields = targetFields;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
Expand All @@ -20,13 +19,15 @@
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.runtime.ImmutablePairList;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCollectionTypeNameSpec;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlMapTypeNameSpec;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
Expand All @@ -42,11 +43,8 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;

import com.google.common.collect.ImmutableList;


/**
* An abstract way to write SQL scripts.
Expand Down Expand Up @@ -103,7 +101,7 @@ default ScriptImplementor database(String database) {
}

/** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */
default ScriptImplementor insert(String schema, String table, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
default ScriptImplementor insert(String schema, String table, RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
return with(new InsertImplementor(schema, table, relNode, targetFields));
}

Expand Down Expand Up @@ -262,9 +260,9 @@ class InsertImplementor implements ScriptImplementor {
private final String schema;
private final String table;
private final RelNode relNode;
private final ImmutableList<Pair<Integer, String>> targetFields;
private final ImmutablePairList<Integer, String> targetFields;

public InsertImplementor(String schema, String table, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
public InsertImplementor(String schema, String table, RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
this.schema = schema;
this.table = table;
this.relNode = relNode;
Expand All @@ -283,10 +281,10 @@ public void implement(SqlWriter w) {

// Drops NULL fields
// Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ...
private static RelNode dropFields(RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
private static RelNode dropFields(RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
List<Integer> cols = new ArrayList<>();
int i = 0;
Set<String> targetFieldNames = targetFields.stream().map(x -> x.right).collect(Collectors.toSet());
Set<String> targetFieldNames = targetFields.stream().map(Map.Entry::getValue).collect(Collectors.toSet());
for (RelDataTypeField field : relNode.getRowType().getFieldList()) {
if (targetFieldNames.contains(field.getName())
&& !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
Expand Down Expand Up @@ -463,6 +461,9 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) {
.map(RelDataType::getSqlTypeName)
.orElseThrow(() -> new IllegalArgumentException("not a collection?")), SqlParserPos.ZERO),
dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO));
} else if (dataType.getKeyType() != null && dataType.getValueType() != null) {
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlMapTypeNameSpec(
toSpec(dataType.getKeyType()), toSpec(dataType.getValueType()), SqlParserPos.ZERO), SqlParserPos.ZERO));
} else {
return maybeNullable(dataType,
new SqlDataTypeSpec(new SqlBasicTypeNameSpec(dataType.getSqlTypeName(), SqlParserPos.ZERO),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,46 @@ public void flattenUnflattenNestedArrays() {
+ "`CAR` FLOAT ARRAY) WITH ();", unflattenedConnector,
"Flattened-unflattened connector should be correct");
}

@Test
public void flattenUnflattenComplexMap() {
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
RelDataTypeFactory.Builder mapValue = new RelDataTypeFactory.Builder(typeFactory);
mapValue.add("BAR", SqlTypeName.VARCHAR);
mapValue.add("CAR", SqlTypeName.INTEGER);

RelDataTypeFactory.Builder keyBuilder = new RelDataTypeFactory.Builder(typeFactory);
RelDataTypeFactory.Builder valueBuilder = new RelDataTypeFactory.Builder(typeFactory);
keyBuilder.add("QUX", SqlTypeName.VARCHAR);
valueBuilder.add("QIZ", mapValue.build());

RelDataTypeFactory.Builder mapBuilder = new RelDataTypeFactory.Builder(typeFactory);
mapBuilder.add("FOO", typeFactory.createMapType(keyBuilder.build(), valueBuilder.build()));
RelDataType rowType = mapBuilder.build();
Assertions.assertEquals(1, rowType.getFieldList().size());
RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory);
Assertions.assertEquals(3, flattenedType.getFieldList().size());
List<String> flattenedNames = flattenedType.getFieldList().stream().map(RelDataTypeField::getName)
.collect(Collectors.toList());
Assertions.assertIterableEquals(Arrays.asList("FOO$keyType", "FOO$valueType$QIZ$BAR", "FOO$valueType$QIZ$CAR"), flattenedNames);
String flattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
flattenedType, Collections.emptyMap()).sql();
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` ("
+ "`FOO_keyType` ROW(`QUX` VARCHAR), "
+ "`FOO_valueType_QIZ_BAR` VARCHAR, "
+ "`FOO_valueType_QIZ_CAR` INTEGER) WITH ();", flattenedConnector,
"Flattened connector should have simplified map");

RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory);
RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW);
String originalConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
rowType, Collections.emptyMap()).sql();
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
unflattenedType, Collections.emptyMap()).sql();
Assertions.assertEquals(originalConnector, unflattenedConnector,
"Flattening and unflattening data types should have no impact on connector");
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` ("
+ "`FOO` MAP< ROW(`QUX` VARCHAR), ROW(`QIZ` ROW(`BAR` VARCHAR, `CAR` INTEGER)) >) WITH ();", unflattenedConnector,
"Flattened-unflattened connector should be correct");
}
}