Skip to content

Commit 6784871

Browse files
committed
Merge AvroConverters
1 parent aeb8b20 commit 6784871

File tree

8 files changed

+60
-202
lines changed

8 files changed

+60
-202
lines changed

hoptimator-avro/build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ dependencies {
77
implementation project(':hoptimator-api')
88
implementation libs.avro
99
implementation libs.calcite.core
10+
11+
testImplementation libs.junit
12+
testImplementation libs.assertj
1013
}
1114

1215
publishing {

hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java

+50-35
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
1515
import org.apache.calcite.sql.type.SqlTypeName;
1616

17-
1817
/** Converts between Avro and Calcite's RelDataType */
1918
public final class AvroConverter {
2019

@@ -23,13 +22,11 @@ private AvroConverter() {
2322

2423
public static Schema avro(String namespace, String name, RelDataType dataType) {
2524
if (dataType.isStruct()) {
26-
List<Schema.Field> fields = dataType.getFieldList()
27-
.stream()
28-
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x),
29-
null))
25+
List<Schema.Field> fields = dataType.getFieldList().stream()
26+
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null))
3027
.collect(Collectors.toList());
31-
return createAvroSchemaWithNullability(
32-
Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), dataType.isNullable());
28+
return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields),
29+
dataType.isNullable());
3330
} else {
3431
switch (dataType.getSqlTypeName()) {
3532
case INTEGER:
@@ -51,10 +48,9 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
5148
case ARRAY:
5249
return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())),
5350
dataType.isNullable());
54-
// TODO support map types
55-
// Appears to require a Calcite version bump
56-
// case MAP:
57-
// return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable());
51+
case MAP:
52+
return createAvroSchemaWithNullability(Schema.createMap(avro(null, null, dataType.getValueType())),
53+
dataType.isNullable());
5854
case UNKNOWN:
5955
case NULL:
6056
return Schema.createUnion(Schema.create(Schema.Type.NULL));
@@ -82,55 +78,74 @@ private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean
8278
}
8379

8480
public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
81+
return rel(schema, typeFactory, false);
82+
}
83+
84+
/** Converts Avro Schema to RelDataType.
85+
* Nullability is preserved except for array types, JDBC is incapable of interpreting e.g. "FLOAT NOT NULL ARRAY"
86+
* causing "NOT NULL" arrays to get demoted to "ANY ARRAY" which is not desired.
87+
*/
88+
public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) {
8589
RelDataType unknown = typeFactory.createUnknownType();
8690
switch (schema.getType()) {
8791
case RECORD:
88-
return typeFactory.createStructType(schema.getFields()
89-
.stream()
90-
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory)))
92+
return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getFields().stream()
93+
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable)))
9194
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
9295
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
93-
.collect(Collectors.toList()));
96+
.collect(Collectors.toList())), nullable);
9497
case INT:
95-
return createRelType(typeFactory, SqlTypeName.INTEGER);
98+
return createRelType(typeFactory, SqlTypeName.INTEGER, nullable);
9699
case LONG:
97-
return createRelType(typeFactory, SqlTypeName.BIGINT);
100+
return createRelType(typeFactory, SqlTypeName.BIGINT, nullable);
98101
case ENUM:
99-
case FIXED:
100102
case STRING:
101-
return createRelType(typeFactory, SqlTypeName.VARCHAR);
103+
return createRelType(typeFactory, SqlTypeName.VARCHAR, nullable);
104+
case FIXED:
105+
return createRelType(typeFactory, SqlTypeName.VARBINARY, schema.getFixedSize(), nullable);
106+
case BYTES:
107+
return createRelType(typeFactory, SqlTypeName.VARBINARY, nullable);
102108
case FLOAT:
103-
return createRelType(typeFactory, SqlTypeName.FLOAT);
109+
return createRelType(typeFactory, SqlTypeName.FLOAT, nullable);
104110
case DOUBLE:
105-
return createRelType(typeFactory, SqlTypeName.DOUBLE);
111+
return createRelType(typeFactory, SqlTypeName.DOUBLE, nullable);
106112
case BOOLEAN:
107-
return createRelType(typeFactory, SqlTypeName.BOOLEAN);
113+
return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable);
108114
case ARRAY:
109-
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1);
110-
// TODO support map types
111-
// Appears to require a Calcite version bump
112-
// case MAP:
113-
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory));
115+
return typeFactory.createTypeWithNullability(
116+
typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1), nullable);
117+
case MAP:
118+
return typeFactory.createTypeWithNullability(
119+
typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)), nullable);
114120
case UNION:
121+
boolean isNullable = schema.isNullable();
115122
if (schema.isNullable() && schema.getTypes().size() == 2) {
116123
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
117-
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true);
118-
} else {
119-
// TODO support more elaborate union types
120-
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
124+
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true);
121125
}
126+
return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getTypes().stream()
127+
.filter(x -> x.getType() != Schema.Type.NULL)
128+
.map(x -> new AbstractMap.SimpleEntry<>(x.getName(), rel(x, typeFactory, isNullable)))
129+
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
130+
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
131+
.collect(Collectors.toList())), isNullable);
122132
default:
123-
return typeFactory.createUnknownType();
133+
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
124134
}
125135
}
126136

127137
public static RelDataType rel(Schema schema) {
128138
return rel(schema, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT));
129139
}
130140

131-
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) {
132-
RelDataType rawType = typeFactory.createSqlType(typeName);
133-
return typeFactory.createTypeWithNullability(rawType, false);
141+
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) {
142+
return createRelType(typeFactory, typeName, RelDataType.PRECISION_NOT_SPECIFIED, nullable);
143+
}
144+
145+
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName,
146+
int precision, boolean nullable) {
147+
RelDataType rawType = typeFactory.createSqlType(typeName, precision);
148+
return typeFactory.createTypeWithNullability(rawType, nullable);
134149
}
135150

136151
public static RelProtoDataType proto(Schema schema) {

hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java hoptimator-avro/test/java/com/linkedin/hoptimator/avro/AvroConverterTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.linkedin.hoptimator.catalog;
1+
package com.linkedin.hoptimator.avro;
22

33
import org.apache.avro.Schema;
44
import org.apache.calcite.plan.RelOptUtil;

hoptimator-catalog/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ plugins {
55
}
66

77
dependencies {
8+
implementation project(':hoptimator-avro')
89
implementation project(':hoptimator-util')
910
implementation libs.avro
1011
implementation libs.calcite.core

hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java

-163
This file was deleted.

hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.hoptimator.catalog;
22

3+
import com.linkedin.hoptimator.avro.AvroConverter;
34
import org.apache.avro.Schema;
45
import org.apache.calcite.rel.type.RelDataType;
56
import org.apache.calcite.sql.SqlWriter;

hoptimator-operator/build.gradle

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,20 @@ plugins {
55
}
66

77
dependencies {
8+
implementation project(':hoptimator-avro')
89
implementation project(':hoptimator-planner')
910
implementation project(':hoptimator-catalog') // <-- marked for deletion
1011
implementation project(':hoptimator-util')
1112
implementation project(':hoptimator-k8s')
1213
implementation project(':hoptimator-models') // <-- marked for deletion
13-
14+
1415
implementation libs.calcite.core
1516
implementation libs.kubernetes.client
1617
implementation libs.kubernetes.extended.client
1718
implementation libs.slf4j.api
1819
implementation libs.commons.cli
1920
implementation libs.avro
20-
21+
2122
testImplementation libs.junit
2223
testImplementation libs.assertj
2324
}

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.linkedin.hoptimator.operator.subscription;
22

3-
import com.linkedin.hoptimator.catalog.AvroConverter;
3+
import com.linkedin.hoptimator.avro.AvroConverter;
44
import com.linkedin.hoptimator.catalog.Resource;
55
import com.linkedin.hoptimator.planner.Pipeline;
66

0 commit comments

Comments
 (0)