Skip to content

Commit 7058fc3

Browse files
updating common schema and validating through import query type.
1 parent cda7cd3 commit 7058fc3

File tree

8 files changed

+133
-88
lines changed

8 files changed

+133
-88
lines changed

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.google.common.collect.ImmutableSet;
2020
import com.google.common.collect.Lists;
2121
import io.cdap.cdap.api.data.schema.Schema;
22-
import io.cdap.plugin.common.db.DBUtils;
2322
import io.cdap.plugin.db.CommonSchemaReader;
2423
import org.slf4j.Logger;
2524
import org.slf4j.LoggerFactory;
@@ -56,33 +55,12 @@ public RedshiftSchemaReader(String sessionID) {
5655
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
5756
String typeName = metadata.getColumnTypeName(index);
5857
int columnType = metadata.getColumnType(index);
58+
int precision = metadata.getPrecision(index);
59+
String columnName = metadata.getColumnName(index);
60+
int scale = metadata.getScale(index);
61+
boolean isSigned = metadata.isSigned(index);
5962

60-
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
61-
return Schema.of(Schema.Type.STRING);
62-
}
63-
if (typeName.equalsIgnoreCase("INT")) {
64-
return Schema.of(Schema.Type.INT);
65-
}
66-
if (typeName.equalsIgnoreCase("BIGINT")) {
67-
return Schema.of(Schema.Type.LONG);
68-
}
69-
70-
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
71-
if (Types.NUMERIC == columnType) {
72-
int precision = metadata.getPrecision(index);
73-
if (precision == 0) {
74-
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
75-
+ "converting into STRING type to avoid any precision loss.",
76-
metadata.getColumnName(index),
77-
metadata.getColumnTypeName(index)));
78-
return Schema.of(Schema.Type.STRING);
79-
}
80-
}
81-
82-
if (typeName.equalsIgnoreCase("timestamp")) {
83-
return Schema.of(Schema.LogicalType.DATETIME);
84-
}
85-
return super.getSchema(metadata, index);
63+
return getSchema(typeName, columnType, precision, scale, columnName, isSigned, true);
8664
}
8765

8866
@Override
@@ -143,6 +121,6 @@ public Schema getSchema(String typeName, int columnType, int precision, int scal
143121
if ("timestamp".equalsIgnoreCase(typeName)) {
144122
return Schema.of(Schema.LogicalType.DATETIME);
145123
}
146-
return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true);
124+
return super.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal);
147125
}
148126
}

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,33 +75,64 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7575
}
7676
return;
7777
}
78-
if ((!sourceConfig.containsMacro(IMPORT_QUERY) && !sourceConfig.containsMacro(TABLE_NAME))
79-
&& (Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
80-
(Strings.isNullOrEmpty(sourceConfig.getImportQuery())))) {
78+
String importQueryType = sourceConfig.getImportQueryType();
79+
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
80+
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
81+
82+
if (isTable) {
83+
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
84+
collector.addFailure(
85+
"'tableName' must be specified when importQueryType is 'table'.",
86+
"Provide a value for 'tableName'."
87+
).withConfigProperty(TABLE_NAME);
88+
}
89+
} else if (isImportQuery) {
90+
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
8191
collector.addFailure(
82-
"Either 'tableName' or 'importQuery' must be specified.",
83-
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
84-
).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery());
92+
"'importQuery' must be specified when importQueryType is 'importQuery'.",
93+
"Provide a value for 'importQuery'."
94+
).withConfigProperty(IMPORT_QUERY);
8595
}
96+
} else {
97+
collector.addFailure(
98+
"Both import query and table name are empty'.",
99+
"Either import query or tableName should be given. Both can not be null together."
100+
);
101+
}
102+
collector.getOrThrowException();
86103
super.configurePipeline(pipelineConfigurer);
87104
}
88105

89106
@Override
90107
public void prepareRun(BatchSourceContext context) throws Exception {
91108
FailureCollector collector = context.getFailureCollector();
109+
String importQueryType = sourceConfig.getImportQueryType();
110+
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
111+
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
92112

93-
if (!sourceConfig.containsMacro(sourceConfig.getImportQuery()) &&
94-
!sourceConfig.containsMacro(sourceConfig.getTableName()) &&
95-
Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
96-
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
113+
if (isTable) {
114+
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
115+
collector.addFailure(
116+
"'tableName' must be specified when importQueryType is 'table'.",
117+
"Provide a value for 'tableName'."
118+
).withConfigProperty(TABLE_NAME);
119+
}
120+
} else if (isImportQuery) {
121+
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
122+
collector.addFailure(
123+
"'importQuery' must be specified when importQueryType is 'importQuery'.",
124+
"Provide a value for 'importQuery'."
125+
).withConfigProperty(IMPORT_QUERY);
126+
}
127+
} else {
97128
collector.addFailure(
98-
"Either 'tableName' or 'importQuery' must be specified.",
99-
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
100-
).withConfigProperty("tableName")
101-
.withConfigProperty("importQuery");
129+
"Both import query and table name are empty'.",
130+
"Either import query or tableName should be given. Both can not be null together."
131+
);
102132
}
103-
super.prepareRun(context);
133+
104134
collector.getOrThrowException();
135+
super.prepareRun(context);
105136
}
106137

107138
@Override

database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,6 @@ public List<Schema.Field> getSchemaFields(Connection connection, String tableNam
123123
*/
124124
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName ,
125125
boolean isSigned, boolean handleAsDecimal) {
126-
return null;
126+
return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal);
127127
}
128128
}

database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,16 @@ public String getTableName() {
119119
return tableName;
120120
}
121121

122+
public String getImportQueryType() {
123+
if (!Strings.isNullOrEmpty(tableName)) {
124+
return TABLE_NAME;
125+
} else if (!Strings.isNullOrEmpty(importQuery)) {
126+
return IMPORT_QUERY;
127+
} else {
128+
return null;
129+
}
130+
}
131+
122132
public String getBoundingQuery() {
123133
return cleanQuery(boundingQuery);
124134
}

database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,14 @@ public interface DatabaseSourceConfig extends DatabaseConnectionConfig {
9191
*/
9292
Integer getFetchSize();
9393

94+
/**
95+
* Returns the name of the table from which data will be imported.
96+
*/
9497
String getTableName();
98+
99+
/**
100+
* @return a {@link String} indicating the import query type,
101+
* typically "table" (for table name) or "query" (for custom import query)
102+
*/
103+
String getImportQueryType();
95104
}

database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,9 @@ public Schema getSchema() throws SQLException {
164164
try (Connection connection = getConnection()) {
165165
executeInitQueries(connection, sourceConfig.getInitQueries());
166166
String query = sourceConfig.getImportQuery();
167-
if (!Strings.isNullOrEmpty(query)) {
167+
if ("importQuery".equalsIgnoreCase(sourceConfig.getImportQueryType())) {
168168
return loadSchemaFromDBwithQuery(connection, query);
169-
} else if (!Strings.isNullOrEmpty(sourceConfig.getTableName())) {
169+
} else if ("tableName".equalsIgnoreCase(sourceConfig.getImportQueryType())) {
170170
List<Schema.Field> fields = getSchemaReader().getSchemaFields(connection, sourceConfig.getTableName());
171171
return Schema.recordOf("schema", fields);
172172
} else {
@@ -210,7 +210,7 @@ private Schema loadSchemaFromDB(Class<? extends Driver> driverClass)
210210
executeInitQueries(connection, sourceConfig.getInitQueries());
211211
String importQuery = sourceConfig.getImportQuery();
212212
String tableName = sourceConfig.getTableName();
213-
if (!Strings.isNullOrEmpty(importQuery)) {
213+
if ("importQuery".equalsIgnoreCase(sourceConfig.getImportQueryType())) {
214214
return loadSchemaFromDBwithQuery(connection, importQuery);
215215
} else {
216216
return loadSchemaFromDBwithTableName(connection, tableName);
@@ -516,6 +516,10 @@ public String getTableName() {
516516
return getTableName();
517517
}
518518

519+
public String getImportQueryType() {
520+
return " ";
521+
}
522+
519523
public String getBoundingQuery() {
520524
return cleanQuery(boundingQuery);
521525
}

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.google.common.collect.ImmutableSet;
2020
import io.cdap.cdap.api.data.schema.Schema;
21-
import io.cdap.plugin.common.db.DBUtils;
2221
import io.cdap.plugin.db.CommonSchemaReader;
2322
import org.slf4j.Logger;
2423
import org.slf4j.LoggerFactory;
@@ -57,27 +56,12 @@ public PostgresSchemaReader(String sessionID) {
5756
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
5857
String typeName = metadata.getColumnTypeName(index);
5958
int columnType = metadata.getColumnType(index);
59+
int precision = metadata.getPrecision(index);
60+
String columnName = metadata.getColumnName(index);
61+
int scale = metadata.getScale(index);
62+
boolean isSigned = metadata.isSigned(index);
6063

61-
if (STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(typeName) || STRING_MAPPED_POSTGRES_TYPES.contains(columnType)) {
62-
return Schema.of(Schema.Type.STRING);
63-
}
64-
65-
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
66-
if (Types.NUMERIC == columnType) {
67-
int precision = metadata.getPrecision(index);
68-
if (precision == 0) {
69-
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
70-
+ "converting into STRING type to avoid any precision loss.",
71-
metadata.getColumnName(index),
72-
metadata.getColumnTypeName(index)));
73-
return Schema.of(Schema.Type.STRING);
74-
}
75-
}
76-
if (typeName.equalsIgnoreCase("timestamp")) {
77-
return Schema.of(Schema.LogicalType.DATETIME);
78-
}
79-
80-
return super.getSchema(metadata, index);
64+
return getSchema(typeName, columnType, precision, scale, columnName, isSigned, true);
8165
}
8266

8367
@Override
@@ -134,7 +118,6 @@ public Schema getSchema(String typeName, int columnType, int precision, int scal
134118
if ("timestamp".equalsIgnoreCase(typeName)) {
135119
return Schema.of(Schema.LogicalType.DATETIME);
136120
}
137-
138-
return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal);
121+
return super.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal);
139122
}
140123
}

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,37 +73,67 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7373
}
7474
return;
7575
}
76-
if ((!sourceConfig.containsMacro(IMPORT_QUERY) &&
77-
!sourceConfig.containsMacro(TABLE_NAME)) &&
78-
(Strings.isNullOrEmpty(sourceConfig.getTableName()))
79-
&& (Strings.isNullOrEmpty(sourceConfig.getImportQuery()))) {
76+
String importQueryType = sourceConfig.getImportQueryType();
77+
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
78+
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
79+
80+
if (isTable) {
81+
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
82+
collector.addFailure(
83+
"'tableName' must be specified when importQueryType is 'table'.",
84+
"Provide a value for 'tableName'."
85+
).withConfigProperty(TABLE_NAME);
86+
}
87+
} else if (isImportQuery) {
88+
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
8089
collector.addFailure(
81-
"Either 'tableName' or 'importQuery' must be specified.",
82-
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
83-
).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery());
90+
"'importQuery' must be specified when importQueryType is 'importQuery'.",
91+
"Provide a value for 'importQuery'."
92+
).withConfigProperty(IMPORT_QUERY);
93+
}
94+
} else {
95+
collector.addFailure(
96+
"Both import query and table name are empty'.",
97+
"Either import query or tableName should be given. Both can not be null together."
98+
);
8499
}
100+
collector.getOrThrowException();
85101
super.configurePipeline(pipelineConfigurer);
86102
}
87103

88104
@Override
89105
public void prepareRun(BatchSourceContext context) throws Exception {
90106
FailureCollector collector = context.getFailureCollector();
91-
if (sourceConfig.containsMacro("tableName") || sourceConfig.containsMacro("importQuery")) {
92-
// At runtime, user-provided schema should be used (already handled in configurePipeline)
107+
if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) {
93108
return;
94109
}
95-
if (sourceConfig.containsMacro(IMPORT_QUERY) &&
96-
sourceConfig.containsMacro(TABLE_NAME) &&
97-
!Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
98-
!Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
110+
111+
String importQueryType = sourceConfig.getImportQueryType();
112+
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
113+
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
114+
115+
if (isTable) {
116+
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
117+
collector.addFailure(
118+
"'tableName' must be specified when importQueryType is 'table'.",
119+
"Provide a value for 'tableName'."
120+
).withConfigProperty(TABLE_NAME);
121+
}
122+
} else if (isImportQuery) {
123+
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
124+
collector.addFailure(
125+
"'importQuery' must be specified when importQueryType is 'importQuery'.",
126+
"Provide a value for 'importQuery'."
127+
).withConfigProperty(IMPORT_QUERY);
128+
}
129+
} else {
99130
collector.addFailure(
100-
"Either 'tableName' or 'importQuery' must be specified.",
101-
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
102-
).withConfigProperty("tableName")
103-
.withConfigProperty("importQuery");
131+
"Both import query and table name are empty'.",
132+
"Either import query or tableName should be given. Both can not be null together."
133+
);
104134
}
105-
super.prepareRun(context);
106135
collector.getOrThrowException();
136+
super.prepareRun(context);
107137
}
108138

109139
@Override

0 commit comments

Comments
 (0)