diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index e3e29aa92bd8..be4941ca010e 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1309,6 +1309,14 @@ public enum ConfVars { + " seqprefix: adds a 'N_' prefix to the table name to get a unique location (table,1_table,2_table,...)\n" + " prohibit: do not consider alternate locations; throw error if the default is not available\n" + " force: use the default location even in case the directory is already available"), + METASTORE_S4U_NOWAIT_MAX_RETRIES("metastore.s4u.nowait.max.retries", + "hive.metastore.s4u.nowait.max.retries", 20, + "Number of retries required to acquire a row lock immediately without waiting."), + METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL( + "metastore.s4u.nowait.retry.sleep.interval", + "hive.metastore.s4u.nowait.retry.sleep.interval", 300, TimeUnit.MILLISECONDS, + "Sleep interval between retries to acquire a row lock immediately described part of property " + + METASTORE_S4U_NOWAIT_MAX_RETRIES.name()), MULTITHREADED("javax.jdo.option.Multithreaded", "javax.jdo.option.Multithreaded", true, "Set this to true if multiple threads access metastore through JDO concurrently."), diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java index 2b012b9a9bba..686c1e9c3713 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java @@ -32,6 +32,7 @@ import java.util.stream.Stream; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -66,7 +67,9 @@ public class DatabaseProduct implements Configurable { */ private static final ReentrantLock derbyLock = new ReentrantLock(true); - public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED}; + public enum DbType { + DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED + }; static public DbType dbType; // Singleton instance @@ -75,6 +78,11 @@ public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED Configuration myConf; private String productName; + + private String dbVersion; + + private Pair versionNums; + /** * Protected constructor for singleton class */ @@ -92,7 +100,10 @@ public static DatabaseProduct determineDatabaseProduct(DataSource connPool, Configuration conf) { try (Connection conn = connPool.getConnection()) { String s = conn.getMetaData().getDatabaseProductName(); - return determineDatabaseProduct(s, conf); + String version = conn.getMetaData().getDatabaseProductVersion(); + int majorVersion = conn.getMetaData().getDatabaseMajorVersion(); + int minorVersion = conn.getMetaData().getDatabaseMinorVersion(); + return determineDatabaseProduct(s, version, Pair.of(majorVersion, minorVersion), conf); } catch (SQLException e) { throw new IllegalStateException("Unable to get database product name", e); } @@ -103,8 +114,12 @@ public static DatabaseProduct determineDatabaseProduct(DataSource connPool, * @param productName string to defer database connection * @return database product type */ - public static DatabaseProduct determineDatabaseProduct(String productName, - Configuration conf) { + public static DatabaseProduct determineDatabaseProduct(String productName, Configuration configuration) { + return determineDatabaseProduct(productName, null, null, configuration); + } + + private static DatabaseProduct determineDatabaseProduct(String productName, + String version, Pair versionNums, Configuration conf) { DbType dbt; Preconditions.checkNotNull(conf, "Configuration is null"); @@ -117,6 +132,12 @@ public static DatabaseProduct determineDatabaseProduct(String productName, dbt = DbType.CUSTOM; } Preconditions.checkState(theDatabaseProduct.dbType == dbt); + if (theDatabaseProduct.dbVersion == null && version != null) { + theDatabaseProduct.dbVersion = version; + } + if (theDatabaseProduct.versionNums == null && versionNums != null) { + theDatabaseProduct.versionNums = versionNums; + } return theDatabaseProduct; } @@ -160,6 +181,12 @@ public static DatabaseProduct determineDatabaseProduct(String productName, theDatabaseProduct.dbType = dbt; theDatabaseProduct.productName = productName; + if (version != null) { + theDatabaseProduct.dbVersion = version; + } + if (versionNums != null) { + theDatabaseProduct.versionNums = versionNums; + } } } return theDatabaseProduct; @@ -424,24 +451,34 @@ public String isWithinCheckInterval(String expr, long intervalInSeconds) throws return condition; } - public String addForUpdateClause(String selectStatement) throws MetaException { + public String addForUpdateClause(String selectStatement, boolean noWait) throws MetaException { switch (dbType) { case DERBY: //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html //sadly in Derby, FOR UPDATE doesn't meant what it should return selectStatement; - case MYSQL: - //http://dev.mysql.com/doc/refman/5.7/en/select.html case ORACLE: //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html case POSTGRES: //http://www.postgresql.org/docs/9.0/static/sql-select.html case CUSTOM: // ANSI SQL + return selectStatement + " for update" + (noWait ? " NOWAIT" : ""); + case MYSQL: + //http://dev.mysql.com/doc/refman/5.7/en/select.html + if (noWait) { + if (canMySQLSupportNoWait()) { + return selectStatement + " for update NOWAIT"; + } else { + int selectLength = "select".length(); + return selectStatement.trim().substring(0, selectLength) + " /*+ MAX_EXECUTION_TIME(300) */ " + + selectStatement.trim().substring(selectLength) + " for update"; + } + } return selectStatement + " for update"; case SQLSERVER: //https://msdn.microsoft.com/en-us/library/ms189499.aspx //https://msdn.microsoft.com/en-us/library/ms187373.aspx - String modifier = " with (updlock)"; + String modifier = " with (updlock" + (noWait ? ",NOWAIT" : "") + ")"; int wherePos = selectStatement.toUpperCase().indexOf(" WHERE "); if (wherePos < 0) { return selectStatement + modifier; @@ -455,6 +492,26 @@ public String addForUpdateClause(String selectStatement) throws MetaException { } } + private boolean canMySQLSupportNoWait() { + if (versionNums == null) { + // Cannot determine the real version of back db + return false; + } + // Prior to MySQL 8.0.1, the NOWAIT clause for row locking was not supported directly in the s4u syntax. + // Use the MAX_EXECUTION_TIME to ensure the s4u does not run indefinitely. + String dbName = productName.replaceAll("\\s+", "").toLowerCase(); + boolean isMariaDB = dbName.contains(MARIADB_NAME) || + (dbVersion != null && dbVersion.toLowerCase().contains(MARIADB_NAME)); + if (isMariaDB) { + // https://mariadb.com/docs/release-notes/community-server/old-releases/release-notes-mariadb-10-3-series/mariadb-1030-release-notes + return (versionNums.getLeft() >= 10 && versionNums.getRight() > 2); + } else { + // https://dev.mysql.com/blog-archive/mysql-8-0-1-using-skip-locked-and-nowait-to-handle-hot-rows/ + return versionNums.getLeft() > 8 || + (versionNums.getLeft() == 8 && dbVersion != null && dbVersion.compareToIgnoreCase("8.0.1") >= 0); + } + } + /** * Add a limit clause to a given query * @param numRows diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 965ff0edc169..cfa5af5a1f63 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -53,9 +53,10 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; +import java.util.function.Consumer; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -68,7 +69,6 @@ import javax.jdo.datastore.JDOConnection; import javax.jdo.identity.IntIdentity; -import com.google.common.util.concurrent.Striped; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -257,6 +257,7 @@ import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.RetryingExecutor; import org.apache.thrift.TException; import org.datanucleus.ExecutionContext; import org.datanucleus.api.jdo.JDOPersistenceManager; @@ -347,8 +348,6 @@ private enum TXN_STATUS { private boolean areTxnStatsSupported = false; private PropertyStore propertyStore; - private static Striped tablelocks; - public ObjectStore() { } @@ -398,15 +397,6 @@ public void setConf(Configuration conf) { } else { LOG.debug("Initialized ObjectStore"); } - - if (tablelocks == null) { - synchronized (ObjectStore.class) { - if (tablelocks == null) { - int numTableLocks = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_NUM_STRIPED_TABLE_LOCKS); - tablelocks = Striped.lazyWeakLock(numTableLocks); - } - } - } } @SuppressWarnings("nls") @@ -9473,98 +9463,95 @@ private void writeMPartitionColumnStatistics(Table table, Partition partition, } } - /** - * Get table's column stats - * - * @return Map of column name and its stats - */ - private Map getPartitionColStats(Table table, List colNames, String engine) - throws MetaException { - Map statsMap = Maps.newHashMap(); - List stats = getMTableColumnStatistics(table, colNames, engine); - for (MTableColumnStatistics cStat : stats) { - statsMap.put(cStat.getColName(), cStat); - } - return statsMap; - } - @Override public Map updateTableColumnStatistics(ColumnStatistics colStats, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean committed = false; - List statsObjs = colStats.getStatsObj(); ColumnStatisticsDesc statsDesc = colStats.getStatsDesc(); - - Lock tableLock = getTableLockFor(statsDesc.getDbName(), statsDesc.getTableName()); - tableLock.lock(); + long start = System.currentTimeMillis(); + String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf); try { openTransaction(); // DataNucleus objects get detached all over the place for no (real) reason. // So let's not use them anywhere unless absolutely necessary. - String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf); + int maxRetries = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_S4U_NOWAIT_MAX_RETRIES); + long sleepInterval = ThreadLocalRandom.current().nextLong(MetastoreConf.getTimeVar(conf, + ConfVars.METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL, TimeUnit.MILLISECONDS)) + 30; MTable mTable = ensureGetMTable(catName, statsDesc.getDbName(), statsDesc.getTableName()); - Table table = convertToTable(mTable); - List colNames = new ArrayList<>(); - for (ColumnStatisticsObj statsObj : statsObjs) { - colNames.add(statsObj.getColName()); - } - - Map oldStats = getPartitionColStats(table, colNames, colStats.getEngine()); - - for (ColumnStatisticsObj statsObj : statsObjs) { - MTableColumnStatistics mStatsObj = StatObjectConverter.convertToMTableColumnStatistics( - mTable, statsDesc, - statsObj, colStats.getEngine()); - writeMTableColumnStatistics(table, mStatsObj, oldStats.get(statsObj.getColName())); - // There is no need to add colname again, otherwise we will get duplicate colNames. - } - - // TODO: (HIVE-20109) ideally the col stats stats should be in colstats, not in the table! - // Set the table properties - // No need to check again if it exists. - String dbname = table.getDbName(); - String name = table.getTableName(); - MTable oldt = mTable; - Map newParams = new HashMap<>(table.getParameters()); - StatsSetupConst.setColumnStatsState(newParams, colNames); - boolean isTxn = TxnUtils.isTransactionalTable(oldt.getParameters()); - if (isTxn) { - if (!areTxnStatsSupported) { - StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); - } else { - String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(dbname, name), - oldt.getParameters(), newParams, writeId, validWriteIds, true); - if (errorMsg != null) { - throw new MetaException(errorMsg); - } - if (!isCurrentStatsValidForTheQuery(oldt, validWriteIds, true)) { - // Make sure we set the flag to invalid regardless of the current value. + Map result = new RetryingExecutor<>(maxRetries, sleepInterval, () -> { + Ref exceptionRef = new Ref<>(); + String savePoint = "uts_" + ThreadLocalRandom.current().nextInt(10000) + "_" + System.nanoTime(); + setTransactionSavePoint(savePoint); + executePlainSQL( + sqlGenerator.addForUpdateNoWait("SELECT \"TBL_ID\" FROM \"TBLS\" WHERE \"TBL_ID\" = " + mTable.getId()), + exception -> { + rollbackTransactionToSavePoint(savePoint); + exceptionRef.t = exception; + }); + if (exceptionRef.t != null) { + throw new RetryingExecutor.RetryException(exceptionRef.t); + } + pm.refresh(mTable); + Table table = convertToTable(mTable); + List colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj : statsObjs) { + colNames.add(statsObj.getColName()); + } + + Map oldStats = Maps.newHashMap(); + List stats = getMTableColumnStatistics(table, colNames, colStats.getEngine()); + for (MTableColumnStatistics cStat : stats) { + oldStats.put(cStat.getColName(), cStat); + } + + for (ColumnStatisticsObj statsObj : statsObjs) { + MTableColumnStatistics mStatsObj = StatObjectConverter.convertToMTableColumnStatistics(mTable, statsDesc, + statsObj, colStats.getEngine()); + writeMTableColumnStatistics(table, mStatsObj, oldStats.get(statsObj.getColName())); + // There is no need to add colname again, otherwise we will get duplicate colNames. + } + + // TODO: (HIVE-20109) ideally the col stats stats should be in colstats, not in the table! + // Set the table properties + // No need to check again if it exists. + String dbname = table.getDbName(); + String name = table.getTableName(); + MTable oldt = mTable; + Map newParams = new HashMap<>(table.getParameters()); + StatsSetupConst.setColumnStatsState(newParams, colNames); + boolean isTxn = TxnUtils.isTransactionalTable(oldt.getParameters()); + if (isTxn) { + if (!areTxnStatsSupported) { StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " - + dbname + "." + name); + } else { + String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(dbname, name), oldt.getParameters(), newParams, + writeId, validWriteIds, true); + if (errorMsg != null) { + throw new MetaException(errorMsg); + } + if (!isCurrentStatsValidForTheQuery(oldt, validWriteIds, true)) { + // Make sure we set the flag to invalid regardless of the current value. + StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + dbname + "." + name); + } + oldt.setWriteId(writeId); } - oldt.setWriteId(writeId); } - } - oldt.setParameters(newParams); - + oldt.setParameters(newParams); + return newParams; + }).onRetry(RetryingExecutor.RetryException.class).commandName("updateTableColumnStatistics").run(); committed = commitTransaction(); // TODO: similar to update...Part, this used to do "return committed;"; makes little sense. - return committed ? newParams : null; + return committed ? result : null; } finally { - try { - rollbackAndCleanup(committed, null); - } finally { - tableLock.unlock(); - } + LOG.info("{} updateTableColumnStatistics took {}ms, success: {}", + new TableName(catName, statsDesc.getDbName(), statsDesc.getTableName()), + System.currentTimeMillis() - start, committed); + rollbackAndCleanup(committed, null); } } - private Lock getTableLockFor(String dbName, String tblName) { - return tablelocks.get(dbName + "." + tblName); - } - /** * Get partition's column stats * @@ -11435,93 +11422,50 @@ public List getAllWriteEventInfo(long txnId, String dbName, Stri return writeEventInfoList; } - private void prepareQuotes() throws SQLException { + private void executePlainSQL(String sql, Consumer exceptionConsumer) + throws SQLException { String s = dbType.getPrepareTxnStmt(); - if (s != null) { - assert pm.currentTransaction().isActive(); - JDOConnection jdoConn = pm.getDataStoreConnection(); - try (Statement statement = ((Connection) jdoConn.getNativeConnection()).createStatement()) { + assert pm.currentTransaction().isActive(); + JDOConnection jdoConn = pm.getDataStoreConnection(); + Connection conn = (Connection) jdoConn.getNativeConnection(); + try (Statement statement = conn.createStatement()) { + if (s != null) { statement.execute(s); - } finally { - jdoConn.close(); } + try { + statement.execute(sql); + } catch (SQLException e) { + if (exceptionConsumer != null) { + exceptionConsumer.accept(e); + } else { + throw e; + } + } + } finally { + jdoConn.close(); } } private void lockNotificationSequenceForUpdate() throws MetaException { + int maxRetries = + MetastoreConf.getIntVar(conf, ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES); + long sleepInterval = MetastoreConf.getTimeVar(conf, + ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL, TimeUnit.MILLISECONDS); if (sqlGenerator.getDbProduct().isDERBY() && directSql != null) { // Derby doesn't allow FOR UPDATE to lock the row being selected (See https://db.apache // .org/derby/docs/10.1/ref/rrefsqlj31783.html) . So lock the whole table. Since there's // only one row in the table, this shouldn't cause any performance degradation. - new RetryingExecutor(conf, () -> { + new RetryingExecutor(maxRetries, sleepInterval, () -> { directSql.lockDbTable("NOTIFICATION_SEQUENCE"); - }).run(); + return null; + }).commandName("lockNotificationSequenceForUpdate").run(); } else { String selectQuery = "select \"NEXT_EVENT_ID\" from \"NOTIFICATION_SEQUENCE\""; String lockingQuery = sqlGenerator.addForUpdateClause(selectQuery); - new RetryingExecutor(conf, () -> { - prepareQuotes(); - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", lockingQuery))) { - query.setUnique(true); - // only need to execute it to get db Lock - query.execute(); - } - }).run(); - } - } - - static class RetryingExecutor { - interface Command { - void process() throws Exception; - } - - private static Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class); - private final int maxRetries; - private final long sleepInterval; - private int currentRetries = 0; - private final Command command; - - RetryingExecutor(Configuration config, Command command) { - this.maxRetries = - MetastoreConf.getIntVar(config, ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES); - this.sleepInterval = MetastoreConf.getTimeVar(config, - ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL, TimeUnit.MILLISECONDS); - this.command = command; - } - - public void run() throws MetaException { - while (true) { - try { - command.process(); - break; - } catch (Exception e) { - LOG.info( - "Attempting to acquire the DB log notification lock: {} out of {}" + - " retries", currentRetries, maxRetries, e); - if (currentRetries >= maxRetries) { - String message = - "Couldn't acquire the DB log notification lock because we reached the maximum" - + " # of retries: " + maxRetries - + " retries. If this happens too often, then is recommended to " - + "increase the maximum number of retries on the" - + " hive.notification.sequence.lock.max.retries configuration"; - LOG.error(message, e); - throw new MetaException(message + " :: " + e.getMessage()); - } - currentRetries++; - try { - Thread.sleep(sleepInterval); - } catch (InterruptedException e1) { - String msg = "Couldn't acquire the DB notification log lock on " + currentRetries - + " retry, because the following error: "; - LOG.error(msg, e1); - throw new MetaException(msg + e1.getMessage()); - } - } - } - } - public long getSleepInterval() { - return sleepInterval; + new RetryingExecutor(maxRetries, sleepInterval, () -> { + executePlainSQL(lockingQuery, null); + return null; + }).commandName("lockNotificationSequenceForUpdate").run(); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java index b793345dd945..7b3382265b29 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -127,7 +127,11 @@ private List createInsertValuesStmt(String tblColumns, List rows * construct. If the DB doesn't support, return original select. */ public String addForUpdateClause(String selectStatement) throws MetaException { - return dbProduct.addForUpdateClause(selectStatement); + return dbProduct.addForUpdateClause(selectStatement, false); + } + + public String addForUpdateNoWait(String selectStatement) throws MetaException { + return dbProduct.addForUpdateClause(selectStatement, true); } /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java new file mode 100644 index 000000000000..014e21b94f24 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RetryingExecutor { + private static Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class); + + private final int maxRetries; + private final long sleepInterval; + private final Callable command; + private final List> retriableException = new ArrayList<>(); + private int currentRetries = 0; + private String commandName; + + public RetryingExecutor(int maxRetries, long sleepInterval, Callable command) { + this.maxRetries = maxRetries; + this.sleepInterval = sleepInterval; + this.command = command; + } + + public RetryingExecutor onRetry(Class ex) { + this.retriableException.add(ex); + return this; + } + + public RetryingExecutor commandName(String name) { + this.commandName = name; + return this; + } + + public T run() throws MetaException { + while (true) { + try { + return command.call(); + } catch (Exception e) { + checkException(e); + LOG.info("Attempting to retry the command:{} in {} out of {} retries", + commandName, currentRetries, maxRetries, e); + if (currentRetries >= maxRetries) { + String message = "Couldn't finish the command: " + commandName + + " because we reached the maximum of retries: " + maxRetries; + LOG.error(message, e); + throw new MetaException(message + " :: " + e.getMessage()); + } + currentRetries++; + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e1) { + String msg = "Couldn't run the command: " + commandName + " in " + currentRetries + + " retry, because the following error: "; + LOG.error(msg, e1); + throw new MetaException(msg + e1.getMessage()); + } + } + } + } + + private void checkException(Exception e) throws MetaException { + if (!retriableException.isEmpty() && + retriableException.stream().noneMatch(nex -> nex.isInstance(e))) { + String message = "See a non-retriable exception, avoid to retry the command:" + commandName; + LOG.info(message, e); + throw new MetaException(message + " :: " + e.getMessage()); + } + } + + public static class RetryException extends Exception { + private static final long serialVersionUID = 1L; + + public RetryException(Exception ex) { + super(ex); + } + + public RetryException(String msg) { + super(msg); + } + } + + public long getSleepInterval() { + return sleepInterval; + } +} \ No newline at end of file diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java index 444edd8812ff..aa2ba60934a9 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java @@ -74,7 +74,7 @@ public String isWithinCheckInterval(String expr, long intervalInSeconds) { return "DummyIsWithin"; } @Override - public String addForUpdateClause(String selectStatement) { + public String addForUpdateClause(String selectStatement, boolean noWait) { return selectStatement + " for update"; } @Override diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java index 4a2408b69e2a..ba91731e8653 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -21,7 +21,6 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.apache.hadoop.hive.metastore.ObjectStore.RetryingExecutor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; import org.apache.hadoop.hive.metastore.api.Catalog; @@ -83,6 +82,7 @@ import org.apache.hadoop.hive.metastore.model.MTable; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; +import org.apache.hadoop.hive.metastore.utils.RetryingExecutor; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -1300,7 +1300,11 @@ public void testQueryCloseOnError() throws Exception { @Test public void testRetryingExecutorSleep() throws Exception { - RetryingExecutor re = new ObjectStore.RetryingExecutor(MetastoreConf.newMetastoreConf(), null); + int maxRetries = + MetastoreConf.getIntVar(conf, ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES); + long sleepInterval = MetastoreConf.getTimeVar(conf, + ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL, TimeUnit.MILLISECONDS); + RetryingExecutor re = new RetryingExecutor<>(maxRetries, sleepInterval, null); Assert.assertTrue("invalid sleep value", re.getSleepInterval() >= 0); }