Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,14 @@ public class InfluxDBRawBackendListenerClient implements BackendListenerClient {
DEFAULT_ARGS.put("influxdbUrl", "http://host_to_change:8086/write?db=jmeter");
DEFAULT_ARGS.put("influxdbToken", "");
DEFAULT_ARGS.put("measurement", DEFAULT_MEASUREMENT);
DEFAULT_ARGS.put("enabled", "${__P(InfluxdbBackendListener.enabled, true)}");
}

private InfluxdbMetricsSender influxDBMetricsManager;
private String measurement;

private boolean listenerIsEnabled;

public InfluxDBRawBackendListenerClient() {
// default constructor
}
Expand All @@ -89,45 +92,49 @@ InfluxdbMetricsSender getInfluxDBMetricsManager() {

@Override
public void setupTest(BackendListenerContext context) throws Exception {
initInfluxDBMetricsManager(context);
measurement = context.getParameter("measurement", DEFAULT_MEASUREMENT);
}
listenerIsEnabled = Boolean.parseBoolean(context.getParameter("enabled", "true"));
log.info("{} will send metrics: {}", this.getClass().getSimpleName(), listenerIsEnabled);

private void initInfluxDBMetricsManager(BackendListenerContext context) throws Exception {
influxDBMetricsManager = Class
.forName(context.getParameter("influxdbMetricsSender"))
.asSubclass(InfluxdbMetricsSender.class)
.getDeclaredConstructor()
.newInstance();
if (listenerIsEnabled) {
influxDBMetricsManager = Class
.forName(context.getParameter("influxdbMetricsSender"))
.asSubclass(InfluxdbMetricsSender.class)
.getDeclaredConstructor()
.newInstance();

influxDBMetricsManager.setup(
context.getParameter("influxdbUrl"),
context.getParameter("influxdbToken"));
influxDBMetricsManager.setup(
context.getParameter("influxdbUrl"),
context.getParameter("influxdbToken"));

measurement = context.getParameter("measurement", DEFAULT_MEASUREMENT);
}
}


@Override
public void teardownTest(BackendListenerContext context) {
influxDBMetricsManager.destroy();
if (influxDBMetricsManager != null) {
influxDBMetricsManager.destroy();
}
}

@Override
public void handleSampleResults(
List<SampleResult> sampleResults, BackendListenerContext context) {
log.debug("Handling {} sample results", sampleResults.size());
synchronized (LOCK) {
for (SampleResult sampleResult : sampleResults) {
addMetricFromSampleResult(sampleResult);
public void handleSampleResults(List<SampleResult> sampleResults, BackendListenerContext context) {
if (listenerIsEnabled) {
log.debug("Handling {} sample results", sampleResults.size());
synchronized (LOCK) {
for (SampleResult sampleResult : sampleResults) {
addMetricFromSampleResult(sampleResult);
}
influxDBMetricsManager.writeAndSendMetrics();
}
influxDBMetricsManager.writeAndSendMetrics();
}
}

private void addMetricFromSampleResult(SampleResult sampleResult) {
String tags = "," + createTags(sampleResult);
String fields = createFields(sampleResult);
long timestamp = sampleResult.getTimeStamp();

influxDBMetricsManager.addMetric(measurement, tags, fields, timestamp);
influxDBMetricsManager.addMetric(measurement, tags, fields, sampleResult.getTimeStamp());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class InfluxdbBackendListenerClient extends AbstractBackendListenerClient
DEFAULT_ARGS.put("percentiles", "99;95;90");
DEFAULT_ARGS.put("testTitle", "Test name");
DEFAULT_ARGS.put("eventTags", "");
DEFAULT_ARGS.put("enabled", "${__P(InfluxdbBackendListener.enabled, true)}");
}

private boolean summaryOnly;
Expand All @@ -125,22 +126,25 @@ public class InfluxdbBackendListenerClient extends AbstractBackendListenerClient
private String testTags;
private String applicationName = "";
private String userTag = "";
private InfluxdbMetricsSender influxdbMetricsManager;
private InfluxdbMetricsSender influxDBMetricsManager;

private ScheduledExecutorService scheduler;
private ScheduledFuture<?> timerHandle;

private boolean listenerIsEnabled;

public InfluxdbBackendListenerClient() {
super();
}

@Override
public void run() {
sendMetrics();
if (listenerIsEnabled) {
sendMetrics();
}
}

private void sendMetrics() {

synchronized (LOCK) {
for (Map.Entry<String, SamplerMetric> entry : metricsPerSampler.entrySet()) {
SamplerMetric metric = entry.getValue();
Expand All @@ -167,9 +171,9 @@ private void sendMetrics() {
field.append(METRIC_STARTED_THREADS).append(userMetrics.getStartedThreads()).append(',');
field.append(METRIC_ENDED_THREADS).append(userMetrics.getFinishedThreads());

influxdbMetricsManager.addMetric(measurement, tag.toString(), field.toString());
influxDBMetricsManager.addMetric(measurement, tag.toString(), field.toString());

influxdbMetricsManager.writeAndSendMetrics();
influxDBMetricsManager.writeAndSendMetrics();
}

@FunctionalInterface
Expand Down Expand Up @@ -207,14 +211,14 @@ private void addErrorMetric(String transaction, ErrorMetric err, long count) {

StringBuilder field = new StringBuilder(30);
field.append(METRIC_COUNT).append(count);
influxdbMetricsManager.addMetric(measurement, tag.toString(), field.toString());
influxDBMetricsManager.addMetric(measurement, tag.toString(), field.toString());
}

private void addMetric(String transaction, int count,
Long sentBytes, Long receivedBytes,
String status, double mean, double minTime, double maxTime,
int hits,
Collection<Float> pcts, PercentileProvider percentileProvider) {
Long sentBytes, Long receivedBytes,
String status, double mean, double minTime, double maxTime,
int hits,
Collection<Float> pcts, PercentileProvider percentileProvider) {
if (count <= 0) {
return;
}
Expand Down Expand Up @@ -246,7 +250,7 @@ private void addMetric(String transaction, int count,
field.append(',').append(METRIC_PCT_PREFIX).append(pct).append('=').append(
percentileProvider.getPercentileValue(pct));
}
influxdbMetricsManager.addMetric(measurement, tag.toString(), field.toString());
influxDBMetricsManager.addMetric(measurement, tag.toString(), field.toString());
}

private void addCumulatedMetrics(SamplerMetric metric) {
Expand Down Expand Up @@ -281,7 +285,7 @@ private void addCumulatedMetrics(SamplerMetric metric) {
for (Float pct : pcts) {
field.append(',').append(METRIC_PCT_PREFIX).append(pct).append('=').append(Double.toString(metric.getAllPercentile(pct)));
}
influxdbMetricsManager.addMetric(measurement, tag.toString(), field.toString());
influxDBMetricsManager.addMetric(measurement, tag.toString(), field.toString());
}

public String getSamplersRegex() {
Expand All @@ -297,17 +301,20 @@ public void setSamplersList(String samplersList) {

@Override
public void handleSampleResults(List<SampleResult> sampleResults, BackendListenerContext context) {
synchronized (LOCK) {
UserMetric userMetrics = getUserMetrics();
for (SampleResult sampleResult : sampleResults) {
userMetrics.add(sampleResult);
Matcher matcher = samplersToFilter.matcher(sampleResult.getSampleLabel());
if (!summaryOnly && matcher.find()) {
SamplerMetric samplerMetric = getSamplerMetricInfluxdb(sampleResult.getSampleLabel());
samplerMetric.add(sampleResult);
if (listenerIsEnabled) {
log.debug("Handling {} sample results", sampleResults.size());
synchronized (LOCK) {
UserMetric userMetrics = getUserMetrics();
for (SampleResult sampleResult : sampleResults) {
userMetrics.add(sampleResult);
Matcher matcher = samplersToFilter.matcher(sampleResult.getSampleLabel());
if (!summaryOnly && matcher.find()) {
SamplerMetric samplerMetric = getSamplerMetricInfluxdb(sampleResult.getSampleLabel());
samplerMetric.add(sampleResult);
}
SamplerMetric cumulatedMetrics = getSamplerMetricInfluxdb(CUMULATED_METRICS);
cumulatedMetrics.addCumulated(sampleResult);
}
SamplerMetric cumulatedMetrics = getSamplerMetricInfluxdb(CUMULATED_METRICS);
cumulatedMetrics.addCumulated(sampleResult);
}
}
}
Expand All @@ -324,25 +331,30 @@ public void setupTest(BackendListenerContext context) throws Exception {
testTags = AbstractInfluxdbMetricsSender.tagToStringValue(
context.getParameter("eventTags", ""));

initPercentiles(context);
initUserTags(context);
initInfluxdbMetricsManager(context);
listenerIsEnabled = Boolean.parseBoolean(context.getParameter("enabled", "true"));
log.info("{} will send metrics: {}", this.getClass().getSimpleName(), listenerIsEnabled);

if (listenerIsEnabled) {
initPercentiles(context);
initUserTags(context);
initinfluxDBMetricsManager(context);

samplersToFilter = Pattern.compile(samplersRegex);
addAnnotation(true);
samplersToFilter = Pattern.compile(samplersRegex);
addAnnotation(true);

scheduler = Executors.newScheduledThreadPool(MAX_POOL_SIZE);
// Start immediately the scheduler and put the pooling ( 5 seconds by default )
this.timerHandle = scheduler.scheduleAtFixedRate(this, 0, SEND_INTERVAL, TimeUnit.SECONDS);
scheduler = Executors.newScheduledThreadPool(MAX_POOL_SIZE);
// Start immediately the scheduler and put the pooling ( 5 seconds by default )
this.timerHandle = scheduler.scheduleAtFixedRate(this, 0, SEND_INTERVAL, TimeUnit.SECONDS);
}
}

private void initInfluxdbMetricsManager(BackendListenerContext context) throws Exception {
private void initinfluxDBMetricsManager(BackendListenerContext context) throws Exception {
Class<?> clazz = Class.forName(context.getParameter("influxdbMetricsSender"));
influxdbMetricsManager = (InfluxdbMetricsSender) clazz.getDeclaredConstructor().newInstance();
influxDBMetricsManager = (InfluxdbMetricsSender) clazz.getDeclaredConstructor().newInstance();

String influxdbUrl = context.getParameter("influxdbUrl");
String influxdbToken = context.getParameter("influxdbToken");
influxdbMetricsManager.setup(influxdbUrl, influxdbToken);
influxDBMetricsManager.setup(influxdbUrl, influxdbToken);
}

private void initUserTags(BackendListenerContext context) {
Expand Down Expand Up @@ -405,23 +417,26 @@ private SamplerMetric getSamplerMetricInfluxdb(String sampleLabel) {

@Override
public void teardownTest(BackendListenerContext context) throws Exception {
boolean cancelState = timerHandle.cancel(false);
log.debug("Canceled state: {}", cancelState);
scheduler.shutdown();
try {
scheduler.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Error waiting for end of scheduler");
Thread.currentThread().interrupt();
}

addAnnotation(false);
if (listenerIsEnabled) {
boolean cancelState = timerHandle.cancel(false);
log.debug("Canceled state: {}", cancelState);
scheduler.shutdown();
try {
scheduler.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Error waiting for end of scheduler");
Thread.currentThread().interrupt();
}

// Send last set of data before ending
log.info("Sending last metrics to InfluxDB");
sendMetrics();
addAnnotation(false);

influxdbMetricsManager.destroy();
// Send last set of data before ending
log.info("Sending last metrics to InfluxDB");
sendMetrics();
}
if (influxDBMetricsManager != null) {
influxDBMetricsManager.destroy();
}
super.teardownTest(context);
}

Expand All @@ -443,7 +458,7 @@ private void addAnnotation(boolean isStartOfTest) {
AbstractInfluxdbMetricsSender.fieldToStringValue(
testTitle + (isStartOfTest ? " started" : " ended")) + "\"";

influxdbMetricsManager.addMetric(EVENTS_FOR_ANNOTATION, tags, field);
influxDBMetricsManager.addMetric(EVENTS_FOR_ANNOTATION, tags, field);
}

@Override
Expand Down
Loading