Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,14 @@ public enum ConfVars {
+ " seqprefix: adds a 'N_' prefix to the table name to get a unique location (table,1_table,2_table,...)\n"
+ " prohibit: do not consider alternate locations; throw error if the default is not available\n"
+ " force: use the default location even in case the directory is already available"),
METASTORE_S4U_NOWAIT_MAX_RETRIES("metastore.s4u.nowait.max.retries",
"hive.metastore.s4u.nowait.max.retries", 20,
"Number of retries required to acquire a row lock immediately without waiting."),
METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL(
"metastore.s4u.nowait.retry.sleep.interval",
"hive.metastore.s4u.nowait.retry.sleep.interval", 300, TimeUnit.MILLISECONDS,
"Sleep interval between retries to acquire a row lock immediately described part of property "
+ METASTORE_S4U_NOWAIT_MAX_RETRIES.name()),

MULTITHREADED("javax.jdo.option.Multithreaded", "javax.jdo.option.Multithreaded", true,
"Set this to true if multiple threads access metastore through JDO concurrently."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public class DatabaseProduct implements Configurable {
*/
private static final ReentrantLock derbyLock = new ReentrantLock(true);

public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED};
public enum DbType {
DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED
};
static public DbType dbType;

// Singleton instance
Expand All @@ -75,6 +77,9 @@ public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED
Configuration myConf;

private String productName;

private String dbVersion;

/**
* Protected constructor for singleton class
*/
Expand All @@ -92,7 +97,8 @@ public static DatabaseProduct determineDatabaseProduct(DataSource connPool,
Configuration conf) {
try (Connection conn = connPool.getConnection()) {
String s = conn.getMetaData().getDatabaseProductName();
return determineDatabaseProduct(s, conf);
String version = conn.getMetaData().getDatabaseProductVersion();
return determineDatabaseProduct(s, version, conf);
} catch (SQLException e) {
throw new IllegalStateException("Unable to get database product name", e);
}
Expand All @@ -103,8 +109,12 @@ public static DatabaseProduct determineDatabaseProduct(DataSource connPool,
* @param productName string to defer database connection
* @return database product type
*/
public static DatabaseProduct determineDatabaseProduct(String productName,
Configuration conf) {
public static DatabaseProduct determineDatabaseProduct(String productName, Configuration configuration) {
return determineDatabaseProduct(productName, null, configuration);
}

private static DatabaseProduct determineDatabaseProduct(String productName,
String version, Configuration conf) {
DbType dbt;

Preconditions.checkNotNull(conf, "Configuration is null");
Expand All @@ -117,6 +127,9 @@ public static DatabaseProduct determineDatabaseProduct(String productName,
dbt = DbType.CUSTOM;
}
Preconditions.checkState(theDatabaseProduct.dbType == dbt);
if (theDatabaseProduct.dbVersion == null && version != null) {
theDatabaseProduct.dbVersion = version;
}
return theDatabaseProduct;
}

Expand Down Expand Up @@ -160,6 +173,9 @@ public static DatabaseProduct determineDatabaseProduct(String productName,

theDatabaseProduct.dbType = dbt;
theDatabaseProduct.productName = productName;
if (version != null) {
theDatabaseProduct.dbVersion = version;
}
}
}
return theDatabaseProduct;
Expand Down Expand Up @@ -424,24 +440,43 @@ public String isWithinCheckInterval(String expr, long intervalInSeconds) throws
return condition;
}

public String addForUpdateClause(String selectStatement) throws MetaException {
public String addForUpdateClause(String selectStatement, boolean noWait) throws MetaException {
switch (dbType) {
case DERBY:
//https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
//sadly in Derby, FOR UPDATE doesn't meant what it should
return selectStatement;
case MYSQL:
//http://dev.mysql.com/doc/refman/5.7/en/select.html
case ORACLE:
//https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html
case POSTGRES:
//http://www.postgresql.org/docs/9.0/static/sql-select.html
case CUSTOM: // ANSI SQL
return selectStatement + " for update" + (noWait ? " NOWAIT" : "");
case MYSQL:
//http://dev.mysql.com/doc/refman/5.7/en/select.html
if (noWait) {
// Prior to MySQL 8.0.1, the NOWAIT clause for row locking was not supported directly in the s4u syntax.
// Use the MAX_EXECUTION_TIME to ensure the s4u does not run indefinitely.
// https://dev.mysql.com/blog-archive/mysql-8-0-1-using-skip-locked-and-nowait-to-handle-hot-rows/
String dbName = productName.replaceAll("\\s+", "").toLowerCase();
boolean addNoWait = dbName.contains(MYSQL_NAME) &&
dbVersion != null && dbVersion.compareToIgnoreCase("8.0.1") >= 0;
// https://mariadb.com/docs/release-notes/community-server/old-releases/release-notes-mariadb-10-3-series/mariadb-1030-release-notes
addNoWait |= productName.contains(MARIADB_NAME) &&
dbVersion != null && dbVersion.compareToIgnoreCase("10.3") >= 0;
if (addNoWait) {
return selectStatement + " for update NOWAIT";
} else {
int selectLength = "select".length();
return selectStatement.trim().substring(0, selectLength) + " /*+ MAX_EXECUTION_TIME(300) */ " +
selectStatement.trim().substring(selectLength) + " for update";
}
}
return selectStatement + " for update";
case SQLSERVER:
//https://msdn.microsoft.com/en-us/library/ms189499.aspx
//https://msdn.microsoft.com/en-us/library/ms187373.aspx
String modifier = " with (updlock)";
String modifier = " with (updlock" + (noWait ? ",NOWAIT" : "") + ")";
int wherePos = selectStatement.toUpperCase().indexOf(" WHERE ");
if (wherePos < 0) {
return selectStatement + modifier;
Expand Down
Loading