Skip to content

Added async subscription support #141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mqtt_jmeter/src/main/java/net/xmeter/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public interface Constants {
public static final String TIME_STAMP_SEP_FLAG = "ts_sep_flag";

public static final String DEBUG_RESPONSE = "mqtt.debug_response";

public static final String SUB_ASYNC = "mqtt.sub_async";

public static final int QOS_0 = 0;
public static final int QOS_1 = 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package net.xmeter.gui;

import net.xmeter.Constants;
import net.xmeter.samplers.AsyncResponseSampler;
import net.xmeter.samplers.DisConnectSampler;
import org.apache.jmeter.gui.util.VerticalPanel;
import org.apache.jmeter.samplers.gui.AbstractSamplerGui;
import org.apache.jmeter.testelement.TestElement;

import javax.swing.*;
import java.awt.*;

public class AsyncResponseSamplerUI extends AbstractSamplerGui implements Constants {
private static final long serialVersionUID = 1666890646673145131L;

public AsyncResponseSamplerUI() {
this.init();
}

private void init() {
setLayout(new BorderLayout());
setBorder(makeBorder());
add(makeTitlePanel(), BorderLayout.NORTH);
JPanel mainPanel = new VerticalPanel();
add(mainPanel, BorderLayout.CENTER);
}

@Override
public void configure(TestElement element) {
super.configure(element);
}

@Override
public TestElement createTestElement() {
AsyncResponseSampler sampler = new AsyncResponseSampler();
this.configureTestElement(sampler);
return sampler;
}

@Override
public String getLabelResource() {
throw new RuntimeException();
}

@Override
public String getStaticLabel() {
return "MQTT Async Response";
}

@Override
public void modifyTestElement(TestElement arg0) {
AsyncResponseSampler sampler = (AsyncResponseSampler) arg0;
this.configureTestElement(sampler);
}

@Override
public void clearGui() {
super.clearGui();
}

}
5 changes: 5 additions & 0 deletions mqtt_jmeter/src/main/java/net/xmeter/gui/SubSamplerUI.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class SubSamplerUI extends AbstractSamplerGui implements Constants, Chang

private JCheckBox debugResponse = new JCheckBox("Debug response");
private JCheckBox timestamp = new JCheckBox("Payload includes timestamp");
private JCheckBox async = new JCheckBox("Async");

public SubSamplerUI() {
this.init();
Expand Down Expand Up @@ -71,6 +72,7 @@ private JPanel createSubOption() {

JPanel optsPanel2 = new HorizontalPanel();
optsPanel2.add(debugResponse);
optsPanel2.add(async);
optsPanelCon.add(optsPanel2);

return optsPanelCon;
Expand Down Expand Up @@ -102,6 +104,7 @@ public void configure(TestElement element) {
this.topicNames.setText(sampler.getTopics());
this.timestamp.setSelected(sampler.isAddTimestamp());
this.debugResponse.setSelected(sampler.isDebugResponse());
this.async.setSelected(sampler.isAsync());
this.sampleOnCondition.setText(sampler.getSampleCondition());

if(SAMPLE_ON_CONDITION_OPTION1.equalsIgnoreCase(sampleOnCondition.getText())) {
Expand Down Expand Up @@ -145,6 +148,7 @@ private void setupSamplerProperties(SubSampler sampler) {

sampler.setAddTimestamp(this.timestamp.isSelected());
sampler.setDebugResponse(this.debugResponse.isSelected());
sampler.setAsync(this.async.isSelected());
sampler.setSampleCondition(this.sampleOnCondition.getText());

if(SAMPLE_ON_CONDITION_OPTION1.equalsIgnoreCase(sampleOnCondition.getText())) {
Expand All @@ -161,6 +165,7 @@ public void clearGui() {
this.qosChoice.setText(String.valueOf(QOS_0));
this.timestamp.setSelected(false);
this.debugResponse.setSelected(false);
this.async.setSelected(false);
this.sampleOnCondition.setText(SAMPLE_ON_CONDITION_OPTION1);
this.sampleConditionValue.setText(DEFAULT_SAMPLE_VALUE_ELAPSED_TIME_MILLI_SEC);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package net.xmeter.samplers;

import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.threads.JMeterContextService;
import org.apache.jmeter.threads.JMeterVariables;

import java.util.logging.Logger;

public class AsyncResponseSampler extends AbstractMQTTSampler {
private static final long serialVersionUID = 4360869021667126983L;

@Override
public SampleResult sample(Entry entry) {
SampleResult result = new SampleResult();
result.setSampleLabel(getName());

JMeterVariables vars = JMeterContextService.getContext().getVariables();
SubSampler subSampler = (SubSampler) vars.getObject("sub");

if (subSampler == null) {
result.sampleStart();
result.setSuccessful(false);
result.setResponseMessage("Sub sampler not found.");
result.setResponseData("Failed. Sub sampler not found.".getBytes());
result.setResponseCode("500");
result.sampleEnd(); // avoid endtime=0 exposed in trace log
return result;
}

// Could make clear responses configurable but cannot think of a good reason to not clear them which allows
// the sub sampler to be reused
return subSampler.produceAsyncResult(result, true);
}
}
64 changes: 49 additions & 15 deletions mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package net.xmeter.samplers;

import net.xmeter.SubBean;
import net.xmeter.samplers.mqtt.MQTTConnection;
import net.xmeter.samplers.mqtt.MQTTQoS;
import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.threads.JMeterContextService;
import org.apache.jmeter.threads.JMeterVariables;

import java.text.MessageFormat;
import java.util.HashSet;
import java.util.List;
Expand All @@ -9,15 +17,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.threads.JMeterContextService;
import org.apache.jmeter.threads.JMeterVariables;

import net.xmeter.SubBean;
import net.xmeter.samplers.mqtt.MQTTConnection;
import net.xmeter.samplers.mqtt.MQTTQoS;

@SuppressWarnings("deprecation")
public class SubSampler extends AbstractMQTTSampler {
private static final long serialVersionUID = 2979978053740194951L;
Expand All @@ -28,11 +27,12 @@ public class SubSampler extends AbstractMQTTSampler {
private boolean subFailed = false;

private boolean sampleByTime = true; // initial values
private int sampleElapsedTime = 1000;
private int sampleElapsedTime = 1000;
private int sampleCount = 1;

private transient ConcurrentLinkedQueue<SubBean> batches = new ConcurrentLinkedQueue<>();
private boolean printFlag = false;
private boolean asyncActive = false;

private transient Object dataLock = new Object();

Expand Down Expand Up @@ -92,6 +92,14 @@ public void setDebugResponse(boolean debugResponse) {
setProperty(DEBUG_RESPONSE, debugResponse);
}

public boolean isAsync() {
return getPropertyAsBoolean(SUB_ASYNC, false);
}

public void setAsync(boolean debugResponse) {
setProperty(SUB_ASYNC, debugResponse);
}

@Override
public SampleResult sample(Entry arg0) {
SampleResult result = new SampleResult();
Expand Down Expand Up @@ -142,38 +150,64 @@ public SampleResult sample(Entry arg0) {
if (subFailed) {
return fillFailedResult(result, "501", "Failed to subscribe to topic(s):" + topicsName);
}

if(sampleByTime) {

return doSample(result);
}

private SampleResult doSample(SampleResult result) {
JMeterVariables vars = JMeterContextService.getContext().getVariables();
final String topicsName= getTopics();

if (isAsync() && !asyncActive) {
asyncActive = true;
vars.putObject("sub", this); // save this sampler as thread local variable for response sampler
result.sampleStart();
return fillOKResult(result, 0, null, "");
} else if (sampleByTime) {
try {
TimeUnit.MILLISECONDS.sleep(sampleElapsedTime);
} catch (InterruptedException e) {
logger.log(Level.INFO, "Received exception when waiting for notification signal", e);
return fillFailedResult(result, "500", "Interrupted");
}
synchronized (dataLock) {
result.sampleStart();
return produceResult(result, topicsName);
}
} else {
synchronized (dataLock) {
int receivedCount1 = (batches.isEmpty() ? 0 : batches.element().getReceivedCount());;
int receivedCount1 = (batches.isEmpty() ? 0 : batches.element().getReceivedCount());
boolean needWait = false;
if(receivedCount1 < sampleCount) {
needWait = true;
}


logger.fine("Count = " + receivedCount1 + ", Expected = " + sampleCount + ", wait = " + needWait);

if(needWait) {
try {
dataLock.wait();
} catch (InterruptedException e) {
logger.log(Level.INFO, "Received exception when waiting for notification signal", e);
return fillFailedResult(result, "500", "Interrupted");
}
}
result.sampleStart();
return produceResult(result, topicsName);
}
}
}


protected SampleResult produceAsyncResult(SampleResult result, boolean clearResponses) {
synchronized (dataLock) {
SampleResult result1 = doSample(result);
if (result1.isSuccessful() && clearResponses) {
batches.clear();
}
return result1;
}
}

private SampleResult produceResult(SampleResult result, String topicName) {
SubBean bean = batches.poll();
if(bean == null) { // In "elapsed time" mode, return "dummy" when time is reached
Expand Down