From 3423c49b1400d0a4c3e1196b8bea93e64918b1de Mon Sep 17 00:00:00 2001 From: Rich Turner <7072278+richturner@users.noreply.github.com> Date: Tue, 30 May 2023 14:54:42 +0100 Subject: [PATCH 1/2] Added async subscription support --- .../src/main/java/net/xmeter/Constants.java | 2 + .../xmeter/gui/AsyncResponseSamplerUI.java | 61 +++++++++++++++++++ .../java/net/xmeter/gui/SubSamplerUI.java | 5 ++ .../xmeter/samplers/AsyncResponseSampler.java | 34 +++++++++++ .../java/net/xmeter/samplers/SubSampler.java | 60 +++++++++++++----- 5 files changed, 147 insertions(+), 15 deletions(-) create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/gui/AsyncResponseSamplerUI.java create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/AsyncResponseSampler.java diff --git a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java index e207efb..ce4c47f 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java @@ -42,6 +42,8 @@ public interface Constants { public static final String TIME_STAMP_SEP_FLAG = "ts_sep_flag"; public static final String DEBUG_RESPONSE = "mqtt.debug_response"; + + public static final String SUB_ASYNC = "mqtt.sub_async"; public static final int QOS_0 = 0; public static final int QOS_1 = 1; diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/AsyncResponseSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/AsyncResponseSamplerUI.java new file mode 100644 index 0000000..9e4539d --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/AsyncResponseSamplerUI.java @@ -0,0 +1,61 @@ +package net.xmeter.gui; + +import net.xmeter.Constants; +import net.xmeter.samplers.AsyncResponseSampler; +import net.xmeter.samplers.DisConnectSampler; +import org.apache.jmeter.gui.util.VerticalPanel; +import org.apache.jmeter.samplers.gui.AbstractSamplerGui; +import org.apache.jmeter.testelement.TestElement; + +import javax.swing.*; +import java.awt.*; + +public class AsyncResponseSamplerUI extends AbstractSamplerGui implements Constants { + private static final long serialVersionUID = 1666890646673145131L; + + public AsyncResponseSamplerUI() { + this.init(); + } + + private void init() { + setLayout(new BorderLayout()); + setBorder(makeBorder()); + add(makeTitlePanel(), BorderLayout.NORTH); + JPanel mainPanel = new VerticalPanel(); + add(mainPanel, BorderLayout.CENTER); + } + + @Override + public void configure(TestElement element) { + super.configure(element); + } + + @Override + public TestElement createTestElement() { + AsyncResponseSampler sampler = new AsyncResponseSampler(); + this.configureTestElement(sampler); + return sampler; + } + + @Override + public String getLabelResource() { + throw new RuntimeException(); + } + + @Override + public String getStaticLabel() { + return "MQTT Async Response"; + } + + @Override + public void modifyTestElement(TestElement arg0) { + AsyncResponseSampler sampler = (AsyncResponseSampler) arg0; + this.configureTestElement(sampler); + } + + @Override + public void clearGui() { + super.clearGui(); + } + +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/SubSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/SubSamplerUI.java index c7e7625..2765d49 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/SubSamplerUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/SubSamplerUI.java @@ -31,6 +31,7 @@ public class SubSamplerUI extends AbstractSamplerGui implements Constants, Chang private JCheckBox debugResponse = new JCheckBox("Debug response"); private JCheckBox timestamp = new JCheckBox("Payload includes timestamp"); + private JCheckBox async = new JCheckBox("Async"); public SubSamplerUI() { this.init(); @@ -71,6 +72,7 @@ private JPanel createSubOption() { JPanel optsPanel2 = new HorizontalPanel(); optsPanel2.add(debugResponse); + optsPanel2.add(async); optsPanelCon.add(optsPanel2); return optsPanelCon; @@ -102,6 +104,7 @@ public void configure(TestElement element) { this.topicNames.setText(sampler.getTopics()); this.timestamp.setSelected(sampler.isAddTimestamp()); this.debugResponse.setSelected(sampler.isDebugResponse()); + this.async.setSelected(sampler.isAsync()); this.sampleOnCondition.setText(sampler.getSampleCondition()); if(SAMPLE_ON_CONDITION_OPTION1.equalsIgnoreCase(sampleOnCondition.getText())) { @@ -145,6 +148,7 @@ private void setupSamplerProperties(SubSampler sampler) { sampler.setAddTimestamp(this.timestamp.isSelected()); sampler.setDebugResponse(this.debugResponse.isSelected()); + sampler.setAsync(this.async.isSelected()); sampler.setSampleCondition(this.sampleOnCondition.getText()); if(SAMPLE_ON_CONDITION_OPTION1.equalsIgnoreCase(sampleOnCondition.getText())) { @@ -161,6 +165,7 @@ public void clearGui() { this.qosChoice.setText(String.valueOf(QOS_0)); this.timestamp.setSelected(false); this.debugResponse.setSelected(false); + this.async.setSelected(false); this.sampleOnCondition.setText(SAMPLE_ON_CONDITION_OPTION1); this.sampleConditionValue.setText(DEFAULT_SAMPLE_VALUE_ELAPSED_TIME_MILLI_SEC); } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AsyncResponseSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AsyncResponseSampler.java new file mode 100644 index 0000000..8fb567f --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AsyncResponseSampler.java @@ -0,0 +1,34 @@ +package net.xmeter.samplers; + +import org.apache.jmeter.samplers.Entry; +import org.apache.jmeter.samplers.SampleResult; +import org.apache.jmeter.threads.JMeterContextService; +import org.apache.jmeter.threads.JMeterVariables; + +import java.util.logging.Logger; + +public class AsyncResponseSampler extends AbstractMQTTSampler { + private static final long serialVersionUID = 4360869021667126983L; + + @Override + public SampleResult sample(Entry entry) { + SampleResult result = new SampleResult(); + result.setSampleLabel(getName()); + + JMeterVariables vars = JMeterContextService.getContext().getVariables(); + SubSampler subSampler = (SubSampler) vars.getObject("sub"); + + if (subSampler == null) { + result.sampleStart(); + result.setSuccessful(false); + result.setResponseMessage("Sub sampler not found."); + result.setResponseData("Failed. Sub sampler not found.".getBytes()); + result.setResponseCode("500"); + result.sampleEnd(); // avoid endtime=0 exposed in trace log + return result; + } + + vars.remove("sub"); // clean up thread local var as well + return subSampler.produceAsyncResult(result); + } +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java index 3b336d6..5d7d5a3 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java @@ -1,5 +1,13 @@ package net.xmeter.samplers; +import net.xmeter.SubBean; +import net.xmeter.samplers.mqtt.MQTTConnection; +import net.xmeter.samplers.mqtt.MQTTQoS; +import org.apache.jmeter.samplers.Entry; +import org.apache.jmeter.samplers.SampleResult; +import org.apache.jmeter.threads.JMeterContextService; +import org.apache.jmeter.threads.JMeterVariables; + import java.text.MessageFormat; import java.util.HashSet; import java.util.List; @@ -9,15 +17,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.jmeter.samplers.Entry; -import org.apache.jmeter.samplers.SampleResult; -import org.apache.jmeter.threads.JMeterContextService; -import org.apache.jmeter.threads.JMeterVariables; - -import net.xmeter.SubBean; -import net.xmeter.samplers.mqtt.MQTTConnection; -import net.xmeter.samplers.mqtt.MQTTQoS; - @SuppressWarnings("deprecation") public class SubSampler extends AbstractMQTTSampler { private static final long serialVersionUID = 2979978053740194951L; @@ -28,11 +27,12 @@ public class SubSampler extends AbstractMQTTSampler { private boolean subFailed = false; private boolean sampleByTime = true; // initial values - private int sampleElapsedTime = 1000; + private int sampleElapsedTime = 1000; private int sampleCount = 1; private transient ConcurrentLinkedQueue batches = new ConcurrentLinkedQueue<>(); private boolean printFlag = false; + private boolean asyncActive = false; private transient Object dataLock = new Object(); @@ -92,6 +92,14 @@ public void setDebugResponse(boolean debugResponse) { setProperty(DEBUG_RESPONSE, debugResponse); } + public boolean isAsync() { + return getPropertyAsBoolean(SUB_ASYNC, false); + } + + public void setAsync(boolean debugResponse) { + setProperty(SUB_ASYNC, debugResponse); + } + @Override public SampleResult sample(Entry arg0) { SampleResult result = new SampleResult(); @@ -142,12 +150,25 @@ public SampleResult sample(Entry arg0) { if (subFailed) { return fillFailedResult(result, "501", "Failed to subscribe to topic(s):" + topicsName); } - - if(sampleByTime) { + + return doSample(result); + } + + private SampleResult doSample(SampleResult result) { + JMeterVariables vars = JMeterContextService.getContext().getVariables(); + final String topicsName= getTopics(); + + if (isAsync() && !asyncActive) { + asyncActive = true; + vars.putObject("sub", this); // save this sampler as thread local variable for response sampler + result.sampleStart(); + return fillOKResult(result, 0, null, ""); + } else if (sampleByTime) { try { TimeUnit.MILLISECONDS.sleep(sampleElapsedTime); } catch (InterruptedException e) { logger.log(Level.INFO, "Received exception when waiting for notification signal", e); + return fillFailedResult(result, "500", "Interrupted"); } synchronized (dataLock) { result.sampleStart(); @@ -155,17 +176,20 @@ public SampleResult sample(Entry arg0) { } } else { synchronized (dataLock) { - int receivedCount1 = (batches.isEmpty() ? 0 : batches.element().getReceivedCount());; + int receivedCount1 = (batches.isEmpty() ? 0 : batches.element().getReceivedCount()); boolean needWait = false; if(receivedCount1 < sampleCount) { needWait = true; } - + + logger.fine("Count = " + receivedCount1 + ", Expected = " + sampleCount + ", wait = " + needWait); + if(needWait) { try { dataLock.wait(); } catch (InterruptedException e) { logger.log(Level.INFO, "Received exception when waiting for notification signal", e); + return fillFailedResult(result, "500", "Interrupted"); } } result.sampleStart(); @@ -173,7 +197,13 @@ public SampleResult sample(Entry arg0) { } } } - + + protected SampleResult produceAsyncResult(SampleResult result) { + synchronized (dataLock) { + return doSample(result); + } + } + private SampleResult produceResult(SampleResult result, String topicName) { SubBean bean = batches.poll(); if(bean == null) { // In "elapsed time" mode, return "dummy" when time is reached From f856841a0c145a6adf2513514fe86bea8f0c423b Mon Sep 17 00:00:00 2001 From: Rich Turner <7072278+richturner@users.noreply.github.com> Date: Wed, 31 May 2023 12:02:12 +0100 Subject: [PATCH 2/2] Make subscription reusable --- .../java/net/xmeter/samplers/AsyncResponseSampler.java | 5 +++-- .../src/main/java/net/xmeter/samplers/SubSampler.java | 8 ++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AsyncResponseSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AsyncResponseSampler.java index 8fb567f..b0e2713 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AsyncResponseSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AsyncResponseSampler.java @@ -28,7 +28,8 @@ public SampleResult sample(Entry entry) { return result; } - vars.remove("sub"); // clean up thread local var as well - return subSampler.produceAsyncResult(result); + // Could make clear responses configurable but cannot think of a good reason to not clear them which allows + // the sub sampler to be reused + return subSampler.produceAsyncResult(result, true); } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java index 5d7d5a3..a60746e 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java @@ -198,9 +198,13 @@ private SampleResult doSample(SampleResult result) { } } - protected SampleResult produceAsyncResult(SampleResult result) { + protected SampleResult produceAsyncResult(SampleResult result, boolean clearResponses) { synchronized (dataLock) { - return doSample(result); + SampleResult result1 = doSample(result); + if (result1.isSuccessful() && clearResponses) { + batches.clear(); + } + return result1; } }