Skip to content

Commit cf27e51

Browse files
authored
Emit metrics about recovery operations from server processes (#6081)
1 parent 1f62db4 commit cf27e51

File tree

8 files changed

+100
-8
lines changed

8 files changed

+100
-8
lines changed

core/src/main/java/org/apache/accumulo/core/metrics/Metric.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,17 @@ public enum Metric {
306306
MetricDocSection.MANAGER),
307307
MANAGER_GOAL_STATE("accumulo.manager.goal.state", MetricType.GAUGE,
308308
"Manager goal state: -1=unknown, 0=CLEAN_STOP, 1=SAFE_MODE, 2=NORMAL.",
309-
MetricDocSection.MANAGER);
309+
MetricDocSection.MANAGER),
310+
311+
// Recovery Metrics
312+
RECOVERIES_IN_PROGRESS("accumulo.recoveries.in.progress", MetricType.GAUGE,
313+
"The number of recoveries in progress.", MetricDocSection.GENERAL_SERVER),
314+
RECOVERIES_LONGEST_RUNTIME("accumulo.recoveries.runtime.longest", MetricType.GAUGE,
315+
"The time (in milliseconds) of the longest running recovery.",
316+
MetricDocSection.GENERAL_SERVER),
317+
RECOVERIES_AVG_PROGRESS("accumulo.recoveries.avg.progress", MetricType.GAUGE,
318+
"The average percentage (0.0 - 99.9) of the in progress recoveries.",
319+
MetricDocSection.GENERAL_SERVER);
310320

311321
private final String name;
312322
private final MetricType type;

server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,8 @@ public void run() {
819819

820820
MetricsInfo metricsInfo = getContext().getMetricsInfo();
821821

822-
metricsInfo.addMetricsProducers(this, pausedMetrics);
822+
final LogSorter logSorter = new LogSorter(this);
823+
metricsInfo.addMetricsProducers(this, pausedMetrics, logSorter);
823824
metricsInfo.init(getServiceTags(clientAddress));
824825

825826
var watcher = new CompactionWatcher(getConfiguration());
@@ -831,7 +832,6 @@ public void run() {
831832
LOG.info("Compactor started, waiting for work");
832833

833834
final AtomicReference<Throwable> err = new AtomicReference<>();
834-
final LogSorter logSorter = new LogSorter(this);
835835
long nextSortLogsCheckTime = System.currentTimeMillis();
836836

837837
while (!isShutdownRequested()) {

server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,16 +377,14 @@ public void run() {
377377
resourceManager.getDataCache(), resourceManager.getSummaryCache());
378378

379379
metricsInfo.addMetricsProducers(this, scanMetrics, scanServerMetrics, blockCacheMetrics);
380-
metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(),
381-
getAdvertiseAddress(), getResourceGroup()));
382-
// We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close
383380

384381
ServiceLock lock = announceExistence();
385382
this.getContext().setServiceLock(lock);
386383

387384
int threadPoolSize = getConfiguration().getCount(Property.SSERV_WAL_SORT_MAX_CONCURRENT);
388385
if (threadPoolSize > 0) {
389386
final LogSorter logSorter = new LogSorter(this);
387+
metricsInfo.addMetricsProducers(logSorter);
390388
try {
391389
// Attempt to process all existing log sorting work and start a background
392390
// thread to look for log sorting work in the future
@@ -400,6 +398,9 @@ public void run() {
400398
"Log sorting for tablet recovery is disabled, SSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
401399
}
402400

401+
metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(),
402+
getAdvertiseAddress(), getResourceGroup()));
403+
403404
while (!isShutdownRequested()) {
404405
if (Thread.currentThread().isInterrupted()) {
405406
LOG.info("Server process thread has been interrupted, shutting down");

server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ public void run() {
558558
this.resourceManager.getDataCache(), this.resourceManager.getSummaryCache());
559559

560560
metricsInfo.addMetricsProducers(this, metrics, updateMetrics, scanMetrics, mincMetrics,
561-
pausedMetrics, blockCacheMetrics);
561+
pausedMetrics, blockCacheMetrics, logSorter);
562562
metricsInfo.init(MetricsInfo.serviceTags(context.getInstanceName(), getApplicationName(),
563563
getAdvertiseAddress(), getResourceGroup()));
564564

server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
package org.apache.accumulo.tserver.log;
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
22+
import static org.apache.accumulo.core.metrics.Metric.RECOVERIES_AVG_PROGRESS;
23+
import static org.apache.accumulo.core.metrics.Metric.RECOVERIES_IN_PROGRESS;
24+
import static org.apache.accumulo.core.metrics.Metric.RECOVERIES_LONGEST_RUNTIME;
2225
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WAL_SORT_CONCURRENT_POOL;
2326

2427
import java.io.DataInputStream;
@@ -32,6 +35,8 @@
3235
import java.util.Map.Entry;
3336
import java.util.TreeMap;
3437
import java.util.concurrent.ThreadPoolExecutor;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicLong;
3540

3641
import org.apache.accumulo.core.Constants;
3742
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -43,6 +48,7 @@
4348
import org.apache.accumulo.core.file.FileOperations;
4449
import org.apache.accumulo.core.manager.thrift.RecoveryStatus;
4550
import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
51+
import org.apache.accumulo.core.metrics.MetricsProducer;
4652
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
4753
import org.apache.accumulo.core.spi.crypto.CryptoService;
4854
import org.apache.accumulo.core.util.Pair;
@@ -64,9 +70,13 @@
6470
import org.slf4j.LoggerFactory;
6571

6672
import com.google.common.annotations.VisibleForTesting;
73+
import com.google.common.util.concurrent.AtomicDouble;
6774
import com.google.common.util.concurrent.MoreExecutors;
6875

69-
public class LogSorter {
76+
import io.micrometer.core.instrument.Gauge;
77+
import io.micrometer.core.instrument.MeterRegistry;
78+
79+
public class LogSorter implements MetricsProducer {
7080

7181
private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
7282

@@ -229,6 +239,9 @@ synchronized long getBytesCopied() throws IOException {
229239
private final double walBlockSize;
230240
private final CryptoService cryptoService;
231241
private final AccumuloConfiguration sortedLogConf;
242+
private final AtomicLong recoveriesInProgress = new AtomicLong(0);
243+
private final AtomicLong recoveryRuntime = new AtomicLong(0);
244+
private final AtomicDouble recoveryAvgProgress = new AtomicDouble(0.0D);
232245

233246
public LogSorter(AbstractServer server) {
234247
this.server = server;
@@ -239,6 +252,8 @@ public LogSorter(AbstractServer server) {
239252
CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY);
240253
this.cryptoService =
241254
context.getCryptoFactory().getService(env, this.conf.getAllCryptoProperties());
255+
ThreadPools.watchNonCriticalScheduledTask(context.getScheduledExecutor()
256+
.scheduleWithFixedDelay(() -> updateMetrics(), 60, 60, TimeUnit.SECONDS));
242257
}
243258

244259
/**
@@ -335,4 +350,41 @@ public List<RecoveryStatus> getLogSorts() {
335350
return result;
336351
}
337352
}
353+
354+
private void updateMetrics() {
355+
synchronized (currentWork) {
356+
recoveriesInProgress.set(currentWork.size());
357+
if (recoveriesInProgress.get() == 0) {
358+
recoveryRuntime.set(0);
359+
recoveryAvgProgress.set(0.0D);
360+
} else {
361+
long runtime = 0;
362+
long progress = 0;
363+
for (LogProcessor processor : currentWork.values()) {
364+
long start = processor.getSortTime();
365+
if (start > 0) { // may not have started yet
366+
runtime = Math.max(start, runtime);
367+
}
368+
try {
369+
progress += processor.getBytesCopied();
370+
} catch (IOException e) {
371+
log.warn("Error getting bytes read");
372+
}
373+
}
374+
recoveryRuntime.set(runtime);
375+
recoveryAvgProgress
376+
.set(Math.min(progress / (walBlockSize * recoveriesInProgress.get()), 99.9));
377+
}
378+
}
379+
}
380+
381+
@Override
382+
public void registerMetrics(MeterRegistry registry) {
383+
Gauge.builder(RECOVERIES_IN_PROGRESS.getName(), recoveriesInProgress, AtomicLong::get)
384+
.description(RECOVERIES_IN_PROGRESS.getDescription()).register(registry);
385+
Gauge.builder(RECOVERIES_LONGEST_RUNTIME.getName(), recoveryRuntime, AtomicLong::get)
386+
.description(RECOVERIES_LONGEST_RUNTIME.getDescription()).register(registry);
387+
Gauge.builder(RECOVERIES_AVG_PROGRESS.getName(), recoveryAvgProgress, AtomicDouble::get)
388+
.description(RECOVERIES_AVG_PROGRESS.getDescription()).register(registry);
389+
}
338390
}

server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Objects;
3838
import java.util.TreeMap;
3939
import java.util.UUID;
40+
import java.util.concurrent.ScheduledThreadPoolExecutor;
4041

4142
import org.apache.accumulo.core.conf.DefaultConfiguration;
4243
import org.apache.accumulo.core.data.TableId;
@@ -54,6 +55,7 @@
5455
import org.apache.accumulo.tserver.logger.LogFileValue;
5556
import org.apache.hadoop.fs.FileSystem;
5657
import org.apache.hadoop.fs.Path;
58+
import org.junit.jupiter.api.AfterAll;
5759
import org.junit.jupiter.api.AfterEach;
5860
import org.junit.jupiter.api.BeforeEach;
5961
import org.junit.jupiter.api.Test;
@@ -64,6 +66,7 @@
6466
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
6567
public class RecoveryLogsIteratorTest extends WithTestNames {
6668

69+
private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(1);
6770
private VolumeManager fs;
6871
private java.nio.file.Path workDir;
6972
static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
@@ -85,6 +88,7 @@ public void setUp() throws Exception {
8588
expect(context.getCryptoFactory()).andReturn(new GenericCryptoServiceFactory()).anyTimes();
8689
expect(context.getVolumeManager()).andReturn(fs).anyTimes();
8790
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
91+
expect(context.getScheduledExecutor()).andReturn(EXECUTOR).anyTimes();
8892
replay(server, context);
8993

9094
logSorter = new LogSorter(server);
@@ -96,6 +100,11 @@ public void tearDown() throws Exception {
96100
verify(server, context);
97101
}
98102

103+
@AfterAll
104+
public static void shutdown() {
105+
EXECUTOR.shutdownNow();
106+
}
107+
99108
static class KeyValue implements Comparable<KeyValue> {
100109
public final LogFileKey key;
101110
public final LogFileValue value;

server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.Set;
4545
import java.util.TreeMap;
4646
import java.util.UUID;
47+
import java.util.concurrent.ScheduledThreadPoolExecutor;
4748

4849
import org.apache.accumulo.core.conf.ConfigurationCopy;
4950
import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -80,6 +81,7 @@
8081
import org.apache.hadoop.fs.Path;
8182
import org.apache.hadoop.io.Text;
8283
import org.easymock.EasyMock;
84+
import org.junit.jupiter.api.AfterAll;
8385
import org.junit.jupiter.api.BeforeEach;
8486
import org.junit.jupiter.api.Test;
8587
import org.junit.jupiter.api.io.TempDir;
@@ -92,6 +94,7 @@
9294
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
9395
public class SortedLogRecoveryTest extends WithTestNames {
9496

97+
private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(1);
9598
static final int bufferSize = 5;
9699
static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
97100
static final Text cf = new Text("cf");
@@ -135,6 +138,11 @@ public int compareTo(KeyValue o) {
135138
}
136139
}
137140

141+
@AfterAll
142+
public static void shutdown() {
143+
EXECUTOR.shutdownNow();
144+
}
145+
138146
private static KeyValue createKeyValue(LogEvents type, long seq, int tid,
139147
Object fileExtentMutation) {
140148
KeyValue result = new KeyValue();
@@ -193,6 +201,7 @@ private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, K
193201
expect(context.getVolumeManager()).andReturn(fs).anyTimes();
194202
expect(context.getCryptoFactory()).andReturn(cryptoFactory).anyTimes();
195203
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
204+
expect(context.getScheduledExecutor()).andReturn(EXECUTOR).anyTimes();
196205
replay(server, context);
197206
logSorter = new LogSorter(server);
198207

@@ -1133,6 +1142,7 @@ public void testLogSortedProperties() throws Exception {
11331142
expect(context.getCryptoFactory()).andReturn(cryptoFactory).anyTimes();
11341143
expect(context.getVolumeManager()).andReturn(vm).anyTimes();
11351144
expect(context.getConfiguration()).andReturn(testConfig).anyTimes();
1145+
expect(context.getScheduledExecutor()).andReturn(EXECUTOR).anyTimes();
11361146
replay(server, context);
11371147
LogSorter sorter = new LogSorter(server);
11381148

server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.io.InputStream;
3232
import java.io.OutputStream;
3333
import java.nio.file.Files;
34+
import java.util.concurrent.ScheduledThreadPoolExecutor;
3435

3536
import org.apache.accumulo.core.conf.DefaultConfiguration;
3637
import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory;
@@ -40,6 +41,7 @@
4041
import org.apache.accumulo.tserver.TabletServer;
4142
import org.apache.accumulo.tserver.WithTestNames;
4243
import org.apache.hadoop.fs.Path;
44+
import org.junit.jupiter.api.AfterAll;
4345
import org.junit.jupiter.api.AfterEach;
4446
import org.junit.jupiter.api.BeforeEach;
4547
import org.junit.jupiter.api.Test;
@@ -58,6 +60,8 @@ public class TestUpgradePathForWALogs extends WithTestNames {
5860
// logs from 2.0 were changed for improved crypto
5961
private static final String WALOG_FROM_20 = "walog-from-20.walog";
6062

63+
private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(1);
64+
6165
private ServerContext context;
6266
private TabletServer server;
6367

@@ -85,6 +89,7 @@ public void setUp() throws Exception {
8589
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
8690
expect(context.getCryptoFactory()).andReturn(new GenericCryptoServiceFactory()).anyTimes();
8791
expect(context.getVolumeManager()).andReturn(fs).anyTimes();
92+
expect(context.getScheduledExecutor()).andReturn(EXECUTOR).anyTimes();
8893
replay(server, context);
8994
}
9095

@@ -93,6 +98,11 @@ public void tearDown() {
9398
verify(server, context);
9499
}
95100

101+
@AfterAll
102+
public static void shutdown() {
103+
EXECUTOR.shutdownNow();
104+
}
105+
96106
/**
97107
* Since 2.0 this version of WAL is no longer compatible.
98108
*/

0 commit comments

Comments
 (0)