Skip to content

Commit 65ac4a0

Browse files
committed
HIVE-28578: Concurrency issue in updateTableColumnStatistics
1 parent 4669cd5 commit 65ac4a0

File tree

7 files changed

+270
-169
lines changed

7 files changed

+270
-169
lines changed

standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,6 +1309,14 @@ public enum ConfVars {
13091309
+ " seqprefix: adds a 'N_' prefix to the table name to get a unique location (table,1_table,2_table,...)\n"
13101310
+ " prohibit: do not consider alternate locations; throw error if the default is not available\n"
13111311
+ " force: use the default location even in case the directory is already available"),
1312+
METASTORE_S4U_NOWAIT_MAX_RETRIES("metastore.s4u.nowait.max.retries",
1313+
"hive.metastore.s4u.nowait.max.retries", 20,
1314+
"Number of retries required to acquire a row lock immediately without waiting."),
1315+
METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL(
1316+
"metastore.s4u.nowait.retry.sleep.interval",
1317+
"hive.metastore.s4u.nowait.retry.sleep.interval", 300, TimeUnit.MILLISECONDS,
1318+
"Sleep interval between retries to acquire a row lock immediately described part of property "
1319+
+ METASTORE_S4U_NOWAIT_MAX_RETRIES.name()),
13121320

13131321
MULTITHREADED("javax.jdo.option.Multithreaded", "javax.jdo.option.Multithreaded", true,
13141322
"Set this to true if multiple threads access metastore through JDO concurrently."),

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ public class DatabaseProduct implements Configurable {
6666
*/
6767
private static final ReentrantLock derbyLock = new ReentrantLock(true);
6868

69-
public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED};
69+
public enum DbType {
70+
DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED
71+
};
7072
static public DbType dbType;
7173

7274
// Singleton instance
@@ -75,6 +77,9 @@ public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED
7577
Configuration myConf;
7678

7779
private String productName;
80+
81+
private String dbVersion;
82+
7883
/**
7984
* Protected constructor for singleton class
8085
*/
@@ -92,7 +97,8 @@ public static DatabaseProduct determineDatabaseProduct(DataSource connPool,
9297
Configuration conf) {
9398
try (Connection conn = connPool.getConnection()) {
9499
String s = conn.getMetaData().getDatabaseProductName();
95-
return determineDatabaseProduct(s, conf);
100+
String version = conn.getMetaData().getDatabaseProductVersion();
101+
return determineDatabaseProduct(s, version, conf);
96102
} catch (SQLException e) {
97103
throw new IllegalStateException("Unable to get database product name", e);
98104
}
@@ -103,8 +109,12 @@ public static DatabaseProduct determineDatabaseProduct(DataSource connPool,
103109
* @param productName string to defer database connection
104110
* @return database product type
105111
*/
106-
public static DatabaseProduct determineDatabaseProduct(String productName,
107-
Configuration conf) {
112+
public static DatabaseProduct determineDatabaseProduct(String productName, Configuration configuration) {
113+
return determineDatabaseProduct(productName, null, configuration);
114+
}
115+
116+
private static DatabaseProduct determineDatabaseProduct(String productName,
117+
String version, Configuration conf) {
108118
DbType dbt;
109119

110120
Preconditions.checkNotNull(conf, "Configuration is null");
@@ -117,6 +127,9 @@ public static DatabaseProduct determineDatabaseProduct(String productName,
117127
dbt = DbType.CUSTOM;
118128
}
119129
Preconditions.checkState(theDatabaseProduct.dbType == dbt);
130+
if (theDatabaseProduct.dbVersion == null && version != null) {
131+
theDatabaseProduct.dbVersion = version;
132+
}
120133
return theDatabaseProduct;
121134
}
122135

@@ -160,6 +173,9 @@ public static DatabaseProduct determineDatabaseProduct(String productName,
160173

161174
theDatabaseProduct.dbType = dbt;
162175
theDatabaseProduct.productName = productName;
176+
if (version != null) {
177+
theDatabaseProduct.dbVersion = version;
178+
}
163179
}
164180
}
165181
return theDatabaseProduct;
@@ -424,24 +440,43 @@ public String isWithinCheckInterval(String expr, long intervalInSeconds) throws
424440
return condition;
425441
}
426442

427-
public String addForUpdateClause(String selectStatement) throws MetaException {
443+
public String addForUpdateClause(String selectStatement, boolean noWait) throws MetaException {
428444
switch (dbType) {
429445
case DERBY:
430446
//https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
431447
//sadly in Derby, FOR UPDATE doesn't meant what it should
432448
return selectStatement;
433-
case MYSQL:
434-
//http://dev.mysql.com/doc/refman/5.7/en/select.html
435449
case ORACLE:
436450
//https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html
437451
case POSTGRES:
438452
//http://www.postgresql.org/docs/9.0/static/sql-select.html
439453
case CUSTOM: // ANSI SQL
454+
return selectStatement + " for update" + (noWait ? " NOWAIT" : "");
455+
case MYSQL:
456+
//http://dev.mysql.com/doc/refman/5.7/en/select.html
457+
if (noWait) {
458+
// Prior to MySQL 8.0.1, the NOWAIT clause for row locking was not supported directly in the s4u syntax.
459+
// Use the MAX_EXECUTION_TIME to ensure the s4u does not run indefinitely.
460+
// https://dev.mysql.com/blog-archive/mysql-8-0-1-using-skip-locked-and-nowait-to-handle-hot-rows/
461+
String dbName = productName.replaceAll("\\s+", "").toLowerCase();
462+
boolean addNoWait = dbName.contains(MYSQL_NAME) &&
463+
dbVersion != null && dbVersion.compareToIgnoreCase("8.0.1") >= 0;
464+
// https://mariadb.com/docs/release-notes/community-server/old-releases/release-notes-mariadb-10-3-series/mariadb-1030-release-notes
465+
addNoWait |= productName.contains(MARIADB_NAME) &&
466+
dbVersion != null && dbVersion.compareToIgnoreCase("10.3") >= 0;
467+
if (addNoWait) {
468+
return selectStatement + " for update NOWAIT";
469+
} else {
470+
int selectLength = "select".length();
471+
return selectStatement.trim().substring(0, selectLength) + " /*+ MAX_EXECUTION_TIME(300) */ " +
472+
selectStatement.trim().substring(selectLength) + " for update";
473+
}
474+
}
440475
return selectStatement + " for update";
441476
case SQLSERVER:
442477
//https://msdn.microsoft.com/en-us/library/ms189499.aspx
443478
//https://msdn.microsoft.com/en-us/library/ms187373.aspx
444-
String modifier = " with (updlock)";
479+
String modifier = " with (updlock" + (noWait ? ",NOWAIT" : "") + ")";
445480
int wherePos = selectStatement.toUpperCase().indexOf(" WHERE ");
446481
if (wherePos < 0) {
447482
return selectStatement + modifier;

0 commit comments

Comments
 (0)