From 8bdd01bed5c0c77c7636b7f5494e54511aed0dba Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Tue, 21 Jan 2025 17:22:14 -0500 Subject: [PATCH 1/4] Add configs to status --- .../hoptimator/k8s/K8sJobDeployer.java | 8 +++- .../k8s/models/V1alpha1Database.java | 2 +- .../k8s/models/V1alpha1DatabaseList.java | 2 +- .../k8s/models/V1alpha1DatabaseSpec.java | 2 +- .../hoptimator/k8s/models/V1alpha1Engine.java | 2 +- .../k8s/models/V1alpha1EngineList.java | 2 +- .../k8s/models/V1alpha1EngineSpec.java | 2 +- .../k8s/models/V1alpha1JobTemplate.java | 2 +- .../k8s/models/V1alpha1JobTemplateList.java | 2 +- .../k8s/models/V1alpha1JobTemplateSpec.java | 2 +- .../k8s/models/V1alpha1Pipeline.java | 2 +- .../k8s/models/V1alpha1PipelineList.java | 2 +- .../k8s/models/V1alpha1PipelineSpec.java | 2 +- .../k8s/models/V1alpha1PipelineStatus.java | 2 +- .../hoptimator/k8s/models/V1alpha1SqlJob.java | 2 +- .../k8s/models/V1alpha1SqlJobList.java | 2 +- .../k8s/models/V1alpha1SqlJobSpec.java | 2 +- .../k8s/models/V1alpha1SqlJobStatus.java | 46 +++++++++++++++++-- .../k8s/models/V1alpha1Subscription.java | 2 +- .../k8s/models/V1alpha1SubscriptionList.java | 2 +- .../k8s/models/V1alpha1SubscriptionSpec.java | 2 +- .../models/V1alpha1SubscriptionStatus.java | 2 +- .../k8s/models/V1alpha1TableTemplate.java | 2 +- .../k8s/models/V1alpha1TableTemplateList.java | 2 +- .../k8s/models/V1alpha1TableTemplateSpec.java | 2 +- .../hoptimator/k8s/models/V1alpha1View.java | 2 +- .../k8s/models/V1alpha1ViewList.java | 2 +- .../k8s/models/V1alpha1ViewSpec.java | 2 +- .../src/main/resources/sqljobs.crd.yaml | 5 ++ .../hoptimator/models/V1alpha1Acl.java | 2 +- .../hoptimator/models/V1alpha1AclList.java | 2 +- .../hoptimator/models/V1alpha1AclSpec.java | 2 +- .../models/V1alpha1AclSpecResource.java | 2 +- .../hoptimator/models/V1alpha1AclStatus.java | 2 +- .../hoptimator/models/V1alpha1KafkaTopic.java | 2 +- .../models/V1alpha1KafkaTopicList.java | 2 +- .../models/V1alpha1KafkaTopicSpec.java | 2 +- .../V1alpha1KafkaTopicSpecClientConfigs.java | 2 +- .../V1alpha1KafkaTopicSpecConfigMapRef.java | 2 +- .../models/V1alpha1KafkaTopicStatus.java | 2 +- .../models/V1alpha1Subscription.java | 2 +- .../models/V1alpha1SubscriptionList.java | 2 +- .../models/V1alpha1SubscriptionSpec.java | 2 +- .../models/V1alpha1SubscriptionStatus.java | 2 +- .../hoptimator/util/ConfigService.java | 15 ++++-- .../linkedin/hoptimator/util/Template.java | 29 ++++++++++++ 46 files changed, 136 insertions(+), 49 deletions(-) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java index bdf1943..234f597 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java @@ -2,7 +2,7 @@ import java.sql.SQLException; import java.util.List; -import java.util.Locale; +import java.util.Properties; import java.util.function.Function; import java.util.stream.Collectors; @@ -10,12 +10,15 @@ import com.linkedin.hoptimator.SqlDialect; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList; +import com.linkedin.hoptimator.util.ConfigService; import com.linkedin.hoptimator.util.Template; /** Specifies an abstract Job with concrete YAML by applying JobTemplates. */ class K8sJobDeployer extends K8sYamlDeployer { + private static final String FLINK_CONFIG = "flink.config"; + private final K8sApi jobTemplateApi; K8sJobDeployer(K8sContext context) { @@ -25,6 +28,8 @@ class K8sJobDeployer extends K8sYamlDeployer { @Override public List specify(Job job) throws SQLException { + Properties properties = ConfigService.config(null, false, FLINK_CONFIG); + properties.putAll(job.sink().options()); Function sql = job.sql(); String name = K8sUtils.canonicalizeName(job.sink().database(), job.name()); Template.Environment env = new Template.SimpleEnvironment() @@ -34,6 +39,7 @@ public List specify(Job job) throws SQLException { .with("table", job.sink().table()) .with("sql", sql.apply(SqlDialect.ANSI)) .with("flinksql", sql.apply(SqlDialect.FLINK)) + .with("flinkconfigs", properties) .with(job.sink().options()); return jobTemplateApi.list() .stream() diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Database.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Database.java index 449c0e1..9785249 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Database.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Database.java @@ -30,7 +30,7 @@ * Database metadata. */ @ApiModel(description = "Database metadata.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1Database implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseList.java index 371449a..31e8258 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseList.java @@ -32,7 +32,7 @@ * DatabaseList is a list of Database */ @ApiModel(description = "DatabaseList is a list of Database") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1DatabaseList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseSpec.java index 17354ad..5ee36b1 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseSpec.java @@ -28,7 +28,7 @@ * Database spec. */ @ApiModel(description = "Database spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1DatabaseSpec { /** * SQL dialect the driver expects. diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Engine.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Engine.java index 0ff507d..30903ad 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Engine.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Engine.java @@ -30,7 +30,7 @@ * Engine metadata. */ @ApiModel(description = "Engine metadata.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1Engine implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1EngineList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1EngineList.java index ad41fb7..29265c1 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1EngineList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1EngineList.java @@ -32,7 +32,7 @@ * EngineList is a list of Engine */ @ApiModel(description = "EngineList is a list of Engine") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1EngineList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1EngineSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1EngineSpec.java index e8435bb..ffdf273 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1EngineSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1EngineSpec.java @@ -30,7 +30,7 @@ * Engine spec. */ @ApiModel(description = "Engine spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1EngineSpec { public static final String SERIALIZED_NAME_DATABASES = "databases"; @SerializedName(SERIALIZED_NAME_DATABASES) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplate.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplate.java index adce7d3..bf3e7e2 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplate.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplate.java @@ -30,7 +30,7 @@ * Template to apply to matching jobs. */ @ApiModel(description = "Template to apply to matching jobs.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1JobTemplate implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateList.java index 6fdc4b1..746281b 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateList.java @@ -32,7 +32,7 @@ * JobTemplateList is a list of JobTemplate */ @ApiModel(description = "JobTemplateList is a list of JobTemplate") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1JobTemplateList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateSpec.java index 99319bb..280de65 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateSpec.java @@ -30,7 +30,7 @@ * TableTemplate spec. */ @ApiModel(description = "TableTemplate spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1JobTemplateSpec { public static final String SERIALIZED_NAME_DATABASES = "databases"; @SerializedName(SERIALIZED_NAME_DATABASES) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Pipeline.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Pipeline.java index 45f4fd5..9032958 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Pipeline.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Pipeline.java @@ -31,7 +31,7 @@ * A set of objects that work together to deliver data. */ @ApiModel(description = "A set of objects that work together to deliver data.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1Pipeline implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineList.java index 9f424e2..404128e 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineList.java @@ -32,7 +32,7 @@ * PipelineList is a list of Pipeline */ @ApiModel(description = "PipelineList is a list of Pipeline") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1PipelineList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineSpec.java index bfee923..c4468ca 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineSpec.java @@ -28,7 +28,7 @@ * Pipeline spec. */ @ApiModel(description = "Pipeline spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1PipelineSpec { public static final String SERIALIZED_NAME_SQL = "sql"; @SerializedName(SERIALIZED_NAME_SQL) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineStatus.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineStatus.java index 1c99663..00b55d2 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineStatus.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineStatus.java @@ -28,7 +28,7 @@ * Pipeline status. */ @ApiModel(description = "Pipeline status.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1PipelineStatus { public static final String SERIALIZED_NAME_FAILED = "failed"; @SerializedName(SERIALIZED_NAME_FAILED) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJob.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJob.java index 810de8d..e582584 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJob.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJob.java @@ -31,7 +31,7 @@ * Hoptimator generic SQL job */ @ApiModel(description = "Hoptimator generic SQL job") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1SqlJob implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobList.java index f46e7c3..dfc4c64 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobList.java @@ -32,7 +32,7 @@ * SqlJobList is a list of SqlJob */ @ApiModel(description = "SqlJobList is a list of SqlJob") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1SqlJobList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobSpec.java index 141d1f4..4c6373a 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobSpec.java @@ -32,7 +32,7 @@ * SQL job spec */ @ApiModel(description = "SQL job spec") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1SqlJobSpec { public static final String SERIALIZED_NAME_CONFIGS = "configs"; @SerializedName(SERIALIZED_NAME_CONFIGS) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobStatus.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobStatus.java index a0f8239..77517ef 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobStatus.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobStatus.java @@ -23,13 +23,20 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * Filled in by the operator. */ @ApiModel(description = "Filled in by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1SqlJobStatus { + public static final String SERIALIZED_NAME_CONFIGS = "configs"; + @SerializedName(SERIALIZED_NAME_CONFIGS) + private Map configs = null; + public static final String SERIALIZED_NAME_FAILED = "failed"; @SerializedName(SERIALIZED_NAME_FAILED) private Boolean failed; @@ -47,6 +54,37 @@ public class V1alpha1SqlJobStatus { private String sql; + public V1alpha1SqlJobStatus configs(Map configs) { + + this.configs = configs; + return this; + } + + public V1alpha1SqlJobStatus putConfigsItem(String key, String configsItem) { + if (this.configs == null) { + this.configs = new HashMap<>(); + } + this.configs.put(key, configsItem); + return this; + } + + /** + * The SQL configurations used by this SqlJob. + * @return configs + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "The SQL configurations used by this SqlJob.") + + public Map getConfigs() { + return configs; + } + + + public void setConfigs(Map configs) { + this.configs = configs; + } + + public V1alpha1SqlJobStatus failed(Boolean failed) { this.failed = failed; @@ -148,7 +186,8 @@ public boolean equals(Object o) { return false; } V1alpha1SqlJobStatus v1alpha1SqlJobStatus = (V1alpha1SqlJobStatus) o; - return Objects.equals(this.failed, v1alpha1SqlJobStatus.failed) && + return Objects.equals(this.configs, v1alpha1SqlJobStatus.configs) && + Objects.equals(this.failed, v1alpha1SqlJobStatus.failed) && Objects.equals(this.message, v1alpha1SqlJobStatus.message) && Objects.equals(this.ready, v1alpha1SqlJobStatus.ready) && Objects.equals(this.sql, v1alpha1SqlJobStatus.sql); @@ -156,7 +195,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(failed, message, ready, sql); + return Objects.hash(configs, failed, message, ready, sql); } @@ -164,6 +203,7 @@ public int hashCode() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("class V1alpha1SqlJobStatus {\n"); + sb.append(" configs: ").append(toIndentedString(configs)).append("\n"); sb.append(" failed: ").append(toIndentedString(failed)).append("\n"); sb.append(" message: ").append(toIndentedString(message)).append("\n"); sb.append(" ready: ").append(toIndentedString(ready)).append("\n"); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Subscription.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Subscription.java index e0b2cc2..a18a8e6 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Subscription.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Subscription.java @@ -31,7 +31,7 @@ * Hoptimator Subscription */ @ApiModel(description = "Hoptimator Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionList.java index a69eaa0..a6d3696 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionList.java @@ -32,7 +32,7 @@ * SubscriptionList is a list of Subscription */ @ApiModel(description = "SubscriptionList is a list of Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionSpec.java index 15faad3..08d0473 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionSpec.java @@ -31,7 +31,7 @@ * Subscription spec */ @ApiModel(description = "Subscription spec") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1SubscriptionSpec { public static final String SERIALIZED_NAME_DATABASE = "database"; @SerializedName(SERIALIZED_NAME_DATABASE) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionStatus.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionStatus.java index 4ad6fec..65be497 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionStatus.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionStatus.java @@ -32,7 +32,7 @@ * Filled in by the operator. */ @ApiModel(description = "Filled in by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1SubscriptionStatus { public static final String SERIALIZED_NAME_ATTRIBUTES = "attributes"; @SerializedName(SERIALIZED_NAME_ATTRIBUTES) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplate.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplate.java index c993c29..5a4a79a 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplate.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplate.java @@ -30,7 +30,7 @@ * Template to apply to matching tables. */ @ApiModel(description = "Template to apply to matching tables.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1TableTemplate implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateList.java index 43065f1..123ae7f 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateList.java @@ -32,7 +32,7 @@ * TableTemplateList is a list of TableTemplate */ @ApiModel(description = "TableTemplateList is a list of TableTemplate") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1TableTemplateList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateSpec.java index f782df7..fbed787 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateSpec.java @@ -30,7 +30,7 @@ * TableTemplate spec. */ @ApiModel(description = "TableTemplate spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1TableTemplateSpec { public static final String SERIALIZED_NAME_CONNECTOR = "connector"; @SerializedName(SERIALIZED_NAME_CONNECTOR) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1View.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1View.java index 59f57e8..8e52ef1 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1View.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1View.java @@ -30,7 +30,7 @@ * A SQL view. */ @ApiModel(description = "A SQL view.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1View implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewList.java index fcc39cb..9ad5d6b 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewList.java @@ -32,7 +32,7 @@ * ViewList is a list of View */ @ApiModel(description = "ViewList is a list of View") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1ViewList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewSpec.java index fe36e7b..1e4f989 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewSpec.java @@ -28,7 +28,7 @@ * View spec. */ @ApiModel(description = "View spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]") public class V1alpha1ViewSpec { public static final String SERIALIZED_NAME_MATERIALIZED = "materialized"; @SerializedName(SERIALIZED_NAME_MATERIALIZED) diff --git a/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml b/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml index cf87f2b..e12af30 100644 --- a/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml +++ b/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml @@ -74,6 +74,11 @@ spec: sql: description: The SQL being implemented by this SqlJob. type: string + configs: + description: The SQL configurations used by this SqlJob. + type: object + additionalProperties: + type: string subresources: status: {} additionalPrinterColumns: diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java index 31cc58e..96caf5c 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java @@ -31,7 +31,7 @@ * Access control rule (colloquially, an Acl) */ @ApiModel(description = "Access control rule (colloquially, an Acl)") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java index 4cc5ce9..ddca2a2 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java @@ -32,7 +32,7 @@ * AclList is a list of Acl */ @ApiModel(description = "AclList is a list of Acl") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java index a72b1de..04b2124 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java @@ -29,7 +29,7 @@ * A set of related ACL rules. */ @ApiModel(description = "A set of related ACL rules.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1AclSpec { /** * The resource access method. diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java index 840522c..66a8ba5 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java @@ -28,7 +28,7 @@ * The resource being controlled. */ @ApiModel(description = "The resource being controlled.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1AclSpecResource { public static final String SERIALIZED_NAME_KIND = "kind"; @SerializedName(SERIALIZED_NAME_KIND) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java index 8aaa66e..7049ce7 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java @@ -28,7 +28,7 @@ * Status, as set by the operator. */ @ApiModel(description = "Status, as set by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1AclStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java index 39912e0..e24c449 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java @@ -31,7 +31,7 @@ * Kafka Topic */ @ApiModel(description = "Kafka Topic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java index b0c3193..6958f1a 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java @@ -32,7 +32,7 @@ * KafkaTopicList is a list of KafkaTopic */ @ApiModel(description = "KafkaTopicList is a list of KafkaTopic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java index b853e5c..7fddbbf 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java @@ -33,7 +33,7 @@ * Desired Kafka topic configuration. */ @ApiModel(description = "Desired Kafka topic configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1KafkaTopicSpec { public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs"; @SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java index 4a96f4e..5682d7a 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java @@ -28,7 +28,7 @@ /** * V1alpha1KafkaTopicSpecClientConfigs */ -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecClientConfigs { public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef"; @SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java index 2a98510..bba8847 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java @@ -28,7 +28,7 @@ * Reference to a ConfigMap to use for AdminClient configuration. */ @ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecConfigMapRef { public static final String SERIALIZED_NAME_NAME = "name"; @SerializedName(SERIALIZED_NAME_NAME) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java index 4cb9711..b5bfd3e 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java @@ -28,7 +28,7 @@ * Current state of the topic. */ @ApiModel(description = "Current state of the topic.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1KafkaTopicStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java index 6e012d2..5e9dd1d 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java @@ -31,7 +31,7 @@ * Hoptimator Subscription */ @ApiModel(description = "Hoptimator Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java index 7d94837..43c4ce0 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java @@ -32,7 +32,7 @@ * SubscriptionList is a list of Subscription */ @ApiModel(description = "SubscriptionList is a list of Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java index 81f177c..1cf05de 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java @@ -31,7 +31,7 @@ * Subscription spec */ @ApiModel(description = "Subscription spec") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1SubscriptionSpec { public static final String SERIALIZED_NAME_DATABASE = "database"; @SerializedName(SERIALIZED_NAME_DATABASE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java index 2281572..54696ca 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java @@ -32,7 +32,7 @@ * Filled in by the operator. */ @ApiModel(description = "Filled in by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:19:48.284Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:15:02.597Z[Etc/UTC]") public class V1alpha1SubscriptionStatus { public static final String SERIALIZED_NAME_ATTRIBUTES = "attributes"; @SerializedName(SERIALIZED_NAME_ATTRIBUTES) diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConfigService.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConfigService.java index df6e902..eae5bb6 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConfigService.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConfigService.java @@ -18,18 +18,21 @@ private ConfigService() { } // Null namespace will default to current namespace, may not be used by some ConfigProviders. - // Loads top level configs and expands input fields as file-like properties + // loadTopLevelConfigs=true loads top level configs and expands input fields as file-like properties + // loadTopLevelConfigs=false will only expand input fields as file-like properties // Ex: // log.properties: | // level=INFO - public static Properties config(@Nullable String namespace, String... expansionFields) { + public static Properties config(@Nullable String namespace, boolean loadTopLevelConfigs, String... expansionFields) { ServiceLoader loader = ServiceLoader.load(ConfigProvider.class); Properties properties = new Properties(); for (ConfigProvider provider : loader) { try { Properties loadedProperties = provider.loadConfig(namespace); - log.debug("Loaded properties={} from provider={}", loadedProperties, provider); - properties.putAll(loadedProperties); + if (loadTopLevelConfigs) { + log.debug("Loaded properties={} from provider={}", loadedProperties, provider); + properties.putAll(loadedProperties); + } for (String expansionField : expansionFields) { if (loadedProperties == null || !loadedProperties.containsKey(expansionField)) { log.warn("provider={} does not contain field={}", provider, expansionField); @@ -43,4 +46,8 @@ public static Properties config(@Nullable String namespace, String... expansionF } return properties; } + + public static Properties config(@Nullable String namespace, String... expansionFields) { + return config(namespace, true, expansionFields); + } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java index 6a0bddf..d671703 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java @@ -3,6 +3,7 @@ import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; +import java.util.Properties; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -65,6 +66,18 @@ public SimpleEnvironment with(Map values) { }}; } + public SimpleEnvironment with(String key, Map values) { + return new SimpleEnvironment(vars) {{ + export(key, formatMapAsString(values)); + }}; + } + + public SimpleEnvironment with(String key, Properties values) { + return new SimpleEnvironment(vars) {{ + export(key, formatPropertiesAsString(values)); + }}; + } + public SimpleEnvironment with(String key, Supplier supplier) { return new SimpleEnvironment(vars) {{ export(key, supplier); @@ -82,6 +95,22 @@ public String getOrDefault(String key, Supplier f) { } return vars.get(key).get(); } + + private String formatMapAsString(Map configMap) { + StringBuilder stringBuilder = new StringBuilder(); + for (Map.Entry entry : configMap.entrySet()) { + stringBuilder.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); + } + return stringBuilder.toString(); + } + + private String formatPropertiesAsString(Properties props) { + StringBuilder stringBuilder = new StringBuilder(); + for (String key : props.stringPropertyNames()) { + stringBuilder.append(key).append(": ").append(props.getProperty(key)).append("\n"); + } + return stringBuilder.toString(); + } } /** Returns "{{key}}" for any key without a default */ From 6434d0de2e73bc57934703d164e9bf264bb770bd Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Thu, 23 Jan 2025 01:02:03 -0500 Subject: [PATCH 2/4] Fix various checkstyle complaints --- .../hoptimator/catalog/AvroConverterTest.java | 8 +++-- .../hoptimator/catalog/DataTypeTest.java | 16 +++++---- .../linkedin/hoptimator/k8s/K8sContext.java | 3 +- .../linkedin/hoptimator/k8s/K8sEngine.java | 7 ++-- .../hoptimator/k8s/K8sEngineTable.java | 7 ++-- .../hoptimator/k8s/K8sSourceDeployer.java | 1 - .../com/linkedin/hoptimator/k8s/K8sUtils.java | 2 +- .../operator/pipeline/PipelineReconciler.java | 3 +- .../hoptimator/planner/PipelineRules.java | 4 ++- .../hoptimator/util/DelegatingConnection.java | 4 +-- .../linkedin/hoptimator/util/Template.java | 2 +- .../hoptimator/util/planner/EngineRules.java | 36 +------------------ .../util/planner/PipelineRules.java | 4 ++- .../util/planner/RemoteConvention.java | 5 ++- .../hoptimator/util/planner/RemoteRel.java | 1 - .../planner/RemoteToEnumerableConverter.java | 35 ++++++++---------- 16 files changed, 49 insertions(+), 89 deletions(-) diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java index dce93d6..8558bac 100644 --- a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java @@ -7,6 +7,8 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -22,18 +24,18 @@ public void convertsNestedSchemas() { Schema avroSchema1 = (new Schema.Parser()).parse(schemaString); RelDataType rel1 = AvroConverter.rel(avroSchema1); assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size()); - assertTrue(rel1.toString(), rel1.getField("h", false, false) != null); + assertNotNull(rel1.toString(), rel1.getField("h", false, false)); RelDataType rel2 = rel1.getField("h", false, false).getType(); assertTrue(rel2.toString(), rel2.isNullable()); Schema avroSchema2 = avroSchema1.getField("h").schema().getTypes().get(1); assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getFields().size()); - assertTrue(rel2.toString(), rel2.getField("A", false, false) != null); + assertNotNull(rel2.toString(), rel2.getField("A", false, false)); RelDataType rel3 = rel2.getField("A", false, false).getType(); assertTrue(rel3.toString(), rel3.isNullable()); Schema avroSchema3 = avroSchema2.getField("A").schema().getTypes().get(1); assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size()); Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1); - assertTrue("!avroSchema4.isNullable()", !avroSchema4.isNullable()); + assertFalse("!avroSchema4.isNullable()", avroSchema4.isNullable()); assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount()); Schema avroSchema5 = AvroConverter.avro("NS", "R", rel2); assertTrue("avroSchema5.isNullable()", avroSchema5.isNullable()); diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/DataTypeTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/DataTypeTest.java index c42b558..35482e7 100644 --- a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/DataTypeTest.java +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/DataTypeTest.java @@ -3,7 +3,9 @@ import org.apache.calcite.rel.type.RelDataType; import org.junit.Test; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; public class DataTypeTest { @@ -13,12 +15,12 @@ public void skipsNestedRows() { DataType.Struct struct = DataType.struct().with("one", DataType.VARCHAR).with("two", DataType.struct().with("three", DataType.VARCHAR)); RelDataType row1 = struct.rel(); - assertTrue(row1.toString(), row1.getFieldCount() == 2); - assertTrue(row1.toString(), row1.getField("one", false, false) != null); - assertTrue(row1.toString(), row1.getField("two", false, false) != null); + assertEquals(row1.toString(), 2, row1.getFieldCount()); + assertNotNull(row1.toString(), row1.getField("one", false, false)); + assertNotNull(row1.toString(), row1.getField("two", false, false)); RelDataType row2 = struct.dropNestedRows().rel(); - assertTrue(row2.toString(), row2.getFieldCount() == 1); - assertTrue(row2.toString(), row2.getField("one", false, false) != null); - assertTrue(row2.toString(), row2.getField("two", false, false) == null); + assertEquals(row2.toString(), 1, row2.getFieldCount()); + assertNotNull(row2.toString(), row2.getField("one", false, false)); + assertNull(row2.toString(), row2.getField("two", false, false)); } } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java index 00aa361..a3d6830 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java @@ -9,6 +9,8 @@ import java.time.Duration; import java.util.Optional; +import org.apache.commons.lang3.StringUtils; + import io.kubernetes.client.apimachinery.GroupVersion; import io.kubernetes.client.common.KubernetesListObject; import io.kubernetes.client.common.KubernetesObject; @@ -19,7 +21,6 @@ import io.kubernetes.client.util.KubeConfig; import io.kubernetes.client.util.generic.GenericKubernetesApi; import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi; -import org.apache.commons.lang3.StringUtils; public class K8sContext { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngine.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngine.java index 5ba3df6..9bb86ba 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngine.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngine.java @@ -1,14 +1,13 @@ package com.linkedin.hoptimator.k8s; +import java.util.Objects; import javax.sql.DataSource; +import org.apache.calcite.adapter.jdbc.JdbcSchema; + import com.linkedin.hoptimator.Engine; import com.linkedin.hoptimator.SqlDialect; -import java.util.Objects; - -import org.apache.calcite.adapter.jdbc.JdbcSchema; - public class K8sEngine implements Engine { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java index fb82d38..dfcfcab 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java @@ -2,11 +2,9 @@ import java.util.Arrays; import java.util.List; -import java.util.Locale; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.calcite.adapter.jdbc.JdbcSchema; import org.apache.calcite.schema.Schema; import io.kubernetes.client.openapi.models.V1ObjectMeta; @@ -16,7 +14,6 @@ import com.linkedin.hoptimator.k8s.models.V1alpha1Engine; import com.linkedin.hoptimator.k8s.models.V1alpha1EngineList; import com.linkedin.hoptimator.k8s.models.V1alpha1EngineSpec; -import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema; public class K8sEngineTable extends K8sTable { @@ -56,8 +53,8 @@ public List forDatabase(String database) { public Row toRow(V1alpha1Engine obj) { return new Row(obj.getMetadata().getName(), obj.getSpec().getUrl(), Optional.ofNullable(obj.getSpec().getDialect()).map(x -> x.toString()).orElseGet(() -> null), - obj.getSpec().getDriver(), obj.getSpec().getDatabases() != null ? - obj.getSpec().getDatabases().toArray(new String[0]) : null); + obj.getSpec().getDriver(), obj.getSpec().getDatabases() != null + ? obj.getSpec().getDatabases().toArray(new String[0]) : null); } @Override diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java index 26b2cd9..c055cca 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java @@ -2,7 +2,6 @@ import java.sql.SQLException; import java.util.List; -import java.util.Locale; import java.util.stream.Collectors; import com.linkedin.hoptimator.Source; diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sUtils.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sUtils.java index a618b3e..86dd90c 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sUtils.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sUtils.java @@ -2,8 +2,8 @@ import java.util.Collection; import java.util.Locale; -import java.util.stream.Stream; import java.util.stream.Collectors; +import java.util.stream.Stream; import io.kubernetes.client.common.KubernetesType; diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java index a559285..5122179 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java @@ -117,8 +117,7 @@ private boolean isReady(String yaml) { String kind = obj.getKind(); try { KubernetesApiResponse existing = - context.dynamic(obj.getApiVersion(), K8sUtils.guessPlural(obj)) - .get(obj.getMetadata().getNamespace(), obj.getMetadata().getName()); + context.dynamic(obj.getApiVersion(), K8sUtils.guessPlural(obj)).get(namespace, name); existing.onFailure((code, status) -> log.warn("Failed to fetch {}/{}: {}.", kind, name, status.getMessage())); if (!existing.isSuccess()) { return false; diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRules.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRules.java index 2fb5f2d..51c93d0 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRules.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRules.java @@ -31,6 +31,8 @@ import org.apache.calcite.rex.RexProgram; import org.apache.calcite.schema.Table; +import com.google.common.collect.ImmutableSet; + import com.linkedin.hoptimator.catalog.HopRel; import com.linkedin.hoptimator.catalog.HopTable; import com.linkedin.hoptimator.catalog.HopTableScan; @@ -152,7 +154,7 @@ static class PipelineProject extends Project implements PipelineRel { PipelineProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, List projects, RelDataType rowType) { - super(cluster, traitSet, Collections.emptyList(), input, projects, rowType); + super(cluster, traitSet, Collections.emptyList(), input, projects, rowType, ImmutableSet.of()); assert getConvention() == PipelineRel.CONVENTION; assert input.getConvention() == PipelineRel.CONVENTION; } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingConnection.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingConnection.java index 886b219..77a46b0 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingConnection.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingConnection.java @@ -1,6 +1,5 @@ package com.linkedin.hoptimator.util; -import java.util.Properties; import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; @@ -18,6 +17,7 @@ import java.sql.Statement; import java.sql.Struct; import java.util.Map; +import java.util.Properties; import java.util.concurrent.Executor; class DelegatingConnection implements Connection { @@ -77,7 +77,7 @@ public DatabaseMetaData getMetaData() throws SQLException { public void setTransactionIsolation(int level) throws SQLException { // nop } - + @Override public int getTransactionIsolation() throws SQLException { return Connection.TRANSACTION_NONE; diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java index d671703..d312cb9 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java @@ -147,7 +147,7 @@ public String getOrDefault(String key, Supplier f) { /** * Replaces `{{var}}` in a template file with the corresponding variable. * - * Default values can supplied with `{{var:default}}`. + * Default values can be supplied with `{{var:default}}`. * * Built-in transformations can be applied to variables, including: * diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java index 6aed614..db948fc 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java @@ -2,15 +2,9 @@ import java.util.Collections; -import org.apache.calcite.adapter.jdbc.JdbcConvention; -import org.apache.calcite.adapter.jdbc.JdbcImplementor; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.linq4j.tree.Expression; -import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.InvalidRelException; @@ -20,38 +14,10 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexProgram; -import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.dialect.AnsiSqlDialect; -import org.apache.calcite.sql.dialect.CalciteSqlDialect; import org.apache.calcite.sql.dialect.MysqlSqlDialect; -import org.apache.calcite.sql.type.SqlTypeFactoryImpl; - -import org.apache.calcite.adapter.enumerable.EnumerableConvention; -import org.apache.calcite.adapter.enumerable.EnumerableRel; -import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; -import org.apache.calcite.adapter.enumerable.JavaRowFormat; -import org.apache.calcite.adapter.enumerable.PhysType; -import org.apache.calcite.adapter.enumerable.PhysTypeImpl; -import org.apache.calcite.linq4j.tree.BlockBuilder; -import org.apache.calcite.linq4j.tree.Expression; -import org.apache.calcite.linq4j.tree.Expressions; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterImpl; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.runtime.Hook; -import org.apache.calcite.util.BuiltInMethod; import com.linkedin.hoptimator.Engine; @@ -66,7 +32,7 @@ public EngineRules(Engine engine) { } public void register(HoptimatorJdbcConvention inTrait, RelOptPlanner planner) { - RemoteConvention remote = inTrait.remoteConventionForEngine(engine); + RemoteConvention remote = inTrait.remoteConventionForEngine(engine); planner.addRule(RemoteToEnumerableConverterRule.create(remote)); planner.addRule(RemoteJoinRule.Config.INSTANCE .withConversion(PipelineRules.PipelineJoin.class, PipelineRel.CONVENTION, remote, "RemoteJoinRule") diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java index 1f9269f..012766a 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java @@ -39,6 +39,8 @@ import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import com.google.common.collect.ImmutableSet; + import com.linkedin.hoptimator.util.DataTypeUtils; @@ -216,7 +218,7 @@ static class PipelineProject extends Project implements PipelineRel { PipelineProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, List projects, RelDataType rowType) { - super(cluster, traitSet, Collections.emptyList(), input, projects, rowType); + super(cluster, traitSet, Collections.emptyList(), input, projects, rowType, ImmutableSet.of()); assert getConvention() == PipelineRel.CONVENTION; assert input.getConvention() == PipelineRel.CONVENTION; } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteConvention.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteConvention.java index 4de6955..1a3a070 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteConvention.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteConvention.java @@ -1,10 +1,9 @@ package com.linkedin.hoptimator.util.planner; -import com.linkedin.hoptimator.Engine; - -import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.plan.Convention; +import com.linkedin.hoptimator.Engine; + class RemoteConvention extends Convention.Impl { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteRel.java index b55968b..973df42 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteRel.java @@ -1,7 +1,6 @@ package com.linkedin.hoptimator.util.planner; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.plan.Convention; public interface RemoteRel extends RelNode { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java index d48d964..f207eca 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java @@ -18,8 +18,15 @@ */ package com.linkedin.hoptimator.util.planner; -import com.linkedin.hoptimator.util.DelegatingDataSource; -import com.linkedin.hoptimator.util.DeploymentService; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; +import java.util.stream.Collectors; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.enumerable.EnumerableRel; @@ -28,9 +35,6 @@ import org.apache.calcite.adapter.enumerable.PhysType; import org.apache.calcite.adapter.enumerable.PhysTypeImpl; import org.apache.calcite.adapter.enumerable.RexImpTable; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.adapter.jdbc.JdbcConvention; -import org.apache.calcite.adapter.jdbc.JdbcRel; import org.apache.calcite.config.CalciteSystemProperty; import org.apache.calcite.linq4j.tree.BlockBuilder; import org.apache.calcite.linq4j.tree.ConstantExpression; @@ -51,31 +55,19 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.runtime.Hook; import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.schema.Schemas; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.dialect.MysqlSqlDialect; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.SqlString; import org.apache.calcite.util.BuiltInMethod; - import org.checkerframework.checker.nullness.qual.Nullable; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import java.util.TimeZone; -import java.util.stream.Collectors; -import javax.sql.DataSource; - -import static org.apache.calcite.linq4j.Nullness.castNonNull; +import com.linkedin.hoptimator.util.DelegatingDataSource; +import com.linkedin.hoptimator.util.DeploymentService; import static java.util.Objects.requireNonNull; +import static org.apache.calcite.linq4j.Nullness.castNonNull; /** * Relational expression representing a scan of a table in a JDBC data source. @@ -199,7 +191,7 @@ private SqlString generateSql(SqlDialect dialect) { builder0.add( Expressions.statement( Expressions.call(dataSource, "setUrl", Expressions.constant(dataSourceUrl)))); - + final Expression enumerable; if (sqlString.getDynamicParameters() != null @@ -320,6 +312,7 @@ private static void generateGet(EnumerableRelImplementor implementor, source = Expressions.call(resultSet_, jdbcGetMethod(primitive), Expressions.constant(i + 1)); + break; } builder.add( Expressions.statement( From d37321eb275f8c27443452f8dbbd1598e023f60d Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Thu, 23 Jan 2025 10:26:23 -0500 Subject: [PATCH 3/4] Resolve complex array types --- .../hoptimator/catalog/AvroConverter.java | 36 ++++++++++++------- .../hoptimator/util/DataTypeUtils.java | 31 +++++++++++----- .../hoptimator/util/planner/PipelineRel.java | 10 ++---- .../util/planner/ScriptImplementor.java | 29 ++++++++++++--- .../hoptimator/util/TestDataTypeUtils.java | 36 ++++++++++++++----- .../util/planner/TestPipelineRel.java | 8 ++--- .../hoptimator/venice/VeniceStore.java | 14 ++++++-- 7 files changed, 116 insertions(+), 48 deletions(-) diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java index dd7d1b3..feec012 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java @@ -79,45 +79,55 @@ private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean } public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { + return rel(schema, typeFactory, false); + } + + /** Converts Avro Schema to RelDataType. + * Nullability is preserved except for array types, JDBC is incapable of interpreting e.g. "FLOAT NOT NULL ARRAY" + * causing "NOT NULL" arrays to get demoted to "ANY ARRAY" which is not desired. + */ + public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) { RelDataType unknown = typeFactory.createUnknownType(); switch (schema.getType()) { case RECORD: return typeFactory.createStructType(schema.getFields() .stream() - .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory))) + .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable))) .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) .collect(Collectors.toList())); case INT: - return createRelType(typeFactory, SqlTypeName.INTEGER); + return createRelType(typeFactory, SqlTypeName.INTEGER, nullable); case LONG: - return createRelType(typeFactory, SqlTypeName.BIGINT); + return createRelType(typeFactory, SqlTypeName.BIGINT, nullable); case ENUM: case FIXED: case STRING: - return createRelType(typeFactory, SqlTypeName.VARCHAR); + return createRelType(typeFactory, SqlTypeName.VARCHAR, nullable); + case BYTES: + return createRelType(typeFactory, SqlTypeName.VARBINARY, nullable); case FLOAT: - return createRelType(typeFactory, SqlTypeName.FLOAT); + return createRelType(typeFactory, SqlTypeName.FLOAT, nullable); case DOUBLE: - return createRelType(typeFactory, SqlTypeName.DOUBLE); + return createRelType(typeFactory, SqlTypeName.DOUBLE, nullable); case BOOLEAN: - return createRelType(typeFactory, SqlTypeName.BOOLEAN); + return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable); case ARRAY: - return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1); + return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1); // TODO support map types // Appears to require a Calcite version bump // case MAP: -// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory)); +// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)) case UNION: if (schema.isNullable() && schema.getTypes().size() == 2) { Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get(); - return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true); + return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true); } else { // TODO support more elaborate union types return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true); } default: - return typeFactory.createUnknownType(); + return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), nullable); } } @@ -125,9 +135,9 @@ public static RelDataType rel(Schema schema) { return rel(schema, DataType.DEFAULT_TYPE_FACTORY); } - private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) { + private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) { RelDataType rawType = typeFactory.createSqlType(typeName); - return typeFactory.createTypeWithNullability(rawType, false); + return typeFactory.createTypeWithNullability(rawType, nullable); } public static RelProtoDataType proto(Schema schema) { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java index 7885ab1..f0d9bb9 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java @@ -24,9 +24,9 @@ private DataTypeUtils() { * Nested structs like `FOO Row(BAR Row(QUX VARCHAR))` are promoted to * top-level fields like `FOO$BAR$QUX VARCHAR`. * - * Complex arrays are demoted to just `ANY ARRAY`. Primitive arrays are - * unchanged. - * + * Complex arrays like `FOO Row(BAR Row(QUX VARCHAR)) ARRAY` are promoted to + * top-level fields like `FOO ANY ARRAY` and `FOO$BAR$QUX VARCHAR`. + * Primitive arrays are unchanged. */ public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeFactory) { if (!dataType.isStruct()) { @@ -40,11 +40,14 @@ public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeF private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType dataType, RelDataTypeFactory.Builder builder, List path) { if (dataType.getComponentType() != null && dataType.getComponentType().isStruct()) { - // demote complex arrays to just `ANY ARRAY` - builder.add(path.stream().collect(Collectors.joining("$")), typeFactory.createArrayType( + builder.add(String.join("$", path), typeFactory.createArrayType( typeFactory.createSqlType(SqlTypeName.ANY), -1)); + for (RelDataTypeField field : dataType.getComponentType().getFieldList()) { + flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(), + Stream.of(field.getName())).collect(Collectors.toList())); + } } else if (!dataType.isStruct()) { - builder.add(path.stream().collect(Collectors.joining("$")), dataType); + builder.add(String.join("$", path), dataType); } else { for (RelDataTypeField field : dataType.getFieldList()) { flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(), @@ -53,7 +56,10 @@ private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType data } } - /** Restructures flattened types, from `FOO$BAR VARCHAR` to `FOO Row(BAR VARCHAR...)` */ + /** Restructures flattened types, from `FOO$BAR VARCHAR` to `FOO Row(BAR VARCHAR...)` + * The combination of fields `FOO ANY ARRAY` and `FOO$BAR$QUX VARCHAR` is reconstructed + * into `FOO Row(BAR Row(QUX VARCHAR)) ARRAY` + */ public static RelDataType unflatten(RelDataType dataType, RelDataTypeFactory typeFactory) { if (!dataType.isStruct()) { throw new IllegalArgumentException("Can only unflatten a struct type."); @@ -76,16 +82,25 @@ private static void buildNodes(Node pos, String name, RelDataType dataType) { } private static RelDataType buildRecord(Node node, RelDataTypeFactory typeFactory) { - if (node.dataType != null) { + if (node.dataType != null && !isComplexArray(node.dataType)) { return node.dataType; } RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); for (Map.Entry child : node.children.entrySet()) { builder.add(child.getKey(), buildRecord(child.getValue(), typeFactory)); } + if (isComplexArray(node.dataType)) { + return typeFactory.createArrayType(builder.build(), -1); + } return builder.build(); } + + private static boolean isComplexArray(RelDataType dataType) { + return dataType != null && dataType.getComponentType() != null + && (dataType.getComponentType().getSqlTypeName().equals(SqlTypeName.ANY)); + } + private static class Node { RelDataType dataType; LinkedHashMap children = new LinkedHashMap<>(); diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index 5d633f9..ab47ef3 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -14,8 +14,7 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.util.Litmus; import org.apache.calcite.util.Pair; @@ -29,7 +28,6 @@ import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.SqlDialect; import com.linkedin.hoptimator.util.ConnectionService; -import com.linkedin.hoptimator.util.DataTypeUtils; import com.linkedin.hoptimator.util.DeploymentService; @@ -102,15 +100,13 @@ public void setSink(String database, List path, RelDataType rowType, Map static Map addKeysAsOption(Map options, RelDataType rowType) { Map newOptions = new LinkedHashMap<>(options); - RelDataType flattened = DataTypeUtils.flatten(rowType, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)); - // If the keys are already set, don't overwrite them if (newOptions.containsKey(KEY_OPTION)) { return newOptions; } - String keyString = flattened.getFieldList().stream() - .map(x -> x.getName().replaceAll("\\$", "_")) + String keyString = rowType.getFieldList().stream() + .map(RelDataTypeField::getName) .filter(name -> name.startsWith(KEY_PREFIX)) .collect(Collectors.joining(";")); if (!keyString.isEmpty()) { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 0bd150d..118ca50 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -429,17 +429,38 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) { if (dataType.isStruct()) { List fieldNames = dataType.getFieldList() .stream() - .map(x -> x.getName()) + .map(RelDataTypeField::getName) .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) .collect(Collectors.toList()); - List fieldTypes = - dataType.getFieldList().stream().map(x -> x.getType()).map(x -> toSpec(x)).collect(Collectors.toList()); + List fieldTypes = dataType.getFieldList() + .stream() + .map(RelDataTypeField::getType) + .map(RowTypeSpecImplementor::toSpec) + .collect(Collectors.toList()); return maybeNullable(dataType, new SqlDataTypeSpec(new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes), SqlParserPos.ZERO)); } else if (dataType.getComponentType() != null) { + // To handle ROW ARRAY types + if (dataType.getComponentType().isStruct()) { + List fieldNames = dataType.getComponentType().getFieldList() + .stream() + .map(RelDataTypeField::getName) + .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) + .collect(Collectors.toList()); + List fieldTypes = dataType.getComponentType().getFieldList() + .stream() + .map(RelDataTypeField::getType) + .map(RowTypeSpecImplementor::toSpec) + .collect(Collectors.toList()); + return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec( + new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes), + dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO)); + } + + // To handle primitive ARRAY types, e.g. `FLOAT ARRAY`. return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(new SqlBasicTypeNameSpec( Optional.ofNullable(dataType.getComponentType()) - .map(x -> x.getSqlTypeName()) + .map(RelDataType::getSqlTypeName) .orElseThrow(() -> new IllegalArgumentException("not a collection?")), SqlParserPos.ZERO), dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO)); } else { diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java index 8504bfe..e8d78de 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java @@ -8,6 +8,7 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; @@ -35,9 +36,9 @@ public void flattenUnflatten() { Assertions.assertEquals(2, rowType.getFieldList().size()); RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory); Assertions.assertEquals(3, flattenedType.getFieldList().size()); - List flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName()) + List flattenedNames = flattenedType.getFieldList().stream().map(RelDataTypeField::getName) .collect(Collectors.toList()); - Assertions.assertIterableEquals(Arrays.asList(new String[]{"FOO$QUX", "FOO$QIZ", "BAR$BAZ"}), + Assertions.assertIterableEquals(Arrays.asList("FOO$QUX", "FOO$QIZ", "BAR$BAZ"), flattenedNames); RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory); RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW); @@ -53,11 +54,12 @@ public void flattenUnflatten() { } @Test - public void flattenNestedArrays() { + public void flattenUnflattenNestedArrays() { RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); RelDataTypeFactory.Builder builder1 = new RelDataTypeFactory.Builder(typeFactory); builder1.add("QUX", SqlTypeName.VARCHAR); - builder1.add("QIZ", SqlTypeName.VARCHAR); + builder1.add("QIZ", typeFactory.createArrayType( + typeFactory.createSqlType(SqlTypeName.VARCHAR), -1)); RelDataTypeFactory.Builder builder2 = new RelDataTypeFactory.Builder(typeFactory); builder2.add("BAZ", SqlTypeName.VARCHAR); RelDataTypeFactory.Builder builder3 = new RelDataTypeFactory.Builder(typeFactory); @@ -68,15 +70,31 @@ public void flattenNestedArrays() { RelDataType rowType = builder3.build(); Assertions.assertEquals(3, rowType.getFieldList().size()); RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory); - Assertions.assertEquals(3, flattenedType.getFieldList().size()); - List flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName()) + Assertions.assertEquals(6, flattenedType.getFieldList().size()); + List flattenedNames = flattenedType.getFieldList().stream().map(RelDataTypeField::getName) .collect(Collectors.toList()); - Assertions.assertIterableEquals(Arrays.asList(new String[]{"FOO", "BAR", "CAR"}), + Assertions.assertIterableEquals(Arrays.asList("FOO", "FOO$QUX", "FOO$QIZ", "BAR", "BAR$BAZ", "CAR"), flattenedNames); String flattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1", flattenedType, Collections.emptyMap()).sql(); - Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` (`FOO` ANY ARRAY, " - + "`BAR` ANY ARRAY, `CAR` FLOAT ARRAY) WITH ();", flattenedConnector, + Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` (" + + "`FOO` ANY ARRAY, `FOO_QUX` VARCHAR, `FOO_QIZ` VARCHAR ARRAY, " + + "`BAR` ANY ARRAY, `BAR_BAZ` VARCHAR, " + + "`CAR` FLOAT ARRAY) WITH ();", flattenedConnector, "Flattened connector should have simplified arrays"); + + RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory); + RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW); + String originalConnector = new ScriptImplementor.ConnectorImplementor("S", "T1", + rowType, Collections.emptyMap()).sql(); + String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1", + unflattenedType, Collections.emptyMap()).sql(); + Assertions.assertEquals(originalConnector, unflattenedConnector, + "Flattening and unflattening data types should have no impact on connector"); + Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` (" + + "`FOO` ROW(`QUX` VARCHAR, `QIZ` VARCHAR ARRAY) ARRAY, " + + "`BAR` ROW(`BAZ` VARCHAR) ARRAY, " + + "`CAR` FLOAT ARRAY) WITH ();", unflattenedConnector, + "Flattened-unflattened connector should be correct"); } } diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/TestPipelineRel.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/TestPipelineRel.java index bdbaa6d..a292999 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/TestPipelineRel.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/TestPipelineRel.java @@ -24,15 +24,13 @@ public void testKeyOptions() { Map keyOptions = addKeysAsOption(new HashMap<>(), primitiveKeyBuilder.build()); assertTrue(keyOptions.isEmpty()); - RelDataTypeFactory.Builder keyBuilder = new RelDataTypeFactory.Builder(typeFactory); - keyBuilder.add("keyInt", SqlTypeName.INTEGER); - keyBuilder.add("keyString", SqlTypeName.VARCHAR); RelDataTypeFactory.Builder recordBuilder = new RelDataTypeFactory.Builder(typeFactory); + recordBuilder.add("KEY_int", SqlTypeName.INTEGER); + recordBuilder.add("KEY_string", SqlTypeName.VARCHAR); recordBuilder.add("intField", SqlTypeName.INTEGER); - recordBuilder.add("KEY", keyBuilder.build()); keyOptions = addKeysAsOption(new HashMap<>(), recordBuilder.build()); assertEquals(3, keyOptions.size()); - assertEquals("KEY_keyInt;KEY_keyString", keyOptions.get("keys")); + assertEquals("KEY_int;KEY_string", keyOptions.get("keys")); assertEquals("KEY_", keyOptions.get("keyPrefix")); assertEquals("RECORD", keyOptions.get("keyType")); } diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java index e4d97cf..ef059ea 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java @@ -3,6 +3,7 @@ import org.apache.avro.Schema; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.impl.AbstractTable; import com.linkedin.hoptimator.avro.AvroConverter; @@ -13,6 +14,8 @@ /** A batch of records from a Venice store. */ public class VeniceStore extends AbstractTable { + private static final String KEY_PREFIX = "KEY_"; + private final StoreSchemaFetcher storeSchemaFetcher; public VeniceStore(StoreSchemaFetcher storeSchemaFetcher) { @@ -25,12 +28,19 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { Schema valueSchema = storeSchemaFetcher.getLatestValueSchema(); // Venice contains both a key schema and a value schema. Since we need to pass back one joint schema, - // and to avoid name collisions, all key fields are structured as "KEY$foo". + // and to avoid name collisions, all key fields are flattened as "KEY_foo". + // A primitive key will be a single field with name "KEY". RelDataType key = rel(keySchema, typeFactory); RelDataType value = rel(valueSchema, typeFactory); RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); builder.addAll(value.getFieldList()); - builder.add("KEY", key); + if (key.isStruct()) { + for (RelDataTypeField field: key.getFieldList()) { + builder.add(KEY_PREFIX + field.getName(), field.getType()); + } + } else { + builder.add("KEY", key); + } RelDataType combinedSchema = builder.build(); return DataTypeUtils.flatten(combinedSchema, typeFactory); } From e3569acee356fa8e40f45ff6f389ab49a80f86f2 Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Fri, 24 Jan 2025 13:05:47 -0500 Subject: [PATCH 4/4] Fix integration tests --- hoptimator-venice/src/test/resources/venice-ddl-insert-all.id | 2 +- .../src/test/resources/venice-ddl-insert-partial.id | 4 ++-- hoptimator-venice/src/test/resources/venice-ddl-select.id | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id index 428a1bc..5af869e 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id @@ -13,7 +13,7 @@ spec: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id index ca3c590..bb89c80 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id @@ -1,7 +1,7 @@ !set outputformat mysql !use k8s -insert into "VENICE-CLUSTER0"."test-store-1" ("KEY$id", "intField") select "KEY$id", "stringField" from "VENICE-CLUSTER0"."test-store"; +insert into "VENICE-CLUSTER0"."test-store-1" ("KEY_id", "intField") select "KEY_id", "stringField" from "VENICE-CLUSTER0"."test-store"; apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: @@ -13,7 +13,7 @@ spec: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store` diff --git a/hoptimator-venice/src/test/resources/venice-ddl-select.id b/hoptimator-venice/src/test/resources/venice-ddl-select.id index 8d9cbf7..eb0e38b 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-select.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-select.id @@ -13,7 +13,7 @@ spec: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - CREATE DATABASE IF NOT EXISTS `PIPELINE` WITH () - CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH () - INSERT INTO `PIPELINE`.`SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1`