Skip to content

Commit

Permalink
OpenTelemetry Metrics Support For Live Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Benke Qu committed Jan 15, 2025
1 parent df17db0 commit 662a776
Show file tree
Hide file tree
Showing 19 changed files with 908 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ public MetricDataMapper(BiConsumer<AbstractTelemetryBuilder, Resource> telemetry
this.captureHttpServer4xxAsError = captureHttpServer4xxAsError;
}

public void mapMetrics(MetricData metricData, Consumer<TelemetryItem> consumer) {
MetricDataType type = metricData.getType();
if (type == DOUBLE_SUM || type == DOUBLE_GAUGE || type == LONG_SUM || type == LONG_GAUGE || type == HISTOGRAM) {

// DO NOT emit unstable metrics from the OpenTelemetry auto instrumentation libraries
// custom metrics are always emitted
if (OTEL_UNSTABLE_METRICS_TO_EXCLUDE.contains(metricData.getName())
&& metricData.getInstrumentationScopeInfo().getName().startsWith(OTEL_INSTRUMENTATION_NAME_PREFIX)) {
return;
}
List<TelemetryItem> stableOtelMetrics = convertOtelMetricToAzureMonitorMetric(metricData, false);
stableOtelMetrics.forEach(consumer::accept);
} else {
logger.warning("metric data type {} is not supported yet.", metricData.getType());
}
}

public void map(MetricData metricData, Consumer<TelemetryItem> consumer) {
MetricDataType type = metricData.getType();
if (type == DOUBLE_SUM || type == DOUBLE_GAUGE || type == LONG_SUM || type == LONG_GAUGE || type == HISTOGRAM) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpRequest;
import com.azure.monitor.opentelemetry.exporter.implementation.MetricDataMapper;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.HostName;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings;
Expand All @@ -20,13 +21,15 @@

public class QuickPulse {

static final int QP_INVARIANT_VERSION = 1;
// 6 represents filtering support for Otel metrics only is enabled
static final int QP_INVARIANT_VERSION = 6;

private volatile QuickPulseDataCollector collector;

public static QuickPulse create(HttpPipeline httpPipeline, Supplier<URL> endpointUrl,
Supplier<String> instrumentationKey, @Nullable String roleName, @Nullable String roleInstance,
String sdkVersion) {
boolean useNormalizedValueForNonNormalizedCpuPercentage, QuickPulseMetricReader quickPulseMetricReader,
MetricDataMapper metricDataMapper, String sdkVersion) {

QuickPulse quickPulse = new QuickPulse();

Expand All @@ -40,7 +43,8 @@ public static QuickPulse create(HttpPipeline httpPipeline, Supplier<URL> endpoin
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
quickPulse.initialize(httpPipeline, endpointUrl, instrumentationKey, roleName, roleInstance, sdkVersion);
quickPulse.initialize(httpPipeline, endpointUrl, instrumentationKey, roleName, roleInstance,
useNormalizedValueForNonNormalizedCpuPercentage, quickPulseMetricReader, metricDataMapper, sdkVersion);
});
// the condition below will always be false, but by referencing the executor it ensures the
// executor can't become unreachable in the middle of the execute() method execution above
Expand All @@ -64,12 +68,16 @@ public void add(TelemetryItem telemetryItem) {
}

private void initialize(HttpPipeline httpPipeline, Supplier<URL> endpointUrl, Supplier<String> instrumentationKey,
@Nullable String roleName, @Nullable String roleInstance, String sdkVersion) {
@Nullable String roleName, @Nullable String roleInstance,
boolean useNormalizedValueForNonNormalizedCpuPercentage, QuickPulseMetricReader quickPulseMetricReader,
MetricDataMapper metricDataMapper, String sdkVersion) {

String quickPulseId = UUID.randomUUID().toString().replace("-", "");
ArrayBlockingQueue<HttpRequest> sendQueue = new ArrayBlockingQueue<>(256, true);
QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration();

QuickPulseDataSender quickPulseDataSender = new QuickPulseDataSender(httpPipeline, sendQueue);
QuickPulseDataSender quickPulseDataSender
= new QuickPulseDataSender(httpPipeline, sendQueue, quickPulseConfiguration);

String instanceName = roleInstance;
String machineName = HostName.get();
Expand All @@ -81,12 +89,13 @@ private void initialize(HttpPipeline httpPipeline, Supplier<URL> endpointUrl, Su
instanceName = "Unknown host";
}

QuickPulseDataCollector collector = new QuickPulseDataCollector();
QuickPulseDataCollector collector
= new QuickPulseDataCollector(useNormalizedValueForNonNormalizedCpuPercentage, quickPulseConfiguration);

QuickPulsePingSender quickPulsePingSender = new QuickPulsePingSender(httpPipeline, endpointUrl,
instrumentationKey, roleName, instanceName, machineName, quickPulseId, sdkVersion);
instrumentationKey, roleName, instanceName, machineName, quickPulseId, sdkVersion, quickPulseConfiguration);
QuickPulseDataFetcher quickPulseDataFetcher = new QuickPulseDataFetcher(collector, sendQueue, endpointUrl,
instrumentationKey, roleName, instanceName, machineName, quickPulseId);
instrumentationKey, roleName, instanceName, machineName, quickPulseId, quickPulseConfiguration);

QuickPulseCoordinatorInitData coordinatorInitData
= new QuickPulseCoordinatorInitDataBuilder().withPingSender(quickPulsePingSender)
Expand All @@ -97,6 +106,14 @@ private void initialize(HttpPipeline httpPipeline, Supplier<URL> endpointUrl, Su

QuickPulseCoordinator coordinator = new QuickPulseCoordinator(coordinatorInitData);

QuickPulseMetricReceiver quickPulseMetricReceiver
= new QuickPulseMetricReceiver(quickPulseMetricReader, metricDataMapper, collector);

Thread metricReceiverThread
= new Thread(quickPulseMetricReceiver, QuickPulseMetricReceiver.class.getSimpleName());
metricReceiverThread.setDaemon(true);
metricReceiverThread.start();

Thread senderThread = new Thread(quickPulseDataSender, QuickPulseDataSender.class.getSimpleName());
senderThread.setDaemon(true);
senderThread.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse;

import com.azure.core.http.HttpResponse;
import com.azure.core.util.logging.ClientLogger;
import com.azure.json.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

public class QuickPulseConfiguration {
private static final ClientLogger logger = new ClientLogger(QuickPulseDataFetcher.class);
private AtomicReference<String> etag = new AtomicReference<>();
private ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> derivedMetrics = new ConcurrentHashMap<>();

public synchronized String getEtag() {
return this.etag.get();
}

public synchronized void setEtag(String etag) {
this.etag.set(etag);
}

public synchronized ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> getDerivedMetrics() {
return this.derivedMetrics;
}

public synchronized void setDerivedMetrics(ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> metrics) {
this.derivedMetrics = metrics;
}

public synchronized void updateConfig(String etagValue,
ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> otelMetrics) {
if (!Objects.equals(this.getEtag(), etagValue)) {
this.setEtag(etagValue);
this.setDerivedMetrics(otelMetrics);
}

}

public ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> parseDerivedMetrics(HttpResponse response)
throws IOException {

ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> requestedMetrics = new ConcurrentHashMap<>();
try {

String responseBody = response.getBodyAsString().block();
if (responseBody == null || responseBody.isEmpty()) {
return new ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>>();
}

try (JsonReader jsonReader = JsonProviders.createReader(responseBody)) {
jsonReader.nextToken();
while (jsonReader.nextToken() != JsonToken.END_OBJECT) {
if ("Metrics".equals(jsonReader.getFieldName())) {
jsonReader.nextToken();

while (jsonReader.nextToken() != JsonToken.END_ARRAY) {
DerivedMetricInfo metric = new DerivedMetricInfo();

while (jsonReader.nextToken() != JsonToken.END_OBJECT) {

String fieldName = jsonReader.getFieldName();
jsonReader.nextToken();

switch (fieldName) {
case "Id":
metric.setId(jsonReader.getString());
break;

case "Aggregation":
metric.setAggregation(jsonReader.getString());
break;

case "TelemetryType":
metric.setTelemetryType(jsonReader.getString());
break;

case "Projection":
metric.setProjection(jsonReader.getString());
break;

case "FilterGroups":
// Handle "FilterGroups" field
if (jsonReader.currentToken() == JsonToken.START_ARRAY) {
while (jsonReader.nextToken() != JsonToken.END_ARRAY) {
if (jsonReader.currentToken() == JsonToken.START_OBJECT) {
while (jsonReader.nextToken() != JsonToken.END_OBJECT) {
if (jsonReader.currentToken() == JsonToken.FIELD_NAME
&& jsonReader.getFieldName().equals("Filters")) {
jsonReader.nextToken();
if (jsonReader.currentToken() == JsonToken.START_ARRAY) {
while (jsonReader.nextToken() != JsonToken.END_ARRAY) {
if (jsonReader.currentToken()
== JsonToken.START_OBJECT) {
String innerFieldName = "";
String predicate = "";
String comparand = "";

while (jsonReader.nextToken()
!= JsonToken.END_OBJECT) {
String filterFieldName
= jsonReader.getFieldName();
jsonReader.nextToken();

switch (filterFieldName) {
case "FieldName":
innerFieldName
= jsonReader.getString();
if (innerFieldName.contains(".")) {
innerFieldName = innerFieldName
.split("\\.")[1];
}
break;

case "Predicate":
predicate = jsonReader.getString();
break;

case "Comparand":
comparand = jsonReader.getString();
break;
}
}

if (!innerFieldName.isEmpty()
&& !innerFieldName.equals("undefined")
&& !predicate.isEmpty()
&& !comparand.isEmpty()) {
metric.addFilterGroup(innerFieldName,
predicate, comparand);
}
}
}
}
}
}
}
}
}
break;

default:
jsonReader.skipChildren();
break;
}
}
requestedMetrics.computeIfAbsent(metric.getTelemetryType(), k -> new ArrayList<>())
.add(metric);
}
} else {
jsonReader.skipChildren();

}
}
}
return requestedMetrics;
} catch (Exception e) {
logger.verbose("Failed to parse metrics from response: %s", e.getMessage());
}
return new ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>>();
}

public class DerivedMetricInfo {
private String id;
private String projection;
private String telemetryType;
private String aggregation;
private ArrayList<FilterGroup> filterGroups = new ArrayList<FilterGroup>();

public String getId() {
return this.id;
}

public void setId(String id) {
this.id = id;
}

public String getProjection() {
return projection;
}

public void setTelemetryType(String telemetryType) {
this.telemetryType = telemetryType;
}

public String getTelemetryType() {
return this.telemetryType;
}

public void setProjection(String projection) {
this.projection = projection;
}

public String getAggregation() {
return this.aggregation;
}

public void setAggregation(String aggregation) {
this.aggregation = aggregation;
}

public ArrayList<FilterGroup> getFilterGroups() {
return this.filterGroups;
}

public void addFilterGroup(String fieldName, String predicate, String comparand) {
this.filterGroups.add(new FilterGroup(fieldName, predicate, comparand));
}
}

class FilterGroup {
private String fieldName;
private String operator;
private String comparand;

public FilterGroup(String fieldName, String predicate, String comparand) {
this.setFieldName(fieldName);
this.setOperator(predicate);
this.setComparand(comparand);
}

public String getFieldName() {
return this.fieldName;
}

private void setFieldName(String fieldName) {
this.fieldName = fieldName;
}

public String getOperator() {
return this.operator;
}

private void setOperator(String operator) {
this.operator = operator;
}

public String getComparand() {
return this.comparand;
}

public void setComparand(String comparand) {
this.comparand = comparand;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ private long sendData() {

case QP_IS_OFF:
pingMode = true;
collector.flushOtelMetrics();
QuickPulseMetricReceiver.setQuickPulseHeaderInfo(currentQuickPulseHeaderInfo);
return qpsServicePollingIntervalHintMillis > 0
? qpsServicePollingIntervalHintMillis
: waitBetweenPingsInMillis;

case QP_IS_ON:
QuickPulseMetricReceiver.setQuickPulseHeaderInfo(currentQuickPulseHeaderInfo);
return waitBetweenPostsInMillis;
}

Expand Down
Loading

0 comments on commit 662a776

Please sign in to comment.