Skip to content

Commit c729ea1

Browse files
committed
retry policy
1 parent ca9817e commit c729ea1

File tree

3 files changed

+56
-25
lines changed

3 files changed

+56
-25
lines changed

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9167,11 +9167,11 @@ public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats
91679167
openTransaction();
91689168
// DataNucleus objects get detached all over the place for no (real) reason.
91699169
// So let's not use them anywhere unless absolutely necessary.
9170-
int maxRetries = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_S4U_NOWAIT_MAX_RETRIES);
9171-
long sleepInterval = ThreadLocalRandom.current().nextLong(MetastoreConf.getTimeVar(conf,
9172-
ConfVars.METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL, TimeUnit.MILLISECONDS)) + 30;
91739170
MTable mTable = ensureGetMTable(catName, statsDesc.getDbName(), statsDesc.getTableName());
9174-
Map<String, String> result = new RetryingExecutor<>(maxRetries, sleepInterval, () -> {
9171+
int maxRetries = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_S4U_NOWAIT_MAX_RETRIES);
9172+
long sleepInterval = MetastoreConf.getTimeVar(conf,
9173+
ConfVars.METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL, TimeUnit.MILLISECONDS);
9174+
Map<String, String> result = new RetryingExecutor<>(maxRetries, () -> {
91759175
Ref<Exception> exceptionRef = new Ref<>();
91769176
String savePoint = "uts_" + ThreadLocalRandom.current().nextInt(10000) + "_" + System.nanoTime();
91779177
setTransactionSavePoint(savePoint);
@@ -9232,7 +9232,9 @@ public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats
92329232
}
92339233
oldt.setParameters(newParams);
92349234
return newParams;
9235-
}).onRetry(RetryingExecutor.RetryException.class).commandName("updateTableColumnStatistics").run();
9235+
}).onRetry(e -> e instanceof RetryingExecutor.RetryException)
9236+
.commandName("updateTableColumnStatistics").sleepInterval(sleepInterval, interval ->
9237+
ThreadLocalRandom.current().nextLong(sleepInterval) + 30).run();
92369238
committed = commitTransaction();
92379239
// TODO: similar to update...Part, this used to do "return committed;"; makes little sense.
92389240
return committed ? result : null;
@@ -11111,17 +11113,17 @@ private void lockNotificationSequenceForUpdate() throws MetaException {
1111111113
// Derby doesn't allow FOR UPDATE to lock the row being selected (See https://db.apache
1111211114
// .org/derby/docs/10.1/ref/rrefsqlj31783.html) . So lock the whole table. Since there's
1111311115
// only one row in the table, this shouldn't cause any performance degradation.
11114-
new RetryingExecutor<Void>(maxRetries, sleepInterval, () -> {
11116+
new RetryingExecutor<Void>(maxRetries, () -> {
1111511117
directSql.lockDbTable("NOTIFICATION_SEQUENCE");
1111611118
return null;
11117-
}).commandName("lockNotificationSequenceForUpdate").run();
11119+
}).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).run();
1111811120
} else {
1111911121
String selectQuery = "select \"NEXT_EVENT_ID\" from \"NOTIFICATION_SEQUENCE\"";
1112011122
String lockingQuery = sqlGenerator.addForUpdateClause(selectQuery);
11121-
new RetryingExecutor<Void>(maxRetries, sleepInterval, () -> {
11123+
new RetryingExecutor<Void>(maxRetries, () -> {
1112211124
executePlainSQL(lockingQuery, null);
1112311125
return null;
11124-
}).commandName("lockNotificationSequenceForUpdate").run();
11126+
}).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).run();
1112511127
}
1112611128
}
1112711129

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,41 @@
1818

1919
package org.apache.hadoop.hive.metastore.utils;
2020

21-
import java.util.ArrayList;
22-
import java.util.List;
21+
import java.lang.reflect.InvocationTargetException;
22+
import java.lang.reflect.UndeclaredThrowableException;
2323
import java.util.concurrent.Callable;
24+
import java.util.function.Function;
25+
import java.util.function.Predicate;
2426

27+
import org.apache.commons.lang3.exception.ExceptionUtils;
2528
import org.apache.hadoop.hive.metastore.api.MetaException;
2629
import org.slf4j.Logger;
2730
import org.slf4j.LoggerFactory;
2831

2932
public class RetryingExecutor<T> {
30-
private static Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class);
33+
private static final Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class);
3134

3235
private final int maxRetries;
33-
private final long sleepInterval;
36+
private long sleepInterval = 1000;
3437
private final Callable<T> command;
35-
private final List<Class<? extends Exception>> retriableException = new ArrayList<>();
38+
private Predicate<Exception> retryPolicy;
3639
private int currentRetries = 0;
3740
private String commandName;
41+
private Function<Long, Long> sleepIntervalFunc;
3842

39-
public RetryingExecutor(int maxRetries, long sleepInterval, Callable<T> command) {
43+
public RetryingExecutor(int maxRetries, Callable<T> command) {
4044
this.maxRetries = maxRetries;
41-
this.sleepInterval = sleepInterval;
4245
this.command = command;
46+
// default commandName unless specified
47+
this.commandName = StackWalker.getInstance()
48+
.walk(frames -> frames
49+
.skip(1)
50+
.findFirst()
51+
.map(StackWalker.StackFrame::getMethodName)).get();
4352
}
4453

45-
public RetryingExecutor<T> onRetry(Class<? extends Exception> ex) {
46-
this.retriableException.add(ex);
54+
public RetryingExecutor<T> onRetry(Predicate<Exception> retryPolicy) {
55+
this.retryPolicy = retryPolicy;
4756
return this;
4857
}
4958

@@ -52,6 +61,17 @@ public RetryingExecutor<T> commandName(String name) {
5261
return this;
5362
}
5463

64+
public RetryingExecutor<T> sleepInterval(long sleepInterval) {
65+
return sleepInterval(sleepInterval, null);
66+
}
67+
68+
public RetryingExecutor<T> sleepInterval(long sleepInterval,
69+
Function<Long, Long> sleepIntervalFunc) {
70+
this.sleepInterval = sleepInterval;
71+
this.sleepIntervalFunc = sleepIntervalFunc;
72+
return this;
73+
}
74+
5575
public T run() throws MetaException {
5676
while (true) {
5777
try {
@@ -68,7 +88,7 @@ public T run() throws MetaException {
6888
}
6989
currentRetries++;
7090
try {
71-
Thread.sleep(sleepInterval);
91+
Thread.sleep(getSleepInterval());
7292
} catch (InterruptedException e1) {
7393
String msg = "Couldn't run the command: " + commandName + " in " + currentRetries +
7494
" retry, because the following error: ";
@@ -80,11 +100,16 @@ public T run() throws MetaException {
80100
}
81101

82102
private void checkException(Exception e) throws MetaException {
83-
if (!retriableException.isEmpty() &&
84-
retriableException.stream().noneMatch(nex -> nex.isInstance(e))) {
85-
String message = "See a non-retriable exception, avoid to retry the command:" + commandName;
103+
if (retryPolicy != null && !retryPolicy.test(e)) {
104+
String message = "See a fatal exception, avoid to retry the command:" + commandName;
86105
LOG.info(message, e);
87-
throw new MetaException(message + " :: " + e.getMessage());
106+
String errorMessage = ExceptionUtils.getMessage(e);
107+
if (e instanceof InvocationTargetException || e instanceof UndeclaredThrowableException) {
108+
errorMessage = ExceptionUtils.getMessage(e.getCause());
109+
}
110+
Throwable rootCause = ExceptionUtils.getRootCause(e);
111+
errorMessage += (rootCause == null ? "" : ("\nRoot cause: " + rootCause));
112+
throw new MetaException(message + " :: " + errorMessage);
88113
}
89114
}
90115

@@ -101,6 +126,9 @@ public RetryException(String msg) {
101126
}
102127

103128
public long getSleepInterval() {
129+
if (sleepIntervalFunc != null) {
130+
this.sleepInterval = sleepIntervalFunc.apply(sleepInterval);
131+
}
104132
return sleepInterval;
105133
}
106-
}
134+
}

standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1277,7 +1277,8 @@ public void testRetryingExecutorSleep() throws Exception {
12771277
MetastoreConf.getIntVar(conf, ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES);
12781278
long sleepInterval = MetastoreConf.getTimeVar(conf,
12791279
ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL, TimeUnit.MILLISECONDS);
1280-
RetryingExecutor<Void> re = new RetryingExecutor<>(maxRetries, sleepInterval, null);
1280+
RetryingExecutor<Void> re = new RetryingExecutor<Void>(maxRetries, null)
1281+
.sleepInterval(sleepInterval);
12811282
Assert.assertTrue("invalid sleep value", re.getSleepInterval() >= 0);
12821283
}
12831284

0 commit comments

Comments
 (0)