22
22
import com .google .common .collect .Maps ;
23
23
import com .netflix .metacat .common .QualifiedName ;
24
24
import com .netflix .metacat .common .server .connectors .ConnectorContext ;
25
+ import com .netflix .metacat .common .server .connectors .exception .ConnectorException ;
25
26
import com .netflix .metacat .common .server .connectors .exception .InvalidMetaException ;
26
27
import com .netflix .metacat .common .server .connectors .exception .TableNotFoundException ;
27
28
import com .netflix .metacat .connector .hive .monitoring .HiveMetrics ;
28
29
import com .netflix .metacat .connector .hive .util .HiveConnectorFastServiceMetric ;
29
30
import com .netflix .spectator .api .Registry ;
30
31
import lombok .extern .slf4j .Slf4j ;
32
+ import org .springframework .dao .DataAccessException ;
31
33
import org .springframework .dao .EmptyResultDataAccessException ;
32
34
import org .springframework .jdbc .core .JdbcTemplate ;
33
35
import org .springframework .jdbc .core .ResultSetExtractor ;
@@ -60,23 +62,27 @@ public class DirectSqlTable {
60
62
private final JdbcTemplate jdbcTemplate ;
61
63
private final HiveConnectorFastServiceMetric fastServiceMetric ;
62
64
private final String catalogName ;
65
+ private final DirectSqlSavePartition directSqlSavePartition ;
63
66
64
67
/**
65
68
* Constructor.
66
69
*
67
- * @param connectorContext server context
68
- * @param jdbcTemplate JDBC template
69
- * @param fastServiceMetric fast service metric
70
+ * @param connectorContext server context
71
+ * @param jdbcTemplate JDBC template
72
+ * @param fastServiceMetric fast service metric
73
+ * @param directSqlSavePartition direct sql partition service
70
74
*/
71
75
public DirectSqlTable (
72
76
final ConnectorContext connectorContext ,
73
77
final JdbcTemplate jdbcTemplate ,
74
- final HiveConnectorFastServiceMetric fastServiceMetric
78
+ final HiveConnectorFastServiceMetric fastServiceMetric ,
79
+ final DirectSqlSavePartition directSqlSavePartition
75
80
) {
76
81
this .catalogName = connectorContext .getCatalogName ();
77
82
this .registry = connectorContext .getRegistry ();
78
83
this .jdbcTemplate = jdbcTemplate ;
79
84
this .fastServiceMetric = fastServiceMetric ;
85
+ this .directSqlSavePartition = directSqlSavePartition ;
80
86
}
81
87
82
88
/**
@@ -222,6 +228,54 @@ public Long getTableId(final QualifiedName tableName) {
222
228
}
223
229
}
224
230
231
+ /**
232
+ * Deletes all the table related information from the store.
233
+ * @param tableName table name
234
+ */
235
+ public void delete (final QualifiedName tableName ) {
236
+ try {
237
+ final TableSequenceIds ids = getSequenceIds (tableName );
238
+ directSqlSavePartition .delete (tableName );
239
+ jdbcTemplate .update (SQL .UPDATE_SDS_CD , new SqlParameterValue (Types .BIGINT , null ),
240
+ new SqlParameterValue (Types .BIGINT , ids .getSdsId ()));
241
+ jdbcTemplate .update (SQL .UPDATE_SDS_SERDE , new SqlParameterValue (Types .BIGINT , null ),
242
+ new SqlParameterValue (Types .BIGINT , ids .getSdsId ()));
243
+ jdbcTemplate .update (SQL .DELETE_COLUMNS_V2 , new SqlParameterValue (Types .BIGINT , ids .getCdId ()));
244
+ jdbcTemplate .update (SQL .DELETE_CDS , new SqlParameterValue (Types .BIGINT , ids .getCdId ()));
245
+ jdbcTemplate .update (SQL .DELETE_PARTITION_KEYS , new SqlParameterValue (Types .BIGINT , ids .getTableId ()));
246
+ jdbcTemplate .update (SQL .DELETE_TABLE_PARAMS , new SqlParameterValue (Types .BIGINT , ids .getTableId ()));
247
+ jdbcTemplate .update (SQL .DELETE_TAB_COL_STATS , new SqlParameterValue (Types .BIGINT , ids .getTableId ()));
248
+ jdbcTemplate .update (SQL .UPDATE_TABLE_SD , new SqlParameterValue (Types .BIGINT , null ),
249
+ new SqlParameterValue (Types .BIGINT , ids .getTableId ()));
250
+ jdbcTemplate .update (SQL .DELETE_SKEWED_COL_NAMES , new SqlParameterValue (Types .BIGINT , ids .getSdsId ()));
251
+ jdbcTemplate .update (SQL .DELETE_BUCKETING_COLS , new SqlParameterValue (Types .BIGINT , ids .getSdsId ()));
252
+ jdbcTemplate .update (SQL .DELETE_SORT_COLS , new SqlParameterValue (Types .BIGINT , ids .getSdsId ()));
253
+ jdbcTemplate .update (SQL .DELETE_SD_PARAMS , new SqlParameterValue (Types .BIGINT , ids .getSdsId ()));
254
+ jdbcTemplate .update (SQL .DELETE_SKEWED_COL_VALUE_LOC_MAP ,
255
+ new SqlParameterValue (Types .BIGINT , ids .getSdsId ()));
256
+ jdbcTemplate .update (SQL .DELETE_SKEWED_VALUES , new SqlParameterValue (Types .BIGINT , ids .getSdsId ()));
257
+ jdbcTemplate .update (SQL .DELETE_SERDE_PARAMS , new SqlParameterValue (Types .BIGINT , ids .getSerdeId ()));
258
+ jdbcTemplate .update (SQL .DELETE_SERDES , new SqlParameterValue (Types .BIGINT , ids .getSerdeId ()));
259
+ jdbcTemplate .update (SQL .DELETE_SDS , new SqlParameterValue (Types .BIGINT , ids .getSdsId ()));
260
+ jdbcTemplate .update (SQL .DELETE_TBLS , new SqlParameterValue (Types .BIGINT , ids .getTableId ()));
261
+ } catch (DataAccessException e ) {
262
+ throw new ConnectorException (String .format ("Failed delete hive table %s" , tableName ), e );
263
+ }
264
+ }
265
+
266
+ private TableSequenceIds getSequenceIds (final QualifiedName tableName ) {
267
+ try {
268
+ return jdbcTemplate .queryForObject (
269
+ SQL .TABLE_SEQUENCE_IDS ,
270
+ new Object []{tableName .getDatabaseName (), tableName .getTableName ()},
271
+ new int []{Types .VARCHAR , Types .VARCHAR },
272
+ (rs , rowNum ) -> new TableSequenceIds (rs .getLong ("tbl_id" ), rs .getLong ("cd_id" ),
273
+ rs .getLong ("sd_id" ), rs .getLong ("serde_id" )));
274
+ } catch (EmptyResultDataAccessException e ) {
275
+ throw new TableNotFoundException (tableName );
276
+ }
277
+ }
278
+
225
279
@ VisibleForTesting
226
280
private static class SQL {
227
281
static final String GET_TABLE_NAMES_BY_URI =
@@ -237,5 +291,26 @@ private static class SQL {
237
291
"update TABLE_PARAMS set param_value=? WHERE tbl_id=? and param_key=?" ;
238
292
static final String INSERT_TABLE_PARAMS =
239
293
"insert into TABLE_PARAMS(tbl_id,param_key,param_value) values (?,?,?)" ;
294
+ static final String UPDATE_SDS_CD = "UPDATE SDS SET CD_ID=? WHERE SD_ID=?" ;
295
+ static final String DELETE_COLUMNS_V2 = "DELETE FROM COLUMNS_V2 WHERE CD_ID=?" ;
296
+ static final String DELETE_CDS = "DELETE FROM CDS WHERE CD_ID=?" ;
297
+ static final String DELETE_PARTITION_KEYS = "DELETE FROM PARTITION_KEYS WHERE TBL_ID=?" ;
298
+ static final String DELETE_TABLE_PARAMS = "DELETE FROM TABLE_PARAMS WHERE TBL_ID=?" ;
299
+ static final String DELETE_TAB_COL_STATS = "DELETE FROM TAB_COL_STATS WHERE TBL_ID=?" ;
300
+ static final String UPDATE_TABLE_SD = "UPDATE TBLS SET SD_ID=? WHERE TBL_ID=?" ;
301
+ static final String DELETE_SKEWED_COL_NAMES = "DELETE FROM SKEWED_COL_NAMES WHERE SD_ID=?" ;
302
+ static final String DELETE_BUCKETING_COLS = "DELETE FROM BUCKETING_COLS WHERE SD_ID=?" ;
303
+ static final String DELETE_SORT_COLS = "DELETE FROM SORT_COLS WHERE SD_ID=?" ;
304
+ static final String DELETE_SD_PARAMS = "DELETE FROM SD_PARAMS WHERE SD_ID=?" ;
305
+ static final String DELETE_SKEWED_COL_VALUE_LOC_MAP = "DELETE FROM SKEWED_COL_VALUE_LOC_MAP WHERE SD_ID=?" ;
306
+ static final String DELETE_SKEWED_VALUES = "DELETE FROM SKEWED_VALUES WHERE SD_ID_OID=?" ;
307
+ static final String UPDATE_SDS_SERDE = "UPDATE SDS SET SERDE_ID=? WHERE SD_ID=?" ;
308
+ static final String DELETE_SERDE_PARAMS = "DELETE FROM SERDE_PARAMS WHERE SERDE_ID=?" ;
309
+ static final String DELETE_SERDES = "DELETE FROM SERDES WHERE SERDE_ID=?" ;
310
+ static final String DELETE_SDS = "DELETE FROM SDS WHERE SD_ID=?" ;
311
+ static final String DELETE_TBLS = "DELETE FROM TBLS WHERE TBL_ID=?" ;
312
+ static final String TABLE_SEQUENCE_IDS = "select t.tbl_id, s.sd_id, s.cd_id, s.serde_id"
313
+ + " from DBS d join TBLS t on d.db_id=t.db_id join SDS s on t.sd_id=s.sd_id"
314
+ + " where d.name=? and t.tbl_name=?" ;
240
315
}
241
316
}
0 commit comments