Skip to content

Commit cda7cd3

Browse files
psainicsAbhishekKumar9984
authored andcommitted
adding import query type property to db plugin.which will only reflect in redshift and Postgres plugin.
1 parent e5c5794 commit cda7cd3

File tree

22 files changed

+711
-38
lines changed

22 files changed

+711
-38
lines changed

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
111111
}
112112
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY,
113113
getTableQuery(path.getDatabase(), schema, table));
114+
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.PROPERTY_IMPORT_QUERY_TYPE,
115+
RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY);
114116
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
115117
}
116118

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import com.google.common.collect.ImmutableSet;
2020
import com.google.common.collect.Lists;
2121
import io.cdap.cdap.api.data.schema.Schema;
22+
import io.cdap.plugin.common.db.DBUtils;
2223
import io.cdap.plugin.db.CommonSchemaReader;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
25-
2626
import java.sql.ResultSet;
2727
import java.sql.ResultSetMetaData;
2828
import java.sql.SQLException;
@@ -72,17 +72,16 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
7272
int precision = metadata.getPrecision(index);
7373
if (precision == 0) {
7474
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
75-
+ "converting into STRING type to avoid any precision loss.",
76-
metadata.getColumnName(index),
77-
metadata.getColumnTypeName(index)));
75+
+ "converting into STRING type to avoid any precision loss.",
76+
metadata.getColumnName(index),
77+
metadata.getColumnTypeName(index)));
7878
return Schema.of(Schema.Type.STRING);
7979
}
8080
}
8181

8282
if (typeName.equalsIgnoreCase("timestamp")) {
8383
return Schema.of(Schema.LogicalType.DATETIME);
8484
}
85-
8685
return super.getSchema(metadata, index);
8786
}
8887

@@ -113,5 +112,37 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
113112
}
114113
return schemaFields;
115114
}
116-
115+
/**
116+
* Maps database column type information to a corresponding {@link Schema}.
117+
*
118+
* @param typeName the SQL type name
119+
* @param columnType the JDBC type code
120+
* @param precision the column precision
121+
* @param scale the column scale
122+
* @param columnName the column name
123+
* @return the mapped {@link Schema} type
124+
*/
125+
@Override
126+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName,
127+
boolean isSigned, boolean handleAsDecimal) {
128+
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
129+
return Schema.of(Schema.Type.STRING);
130+
}
131+
if ("INT".equalsIgnoreCase(typeName)) {
132+
return Schema.of(Schema.Type.INT);
133+
}
134+
if ("BIGINT".equalsIgnoreCase(typeName)) {
135+
return Schema.of(Schema.Type.LONG);
136+
}
137+
if (Types.NUMERIC == columnType && precision == 0) {
138+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale," +
139+
" converting into STRING type to avoid any precision loss.",
140+
columnName, typeName));
141+
return Schema.of(Schema.Type.STRING);
142+
}
143+
if ("timestamp".equalsIgnoreCase(typeName)) {
144+
return Schema.of(Schema.LogicalType.DATETIME);
145+
}
146+
return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true);
147+
}
117148
}

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package io.cdap.plugin.amazon.redshift;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Strings;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
2223
import io.cdap.cdap.api.annotation.Metadata;
2324
import io.cdap.cdap.api.annotation.MetadataProperty;
2425
import io.cdap.cdap.api.annotation.Name;
2526
import io.cdap.cdap.api.annotation.Plugin;
2627
import io.cdap.cdap.etl.api.FailureCollector;
28+
import io.cdap.cdap.etl.api.PipelineConfigurer;
29+
import io.cdap.cdap.etl.api.StageConfigurer;
2730
import io.cdap.cdap.etl.api.batch.BatchSource;
2831
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2932
import io.cdap.cdap.etl.api.connector.Connector;
@@ -40,6 +43,9 @@
4043
import java.util.Map;
4144
import javax.annotation.Nullable;
4245

46+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY;
47+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME;
48+
4349
/**
4450
* Batch source to read from an Amazon Redshift database.
4551
*/
@@ -59,6 +65,45 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
5965
this.redshiftSourceConfig = redshiftSourceConfig;
6066
}
6167

68+
@Override
69+
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
70+
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
71+
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
72+
if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) {
73+
if (sourceConfig.getSchema() != null) {
74+
stageConfigurer.setOutputSchema(sourceConfig.getSchema());
75+
}
76+
return;
77+
}
78+
if ((!sourceConfig.containsMacro(IMPORT_QUERY) && !sourceConfig.containsMacro(TABLE_NAME))
79+
&& (Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
80+
(Strings.isNullOrEmpty(sourceConfig.getImportQuery())))) {
81+
collector.addFailure(
82+
"Either 'tableName' or 'importQuery' must be specified.",
83+
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
84+
).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery());
85+
}
86+
super.configurePipeline(pipelineConfigurer);
87+
}
88+
89+
@Override
90+
public void prepareRun(BatchSourceContext context) throws Exception {
91+
FailureCollector collector = context.getFailureCollector();
92+
93+
if (!sourceConfig.containsMacro(sourceConfig.getImportQuery()) &&
94+
!sourceConfig.containsMacro(sourceConfig.getTableName()) &&
95+
Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
96+
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
97+
collector.addFailure(
98+
"Either 'tableName' or 'importQuery' must be specified.",
99+
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
100+
).withConfigProperty("tableName")
101+
.withConfigProperty("importQuery");
102+
}
103+
super.prepareRun(context);
104+
collector.getOrThrowException();
105+
}
106+
62107
@Override
63108
protected SchemaReader getSchemaReader() {
64109
return new RedshiftSchemaReader();

amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.Test;
2121

2222
import java.io.IOException;
23+
import static org.junit.Assert.assertTrue;
2324

2425
public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest {
2526
private static final String JDBC_DRIVER_CLASS_NAME = "com.amazon.redshift.Driver";
@@ -28,11 +29,23 @@ public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest
2829
public void test() throws ClassNotFoundException, IOException {
2930

3031
RedshiftConnector connector = new RedshiftConnector(
31-
new RedshiftConnectorConfig("username", "password", "jdbc", "", "localhost", "db", 5432));
32+
new RedshiftConnectorConfig("username", "password", "jdbc", "",
33+
"localhost", "db", 5432));
3234

33-
super.test(JDBC_DRIVER_CLASS_NAME, connector, "Failed to create connection to database via connection string: " +
34-
"jdbc:redshift://localhost:5432/db and arguments: " +
35-
"{user=username}. Error: ConnectException: Connection refused " +
36-
"(Connection refused).");
35+
String expectedPrefix = "Failed to create connection to database via connection string: " +
36+
"jdbc:redshift://localhost:5432/db and arguments: {user=username}. Error:";
37+
try {
38+
super.test(JDBC_DRIVER_CLASS_NAME, connector, expectedPrefix + " ConnectException: Connection " +
39+
"refused (Connection refused).");
40+
} catch (AssertionError e) {
41+
// Accept either ConnectException or SunCertPathBuilderException
42+
String message = e.getMessage();
43+
assertTrue(
44+
"Expected either ConnectException or SunCertPathBuilderException, but got: " + message,
45+
message.contains("ConnectException: Connection refused") ||
46+
message.contains("SunCertPathBuilderException: unable to find valid certification " +
47+
"path to requested target")
48+
);
49+
}
3750
}
3851
}

amazon-redshift-plugin/widgets/Redshift-batchsource.json

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,30 @@
108108
{
109109
"label": "SQL Query",
110110
"properties": [
111+
{
112+
"widget-type": "radio-group",
113+
"label": "Import Query Type",
114+
"name": "importQueryType",
115+
"widget-attributes": {
116+
"layout": "inline",
117+
"default": "importQuery",
118+
"options": [
119+
{
120+
"id": "importQuery",
121+
"label": "Native Query"
122+
},
123+
{
124+
"id": "tableName",
125+
"label": "Named Table"
126+
}
127+
]
128+
}
129+
},
130+
{
131+
"widget-type": "textbox",
132+
"label": "Table Name",
133+
"name": "tableName"
134+
},
111135
{
112136
"widget-type": "textarea",
113137
"label": "Import Query",
@@ -229,6 +253,30 @@
229253
}
230254
]
231255
},
256+
{
257+
"name": "ImportQuery",
258+
"condition": {
259+
"expression": "importQueryType != 'tableName'"
260+
},
261+
"show": [
262+
{
263+
"type": "property",
264+
"name": "importQuery"
265+
}
266+
]
267+
},
268+
{
269+
"name": "NativeTableName",
270+
"condition": {
271+
"expression": "importQueryType == 'tableName'"
272+
},
273+
"show": [
274+
{
275+
"type": "property",
276+
"name": "tableName"
277+
}
278+
]
279+
}
232280
],
233281
"jump-config": {
234282
"datasets": [

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.cdap.cdap.api.data.schema.Schema;
2121
import io.cdap.plugin.common.db.DBUtils;
2222

23+
import java.sql.Connection;
24+
import java.sql.DatabaseMetaData;
2325
import java.sql.ResultSet;
2426
import java.sql.ResultSetMetaData;
2527
import java.sql.SQLException;
@@ -29,7 +31,6 @@
2931
* Common schema reader for mapping non specific DB types.
3032
*/
3133
public class CommonSchemaReader implements SchemaReader {
32-
3334
@Override
3435
public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException {
3536
List<Schema.Field> schemaFields = Lists.newArrayList();
@@ -61,4 +62,67 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
6162
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
6263
return false;
6364
}
65+
66+
/**
67+
* Returns the schema fields for the specified table using JDBC metadata.
68+
* Supports schema-qualified table names (e.g. "schema.table").
69+
* Throws SQLException if the table has no columns.
70+
*
71+
* @param connection JDBC connection
72+
* @param tableName table name, optionally schema-qualified
73+
* @return list of schema fields
74+
* @throws SQLException if no columns found or on database error
75+
*/
76+
@Override
77+
public List<Schema.Field> getSchemaFields(Connection connection, String tableName) throws SQLException {
78+
DatabaseMetaData dbMetaData = connection.getMetaData();
79+
String schema = null;
80+
String table = tableName;
81+
// Support schema-qualified table names like "schema.table"
82+
if (tableName != null && tableName.contains(".")) {
83+
String[] parts = tableName.split("\\.", 2);
84+
schema = parts[0];
85+
table = parts[1];
86+
}
87+
try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) {
88+
List<Schema.Field> schemaFields = Lists.newArrayList();
89+
while (columns.next()) {
90+
String columnName = columns.getString("COLUMN_NAME");
91+
String typeName = columns.getString("TYPE_NAME");
92+
int columnType = columns.getInt("DATA_TYPE");
93+
int precision = columns.getInt("COLUMN_SIZE");
94+
int scale = columns.getInt("DECIMAL_DIGITS");
95+
int nullable = columns.getInt("NULLABLE");
96+
97+
Schema columnSchema = this.getSchema(typeName, columnType, precision, scale, columnName, true, true);
98+
if (nullable == DatabaseMetaData.columnNullable) {
99+
columnSchema = Schema.nullableOf(columnSchema);
100+
}
101+
Schema.Field field = Schema.Field.of(columnName, columnSchema);
102+
schemaFields.add(field);
103+
}
104+
if (schemaFields.isEmpty()) {
105+
throw new SQLException("No columns found for table: " +
106+
(schema != null ? schema + "." : "") + table);
107+
}
108+
return schemaFields;
109+
}
110+
}
111+
112+
/**
113+
* Returns the CDAP schema for the given SQL column type.
114+
*
115+
* @param typeName SQL type name
116+
* @param columnType JDBC type code
117+
* @param precision Numeric precision
118+
* @param scale Numeric scale
119+
* @param columnName Column name
120+
* @param isSigned Whether the column is signed
121+
* @param handleAsDecimal Whether to treat as decimal
122+
* @return Corresponding {@link Schema}, or null if not implemented
123+
*/
124+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName ,
125+
boolean isSigned, boolean handleAsDecimal) {
126+
return null;
127+
}
64128
}

0 commit comments

Comments
 (0)