Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish queue latency metrics from tracked thread pools #120488

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9918299
Publish queue latency metrics from tracked thread pools
nicktindall Jan 21, 2025
b1d8df4
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Jan 21, 2025
dfef676
Fix metric name
nicktindall Jan 21, 2025
2ceb965
Temporary hack to fix metric name
nicktindall Jan 21, 2025
dbed27f
Propose solution to composite thread-pool names
nicktindall Jan 22, 2025
c04f2ea
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Jan 22, 2025
c95f625
Fix fixed thread pool names
nicktindall Jan 22, 2025
f850bc9
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Jan 23, 2025
1b450f0
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Mar 7, 2025
e7f5bb6
POC using HandlingTimeTracker to track queue latency
nicktindall Mar 11, 2025
d269195
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Mar 11, 2025
80b8b3f
Tidy
nicktindall Mar 11, 2025
9811299
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Mar 12, 2025
598d0a8
Fix metric name
nicktindall Mar 13, 2025
4153d27
Generalise HandlingTimeTracker
nicktindall Mar 25, 2025
d4b0818
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Mar 25, 2025
3e3d9fc
Tidy and fix document/test
nicktindall Mar 25, 2025
be4b96c
Restore field name
nicktindall Mar 26, 2025
8a313ab
Update docs/changelog/120488.yaml
nicktindall Mar 26, 2025
8fa7dc6
Fix changelog area value
nicktindall Mar 26, 2025
2c36b97
Update server/src/main/java/org/elasticsearch/common/metrics/Exponent…
nicktindall Mar 27, 2025
420739e
Setup metrics separately to constructor
nicktindall Mar 28, 2025
3208423
Remove unnecessary change
nicktindall Mar 28, 2025
9dd0457
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Mar 28, 2025
516a4a9
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Apr 2, 2025
d6e44ed
Add 99th percentile
nicktindall Apr 6, 2025
dfbd9ac
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Apr 7, 2025
2b294bd
Record queue latency metrics in beforeExecute
nicktindall Apr 7, 2025
9e227ed
Handle p100 percentile when last bucket is populated
nicktindall Apr 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120488.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120488
summary: Publish queue latency metrics from tracked thread pools
area: "Infra/Metrics"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.metrics;

import java.util.Arrays;
import java.util.concurrent.atomic.LongAdder;

/**
* A histogram with a fixed number of buckets of exponentially increasing width.
* <p>
* The bucket boundaries are defined by increasing powers of two, e.g.
* <code>
* (-&infin;, 1), [1, 2), [2, 4), [4, 8), ..., [2^({@link #BUCKET_COUNT}-2), &infin;)
* </code>
* There are {@link #BUCKET_COUNT} buckets.
*/
public class ExponentialBucketHistogram {

public static int[] getBucketUpperBounds() {
int[] bounds = new int[17];
for (int i = 0; i < bounds.length; i++) {
bounds[i] = 1 << i;
}
return bounds;
}

private static int getBucket(long observedValue) {
if (observedValue <= 0) {
return 0;
} else if (LAST_BUCKET_LOWER_BOUND <= observedValue) {
return BUCKET_COUNT - 1;
} else {
return Long.SIZE - Long.numberOfLeadingZeros(observedValue);
}
}

public static final int BUCKET_COUNT = getBucketUpperBounds().length + 1;

private static final long LAST_BUCKET_LOWER_BOUND = getBucketUpperBounds()[BUCKET_COUNT - 2];

private final LongAdder[] buckets;

public ExponentialBucketHistogram() {
buckets = new LongAdder[BUCKET_COUNT];
for (int i = 0; i < BUCKET_COUNT; i++) {
buckets[i] = new LongAdder();
}
}

public void addObservation(long observedValue) {
buckets[getBucket(observedValue)].increment();
}

/**
* @return An array of frequencies of handling times in buckets with upper bounds as returned by {@link #getBucketUpperBounds()}, plus
* an extra bucket for handling times longer than the longest upper bound.
*/
public long[] getHistogram() {
final long[] histogram = new long[BUCKET_COUNT];
for (int i = 0; i < BUCKET_COUNT; i++) {
histogram[i] = buckets[i].longValue();
}
return histogram;
}

/**
* Calculate the Nth percentile value
*
* @param percentile The percentile as a fraction (in [0, 1.0])
* @return A value greater than the specified fraction of values in the histogram
*/
public long getPercentile(float percentile) {
assert percentile >= 0 && percentile <= 1;
final long[] snapshot = getHistogram();
final long totalCount = Arrays.stream(snapshot).sum();
long percentileIndex = (long) Math.ceil(totalCount * percentile);
for (int i = 0; i < BUCKET_COUNT; i++) {
percentileIndex -= snapshot[i];
if (percentileIndex <= 0) {
if (i == snapshot.length - 1) {
return Long.MAX_VALUE;
} else {
return getBucketUpperBounds()[i];
}
}
}
assert false : "We shouldn't ever get here";
return Long.MAX_VALUE;
}

/**
* Clear all values in the histogram (non-atomic)
*/
public void clear() {
for (int i = 0; i < BUCKET_COUNT; i++) {
buckets[i].reset();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.common.network;

import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -85,14 +86,14 @@ public interface CustomNameResolver {
}

private final List<CustomNameResolver> customNameResolvers;
private final HandlingTimeTracker handlingTimeTracker = new HandlingTimeTracker();
private final ExponentialBucketHistogram handlingTimeTracker = new ExponentialBucketHistogram();
private final ThreadWatchdog threadWatchdog = new ThreadWatchdog();

public NetworkService(List<CustomNameResolver> customNameResolvers) {
this.customNameResolvers = Objects.requireNonNull(customNameResolvers, "customNameResolvers must be non null");
}

public HandlingTimeTracker getHandlingTimeTracker() {
public ExponentialBucketHistogram getHandlingTimeTracker() {
return handlingTimeTracker;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@
package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
import org.elasticsearch.core.TimeValue;

import org.elasticsearch.telemetry.metric.Instrument;
import org.elasticsearch.telemetry.metric.LongGauge;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -27,12 +35,16 @@
*/
public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {

private static final int[] LATENCY_PERCENTILES_TO_REPORT = { 50, 90, 99 };
public static final String THREAD_POOL_METRIC_NAME_QUEUE_TIME = ".threads.queue.latency.histogram";

private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final LongAdder totalExecutionTime = new LongAdder();
private final boolean trackOngoingTasks;
// The set of currently running tasks and the timestamp of when they started execution in the Executor.
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
private final ExponentialBucketHistogram queueLatencyHistogram = new ExponentialBucketHistogram();

TaskExecutionTimeTrackingEsThreadPoolExecutor(
String name,
Expand All @@ -53,6 +65,27 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
}

public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
final LongGauge queueLatencyGauge = meterRegistry.registerLongsGauge(
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_QUEUE_TIME,
"Time tasks spent in the queue for the " + threadPoolName + " thread pool",
"milliseconds",
() -> {
List<LongWithAttributes> metricValues = Arrays.stream(LATENCY_PERCENTILES_TO_REPORT)
.mapToObj(
percentile -> new LongWithAttributes(
queueLatencyHistogram.getPercentile(percentile / 100f),
Map.of("percentile", String.valueOf(percentile))
)
)
.toList();
queueLatencyHistogram.clear();
return metricValues;
}
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if rather than publishing a time-series-per-percentile (using percentile attribute) we should publish a metric-per-percentile.
The metric makes no sense if you don't filter by a percentile label.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it easier to plot different percentiles on the same graph with labels (and group by) compared to two different time series?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that makes a difference, but I'm not sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just having a look at Kibana just now, it would be much easier to plot as a single metric grouped-by the percentiles. As separate metrics we'd need to add them as distinct time-series.

return List.of(queueLatencyGauge);
}

@Override
protected Runnable wrapRunnable(Runnable command) {
return super.wrapRunnable(this.runnableWrapper.apply(command));
Expand Down Expand Up @@ -94,6 +127,11 @@ protected void beforeExecute(Thread t, Runnable r) {
if (trackOngoingTasks) {
ongoingTasks.put(r, System.nanoTime());
}
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
timedRunnable.beforeExecute();
final long taskQueueLatency = timedRunnable.getQueueTimeNanos();
queueLatencyHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
private final Runnable original;
private final long creationTimeNanos;
private long beforeExecuteTime = -1;
private long startTimeNanos;
private long finishTimeNanos = -1;
private boolean failedOrRejected = false;
Expand Down Expand Up @@ -58,6 +59,19 @@ public boolean isForceExecution() {
return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution();
}

/**
* Returns the time in nanoseconds between the creation time and the execution time
*
* @return The time in nanoseconds or -1 if the task was never de-queued
*/
long getQueueTimeNanos() {
if (beforeExecuteTime == -1) {
assert false : "beforeExecute must be called before getQueueTimeNanos";
return -1;
}
return beforeExecuteTime - creationTimeNanos;
}

/**
* Return the time this task spent being run.
* If the task is still running or has not yet been run, returns -1.
Expand All @@ -70,6 +84,13 @@ long getTotalExecutionNanos() {
return Math.max(finishTimeNanos - startTimeNanos, 1);
}

/**
* Called when the task has reached the front of the queue and is about to be executed
*/
public void beforeExecute() {
beforeExecuteTime = System.nanoTime();
}

/**
* If the task was failed or rejected, return true.
* Otherwise, false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt
handleIncomingRequest(httpRequest, trackingChannel, httpRequest.getInboundException());
} finally {
final long took = threadPool.rawRelativeTimeInMillis() - startTime;
networkService.getHandlingTimeTracker().addHandlingTime(took);
networkService.getHandlingTimeTracker().addObservation(took);
final long logThreshold = slowLogThresholdMs;
if (logThreshold > 0 && took > logThreshold) {
logger.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.network.HandlingTimeTracker;
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -36,7 +36,7 @@
* @param totalResponseSize the total body size (bytes) of responses produced by the HTTP route
* @param responseSizeHistogram similar to {@code requestSizeHistogram} but for response size
* @param responseTimeHistogram an array of frequencies of response time (millis) in buckets with upper bounds
* as returned by {@link HandlingTimeTracker#getBucketUpperBounds()}, plus
* as returned by {@link ExponentialBucketHistogram#getBucketUpperBounds()}, plus
* an extra bucket for handling response time larger than the longest upper bound (currently 65536ms).
*/
public record HttpRouteStats(
Expand Down Expand Up @@ -89,7 +89,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
"millis",
TimeValue::timeValueMillis,
responseTimeHistogram,
HandlingTimeTracker.getBucketUpperBounds()
ExponentialBucketHistogram.getBucketUpperBounds()
);
builder.endObject();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

package org.elasticsearch.http;

import org.elasticsearch.common.network.HandlingTimeTracker;
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;

import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -80,7 +80,7 @@ private static int bucket(int contentLength) {

private final StatsTracker requestStats = new StatsTracker();
private final StatsTracker responseStats = new StatsTracker();
private final HandlingTimeTracker responseTimeTracker = new HandlingTimeTracker();
private final ExponentialBucketHistogram responseTimeTracker = new ExponentialBucketHistogram();

public void addRequestStats(int contentLength) {
requestStats.addStats(contentLength);
Expand All @@ -91,7 +91,7 @@ public void addResponseStats(long contentLength) {
}

public void addResponseTime(long timeMillis) {
responseTimeTracker.addHandlingTime(timeMillis);
responseTimeTracker.addObservation(timeMillis);
}

public HttpRouteStats getStats() {
Expand Down
Loading
Loading