Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve complex array types #95

Merged
merged 4 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,55 +79,65 @@ private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean
}

public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
return rel(schema, typeFactory, false);
}

/** Converts Avro Schema to RelDataType.
* Nullability is preserved except for array types, JDBC is incapable of interpreting e.g. "FLOAT NOT NULL ARRAY"
* causing "NOT NULL" arrays to get demoted to "ANY ARRAY" which is not desired.
*/
public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) {
RelDataType unknown = typeFactory.createUnknownType();
switch (schema.getType()) {
case RECORD:
return typeFactory.createStructType(schema.getFields()
.stream()
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory)))
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable)))
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList()));
case INT:
return createRelType(typeFactory, SqlTypeName.INTEGER);
return createRelType(typeFactory, SqlTypeName.INTEGER, nullable);
case LONG:
return createRelType(typeFactory, SqlTypeName.BIGINT);
return createRelType(typeFactory, SqlTypeName.BIGINT, nullable);
case ENUM:
case FIXED:
case STRING:
return createRelType(typeFactory, SqlTypeName.VARCHAR);
return createRelType(typeFactory, SqlTypeName.VARCHAR, nullable);
case BYTES:
return createRelType(typeFactory, SqlTypeName.VARBINARY, nullable);
case FLOAT:
return createRelType(typeFactory, SqlTypeName.FLOAT);
return createRelType(typeFactory, SqlTypeName.FLOAT, nullable);
case DOUBLE:
return createRelType(typeFactory, SqlTypeName.DOUBLE);
return createRelType(typeFactory, SqlTypeName.DOUBLE, nullable);
case BOOLEAN:
return createRelType(typeFactory, SqlTypeName.BOOLEAN);
return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable);
case ARRAY:
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1);
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1);
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory));
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable))
case UNION:
if (schema.isNullable() && schema.getTypes().size() == 2) {
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true);
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true);
} else {
// TODO support more elaborate union types
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
}
default:
return typeFactory.createUnknownType();
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), nullable);
}
}

public static RelDataType rel(Schema schema) {
return rel(schema, DataType.DEFAULT_TYPE_FACTORY);
}

private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) {
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) {
RelDataType rawType = typeFactory.createSqlType(typeName);
return typeFactory.createTypeWithNullability(rawType, false);
return typeFactory.createTypeWithNullability(rawType, nullable);
}

public static RelProtoDataType proto(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;


Expand All @@ -22,18 +24,18 @@ public void convertsNestedSchemas() {
Schema avroSchema1 = (new Schema.Parser()).parse(schemaString);
RelDataType rel1 = AvroConverter.rel(avroSchema1);
assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size());
assertTrue(rel1.toString(), rel1.getField("h", false, false) != null);
assertNotNull(rel1.toString(), rel1.getField("h", false, false));
RelDataType rel2 = rel1.getField("h", false, false).getType();
assertTrue(rel2.toString(), rel2.isNullable());
Schema avroSchema2 = avroSchema1.getField("h").schema().getTypes().get(1);
assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getFields().size());
assertTrue(rel2.toString(), rel2.getField("A", false, false) != null);
assertNotNull(rel2.toString(), rel2.getField("A", false, false));
RelDataType rel3 = rel2.getField("A", false, false).getType();
assertTrue(rel3.toString(), rel3.isNullable());
Schema avroSchema3 = avroSchema2.getField("A").schema().getTypes().get(1);
assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size());
Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1);
assertTrue("!avroSchema4.isNullable()", !avroSchema4.isNullable());
assertFalse("!avroSchema4.isNullable()", avroSchema4.isNullable());
assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount());
Schema avroSchema5 = AvroConverter.avro("NS", "R", rel2);
assertTrue("avroSchema5.isNullable()", avroSchema5.isNullable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import org.apache.calcite.rel.type.RelDataType;
import org.junit.Test;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;


public class DataTypeTest {
Expand All @@ -13,12 +15,12 @@ public void skipsNestedRows() {
DataType.Struct struct =
DataType.struct().with("one", DataType.VARCHAR).with("two", DataType.struct().with("three", DataType.VARCHAR));
RelDataType row1 = struct.rel();
assertTrue(row1.toString(), row1.getFieldCount() == 2);
assertTrue(row1.toString(), row1.getField("one", false, false) != null);
assertTrue(row1.toString(), row1.getField("two", false, false) != null);
assertEquals(row1.toString(), 2, row1.getFieldCount());
assertNotNull(row1.toString(), row1.getField("one", false, false));
assertNotNull(row1.toString(), row1.getField("two", false, false));
RelDataType row2 = struct.dropNestedRows().rel();
assertTrue(row2.toString(), row2.getFieldCount() == 1);
assertTrue(row2.toString(), row2.getField("one", false, false) != null);
assertTrue(row2.toString(), row2.getField("two", false, false) == null);
assertEquals(row2.toString(), 1, row2.getFieldCount());
assertNotNull(row2.toString(), row2.getField("one", false, false));
assertNull(row2.toString(), row2.getField("two", false, false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.time.Duration;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;

import io.kubernetes.client.apimachinery.GroupVersion;
import io.kubernetes.client.common.KubernetesListObject;
import io.kubernetes.client.common.KubernetesObject;
Expand All @@ -19,7 +21,6 @@
import io.kubernetes.client.util.KubeConfig;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi;
import org.apache.commons.lang3.StringUtils;


public class K8sContext {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.linkedin.hoptimator.k8s;

import java.util.Objects;
import javax.sql.DataSource;

import org.apache.calcite.adapter.jdbc.JdbcSchema;

import com.linkedin.hoptimator.Engine;
import com.linkedin.hoptimator.SqlDialect;

import java.util.Objects;

import org.apache.calcite.adapter.jdbc.JdbcSchema;


public class K8sEngine implements Engine {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.schema.Schema;

import io.kubernetes.client.openapi.models.V1ObjectMeta;
Expand All @@ -16,7 +14,6 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1Engine;
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineSpec;
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;


public class K8sEngineTable extends K8sTable<V1alpha1Engine, V1alpha1EngineList, K8sEngineTable.Row> {
Expand Down Expand Up @@ -56,8 +53,8 @@ public List<Engine> forDatabase(String database) {
public Row toRow(V1alpha1Engine obj) {
return new Row(obj.getMetadata().getName(), obj.getSpec().getUrl(),
Optional.ofNullable(obj.getSpec().getDialect()).map(x -> x.toString()).orElseGet(() -> null),
obj.getSpec().getDriver(), obj.getSpec().getDatabases() != null ?
obj.getSpec().getDatabases().toArray(new String[0]) : null);
obj.getSpec().getDriver(), obj.getSpec().getDatabases() != null
? obj.getSpec().getDatabases().toArray(new String[0]) : null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@

import java.sql.SQLException;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.linkedin.hoptimator.Job;
import com.linkedin.hoptimator.SqlDialect;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;
import com.linkedin.hoptimator.util.ConfigService;
import com.linkedin.hoptimator.util.Template;


/** Specifies an abstract Job with concrete YAML by applying JobTemplates. */
class K8sJobDeployer extends K8sYamlDeployer<Job> {

private static final String FLINK_CONFIG = "flink.config";

private final K8sApi<V1alpha1JobTemplate, V1alpha1JobTemplateList> jobTemplateApi;

K8sJobDeployer(K8sContext context) {
Expand All @@ -25,6 +28,8 @@ class K8sJobDeployer extends K8sYamlDeployer<Job> {

@Override
public List<String> specify(Job job) throws SQLException {
Properties properties = ConfigService.config(null, false, FLINK_CONFIG);
properties.putAll(job.sink().options());
Function<SqlDialect, String> sql = job.sql();
String name = K8sUtils.canonicalizeName(job.sink().database(), job.name());
Template.Environment env = new Template.SimpleEnvironment()
Expand All @@ -34,6 +39,7 @@ public List<String> specify(Job job) throws SQLException {
.with("table", job.sink().table())
.with("sql", sql.apply(SqlDialect.ANSI))
.with("flinksql", sql.apply(SqlDialect.FLINK))
.with("flinkconfigs", properties)
.with(job.sink().options());
return jobTemplateApi.list()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.sql.SQLException;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

import com.linkedin.hoptimator.Source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import java.util.Collection;
import java.util.Locale;
import java.util.stream.Stream;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.kubernetes.client.common.KubernetesType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Database metadata.
*/
@ApiModel(description = "Database metadata.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1Database implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* DatabaseList is a list of Database
*/
@ApiModel(description = "DatabaseList is a list of Database")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1DatabaseList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Database spec.
*/
@ApiModel(description = "Database spec.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1DatabaseSpec {
/**
* SQL dialect the driver expects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Engine metadata.
*/
@ApiModel(description = "Engine metadata.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1Engine implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* EngineList is a list of Engine
*/
@ApiModel(description = "EngineList is a list of Engine")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1EngineList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Engine spec.
*/
@ApiModel(description = "Engine spec.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1EngineSpec {
public static final String SERIALIZED_NAME_DATABASES = "databases";
@SerializedName(SERIALIZED_NAME_DATABASES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Template to apply to matching jobs.
*/
@ApiModel(description = "Template to apply to matching jobs.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1JobTemplate implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* JobTemplateList is a list of JobTemplate
*/
@ApiModel(description = "JobTemplateList is a list of JobTemplate")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1JobTemplateList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* TableTemplate spec.
*/
@ApiModel(description = "TableTemplate spec.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1JobTemplateSpec {
public static final String SERIALIZED_NAME_DATABASES = "databases";
@SerializedName(SERIALIZED_NAME_DATABASES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* A set of objects that work together to deliver data.
*/
@ApiModel(description = "A set of objects that work together to deliver data.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1Pipeline implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* PipelineList is a list of Pipeline
*/
@ApiModel(description = "PipelineList is a list of Pipeline")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1PipelineList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Pipeline spec.
*/
@ApiModel(description = "Pipeline spec.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1PipelineSpec {
public static final String SERIALIZED_NAME_SQL = "sql";
@SerializedName(SERIALIZED_NAME_SQL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Pipeline status.
*/
@ApiModel(description = "Pipeline status.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1PipelineStatus {
public static final String SERIALIZED_NAME_FAILED = "failed";
@SerializedName(SERIALIZED_NAME_FAILED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Hoptimator generic SQL job
*/
@ApiModel(description = "Hoptimator generic SQL job")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
public class V1alpha1SqlJob implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Loading
Loading