Skip to content

Commit 54844dc

Browse files
authored
Merge pull request #12 from trocco-io/feature/remove-duplicate-code-with-superclass
Refactoring: remove duplicate code with superclass
2 parents 881c4f1 + 437f8f2 commit 54844dc

File tree

3 files changed

+18
-135
lines changed

3 files changed

+18
-135
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,6 @@ $ ./gradlew gem # -t to watch change of files and rebuild continuously
7878

7979
## TEST
8080

81+
```
8182
$ EMBULK_OUTPUT_DATABRICKS_TEST_CONFIG="example/test.yml" ./gradlew test # Create example/test.yml based on example/test.yml.example
83+
```

src/main/java/org/embulk/output/DatabricksOutputPlugin.java

Lines changed: 3 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package org.embulk.output;
22

33
import java.io.IOException;
4-
import java.sql.DatabaseMetaData;
5-
import java.sql.ResultSet;
64
import java.sql.SQLException;
75
import java.util.*;
86
import org.embulk.config.ConfigDiff;
@@ -171,86 +169,10 @@ protected void logConnectionProperties(String url, Properties props) {
171169
super.logConnectionProperties(url, maskedProps);
172170
}
173171

174-
// This is almost copy from AbstractJdbcOutputPlugin excepting validation of table exists in
175-
// current schema
176172
public Optional<JdbcSchema> newJdbcSchemaFromTableIfExists(
177173
JdbcOutputConnection connection, TableIdentifier table) throws SQLException {
178-
if (!connection.tableExists(table)) {
179-
// DatabaseMetaData.getPrimaryKeys fails if table does not exist
180-
return Optional.empty();
181-
}
182-
183-
DatabricksOutputConnection conn = (DatabricksOutputConnection) connection;
184-
DatabaseMetaData dbm = connection.getMetaData();
185-
String escape = dbm.getSearchStringEscape();
186-
187-
ResultSet rs =
188-
dbm.getPrimaryKeys(conn.getCatalogName(), table.getSchemaName(), table.getTableName());
189-
final HashSet<String> primaryKeysBuilder = new HashSet<>();
190-
try {
191-
while (rs.next()) {
192-
if (!((DatabricksOutputConnection) connection)
193-
.isAvailableTableMetadataInConnection(rs, table)) {
194-
continue;
195-
}
196-
primaryKeysBuilder.add(rs.getString("COLUMN_NAME"));
197-
}
198-
} finally {
199-
rs.close();
200-
}
201-
final Set<String> primaryKeys = Collections.unmodifiableSet(primaryKeysBuilder);
202-
203-
final ArrayList<JdbcColumn> builder = new ArrayList<>();
204-
// NOTE: Columns of TIMESTAMP_NTZ, INTERVAL are not included in getColumns result.
205-
// This cause runtime sql exception when copy into.
206-
// (probably because of unsupported in databricks jdbc)
207-
// https://docs.databricks.com/en/sql/language-manual/data-types/interval-type.html
208-
// https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html#notes
209-
rs =
210-
dbm.getColumns(
211-
JdbcUtils.escapeSearchString(conn.getCatalogName(), escape),
212-
JdbcUtils.escapeSearchString(table.getSchemaName(), escape),
213-
JdbcUtils.escapeSearchString(table.getTableName(), escape),
214-
null);
215-
try {
216-
while (rs.next()) {
217-
if (!((DatabricksOutputConnection) connection)
218-
.isAvailableTableMetadataInConnection(rs, table)) {
219-
continue;
220-
}
221-
String columnName = rs.getString("COLUMN_NAME");
222-
String simpleTypeName = rs.getString("TYPE_NAME").toUpperCase(Locale.ENGLISH);
223-
boolean isUniqueKey = primaryKeys.contains(columnName);
224-
int sqlType = rs.getInt("DATA_TYPE");
225-
int colSize = rs.getInt("COLUMN_SIZE");
226-
int decDigit = rs.getInt("DECIMAL_DIGITS");
227-
if (rs.wasNull()) {
228-
decDigit = -1;
229-
}
230-
int charOctetLength = rs.getInt("CHAR_OCTET_LENGTH");
231-
boolean isNotNull = "NO".equals(rs.getString("IS_NULLABLE"));
232-
// rs.getString("COLUMN_DEF") // or null // TODO
233-
builder.add(
234-
JdbcColumn.newGenericTypeColumn(
235-
columnName,
236-
sqlType,
237-
simpleTypeName,
238-
colSize,
239-
decDigit,
240-
charOctetLength,
241-
isNotNull,
242-
isUniqueKey));
243-
// We can't get declared column name using JDBC API.
244-
// Subclasses need to overwrite it.
245-
}
246-
} finally {
247-
rs.close();
248-
}
249-
final List<JdbcColumn> columns = Collections.unmodifiableList(builder);
250-
if (columns.isEmpty()) {
251-
return Optional.empty();
252-
} else {
253-
return Optional.of(new JdbcSchema(columns));
254-
}
174+
return super.newJdbcSchemaFromTableIfExists(
175+
connection,
176+
((DatabricksOutputConnection) connection).currentConnectionTableIdentifier(table));
255177
}
256178
}

src/main/java/org/embulk/output/databricks/DatabricksOutputConnection.java

Lines changed: 13 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -42,58 +42,21 @@ protected void executeUseStatement(String sql) throws SQLException {
4242
// This is almost copy from JdbcOutputConnection excepting validation of table exists in current
4343
// schema
4444
public boolean tableExists(TableIdentifier table) throws SQLException {
45-
try (ResultSet rs =
46-
connection
47-
.getMetaData()
48-
.getTables(catalogName, table.getSchemaName(), table.getTableName(), null)) {
49-
while (rs.next()) {
50-
if (isAvailableTableMetadataInConnection(rs, table)) {
51-
return true;
52-
}
53-
}
54-
}
55-
return false;
45+
return super.tableExists(currentConnectionTableIdentifier(table));
5646
}
5747

58-
public boolean isAvailableTableMetadataInConnection(ResultSet rs, TableIdentifier tableIdentifier)
59-
throws SQLException {
60-
// If unchecked, tables in other catalogs may appear to exist.
61-
// This is because the base embulk jdbc plugin's tableIdentifier.getDatabase() is often returns
62-
// null
63-
// and one Databricks connection has multiple available catalogs (databases).
64-
65-
// NOTE: maybe this logic is not necessary anymore after this PR:
66-
// https://github.com/trocco-io/embulk-output-databricks/pull/11
67-
// But I'm not sure, so I'll keep it for now.
68-
69-
if (tableIdentifier.getDatabase() == null) {
70-
logger.trace("tableIdentifier.getDatabase() == null, check by instance variable");
71-
if (!rs.getString("TABLE_CAT").equalsIgnoreCase(catalogName)) {
72-
return false;
73-
}
74-
}
75-
if (tableIdentifier.getSchemaName() == null) {
76-
logger.trace("tableIdentifier.getSchemaName() == null, check by instance variable");
77-
if (!rs.getString("TABLE_SCHEM").equalsIgnoreCase(schemaName)) {
78-
return false;
79-
}
80-
}
81-
82-
if (tableIdentifier.getDatabase() != null
83-
&& !tableIdentifier.getDatabase().equalsIgnoreCase(catalogName)) {
84-
logger.error(
85-
String.format(
86-
"tableIdentifier.getSchemaName() != instance variable. (%s, %s)",
87-
tableIdentifier.getDatabase(), catalogName));
88-
}
89-
if (tableIdentifier.getSchemaName() != null
90-
&& !tableIdentifier.getSchemaName().equalsIgnoreCase(schemaName)) {
91-
logger.error(
92-
String.format(
93-
"tableIdentifier.getSchemaName() != instance variable. (%s, %s)",
94-
tableIdentifier.getSchemaName(), schemaName));
95-
}
96-
return true;
48+
public TableIdentifier currentConnectionTableIdentifier(TableIdentifier tableIdentifier) {
49+
// Caution:
50+
// JdbcOutputPlugin sometimes uses tableIdentifier whose database variable is null,
51+
// which causes unexpected DatabaseMetaData behavior in AbstractJdbcOutputPlugin.
52+
// For example, getTables and getColumns search in all catalogs,
53+
// not just the one specified by the connection's default value,
54+
// and can't search in schemas with multibyte name.
55+
// So, if tableIdentifier database variable is null, it will set the connection's default value.
56+
return new TableIdentifier(
57+
tableIdentifier.getDatabase() != null ? tableIdentifier.getDatabase() : catalogName,
58+
tableIdentifier.getSchemaName() != null ? tableIdentifier.getSchemaName() : schemaName,
59+
tableIdentifier.getTableName());
9760
}
9861

9962
@Override
@@ -299,8 +262,4 @@ private String buildColumns(JdbcSchema schema, String prefix) {
299262
}
300263
return sb.toString();
301264
}
302-
303-
public String getCatalogName() {
304-
return catalogName;
305-
}
306265
}

0 commit comments

Comments
 (0)