Skip to content

Commit 7467cb2

Browse files
authored
[Improvement-18019][task-sql] Support SQL from resource file and parameter placeholders (#18020)
1 parent d2b0abd commit 7467cb2

File tree

9 files changed

+409
-27
lines changed

9 files changed

+409
-27
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.plugin.task.api.enums;
19+
20+
public enum SqlSourceType {
21+
/**
22+
* SCRIPT: inline sql text
23+
* FILE: sql from resource center file
24+
*/
25+
SCRIPT, FILE
26+
}

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.plugin.task.api.SQLTaskExecutionContext;
2222
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
2323
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
24+
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlSourceType;
2425
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
2526
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
2627
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
@@ -58,6 +59,16 @@ public class SqlParameters extends AbstractParameters {
5859
*/
5960
private String sql;
6061

62+
/**
63+
* sql source
64+
*/
65+
private SqlSourceType sqlSource;
66+
67+
/**
68+
* sql resource file path in resource center
69+
*/
70+
private String sqlResource;
71+
6172
/**
6273
* sql type
6374
* 0 query
@@ -139,6 +150,22 @@ public void setSql(String sql) {
139150
this.sql = sql;
140151
}
141152

153+
public SqlSourceType getSqlSource() {
154+
return sqlSource;
155+
}
156+
157+
public void setSqlSource(SqlSourceType sqlSource) {
158+
this.sqlSource = sqlSource;
159+
}
160+
161+
public String getSqlResource() {
162+
return sqlResource;
163+
}
164+
165+
public void setSqlResource(String sqlResource) {
166+
this.sqlResource = sqlResource;
167+
}
168+
142169
public int getSqlType() {
143170
return sqlType;
144171
}
@@ -213,12 +240,24 @@ public void setGroupId(int groupId) {
213240

214241
@Override
215242
public boolean checkParameters() {
216-
return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql);
243+
if (datasource == 0 || StringUtils.isEmpty(type)) {
244+
return false;
245+
}
246+
if (StringUtils.isNotEmpty(sql)) {
247+
return true;
248+
}
249+
return StringUtils.isNotEmpty(sqlResource);
217250
}
218251

219252
@Override
220253
public List<ResourceInfo> getResourceFilesList() {
221-
return new ArrayList<>();
254+
List<ResourceInfo> resourceFiles = new ArrayList<>();
255+
if (StringUtils.isNotEmpty(sqlResource)) {
256+
ResourceInfo resourceInfo = new ResourceInfo();
257+
resourceInfo.setResourceName(sqlResource);
258+
resourceFiles.add(resourceInfo);
259+
}
260+
return resourceFiles;
222261
}
223262

224263
public void dealOutParam(String result) {
@@ -272,6 +311,8 @@ public String toString() {
272311
+ "type='" + type + '\''
273312
+ ", datasource=" + datasource
274313
+ ", sql='" + sql + '\''
314+
+ ", sqlSource='" + sqlSource + '\''
315+
+ ", sqlResource='" + sqlResource + '\''
275316
+ ", sqlType=" + sqlType
276317
+ ", sendEmail=" + sendEmail
277318
+ ", displayRows=" + displayRows

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.api.parameters;
1919

20+
import org.apache.dolphinscheduler.plugin.task.api.SQLTaskExecutionContext;
2021
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
2122
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
23+
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
2224
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
25+
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
26+
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
27+
import org.apache.dolphinscheduler.spi.enums.DbType;
2328

2429
import org.apache.commons.collections4.CollectionUtils;
2530

@@ -87,5 +92,57 @@ public void testSqlParameters() {
8792
sqlParameters.setLocalParams(properties);
8893
sqlParameters.dealOutParam(sqlResult);
8994
Assertions.assertNotNull(sqlParameters.getVarPool().get(0));
95+
96+
// resource files list should contain sqlResource when it is set
97+
sqlParameters.setSql(null);
98+
sqlParameters.setSqlResource("/sql/demo.sql");
99+
Assertions.assertFalse(CollectionUtils.isEmpty(sqlParameters.getResourceFilesList()));
100+
}
101+
102+
@Test
103+
public void testCheckParameters_variants() {
104+
SqlParameters p = new SqlParameters();
105+
106+
// datasource/type invalid
107+
p.setDatasource(0);
108+
p.setType("");
109+
p.setSql("select 1");
110+
p.setSqlResource(null);
111+
Assertions.assertFalse(p.checkParameters());
112+
113+
// sql present -> true
114+
p.setDatasource(1);
115+
p.setType("MYSQL");
116+
p.setSql("select 1");
117+
p.setSqlResource(null);
118+
Assertions.assertTrue(p.checkParameters());
119+
120+
// sql absent, sqlResource present -> true
121+
p.setSql(null);
122+
p.setSqlResource("/sql/demo.sql");
123+
Assertions.assertTrue(p.checkParameters());
124+
125+
// both absent -> false
126+
p.setSql(null);
127+
p.setSqlResource(null);
128+
Assertions.assertFalse(p.checkParameters());
129+
}
130+
131+
@Test
132+
public void testGenerateExtendedContext_setsConnectionParams() {
133+
SqlParameters p = new SqlParameters();
134+
p.setDatasource(1);
135+
136+
DataSourceParameters dataSourceParameters = new DataSourceParameters();
137+
dataSourceParameters.setType(DbType.MYSQL);
138+
dataSourceParameters.setResourceType(ResourceType.DATASOURCE.name());
139+
dataSourceParameters.setConnectionParams("conn_params");
140+
141+
ResourceParametersHelper helper = new ResourceParametersHelper();
142+
helper.put(ResourceType.DATASOURCE, 1, dataSourceParameters);
143+
144+
SQLTaskExecutionContext ctx = p.generateExtendedContext(helper);
145+
Assertions.assertNotNull(ctx);
146+
Assertions.assertEquals("conn_params", ctx.getConnectionParams());
90147
}
91148
}

dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,25 @@
3535
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
3636
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
3737
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
38+
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
3839
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
3940
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
4041
import org.apache.dolphinscheduler.spi.enums.DbType;
4142

4243
import org.apache.commons.lang3.StringUtils;
4344

45+
import java.io.IOException;
46+
import java.nio.charset.StandardCharsets;
47+
import java.nio.file.Files;
48+
import java.nio.file.Paths;
4449
import java.sql.Connection;
4550
import java.sql.PreparedStatement;
4651
import java.sql.ResultSet;
4752
import java.sql.ResultSetMetaData;
4853
import java.sql.SQLException;
4954
import java.sql.Statement;
5055
import java.util.ArrayList;
56+
import java.util.Collections;
5157
import java.util.HashMap;
5258
import java.util.HashSet;
5359
import java.util.List;
@@ -128,6 +134,8 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
128134
sqlParameters.getLimit());
129135
try {
130136

137+
ensureSqlContent();
138+
131139
// get datasource
132140
baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType,
133141
sqlTaskExecutionContext.getConnectionParams());
@@ -405,6 +413,32 @@ private void printReplacedSql(String content, String formatSql, String rgex, Map
405413
log.info("Sql Params are {}", logPrint);
406414
}
407415

416+
private void ensureSqlContent() {
417+
if (StringUtils.isNotEmpty(sqlParameters.getSql())) {
418+
return;
419+
}
420+
if (StringUtils.isEmpty(sqlParameters.getSqlResource())) {
421+
return;
422+
}
423+
String resourcePathInStorage = sqlParameters.getSqlResource();
424+
try {
425+
ResourceContext resourceContext = taskExecutionContext.getResourceContext();
426+
ResourceContext.ResourceItem resourceItem =
427+
resourceContext.getResourceItem(resourcePathInStorage);
428+
String localPath = resourceItem.getResourceAbsolutePathInLocal();
429+
log.info("Load sql content from resource file: {}", resourcePathInStorage);
430+
String sqlContent = new String(
431+
Files.readAllBytes(Paths.get(localPath)),
432+
StandardCharsets.UTF_8);
433+
sqlParameters.setSql(sqlContent);
434+
} catch (IOException e) {
435+
log.error("Read sql content from resource file {} error", resourcePathInStorage, e);
436+
throw new TaskException(
437+
String.format("Read sql content from resource file %s error", resourcePathInStorage),
438+
e);
439+
}
440+
}
441+
408442
/**
409443
* ready to execute SQL and parameter entity Map
410444
*
@@ -420,19 +454,22 @@ private SqlBinds getSqlAndSqlParamsMap(String sql) {
420454

421455
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
422456

423-
// spell SQL according to the final user-defined variable
424-
if (paramsMap == null) {
425-
sqlBuilder.append(sql);
426-
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
427-
}
457+
Map<String, String> placeholderParamsMap = paramsMap == null
458+
? Collections.emptyMap()
459+
: ParameterUtils.convert(paramsMap);
428460

429461
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
430-
String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(),
431-
ParameterUtils.convert(paramsMap));
462+
String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(), placeholderParamsMap);
432463
log.info("SQL title : {}", title);
433464
sqlParameters.setTitle(title);
434465
}
435466

467+
// spell SQL according to the final user-defined variable
468+
if (paramsMap == null) {
469+
sqlBuilder.append(sql);
470+
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
471+
}
472+
436473
// special characters need to be escaped, ${} needs to be escaped
437474
setSqlParamsMap(sql, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
438475
// Replace the original value in sql !{...} ,Does not participate in precompilation

0 commit comments

Comments
 (0)