diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractCDCBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractCDCBuilder.java index d45d6f91f9..16eca6ff3b 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractCDCBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractCDCBuilder.java @@ -81,16 +81,11 @@ public String getSchemaFieldName() { return "schema"; } - public Map> parseMetaDataConfigs() { - Map> allConfigMap = new HashMap<>(); - for (String schema : getSchemaList()) { - String url = generateUrl(schema); - allConfigMap.put(schema, parseMetaDataSingleConfig(url)); - } - return allConfigMap; + public Map generateMetaDataConfig(String schema) { + return parseMetaDataSingleConfig(generateUrl(schema)); } - public Map parseMetaDataSingleConfig(String url) { + protected Map parseMetaDataSingleConfig(String url) { Map configMap = new HashMap<>(); configMap.put(ClientConstant.METADATA_NAME, url); configMap.put(ClientConstant.METADATA_URL, url); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/CDCBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/CDCBuilder.java index f850f8348f..8594a91eeb 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/CDCBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/CDCBuilder.java @@ -19,7 +19,6 @@ package org.dinky.cdc; -import org.dinky.data.exception.SplitTableException; import org.dinky.data.model.FlinkCDCConfig; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -40,11 +39,7 @@ public interface CDCBuilder { List getTableList(); - Map> parseMetaDataConfigs(); + Map generateMetaDataConfig(String schema); String getSchemaFieldName(); - - default Map parseMetaDataConfig() { - throw new SplitTableException("此数据源并未实现分库分表"); - } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/CDCBuilderFactory.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/CDCBuilderFactory.java index d223a86109..ba3fb22eab 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/CDCBuilderFactory.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/CDCBuilderFactory.java @@ -20,6 +20,7 @@ package org.dinky.cdc; import org.dinky.assertion.Asserts; +import org.dinky.cdc.kafka.KafkaSourceBuilder; import org.dinky.cdc.mysql.MysqlCDCBuilder; import org.dinky.cdc.oracle.OracleCDCBuilder; import org.dinky.cdc.postgres.PostgresCDCBuilder; @@ -41,6 +42,7 @@ private CDCBuilderFactory() {} .put(OracleCDCBuilder.KEY_WORD, OracleCDCBuilder::new) .put(PostgresCDCBuilder.KEY_WORD, PostgresCDCBuilder::new) .put(SqlServerCDCBuilder.KEY_WORD, SqlServerCDCBuilder::new) + .put(KafkaSourceBuilder.KEY_WORD, KafkaSourceBuilder::new) .build(); public static CDCBuilder buildCDCBuilder(FlinkCDCConfig config) { diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSourceBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSourceBuilder.java new file mode 100644 index 0000000000..c571e07c3a --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSourceBuilder.java @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.cdc.kafka; + +import org.dinky.assertion.Asserts; +import org.dinky.cdc.AbstractCDCBuilder; +import org.dinky.cdc.CDCBuilder; +import org.dinky.data.model.FlinkCDCConfig; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Map; + +public class KafkaSourceBuilder extends AbstractCDCBuilder { + + public static final String KEY_WORD = "kafka"; + + public KafkaSourceBuilder() {} + + public KafkaSourceBuilder(FlinkCDCConfig config) { + super(config); + } + + @Override + public String getHandle() { + return KEY_WORD; + } + + @Override + public CDCBuilder create(FlinkCDCConfig config) { + return new KafkaSourceBuilder(config); + } + + @Override + public DataStreamSource build(StreamExecutionEnvironment env) { + Map source = config.getSource(); + String brokers = source.get("properties.bootstrap.servers"); + String topic = source.get("topic"); + String groupId = source.get("properties.group.id"); + String scanBoundedSpecificOffsets = source.get("scan.bounded.specific-offsets"); + String scanBoundedTimestampMillis = source.get("scan.bounded.timestamp-millis"); + + final org.apache.flink.connector.kafka.source.KafkaSourceBuilder sourceBuilder = + KafkaSource.builder() + .setBootstrapServers(brokers) + .setValueOnlyDeserializer(new SimpleStringSchema()); + + if (Asserts.isNotNullString(topic)) { + sourceBuilder.setTopics(topic); + } + + if (Asserts.isNotNullString(groupId)) { + sourceBuilder.setGroupId(groupId); + } + + if (Asserts.isNotNullString(config.getStartupMode())) { + switch (config.getStartupMode().toLowerCase()) { + case "earliest-offset": + sourceBuilder.setStartingOffsets(OffsetsInitializer.earliest()); + break; + case "latest-offset": + sourceBuilder.setStartingOffsets(OffsetsInitializer.latest()); + break; + case "group-offsets": + sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets()); + break; + /*If specific-offsets is specified, another config option scan.bounded.specific-offsets + is required to specify specific bounded offsets for each partition, e.g. an option value + partition:0,offset:42;partition:1,offset:300 indicates offset 42 for partition 0 and offset 300 + for partition 1. If an offset for a partition is not provided it will not consume from that partition.*/ + /*case "specific-offset": + if (Asserts.isNotNullString(scanBoundedSpecificOffsets)) { + sourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(scanBoundedSpecificOffsets)); + } else { + throw new RuntimeException("No specific offset parameter specified."); + } + break;*/ + case "timestamp": + if (Asserts.isNotNullString(scanBoundedTimestampMillis)) { + sourceBuilder.setStartingOffsets( + OffsetsInitializer.timestamp(Long.valueOf(scanBoundedTimestampMillis))); + } else { + throw new RuntimeException("No timestamp parameter specified."); + } + break; + default: + } + } else { + sourceBuilder.setStartingOffsets(OffsetsInitializer.latest()); + } + + return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "Kafka Source"); + } + + @Override + public String getSchemaFieldName() { + return "db"; + } + + @Override + public String getSchema() { + return config.getDatabase(); + } + + @Override + protected String getMetadataType() { + return null; + } + + @Override + protected String generateUrl(String schema) { + return null; + } +} diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java index f9b8d2ff4a..f1d1c4d213 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java @@ -213,14 +213,6 @@ public DataStreamSource build(StreamExecutionEnvironment env) { return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source"); } - @Override - public Map parseMetaDataConfig() { - String url = String.format( - "jdbc:mysql://%s:%d/%s", - config.getHostname(), config.getPort(), composeJdbcProperties(config.getJdbc())); - return parseMetaDataSingleConfig(url); - } - @Override public String getSchemaFieldName() { return "db"; diff --git a/dinky-common/src/main/java/org/dinky/data/enums/TableType.java b/dinky-common/src/main/java/org/dinky/data/enums/TableType.java index 710b59c7f0..1944ff986c 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/TableType.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/TableType.java @@ -19,17 +19,16 @@ package org.dinky.data.enums; -/** 分库分表的类型 */ +/** The types of database and table division */ public enum TableType { - /** 分库分表 */ + /** Separation of databases and tables */ SPLIT_DATABASE_AND_TABLE, - /** 分表单库 */ + /** Database division with single table */ SPLIT_DATABASE_AND_SINGLE_TABLE, - /** 单库分表 */ - SINGLE_DATABASE_AND_SPLIT_TABLE - /** 单库单表 */ - , + /** Single database with table division */ + SINGLE_DATABASE_AND_SPLIT_TABLE, + /** Single database and single table */ SINGLE_DATABASE_AND_TABLE; public static TableType type(boolean splitDatabase, boolean splitTable) { diff --git a/dinky-common/src/main/java/org/dinky/data/model/FlinkCDCConfig.java b/dinky-common/src/main/java/org/dinky/data/model/FlinkCDCConfig.java index 406954b6bb..7404c6b47e 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/FlinkCDCConfig.java +++ b/dinky-common/src/main/java/org/dinky/data/model/FlinkCDCConfig.java @@ -19,6 +19,8 @@ package org.dinky.data.model; +import org.dinky.assertion.Asserts; + import java.util.Arrays; import java.util.List; import java.util.Map; @@ -176,6 +178,10 @@ public String getSinkConfigurationString() { .collect(Collectors.joining(",\n")); } + public boolean isAutoCreateSchemaAndTables() { + return Asserts.isEqualsIgnoreCase(sink.get(FlinkCDCConfig.AUTO_CREATE), "true"); + } + public String getType() { return type; } diff --git a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java index 4b49b7c9ef..0c247a7717 100644 --- a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java +++ b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java @@ -89,10 +89,8 @@ public TableResult execute(Executor executor) { config.setMockTest(executor.isMockTest()); try { CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config); - Map> allConfigMap = cdcBuilder.parseMetaDataConfigs(); config.setSchemaFieldName(cdcBuilder.getSchemaFieldName()); SinkBuilder sinkBuilder = SinkBuilderFactory.buildSinkBuilder(config); - final List schemaNameList = cdcBuilder.getSchemaList(); final List tableRegList = cdcBuilder.getTableList(); final List schemaList = new LinkedList<>(); @@ -100,7 +98,7 @@ public TableResult execute(Executor executor) { // Scenario of dividing databases and tables if (SplitUtil.isEnabled(cdcSource.getSplit())) { logger.info("Split table or database mode is enabled..."); - Map confMap = cdcBuilder.parseMetaDataConfig(); + Map confMap = cdcBuilder.generateMetaDataConfig(""); Driver driver = Driver.buildWithOutPool(confMap.get("name"), confMap.get("type"), JsonUtils.toMap(confMap)); @@ -109,8 +107,7 @@ public TableResult execute(Executor executor) { .map(x -> x.replaceFirst("\\\\.", ".")) .collect(Collectors.toList())); - Driver sinkDriver = checkAndCreateSinkSchema(config, schemaTableNameList.get(0)); - + // target tables (merged tables) Set tables = driver.getSplitTables(tableRegList, cdcSource.getSplit()); for (Table table : tables) { @@ -119,8 +116,6 @@ public TableResult execute(Executor executor) { continue; } String schemaName = table.getSchema(); - Schema schema = Schema.build(schemaName); - schema.setTables(Collections.singletonList(table)); // The structure of all tables in a database or table is the same, just take out the first table // name from the list String schemaTableName = table.getSchemaTableNameList().get(0); @@ -128,8 +123,26 @@ public TableResult execute(Executor executor) { String realSchemaName = schemaTableName.split("\\.")[0]; String tableName = schemaTableName.split("\\.")[1]; table.setColumns(driver.listColumnsSortByPK(realSchemaName, tableName)); - schemaList.add(schema); + boolean isExist = false; + for (Schema schemaItem : schemaList) { + if (schemaItem.getName().equals(schemaName)) { + schemaItem.getTables().add(table); + isExist = true; + break; + } + } + if (!isExist) { + Schema schema = Schema.build(schemaName); + schema.setTables(Collections.singletonList(table)); + schemaList.add(schema); + } + // anto create schema and table. + if (!config.isAutoCreateSchemaAndTables()) { + continue; + } + checkAndCreateSinkSchema(config, schemaName); + Driver sinkDriver = buildSinkDriver(config, schemaName); if (null != sinkDriver) { Table sinkTable = (Table) table.clone(); sinkTable.setSchema(sinkBuilder.getSinkSchemaName(table)); @@ -138,37 +151,42 @@ public TableResult execute(Executor executor) { } } } else { - for (String schemaName : schemaNameList) { - Schema schema = Schema.build(schemaName); - if (!allConfigMap.containsKey(schemaName)) { + for (String schemaName : cdcBuilder.getSchemaList()) { + if (Asserts.isNullString(schemaName)) { continue; } - - Driver sinkDriver = checkAndCreateSinkSchema(config, schemaName); - Map confMap = allConfigMap.get(schemaName); - Driver driver = Driver.build(confMap.get("name"), confMap.get("type"), JsonUtils.toMap(confMap)); - - final List
tables = driver.listTables(schemaName); - for (Table table : tables) { - if (!Asserts.isEquals(table.getType(), "VIEW")) { - if (Asserts.isNotNullCollection(tableRegList)) { - for (String tableReg : tableRegList) { - if (table.getSchemaTableName().matches(tableReg.trim()) - && !schema.getTables().contains(Table.build(table.getName()))) { - table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); - schema.getTables().add(table); - schemaTableNameList.add(table.getSchemaTableName()); - break; - } + Schema schema = Schema.build(schemaName); + Map confMap = cdcBuilder.generateMetaDataConfig(schemaName); + Driver sourceDriver = + Driver.buildWithOutPool(confMap.get("name"), confMap.get("type"), JsonUtils.toMap(confMap)); + for (Table table : sourceDriver.listTables(schemaName)) { + if ("VIEW".equals(table.getType())) { + continue; + } + if (Asserts.isNotNullCollection(tableRegList)) { + for (String tableReg : tableRegList) { + if (table.getSchemaTableName().matches(tableReg.trim()) + && !schemaTableNameList.contains(table.getSchemaTableName())) { + schemaTableNameList.add(table.getSchemaTableName()); + table.setColumns(sourceDriver.listColumnsSortByPK(schemaName, table.getName())); + schema.getTables().add(table); + break; } - } else { - table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); - schemaTableNameList.add(table.getSchemaTableName()); - schema.getTables().add(table); } + } else { + schemaTableNameList.add(table.getSchemaTableName()); + table.setColumns(sourceDriver.listColumnsSortByPK(schemaName, table.getName())); + schema.getTables().add(table); } } + schemaList.add(schema); + // anto create schema and table. + if (!config.isAutoCreateSchemaAndTables()) { + continue; + } + checkAndCreateSinkSchema(config, schemaName); + Driver sinkDriver = buildSinkDriver(config, schemaName); if (null != sinkDriver) { for (Table table : schema.getTables()) { Table sinkTable = (Table) table.clone(); @@ -177,7 +195,6 @@ public TableResult execute(Executor executor) { checkAndCreateSinkTable(sinkDriver, sinkTable); } } - schemaList.add(schema); } } @@ -219,30 +236,38 @@ public TableResult execute(Executor executor) { return tableResultBuilder.build(); } - private Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception { - Map sink = config.getSink(); - String autoCreate = sink.get(FlinkCDCConfig.AUTO_CREATE); - if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) { - return null; + private void checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception { + Map sinkConfMap = config.getSink(); + String url = sinkConfMap.get("url"); + if (url.contains("#{schemaName}")) { + url = SqlUtil.replaceAllParam(url, "schemaName", ""); + } + Driver sinkDriver = Driver.build( + sinkConfMap.get("connector"), url, sinkConfMap.get("username"), sinkConfMap.get("password")); + if (null == sinkDriver) { + return; } - String url = sink.get("url"); - String schema = SqlUtil.replaceAllParam(sink.get(FlinkCDCConfig.SINK_DB), "schemaName", schemaName); - Driver driver = Driver.build(sink.get("connector"), url, sink.get("username"), sink.get("password")); - if (null != driver && !driver.existSchema(schema)) { - driver.createSchema(schema); + String schema = SqlUtil.replaceAllParam(sinkConfMap.get(FlinkCDCConfig.SINK_DB), "schemaName", schemaName); + if (!sinkDriver.existSchema(schema)) { + sinkDriver.createSchema(schema); } - sink.put(FlinkCDCConfig.SINK_DB, schema); - // todo: There is a bug that can cause the problem of URL duplicate concatenation of schema, for example: jdbc: - // mysql://localhost:3306/test?useSSL=false/test -1 - if (!url.contains(schema)) { - sink.put("url", url + "/" + schema); + sinkConfMap.put(FlinkCDCConfig.SINK_DB, schema); + } + + private Driver buildSinkDriver(FlinkCDCConfig config, String schemaName) throws Exception { + Map sinkConfMap = config.getSink(); + String url = sinkConfMap.get("url"); + String schema = SqlUtil.replaceAllParam(sinkConfMap.get(FlinkCDCConfig.SINK_DB), "schemaName", schemaName); + if (url.contains("#{schemaName}")) { + url = SqlUtil.replaceAllParam(url, "schemaName", schema); } - return driver; + return Driver.build( + sinkConfMap.get("connector"), url, sinkConfMap.get("username"), sinkConfMap.get("password")); } void checkAndCreateSinkTable(Driver driver, Table table) throws Exception { if (null != driver && !driver.existTable(table)) { - driver.generateCreateTable(table); + driver.createTable(table); } } } diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java index 3f5fa3e4c8..efc9eef890 100644 --- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java +++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java @@ -462,16 +462,6 @@ public boolean createTable(Table table) throws Exception { } } - @Override - public boolean generateCreateTable(Table table) throws Exception { - String sql = generateCreateTableSql(table).replaceAll("\r\n", " "); - if (Asserts.isNotNull(sql)) { - return execute(sql); - } else { - return false; - } - } - @Override public boolean dropTable(Table table) throws Exception { String sql = getDropTableSql(table).replaceAll("\r\n", " "); @@ -546,12 +536,6 @@ public String getTruncateTableSql(Table table) { return sb.toString(); } - // todu impl by subclass - @Override - public String generateCreateTableSql(Table table) { - return ""; - } - @Override public boolean execute(String sql) throws Exception { Asserts.checkNullString(sql, "Sql 语句为空"); @@ -799,7 +783,6 @@ public Map getFlinkColumnTypeConversion() { public List> getSplitSchemaList() { PreparedStatement preparedStatement = null; ResultSet results = null; - IDBQuery dbQuery = getDBQuery(); String sql = "select DATA_LENGTH,TABLE_NAME AS `NAME`,TABLE_SCHEMA AS `Database`,TABLE_COMMENT" + " AS COMMENT,TABLE_CATALOG AS `CATALOG`,TABLE_TYPE AS `TYPE`,ENGINE AS" + " `ENGINE`,CREATE_OPTIONS AS `OPTIONS`,TABLE_ROWS AS" @@ -831,6 +814,7 @@ public List> getSplitSchemaList() { return schemas; } + // get target tables (merged tabled) @Override public Set
getSplitTables(List tableRegList, Map splitConfig) { Set
set = new HashSet<>(); diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java index 30f3664680..b4776a830e 100644 --- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java +++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java @@ -196,8 +196,6 @@ static Driver build(String connector, String url, String username, String passwo boolean createTable(Table table) throws Exception; - boolean generateCreateTable(Table table) throws Exception; - boolean dropTable(Table table) throws Exception; boolean truncateTable(Table table) throws Exception; @@ -210,8 +208,6 @@ static Driver build(String connector, String url, String username, String passwo String getTruncateTableSql(Table table); - String generateCreateTableSql(Table table); - /* * boolean insert(Table table, JsonNode data); * diff --git a/dinky-metadata/dinky-metadata-base/src/test/java/org/dinky/metadata/driver/AbstractDriverTest.java b/dinky-metadata/dinky-metadata-base/src/test/java/org/dinky/metadata/driver/AbstractDriverTest.java index f9361ebc03..0a1b9381c1 100644 --- a/dinky-metadata/dinky-metadata-base/src/test/java/org/dinky/metadata/driver/AbstractDriverTest.java +++ b/dinky-metadata/dinky-metadata-base/src/test/java/org/dinky/metadata/driver/AbstractDriverTest.java @@ -188,11 +188,6 @@ public boolean createTable(Table table) throws Exception { return false; } - @Override - public boolean generateCreateTable(Table table) throws Exception { - return false; - } - @Override public boolean dropTable(Table table) throws Exception { return false; @@ -218,11 +213,6 @@ public String getTruncateTableSql(Table table) { return null; } - @Override - public String generateCreateTableSql(Table table) { - return null; - } - @Override public boolean execute(String sql) throws Exception { return false; diff --git a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java index a9f3ee5b5f..0364dee659 100644 --- a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java +++ b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java @@ -133,13 +133,6 @@ public Map getFlinkColumnTypeConversion() { return map; } - @Override - public String generateCreateTableSql(Table table) { - String genTableSql = genTable(table); - log.info("Auto generateCreateTableSql {}", genTableSql); - return genTableSql; - } - @Override public String getCreateTableSql(Table table) { return genTable(table); diff --git a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java index f7f805c1ef..bac9fdc4f1 100644 --- a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java +++ b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java @@ -83,13 +83,6 @@ public Map getFlinkColumnTypeConversion() { return map; } - @Override - public String generateCreateTableSql(Table table) { - String genTableSql = genTable(table); - log.info("Auto generateCreateTableSql {}", genTableSql); - return genTableSql; - } - @Override public String getCreateTableSql(Table table) { return genTable(table); diff --git a/dinky-metadata/dinky-metadata-paimon/src/main/java/org/dinky/metadata/driver/PaimonDriver.java b/dinky-metadata/dinky-metadata-paimon/src/main/java/org/dinky/metadata/driver/PaimonDriver.java index 2889a73419..1567bbe5dd 100644 --- a/dinky-metadata/dinky-metadata-paimon/src/main/java/org/dinky/metadata/driver/PaimonDriver.java +++ b/dinky-metadata/dinky-metadata-paimon/src/main/java/org/dinky/metadata/driver/PaimonDriver.java @@ -261,11 +261,6 @@ public boolean createTable(Table table) throws Exception { return false; } - @Override - public boolean generateCreateTable(Table table) throws Exception { - return false; - } - @Override public boolean dropTable(Table table) throws Exception { return false; @@ -291,11 +286,6 @@ public String getTruncateTableSql(Table table) { return null; } - @Override - public String generateCreateTableSql(Table table) { - return null; - } - @Override public boolean execute(String sql) throws Exception { return false; diff --git a/dinky-web/pom.xml b/dinky-web/pom.xml index e81a054878..6635b17191 100644 --- a/dinky-web/pom.xml +++ b/dinky-web/pom.xml @@ -51,8 +51,8 @@ 10.5.0 9.15.4 - - + + @@ -61,24 +61,24 @@ install-node-and-pnpm - - install - - pnpm - - - install --registry ${npm-registry-repo} - - - - build - - pnpm - - - run build - - + + install + + pnpm + + + install --registry ${npm-registry-repo} + + + + build + + pnpm + + + run build + +