diff --git a/.gitignore b/.gitignore index c8d0aeb..4c13908 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ target/ .metadata/ .DS_Store +.idea/ +Download/ diff --git a/Download/v1.0.1/mqtt-xmeter-jar-with-dependencies.jar b/Download/v1.0.1/mqtt-xmeter-jar-with-dependencies.jar deleted file mode 100644 index 12506b6..0000000 Binary files a/Download/v1.0.1/mqtt-xmeter-jar-with-dependencies.jar and /dev/null differ diff --git a/Download/v1.13.0/mqtt-xmeter-1.13-jar-with-dependencies.jar b/Download/v1.13.0/mqtt-xmeter-1.13-jar-with-dependencies.jar deleted file mode 100644 index f476018..0000000 Binary files a/Download/v1.13.0/mqtt-xmeter-1.13-jar-with-dependencies.jar and /dev/null differ diff --git a/Download/v2.0.2/mqtt-xmeter-2.0.2-jar-with-dependencies.jar b/Download/v2.0.2/mqtt-xmeter-2.0.2-jar-with-dependencies.jar deleted file mode 100644 index 65e54ce..0000000 Binary files a/Download/v2.0.2/mqtt-xmeter-2.0.2-jar-with-dependencies.jar and /dev/null differ diff --git a/Download/v2.0.2/mqtt-xmeter-fuse-2.0.2-jar-with-dependencies.jar b/Download/v2.0.2/mqtt-xmeter-fuse-2.0.2-jar-with-dependencies.jar deleted file mode 100644 index 1913626..0000000 Binary files a/Download/v2.0.2/mqtt-xmeter-fuse-2.0.2-jar-with-dependencies.jar and /dev/null differ diff --git a/README.md b/README.md index 08ba374..024f053 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,23 @@ # mqtt-jmeter Overview -MQTT JMeter Plugin extends your JMeter's capability to test against MQTT protocol, just as easy as ordinary HTTP protocal. +MQTT JMeter Plugin extends your JMeter's capability to test against MQTT protocol, just as easy as ordinary HTTP protocol. It has been used to benchmark EMQ server performance, and here is the [report link](https://github.com/emqx/emq-xmeter-cn). -This plugin is developed and maintained by [XMeter](https://www.xmeter.net). XMeter is a professional performance testing service provider. +The original plugin is developed by [XMeter](https://www.xmeter.net). This version of the plugin is developed by Expleo for internal usage only. # Install instruction -The plugin is a standard JMeter plugin. You can download the latest version of mqtt-jmeter from [here](https://github.com/emqx/mqtt-jmeter/releases), and then copy the downloaded JAR files into $JMETER_HOME/lib/ext folder. After restarting the JMeter, you can see "MQTT samplers" provided by this plugin. +The plugin is a standard JMeter plugin. To use it, clone the project and run 'mvn install'. Maven will download some JMeter dependencies, so build time may depend on your network connection. Find the resulting JAR file (usually ending with '\[...\]-with-dependencies') in the target and copy it into $JMETER_HOME/lib/ext folder. After restarting JMeter, you can see "MQTT samplers" provided by this plugin. To use this plugin, we recommend you to install JMeter 3.2 or above. -## Build from source code - -If you'd like to build binary by yourself, please clone the project and run 'mvn install'. Maven will download some JMeter dependency binary files, so the build elapsed time will up to your network status. - # How to use The plugin includes 4 samplers: -- Connect sampler: Initiate MQTT server connection on behalf of a device. In addition to normal connection setup, this sampler can be used to simulate massive "background" connections(no data transimission but regular hearbeat signals) to the designated MQTT server or cluster system. +- Connect sampler: Initiate MQTT server connection on behalf of a device. In addition to normal connection setup, this sampler can be used to simulate massive "background" connections (no data transmission but regular heartbeat signals) to the designated MQTT server or cluster system. - Pub sampler: publish various messages to the target MQTT server. -- Sub sampler: subscribe message(s) from target MQTT server. +- Sub sampler: subscribe to topics on target MQTT server and receive messages from the server. - DisConnect sampler: Reset the connection to target MQTT server. @@ -29,7 +25,6 @@ If MQTT JMeter plugin is successfully installed, you can find these MQTT sampler ![mqtt_jmeter_plugin](screenshots/mqtt_jmeter_plugin.png) - ## Connect Sampler ![conn_sampler](screenshots/conn_sampler.png) @@ -39,13 +34,12 @@ This section includes basic connection settings. - **Server name or IP**: The MQTT target to be tested. It can be either IP address or server name. The default value is 127.0.0.1. **DO NOT** add protocol (e.g. tcp:// or ssl://) before server name or IP address! -- **Port number**: The port opened by MQTT server. Typically 1883 is for TCP protocol, and 8883 for SSL protocol. +- **Port number**: The port opened by MQTT server. Typically, 1883 is for TCP protocol, and 8883 for SSL protocol. -- **MQTT version**: The MQTT version, default is 3.1, and another option is 3.1.1. Sometimes we found version 3.1.1 is required to establish connection to [Azure IoTHub](https://github.com/emqx/mqtt-jmeter/issues/21). +- **MQTT version**: The MQTT version, default is 3.1, and another option is 3.1.1. We found that sometimes version 3.1.1 is required to establish connection to [Azure IoTHub](https://github.com/emqx/mqtt-jmeter/issues/21). - **Timeout(s)**: The connection timeout seconds while connecting to MQTT server. The default is 10 seconds. - ### MQTT Protocol The sampler supports 4 protocols, TCP, SSL, WS, WSS. @@ -57,9 +51,9 @@ If **'Dual SSL authentication'** is checked, please follow 'Certification files ### User authentication -User can configure MQTT server with user name & password authentication, refer to [EMQ user name and password authentication guide](http://emqtt.com/docs/v2/guide.html#id3). +User can configure MQTT server with username & password authentication, refer to [EMQ user name and password authentication guide](http://emqtt.com/docs/v2/guide.html#id3). -- **User name**: If MQTT server is configured with user name, then specify user name here. +- **Username**: If MQTT server is configured with username, then specify username here. - **Password**: If MQTT server is configured with password, then specify password here. @@ -67,8 +61,8 @@ User can configure MQTT server with user name & password authentication, refer t - **ClientId**: Identification of the client, i.e. virtual user or JMeter thread. Default value is 'conn_'. If 'Add random client id suffix' is selected, JMeter plugin will append generated uuid as suffix to represent the client, otherwise, the text of 'ClientId' will be passed as 'clientId' of current connection. - **Keep alive(s)**: Ping packet send interval in seconds. Default value is 300, which means each connection sends a ping packet to MQTT server every 5 minutes. -- **Connect attampt max**: The maximum number of reconnect attempts before an error is reported back to the client on the first attempt by the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to 0. -- **Reconnect attampt max**: The maximum number of reconnect attempts before an error is reported back to the client after a server connection had previously been established. Set to -1 to use unlimited attempts. Defaults to 0. +- **Connect attempt max**: The maximum number of reconnect attempts before an error is reported back to the client on the first attempt by the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to 0. +- **Reconnect attempt max**: The maximum number of reconnect attempts before an error is reported back to the client after a server connection had previously been established. Set to -1 to use unlimited attempts. Defaults to 0. - **Clean session**: If you want to maintain state information between sessions, set it to false; otherwise, set it to true. @@ -108,7 +102,7 @@ Sub sampler reuses previously established connection (by Connect sampler) to sub - **Topic name(s)**: A list of topic names (comma-separated) that will be subscribed to. -- **Payload includes timestamp**: If the checkbox is enabled, then it means the payload includes timestamp. It can be used to calcuate the message latency time. +- **Payload includes timestamp**: If the checkbox is enabled, then it means the payload includes timestamp. It can be used to calculate the message latency time. ``` message_latency = timestamp_in_sub_when_receive_msg - timestamp_in_payload (timestamp in pub machine when sending out message) @@ -116,7 +110,7 @@ message_latency = timestamp_in_sub_when_receive_msg - timestamp_in_payload (time Please notice, if the machine publish message is not the same as subscriber, then the calculated message latency time is not accurate. It's because the time is almost not the same in different machines. So the latency time calculated by sub sampler could be only be a reference. ``` -- **Sample on**: It controls how to sample. The default value is '**elapsed with specified time(ms)**', which means a sub sampler will occur every specified milli-seconds (default is 1000ms). During the 1000 ms, multiple messages could be received, and result in report is the summarized data during 1000 ms. If the value is set to 2000, then means summarized report during 2000 ms. Another option is '**number of received messages**', which means a sub sampler will occur after receiving these specified number of messages (default is 1). +- **Sample on**: It controls how to sample. The default value is '**elapsed with specified time(ms)**', which means a sub sampler will occur every specified milli-seconds (default is 1000ms). During the 1000 ms, multiple messages could be received, and result in report is the summarized data during 1000 ms. If the value is set to 2000, then means summarized report during 2000 ms. Another option is '**number of received messages**', which means a sub sampler will occur after receiving these specified number of messages (default is 1). - **Debug response**: If checked, the received message will be print in response. It's recommended to enable this option when you debug your script. @@ -124,8 +118,7 @@ It's because the time is almost not the same in different machines. So the laten ## DisConnect Sampler ![disconn_sampler](screenshots/disconn_sampler.png) -This sampler is very simple, it just clear the previous created connection. Therefore, next time you run Connect sampler, it will initiate a new MQTT server connection for you. As you can imagine, Disconnect sample will fail immediately if no connection is detected at this moment. - +This sampler is very simple, it just clears the previous created connection. Therefore, next time you run Connect sampler, it will initiate a new MQTT server connection for you. As you can imagine, Disconnect sample will fail immediately if no connection is detected at this moment. ## Example JMeter Scripts As a reference, you can check out some example scripts in SampleScripts folder. @@ -133,7 +126,7 @@ As a reference, you can check out some example scripts in SampleScripts folder. Simulate massive background MQTT connections to server. You can optionally subscribe to a topic when connecting. (Please modify "xmeter_runtime_vars" UDV to fit your needs.) 2) pubsub_unidirection.jmx: -Demonstrate how sub sampler can get messages from corresponding pub sampler, with two JMeter user groups and delay between opertions. +Demonstrate how sub sampler can get messages from corresponding pub sampler, with two JMeter user groups and delay between operations. 3) pubsub_bidirection.jmx: Demonstrate how a set of Devices and Mobiles exchange messages in both directions. @@ -153,7 +146,7 @@ After deploying emqtt server, you get the following OOTB (out of the box) SSL/TL [Note:] The above server and client certifications are both issued by the self-signed CA. If you would like to use official certifications for your EMQTT deployment, please check out relevant document to configure it. -We will use the OOTB test certfications (as an example) to show you how to prepare the required certification files for this EMQTT JMeter plugin. +We will use the OOTB test certifications (as an example) to show you how to prepare the required certification files for this EMQTT JMeter plugin. ``` export PATH=$PATH:/bin diff --git a/mqtt_jmeter/pom.xml b/mqtt_jmeter/pom.xml index fdab02d..f4c3cb8 100644 --- a/mqtt_jmeter/pom.xml +++ b/mqtt_jmeter/pom.xml @@ -3,10 +3,10 @@ 4.0.0 net.xmeter mqtt-jmeter - 2.0.2 + 2.0.3 - 5.0 + 5.4.3 @@ -27,19 +27,25 @@ org.fusesource.mqtt-client mqtt-client - 1.14 + 1.16 com.hivemq hivemq-mqtt-client - 1.1.3 + 1.3.0 + + + + javax.xml.bind + jaxb-api + 2.4.0-b180830.0359 - mqtt-xmeter-${project.version} + plugin-xmeter-${project.version} install @@ -47,8 +53,8 @@ maven-compiler-plugin 3.8.0 - 1.8 - 1.8 + 9 + 9 diff --git a/mqtt_jmeter/src/main/java/net/xmeter/AcceptAllTrustManagerFactory.java b/mqtt_jmeter/src/main/java/net/xmeter/AcceptAllTrustManagerFactory.java index 0b9abac..08fad34 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/AcceptAllTrustManagerFactory.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/AcceptAllTrustManagerFactory.java @@ -14,7 +14,7 @@ public class AcceptAllTrustManagerFactory extends TrustManagerFactory { - private static final Provider PROVIDER = new Provider("", 0.0, "") { + private static final Provider PROVIDER = new Provider("", "0.0", "") { private static final long serialVersionUID = -2226165055935321223L; }; @@ -22,13 +22,13 @@ private AcceptAllTrustManagerFactory() { super(AcceptAllTrustManagerFactorySpi.getInstance(), PROVIDER, ""); } - public static final TrustManagerFactory getInstance() { + public static TrustManagerFactory getInstance() { return new AcceptAllTrustManagerFactory(); } static final class AcceptAllTrustManagerFactorySpi extends TrustManagerFactorySpi { - public static final AcceptAllTrustManagerFactorySpi getInstance() { + public static AcceptAllTrustManagerFactorySpi getInstance() { return new AcceptAllTrustManagerFactorySpi(); } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java index e207efb..0035bec 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java @@ -1,96 +1,100 @@ package net.xmeter; public interface Constants { - public static final String SERVER = "mqtt.server"; - public static final String MQTT_VERSION = "mqtt.version"; - public static final String PORT = "mqtt.port"; - public static final String CONN_TIMEOUT = "mqtt.conn_timeout"; - public static final String MQTT_CLIENT_NAME = "mqtt.client_name"; + String SERVER = "mqtt.server"; + String MQTT_VERSION = "mqtt.version"; + String PORT = "mqtt.port"; + String CONN_TIMEOUT = "mqtt.conn_timeout"; + String MQTT_CLIENT_NAME = "mqtt.client_name"; + String MQTT_CONN_NAME = "mqtt.connect_name"; - public static final String PROTOCOL = "mqtt.protocol"; - public static final String WS_PATH = "mqtt.ws_path"; - public static final String DUAL_AUTH = "mqtt.dual_ssl_authentication"; - public static final String CERT_FILE_PATH1 = "mqtt.keystore_file_path"; - public static final String CERT_FILE_PATH2 = "mqtt.clientcert_file_path"; - public static final String KEY_FILE_PWD1 = "mqtt.keystore_password"; - public static final String KEY_FILE_PWD2 = "mqtt.clientcert_password"; + String PROTOCOL = "mqtt.protocol"; + String WS_PATH = "mqtt.ws_path"; + String DUAL_AUTH = "mqtt.dual_ssl_authentication"; + String CERT_FILE_PATH1 = "mqtt.keystore_file_path"; + String CERT_FILE_PATH2 = "mqtt.clientcert_file_path"; + String KEY_FILE_PWD1 = "mqtt.keystore_password"; + String KEY_FILE_PWD2 = "mqtt.clientcert_password"; - public static final String USER_NAME_AUTH = "mqtt.user_name"; - public static final String PASSWORD_AUTH = "mqtt.password"; + String USER_NAME_AUTH = "mqtt.user_name"; + String PASSWORD_AUTH = "mqtt.password"; - public static final String CONN_CLIENT_ID_PREFIX = "mqtt.client_id_prefix"; - public static final String CONN_CLIENT_ID_SUFFIX = "mqtt.client_id_suffix"; + String CONN_CLIENT_ID_PREFIX = "mqtt.client_id_prefix"; + String CONN_CLIENT_ID_SUFFIX = "mqtt.client_id_suffix"; - public static final String CONN_KEEP_ALIVE = "mqtt.conn_keep_alive"; - public static final String CONN_ATTAMPT_MAX = "mqtt.conn_attampt_max"; - public static final String CONN_RECONN_ATTAMPT_MAX = "mqtt.reconn_attampt_max"; + String CONN_KEEP_ALIVE = "mqtt.conn_keep_alive"; + String CONN_ATTEMPT_MAX = "mqtt.conn_attempt_max"; + String CONN_RECONN_ATTEMPT_MAX = "mqtt.reconn_attempt_max"; - public static final String CONN_CLEAN_SESSION = "mqtt.conn_clean_session"; + String CONN_CLEAN_SESSION = "mqtt.conn_clean_session"; - public static final String MESSAGE_TYPE = "mqtt.message_type"; - public static final String MESSAGE_FIX_LENGTH = "mqtt.message_type_fixed_length"; - public static final String MESSAGE_TO_BE_SENT = "mqtt.message_to_sent"; + String MESSAGE_TYPE = "mqtt.message_type"; + String MESSAGE_FIX_LENGTH = "mqtt.message_type_fixed_length"; + String MESSAGE_TO_BE_SENT = "mqtt.message_to_sent"; - public static final String TOPIC_NAME = "mqtt.topic_name"; - public static final String QOS_LEVEL = "mqtt.qos_level"; - public static final String ADD_TIMESTAMP = "mqtt.add_timestamp"; - public static final String RETAINED_MESSAGE = "mqtt.retained_message"; + String TOPIC_NAME = "mqtt.topic_name"; + String QOS_LEVEL = "mqtt.qos_level"; + String ADD_TIMESTAMP = "mqtt.add_timestamp"; + String RETAINED_MESSAGE = "mqtt.retained_message"; - public static final String SAMPLE_CONDITION_VALUE = "mqtt.sample_condition_value"; - public static final String SAMPLE_CONDITION = "mqtt.sample_condition"; + String SAMPLE_CONDITION_VALUE = "mqtt.sample_condition_value"; + String SAMPLE_CONDITION_VALUE_OPT = "mqtt.sample_condition_value_opt"; + String SAMPLE_CONDITION = "mqtt.sample_condition"; - public static final String TIME_STAMP_SEP_FLAG = "ts_sep_flag"; + String TIME_STAMP_SEP_FLAG = "ts_sep_flag"; - public static final String DEBUG_RESPONSE = "mqtt.debug_response"; + String DEBUG_RESPONSE = "mqtt.debug_response"; - public static final int QOS_0 = 0; - public static final int QOS_1 = 1; - public static final int QOS_2 = 2; + int QOS_0 = 0; + int QOS_1 = 1; + int QOS_2 = 2; - public static final String MESSAGE_TYPE_RANDOM_STR_WITH_FIX_LEN = "Random string with fixed length"; - public static final String MESSAGE_TYPE_HEX_STRING = "Hex string"; - public static final String MESSAGE_TYPE_STRING = "String"; + String MESSAGE_TYPE_RANDOM_STR_WITH_FIX_LEN = "Random string with fixed length"; + String MESSAGE_TYPE_HEX_STRING = "Hex string"; + String MESSAGE_TYPE_STRING = "String"; - public static final String MQTT_VERSION_3_1_1 = "3.1.1"; - public static final String MQTT_VERSION_3_1 = "3.1"; + String MQTT_VERSION_3_1_1 = "3.1.1"; + String MQTT_VERSION_3_1 = "3.1"; - public static final String SAMPLE_ON_CONDITION_OPTION1 = "specified elapsed time (ms)"; - public static final String SAMPLE_ON_CONDITION_OPTION2 = "number of received messages"; + String SAMPLE_ON_CONDITION_OPTION1 = "specified elapsed time (ms)"; + String SAMPLE_ON_CONDITION_OPTION2 = "number of received messages"; - public static final int MAX_CLIENT_ID_LENGTH = 23; + int MAX_CLIENT_ID_LENGTH = 23; - public static final String DEFAULT_SERVER = "127.0.0.1"; - public static final String DEFAULT_MQTT_VERSION = "3.1"; - public static final String DEFAULT_PORT = "1883"; - public static final String DEFAULT_CONN_TIME_OUT = "10"; - public static final String TCP_PROTOCOL = "TCP"; - public static final String SSL_PROTOCOL = "SSL"; - public static final String WS_PROTOCOL = "WS"; - public static final String WSS_PROTOCOL = "WSS"; - public static final String DEFAULT_PROTOCOL = TCP_PROTOCOL; - public static final String FUSESOURCE_MQTT_CLIENT_NAME = "fusesource"; - public static final String HIVEMQ_MQTT_CLIENT_NAME = "hivemq"; -// public static final String DEFAULT_MQTT_CLIENT_NAME = FUSESOURCE_MQTT_CLIENT_NAME; - public static final String DEFAULT_MQTT_CLIENT_NAME = HIVEMQ_MQTT_CLIENT_NAME; + String DEFAULT_SERVER = "127.0.0.1"; + String DEFAULT_MQTT_VERSION = "3.1"; + String DEFAULT_PORT = "1883"; + String DEFAULT_CONN_TIME_OUT = "10"; + String DEFAULT_MQTT_CONN_NAME = "mqttconn"; + String TCP_PROTOCOL = "TCP"; + String SSL_PROTOCOL = "SSL"; + String WS_PROTOCOL = "WS"; + String WSS_PROTOCOL = "WSS"; + String DEFAULT_PROTOCOL = TCP_PROTOCOL; + String FUSESOURCE_MQTT_CLIENT_NAME = "fusesource"; + String HIVEMQ_MQTT_CLIENT_NAME = "hivemq"; +// String DEFAULT_MQTT_CLIENT_NAME = FUSESOURCE_MQTT_CLIENT_NAME; + String DEFAULT_MQTT_CLIENT_NAME = HIVEMQ_MQTT_CLIENT_NAME; - public static final String JMETER_VARIABLE_PREFIX = "${"; + String JMETER_VARIABLE_PREFIX = "${"; - public static final String DEFAULT_TOPIC_NAME = "test_topic"; + String DEFAULT_TOPIC_NAME = "test_topic"; - public static final String DEFAULT_CONN_PREFIX_FOR_CONN = "conn_"; + String DEFAULT_CONN_PREFIX_FOR_CONN = "conn_"; - public static final String DEFAULT_CONN_KEEP_ALIVE = "300"; - public static final String DEFAULT_CONN_ATTAMPT_MAX = "0"; - public static final String DEFAULT_CONN_RECONN_ATTAMPT_MAX = "0"; + String DEFAULT_CONN_KEEP_ALIVE = "300"; + String DEFAULT_CONN_ATTEMPT_MAX = "0"; + String DEFAULT_CONN_RECONN_ATTEMPT_MAX = "0"; - public static final String DEFAULT_SAMPLE_VALUE_COUNT = "1"; - public static final String DEFAULT_SAMPLE_VALUE_ELAPSED_TIME_MILLI_SEC = "1000"; + String DEFAULT_SAMPLE_VALUE_COUNT = "1"; + String DEFAULT_SAMPLE_VALUE_COUNT_TIMEOUT = "5000"; + String DEFAULT_SAMPLE_VALUE_ELAPSED_TIME_MILLI_SEC = "1000"; - public static final String DEFAULT_MESSAGE_FIX_LENGTH = "1024"; + String DEFAULT_MESSAGE_FIX_LENGTH = "1024"; - public static final boolean DEFAULT_ADD_CLIENT_ID_SUFFIX = true; + boolean DEFAULT_ADD_CLIENT_ID_SUFFIX = true; - public static final int SUB_FAIL_PENALTY = 1000; // force to delay 1s if sub fails for whatever reason + int SUB_FAIL_PENALTY = 1000; // force to delay 1s if sub fails for whatever reason - public static final boolean DEFAULT_SUBSCRIBE_WHEN_CONNECTED = false; + boolean DEFAULT_SUBSCRIBE_WHEN_CONNECTED = false; } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/SubBean.java b/mqtt_jmeter/src/main/java/net/xmeter/SubBean.java index e6e791e..359d398 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/SubBean.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/SubBean.java @@ -8,7 +8,7 @@ public class SubBean { private int receivedCount = 0; private double avgElapsedTime = 0f; - private List contents = new ArrayList(); + private final List contents = new ArrayList<>(); public int getReceivedMessageSize() { return receivedMessageSize; @@ -37,8 +37,4 @@ public void setAvgElapsedTime(double avgElapsedTime) { public List getContents() { return contents; } - - public void setContents(List contents) { - this.contents = contents; - } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/Util.java b/mqtt_jmeter/src/main/java/net/xmeter/Util.java index 8fca1aa..5b2c2f4 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/Util.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/Util.java @@ -1,8 +1,8 @@ package net.xmeter; import java.io.File; -import java.io.FileInputStream; import java.io.InputStream; +import java.nio.file.Files; import java.security.KeyStore; import java.security.SecureRandom; import java.util.UUID; @@ -18,8 +18,8 @@ public class Util implements Constants { - private static SecureRandom random = new SecureRandom(); - private static char[] seeds = "abcdefghijklmnopqrstuvwxmy0123456789".toCharArray(); + private static final SecureRandom random = new SecureRandom(); + private static final char[] seeds = "abcdefghijklmnopqrstuvwxmy0123456789".toCharArray(); private static final Logger logger = Logger.getLogger(Util.class.getCanonicalName()); public static String generateClientId(String prefix) { @@ -49,7 +49,7 @@ public static SSLContext getContext(AbstractMQTTSampler sampler) throws Exceptio // File theFile1 = getKeyStoreFile(sampler); File theFile2 = getClientCertFile(sampler); - try(/*InputStream is_cacert = new FileInputStream(theFile1);*/InputStream is_client = new FileInputStream(theFile2)) { + try(/*InputStream is_cacert = Files.newInputStream(theFile1.toPath());*/InputStream is_client = Files.newInputStream(theFile2.toPath())) { // KeyStore tks = KeyStore.getInstance(KeyStore.getDefaultType()); // jks // tks.load(is_cacert, KEYSTORE_PASS.toCharArray()); @@ -68,9 +68,9 @@ public static SSLContext getContext(AbstractMQTTSampler sampler) throws Exceptio } } - public static File getKeyStoreFile(AbstractMQTTSampler sampler) { - return getFilePath(sampler.getKeyStoreFilePath()); - } +// public static File getKeyStoreFile(AbstractMQTTSampler sampler) { +// return getFilePath(sampler.getKeyStoreFilePath()); +// } public static File getClientCertFile(AbstractMQTTSampler sampler) { return getFilePath(sampler.getClientCertFilePath()); @@ -94,7 +94,7 @@ private static File getFilePath(String filePath) { } public static String generatePayload(int size) { - StringBuffer res = new StringBuffer(); + StringBuilder res = new StringBuilder(); for(int i = 0; i < size; i++) { res.append(seeds[random.nextInt(seeds.length - 1)]); } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java index 8759352..b842f35 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java @@ -33,17 +33,19 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{ private final JLabeledTextField serverAddr = new JLabeledTextField("Server name or IP:"); private final JLabeledTextField serverPort = new JLabeledTextField("Port number:", 5); - private JLabeledChoice mqttVersion = new JLabeledChoice("MQTT version:", new String[] { MQTT_VERSION_3_1, MQTT_VERSION_3_1_1 }, false, false);; + private final JLabeledChoice mqttVersion = new JLabeledChoice("MQTT version:", + new String[] { MQTT_VERSION_3_1, MQTT_VERSION_3_1_1 }, false, false); private final JLabeledTextField timeout = new JLabeledTextField("Timeout(s):", 5); - + private final JLabeledTextField connName = new JLabeledTextField("MQTT Conn Name:"); + private final JLabeledTextField userNameAuth = new JLabeledTextField("User name:"); private final JLabeledTextField passwordAuth = new JLabeledTextField("Password:"); private JLabeledChoice protocols; // private JLabeledChoice clientNames; - private JCheckBox dualAuth = new JCheckBox("Dual SSL authentication"); - private JLabeledTextField wsPath = new JLabeledTextField("WS Path: ", 10); + private final JCheckBox dualAuth = new JCheckBox("Dual SSL authentication"); + private final JLabeledTextField wsPath = new JLabeledTextField("WS Path: ", 10); // private final JLabeledTextField tksFilePath = new JLabeledTextField("Trust Key Store(*.jks): ", 25); private final JLabeledTextField ccFilePath = new JLabeledTextField("Client Certification(*.p12):", 25); @@ -57,12 +59,12 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{ private static final String CC_BROWSE = "cc_browse"; public final JLabeledTextField connNamePrefix = new JLabeledTextField("ClientId:", 8); - private JCheckBox connNameSuffix = new JCheckBox("Add random suffix for ClientId"); + private final JCheckBox connNameSuffix = new JCheckBox("Add random suffix for ClientId"); private final JLabeledTextField connKeepAlive = new JLabeledTextField("Keep alive(s):", 3); - private final JLabeledTextField connAttmptMax = new JLabeledTextField("Connect attampt max:", 3); - private final JLabeledTextField reconnAttmptMax = new JLabeledTextField("Reconnect attampt max:", 3); + private final JLabeledTextField connAttemptMax = new JLabeledTextField("Connect attempt max:", 3); + private final JLabeledTextField reconnAttemptMax = new JLabeledTextField("Reconnect attempt max:", 3); private final JLabeledTextField connCleanSession = new JLabeledTextField("Clean session:", 3); @@ -88,9 +90,11 @@ public JPanel createConnPanel() { public JPanel createConnOptions() { JPanel optsPanelCon = new VerticalPanel(); - optsPanelCon.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Connection options")); + optsPanelCon.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), + "Connection options")); JPanel optsPanel0 = new HorizontalPanel(); + optsPanel0.add(connName); optsPanel0.add(connNamePrefix); optsPanel0.add(connNameSuffix); connNameSuffix.setSelected(true); @@ -100,8 +104,8 @@ public JPanel createConnOptions() { optsPanel1.add(connKeepAlive); optsPanelCon.add(optsPanel1); - optsPanel1.add(connAttmptMax); - optsPanel1.add(reconnAttmptMax); + optsPanel1.add(connAttemptMax); + optsPanel1.add(reconnAttemptMax); optsPanel1.add(connCleanSession); optsPanelCon.add(optsPanel1); @@ -254,6 +258,7 @@ public void stateChanged(ChangeEvent e) { } public void configure(AbstractMQTTSampler sampler) { + connName.setText(sampler.getConnName()); serverAddr.setText(sampler.getServer()); serverPort.setText(sampler.getPort()); if(sampler.getMqttVersion().equals(MQTT_VERSION_3_1)) { @@ -270,7 +275,7 @@ public void configure(AbstractMQTTSampler sampler) { // clientNames.setText(sampler.getMqttClientName()); // } - if(sampler.getProtocol().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1) { + if(!sampler.getProtocol().trim().contains(JMETER_VARIABLE_PREFIX)) { List items = Arrays.asList(protocols.getItems()); int index = items.indexOf(sampler.getProtocol()); protocols.setSelectedIndex(index); @@ -296,17 +301,13 @@ public void configure(AbstractMQTTSampler sampler) { passwordAuth.setText(sampler.getPasswordAuth()); connNamePrefix.setText(sampler.getConnPrefix()); - if(sampler.isClientIdSuffix()) { - connNameSuffix.setSelected(true); - } else { - connNameSuffix.setSelected(false); - } + connNameSuffix.setSelected(sampler.isClientIdSuffix()); connKeepAlive.setText(sampler.getConnKeepAlive()); - connAttmptMax.setText(sampler.getConnAttamptMax()); - reconnAttmptMax.setText(sampler.getConnReconnAttamptMax()); + connAttemptMax.setText(sampler.getConnAttemptMax()); + reconnAttemptMax.setText(sampler.getConnReconnAttemptMax()); - connCleanSession.setText(sampler.getConnCleanSession().toString()); + connCleanSession.setText(sampler.getConnCleanSession()); } @@ -315,6 +316,7 @@ public void setupSamplerProperties(AbstractMQTTSampler sampler) { sampler.setPort(serverPort.getText()); sampler.setMqttVersion(mqttVersion.getText()); sampler.setConnTimeout(timeout.getText()); + sampler.setConnName(connName.getText()); // sampler.setMqttClientName(clientNames.getText()); sampler.setProtocol(protocols.getText()); @@ -332,19 +334,12 @@ public void setupSamplerProperties(AbstractMQTTSampler sampler) { sampler.setClientIdSuffix(connNameSuffix.isSelected()); sampler.setConnKeepAlive(connKeepAlive.getText()); - sampler.setConnAttamptMax(connAttmptMax.getText()); - sampler.setConnReconnAttamptMax(reconnAttmptMax.getText()); + sampler.setConnAttemptMax(connAttemptMax.getText()); + sampler.setConnReconnAttemptMax(reconnAttemptMax.getText()); sampler.setConnCleanSession(connCleanSession.getText()); } - public static int parseInt(String value) { - if(value == null || "".equals(value.trim())) { - return 0; - } - return Integer.parseInt(value); - } - public void clearUI() { serverAddr.setText(DEFAULT_SERVER); serverPort.setText(DEFAULT_PORT); @@ -365,12 +360,13 @@ public void clearUI() { userNameAuth.setText(""); passwordAuth.setText(""); + connName.setText(DEFAULT_MQTT_CONN_NAME); connNamePrefix.setText(DEFAULT_CONN_PREFIX_FOR_CONN); connNameSuffix.setSelected(true); - connAttmptMax.setText(DEFAULT_CONN_ATTAMPT_MAX); + connAttemptMax.setText(DEFAULT_CONN_ATTEMPT_MAX); connKeepAlive.setText(DEFAULT_CONN_KEEP_ALIVE); - reconnAttmptMax.setText(DEFAULT_CONN_RECONN_ATTAMPT_MAX); + reconnAttemptMax.setText(DEFAULT_CONN_RECONN_ATTEMPT_MAX); connCleanSession.setText("true"); } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/ConnectSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/ConnectSamplerUI.java index 39ba9c3..f9c4413 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/ConnectSamplerUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/ConnectSamplerUI.java @@ -12,7 +12,7 @@ import net.xmeter.samplers.ConnectSampler; public class ConnectSamplerUI extends AbstractSamplerGui implements Constants { - private CommonConnUI connUI = new CommonConnUI(); + private final CommonConnUI connUI = new CommonConnUI(); private static final long serialVersionUID = 1666890646673145131L; public ConnectSamplerUI() { diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/DisConnectSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/DisConnectSamplerUI.java index 978cedd..b59cf6e 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/DisConnectSamplerUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/DisConnectSamplerUI.java @@ -1,19 +1,21 @@ package net.xmeter.gui; import java.awt.BorderLayout; +import javax.swing.*; -import javax.swing.JPanel; +import net.xmeter.Constants; +import net.xmeter.samplers.DisConnectSampler; +import org.apache.jmeter.gui.util.HorizontalPanel; import org.apache.jmeter.gui.util.VerticalPanel; import org.apache.jmeter.samplers.gui.AbstractSamplerGui; import org.apache.jmeter.testelement.TestElement; - -import net.xmeter.Constants; -import net.xmeter.samplers.DisConnectSampler; +import org.apache.jorphan.gui.JLabeledTextField; public class DisConnectSamplerUI extends AbstractSamplerGui implements Constants { private static final long serialVersionUID = 1666890646673145131L; - + private final JLabeledTextField connName = new JLabeledTextField("MQTT Conn Name:"); + public DisConnectSamplerUI() { this.init(); } @@ -23,12 +25,17 @@ private void init() { setBorder(makeBorder()); add(makeTitlePanel(), BorderLayout.NORTH); JPanel mainPanel = new VerticalPanel(); + JPanel optsPanel = new HorizontalPanel(); + mainPanel.add(optsPanel); + mainPanel.add(createConnOptions()); add(mainPanel, BorderLayout.CENTER); } @Override public void configure(TestElement element) { super.configure(element); + DisConnectSampler sampler = (DisConnectSampler) element; + this.connName.setText(sampler.getConnName()); } @Override @@ -43,6 +50,17 @@ public String getLabelResource() { throw new RuntimeException(); } + public JPanel createConnOptions() { + JPanel optsPanelCon = new VerticalPanel(); + optsPanelCon.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Connection options")); + + JPanel optsPanel0 = new HorizontalPanel(); + optsPanel0.add(connName); + optsPanelCon.add(optsPanel0); + + return optsPanelCon; + } + @Override public String getStaticLabel() { return "MQTT DisConnect"; @@ -52,11 +70,13 @@ public String getStaticLabel() { public void modifyTestElement(TestElement arg0) { DisConnectSampler sampler = (DisConnectSampler)arg0; this.configureTestElement(sampler); + sampler.setConnName(this.connName.getText()); } @Override public void clearGui() { super.clearGui(); + this.connName.setText(DEFAULT_MQTT_CONN_NAME); } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientConnectSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientConnectSamplerUI.java index da72e2a..abbd5a7 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientConnectSamplerUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientConnectSamplerUI.java @@ -21,9 +21,9 @@ public class EfficientConnectSamplerUI extends AbstractSamplerGui implements Con private static final long serialVersionUID = 1666890646673145131L; private static final Logger logger = Logger.getLogger(SubSamplerUI.class.getCanonicalName()); - private CommonConnUI connUI = new CommonConnUI(); - private JCheckBox shouldSub = new JCheckBox("Subscribe when connected"); - private JLabeledTextField connCapacity = new JLabeledTextField("Connection capacity:");; + private final CommonConnUI connUI = new CommonConnUI(); + private final JCheckBox shouldSub = new JCheckBox("Subscribe when connected"); + private final JLabeledTextField connCapacity = new JLabeledTextField("Connection capacity:"); private JLabeledChoice qosChoice; private final JLabeledTextField topicNames = new JLabeledTextField("Topic name(s):"); @@ -83,7 +83,7 @@ public void configure(TestElement element) { connUI.configure(sampler); // shouldSub.setSelected(sampler.isSubWhenConnected()); - if(sampler.getQOS().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1){ + if(!sampler.getQOS().trim().contains(JMETER_VARIABLE_PREFIX)){ this.qosChoice.setSelectedIndex(Integer.parseInt(sampler.getQOS())); } else { this.qosChoice.setText(sampler.getQOS()); @@ -104,7 +104,7 @@ public TestElement createTestElement() { private void setupSamplerProperties(EfficientConnectSampler sampler) { sampler.setSubWhenConnected(shouldSub.isSelected()); - if(this.qosChoice.getText().indexOf(JMETER_VARIABLE_PREFIX) == -1) { + if(!this.qosChoice.getText().contains(JMETER_VARIABLE_PREFIX)) { int qos = QOS_0; try { qos = Integer.parseInt(this.qosChoice.getText()); @@ -114,7 +114,6 @@ private void setupSamplerProperties(EfficientConnectSampler sampler) { } } catch (Exception ex) { logger.info("Invalid QoS value, set to default QoS value 0."); - qos = QOS_0; } sampler.setQOS(String.valueOf(qos)); } else { diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java index fa3e714..a0a7253 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java @@ -4,11 +4,15 @@ import java.util.logging.Logger; import javax.swing.BorderFactory; +import javax.swing.JLabel; import javax.swing.JCheckBox; import javax.swing.JPanel; import javax.swing.event.ChangeEvent; import javax.swing.event.ChangeListener; +import net.xmeter.Constants; +import net.xmeter.samplers.PubSampler; + import org.apache.jmeter.gui.util.HorizontalPanel; import org.apache.jmeter.gui.util.JSyntaxTextArea; import org.apache.jmeter.gui.util.JTextScrollPane; @@ -18,22 +22,21 @@ import org.apache.jorphan.gui.JLabeledChoice; import org.apache.jorphan.gui.JLabeledTextField; -import net.xmeter.Constants; -import net.xmeter.samplers.PubSampler; - public class PubSamplerUI extends AbstractSamplerGui implements Constants, ChangeListener { private static final long serialVersionUID = 2479085966683186422L; private static final Logger logger = Logger.getLogger(PubSamplerUI.class.getCanonicalName()); + private static final JLabel qosLabel = new JLabel("QOS Level:"); + private final JLabeledTextField connName = new JLabeledTextField("MQTT Conn Name:"); private JLabeledChoice qosChoice; private final JLabeledTextField retainedMsg = new JLabeledTextField("Retained messages:", 1); private final JLabeledTextField topicName = new JLabeledTextField("Topic name:"); - private JCheckBox timestamp = new JCheckBox("Add timestamp in payload"); + private final JCheckBox timestamp = new JCheckBox("Add timestamp in payload"); private JLabeledChoice messageTypes; private final JSyntaxTextArea sendMessage = JSyntaxTextArea.getInstance(10, 50); private final JTextScrollPane messagePanel = JTextScrollPane.getInstance(sendMessage); - private JLabeledTextField stringLength = new JLabeledTextField("Length:"); + private final JLabeledTextField stringLength = new JLabeledTextField("Length:"); public PubSamplerUI() { init(); @@ -46,9 +49,9 @@ private void init() { add(makeTitlePanel(), BorderLayout.NORTH); JPanel mainPanel = new VerticalPanel(); add(mainPanel, BorderLayout.CENTER); - mainPanel.add(createPubOption()); mainPanel.add(createPayload()); + mainPanel.add(createConnOptions()); } private JPanel createPubOption() { @@ -59,6 +62,7 @@ private JPanel createPubOption() { qosChoice.addChangeListener(this); JPanel optsPanel = new HorizontalPanel(); + optsPanel.add(qosLabel); optsPanel.add(qosChoice); optsPanel.add(retainedMsg); optsPanel.add(topicName); @@ -91,6 +95,17 @@ private JPanel createPayload() { return optsPanelCon; } + public JPanel createConnOptions() { + JPanel optsPanelCon = new VerticalPanel(); + optsPanelCon.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Connection options")); + + JPanel optsPanel0 = new HorizontalPanel(); + optsPanel0.add(connName); + optsPanelCon.add(optsPanel0); + + return optsPanelCon; + } + @Override public String getStaticLabel() { return "MQTT Pub Sampler"; @@ -128,8 +143,9 @@ public TestElement createTestElement() { public void configure(TestElement element) { super.configure(element); PubSampler sampler = (PubSampler) element; - - if(sampler.getQOS().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1){ + + this.connName.setText(sampler.getConnName()); + if(!sampler.getQOS().trim().contains(JMETER_VARIABLE_PREFIX)){ this.qosChoice.setSelectedIndex(Integer.parseInt(sampler.getQOS())); } else { this.qosChoice.setText(sampler.getQOS()); @@ -159,9 +175,10 @@ public void modifyTestElement(TestElement arg0) { private void setupSamplerProperties(PubSampler sampler) { this.configureTestElement(sampler); + sampler.setConnName(this.connName.getText()); sampler.setTopic(this.topicName.getText()); - if(this.qosChoice.getText().indexOf(JMETER_VARIABLE_PREFIX) == -1) { + if(!this.qosChoice.getText().contains(JMETER_VARIABLE_PREFIX)) { int qos = QOS_0; try { qos = Integer.parseInt(this.qosChoice.getText()); @@ -171,7 +188,6 @@ private void setupSamplerProperties(PubSampler sampler) { } } catch (Exception ex) { logger.info("Invalid QoS value, set to default QoS value 0."); - qos = QOS_0; } sampler.setQOS(String.valueOf(qos)); } else { @@ -188,12 +204,13 @@ private void setupSamplerProperties(PubSampler sampler) { @Override public void clearGui() { super.clearGui(); + this.connName.setText(DEFAULT_MQTT_CONN_NAME); this.topicName.setText(DEFAULT_TOPIC_NAME); this.qosChoice.setText(String.valueOf(QOS_0)); this.timestamp.setSelected(false); this.messageTypes.setSelectedIndex(0); - this.stringLength.setText(String.valueOf(DEFAULT_MESSAGE_FIX_LENGTH)); + this.stringLength.setText(DEFAULT_MESSAGE_FIX_LENGTH); this.sendMessage.setText(""); } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/SubSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/SubSamplerUI.java index c7e7625..a7d4f72 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/SubSamplerUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/SubSamplerUI.java @@ -2,10 +2,10 @@ import java.awt.BorderLayout; import java.util.logging.Logger; - import javax.swing.BorderFactory; import javax.swing.JCheckBox; import javax.swing.JPanel; +import javax.swing.JLabel; import javax.swing.event.ChangeEvent; import javax.swing.event.ChangeListener; @@ -22,15 +22,18 @@ public class SubSamplerUI extends AbstractSamplerGui implements Constants, ChangeListener{ private static final long serialVersionUID = 1715399546099472610L; private static final Logger logger = Logger.getLogger(SubSamplerUI.class.getCanonicalName()); - + private static final JLabel qosLabel = new JLabel("QoS Level:"); + private final JLabeledTextField connName = new JLabeledTextField("MQTT Conn Name:"); private JLabeledChoice qosChoice; + private static final JLabel sampleOnLabel = new JLabel("Sample on:"); private JLabeledChoice sampleOnCondition; private final JLabeledTextField sampleConditionValue = new JLabeledTextField(""); + private final JLabeledTextField sampleConditionValue2 = new JLabeledTextField("Timeout (ms):"); private final JLabeledTextField topicNames = new JLabeledTextField("Topic name(s):"); - private JCheckBox debugResponse = new JCheckBox("Debug response"); - private JCheckBox timestamp = new JCheckBox("Payload includes timestamp"); + private final JCheckBox debugResponse = new JCheckBox("Debug response"); + private final JCheckBox timestamp = new JCheckBox("Payload includes timestamp"); public SubSamplerUI() { this.init(); @@ -45,6 +48,7 @@ private void init() { add(mainPanel, BorderLayout.CENTER); mainPanel.add(createSubOption()); + mainPanel.add(createConnOptions()); } private JPanel createSubOption() { @@ -55,6 +59,7 @@ private JPanel createSubOption() { sampleOnCondition = new JLabeledChoice("Sample on:", new String[] {SAMPLE_ON_CONDITION_OPTION1, SAMPLE_ON_CONDITION_OPTION2}); JPanel optsPanel1 = new HorizontalPanel(); + optsPanel1.add(qosLabel); optsPanel1.add(qosChoice); optsPanel1.add(topicNames); topicNames.setToolTipText("A list of topics to be subscribed to, comma-separated."); @@ -63,10 +68,14 @@ private JPanel createSubOption() { JPanel optsPanel3 = new HorizontalPanel(); sampleOnCondition.addChangeListener(this); + optsPanel3.add(sampleOnLabel); optsPanel3.add(sampleOnCondition); optsPanel3.add(sampleConditionValue); + optsPanel3.add(sampleConditionValue2); sampleOnCondition.setToolTipText("When sub sampler should report out."); sampleConditionValue.setToolTipText("Please specify an integer value great than 0, other values will be ignored."); + sampleConditionValue2.setToolTipText("Timeout in sec"); + sampleConditionValue2.setEnabled(false); optsPanelCon.add(optsPanel3); JPanel optsPanel2 = new HorizontalPanel(); @@ -75,7 +84,16 @@ private JPanel createSubOption() { return optsPanelCon; } - + public JPanel createConnOptions() { + JPanel optsPanelCon = new VerticalPanel(); + optsPanelCon.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Connection options")); + + JPanel optsPanel0 = new HorizontalPanel(); + optsPanel0.add(connName); + optsPanelCon.add(optsPanel0); + + return optsPanelCon; + } @Override public String getStaticLabel() { return "MQTT Sub Sampler"; @@ -93,21 +111,25 @@ public void configure(TestElement element) { super.configure(element); SubSampler sampler = (SubSampler) element; - if(sampler.getQOS().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1){ + this.connName.setText(sampler.getConnName()); + if(!sampler.getQOS().trim().contains(JMETER_VARIABLE_PREFIX)){ this.qosChoice.setSelectedIndex(Integer.parseInt(sampler.getQOS())); } else { this.qosChoice.setText(sampler.getQOS()); } - this.topicNames.setText(sampler.getTopics()); + this.topicNames.setText(sampler.getTopic()); this.timestamp.setSelected(sampler.isAddTimestamp()); this.debugResponse.setSelected(sampler.isDebugResponse()); this.sampleOnCondition.setText(sampler.getSampleCondition()); - if(SAMPLE_ON_CONDITION_OPTION1.equalsIgnoreCase(sampleOnCondition.getText())) { + if (SAMPLE_ON_CONDITION_OPTION1.equalsIgnoreCase(sampleOnCondition.getText())) { this.sampleConditionValue.setText(sampler.getSampleElapsedTime()); - } else if(SAMPLE_ON_CONDITION_OPTION2.equalsIgnoreCase(sampleOnCondition.getText())) { + this.sampleConditionValue2.setEnabled(false); + } else if (SAMPLE_ON_CONDITION_OPTION2.equalsIgnoreCase(sampleOnCondition.getText())) { this.sampleConditionValue.setText(sampler.getSampleCount()); + this.sampleConditionValue2.setEnabled(true); + this.sampleConditionValue2.setText(sampler.getSampleCountTimeout()); } } @@ -124,9 +146,10 @@ public void modifyTestElement(TestElement arg0) { private void setupSamplerProperties(SubSampler sampler) { this.configureTestElement(sampler); - sampler.setTopics(this.topicNames.getText()); + sampler.setConnName(this.connName.getText()); + sampler.setTopic(this.topicNames.getText()); - if(this.qosChoice.getText().indexOf(JMETER_VARIABLE_PREFIX) == -1) { + if(!this.qosChoice.getText().contains(JMETER_VARIABLE_PREFIX)) { int qos = QOS_0; try { qos = Integer.parseInt(this.qosChoice.getText()); @@ -136,7 +159,6 @@ private void setupSamplerProperties(SubSampler sampler) { } } catch (Exception ex) { logger.info("Invalid QoS value, set to default QoS value 0."); - qos = QOS_0; } sampler.setQOS(String.valueOf(qos)); } else { @@ -151,29 +173,35 @@ private void setupSamplerProperties(SubSampler sampler) { sampler.setSampleElapsedTime(this.sampleConditionValue.getText()); } else if(SAMPLE_ON_CONDITION_OPTION2.equalsIgnoreCase(sampleOnCondition.getText())) { sampler.setSampleCount(this.sampleConditionValue.getText()); + sampler.setSampleCountTimeout(this.sampleConditionValue2.getText()); } } @Override public void clearGui() { super.clearGui(); + this.connName.setText(DEFAULT_MQTT_CONN_NAME); this.topicNames.setText(DEFAULT_TOPIC_NAME); this.qosChoice.setText(String.valueOf(QOS_0)); this.timestamp.setSelected(false); this.debugResponse.setSelected(false); this.sampleOnCondition.setText(SAMPLE_ON_CONDITION_OPTION1); this.sampleConditionValue.setText(DEFAULT_SAMPLE_VALUE_ELAPSED_TIME_MILLI_SEC); + this.sampleConditionValue2.setEnabled(false); + this.sampleConditionValue2.setText(DEFAULT_SAMPLE_VALUE_COUNT_TIMEOUT); } @Override public void stateChanged(ChangeEvent e) { if(this.sampleOnCondition == e.getSource()) { - if(SAMPLE_ON_CONDITION_OPTION1.equalsIgnoreCase(sampleOnCondition.getText())) { + if (SAMPLE_ON_CONDITION_OPTION1.equalsIgnoreCase(sampleOnCondition.getText())) { sampleConditionValue.setText(DEFAULT_SAMPLE_VALUE_ELAPSED_TIME_MILLI_SEC); - } else if(SAMPLE_ON_CONDITION_OPTION2.equalsIgnoreCase(sampleOnCondition.getText())) { + sampleConditionValue2.setEnabled(false); + } else if (SAMPLE_ON_CONDITION_OPTION2.equalsIgnoreCase(sampleOnCondition.getText())) { sampleConditionValue.setText(DEFAULT_SAMPLE_VALUE_COUNT); + sampleConditionValue2.setText(DEFAULT_SAMPLE_VALUE_COUNT_TIMEOUT); + sampleConditionValue2.setEnabled(true); } } - } - + } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java index afbc3b7..39dc6d3 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java @@ -4,9 +4,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.jmeter.samplers.AbstractSampler; - import net.xmeter.Constants; +import org.apache.jmeter.samplers.AbstractSampler; public abstract class AbstractMQTTSampler extends AbstractSampler implements Constants { /** @@ -17,12 +16,20 @@ public abstract class AbstractMQTTSampler extends AbstractSampler implements Con // protected static final String LABEL_PREFIX = "xmeter-mqtt-batch-con-mode-"; // protected boolean useEfficientCon = Boolean.parseBoolean(System.getProperty("batchCon")); - protected boolean useEfficientCon = true; +// protected boolean useEfficientCon = true; // protected static int conCapacity = 1; // - protected static Map> topicSubscribed = new ConcurrentHashMap<>(); + protected static Map> topicsSubscribed = new ConcurrentHashMap<>(); + + public String getConnName() { + return getPropertyAsString(MQTT_CONN_NAME, DEFAULT_MQTT_CONN_NAME); + } + + public void setConnName(String connName) { + setProperty(MQTT_CONN_NAME, connName); + } public String getServer() { return getPropertyAsString(SERVER, DEFAULT_SERVER); @@ -112,7 +119,6 @@ public void setClientCertPassword(String clientCertPassword) { this.setProperty(KEY_FILE_PWD2, clientCertPassword); } - public String getConnPrefix() { return getPropertyAsString(CONN_CLIENT_ID_PREFIX, DEFAULT_CONN_PREFIX_FOR_CONN); } @@ -137,20 +143,20 @@ public void setClientIdSuffix(boolean clientIdSuffix) { setProperty(CONN_CLIENT_ID_SUFFIX, clientIdSuffix); } - public String getConnAttamptMax() { - return getPropertyAsString(CONN_ATTAMPT_MAX, DEFAULT_CONN_ATTAMPT_MAX); + public String getConnAttemptMax() { + return getPropertyAsString(CONN_ATTEMPT_MAX, DEFAULT_CONN_ATTEMPT_MAX); } - public void setConnAttamptMax(String connAttamptMax) { - setProperty(CONN_ATTAMPT_MAX, connAttamptMax); + public void setConnAttemptMax(String connAttemptMax) { + setProperty(CONN_ATTEMPT_MAX, connAttemptMax); } - public String getConnReconnAttamptMax() { - return getPropertyAsString(CONN_RECONN_ATTAMPT_MAX, DEFAULT_CONN_RECONN_ATTAMPT_MAX); + public String getConnReconnAttemptMax() { + return getPropertyAsString(CONN_RECONN_ATTEMPT_MAX, DEFAULT_CONN_RECONN_ATTEMPT_MAX); } - public void setConnReconnAttamptMax(String connReconnAttamptMax) { - setProperty(CONN_RECONN_ATTAMPT_MAX, connReconnAttamptMax); + public void setConnReconnAttemptMax(String connReconnAttemptMax) { + setProperty(CONN_RECONN_ATTEMPT_MAX, connReconnAttemptMax); } public String getUserNameAuth() { @@ -178,11 +184,11 @@ public String getConnCleanSession() { } public void setTopicSubscribed(String clientId, Set topics) { - topicSubscribed.put(clientId, topics); + topicsSubscribed.put(clientId, topics); } public void removeTopicSubscribed(String clientId) { - topicSubscribed.remove(clientId); + topicsSubscribed.remove(clientId); } public String getLabelPrefix() { @@ -197,9 +203,9 @@ public String getMqttClientName() { return getPropertyAsString(MQTT_CLIENT_NAME, DEFAULT_MQTT_CLIENT_NAME); } - public void setMqttClientName(String mqttClientName) { - setProperty(MQTT_CLIENT_NAME, mqttClientName); - } +// public void setMqttClientName(String mqttClientName) { +// setProperty(MQTT_CLIENT_NAME, mqttClientName); +// } // public int getConCapacity() { // return conCapacity; diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java index 1b36afd..861e6ba 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java @@ -22,23 +22,25 @@ public class ConnectSampler extends AbstractMQTTSampler { private static final Logger logger = Logger.getLogger(ConnectSampler.class.getCanonicalName()); private transient MQTTClient client; - private transient MQTTConnection connection; @Override public SampleResult sample(Entry entry) { SampleResult result = new SampleResult(); result.setSampleLabel(getName()); - + JMeterVariables vars = JMeterContextService.getContext().getVariables(); - connection = (MQTTConnection) vars.getObject("conn"); + MQTTConnection connection = (MQTTConnection) vars.getObject(getConnName()); + String clientId = (String) vars.getObject(getConnName()+"_clientId"); if (connection != null) { - result.sampleStart(); - result.setSuccessful(false); - result.setResponseMessage(MessageFormat.format("Connection {0} is already established.", connection)); - result.setResponseData("Failed. Connection is already established.".getBytes()); - result.setResponseCode("500"); - result.sampleEnd(); // avoid endtime=0 exposed in trace log - return result; + try { + logger.info(MessageFormat.format("Disconnect connection {0} ({1}).", connection, getConnName())); + connection.disconnect(); + } catch (Exception e) { + logger.log(Level.SEVERE, MessageFormat.format("Failed to disconnect connection {0} ({1}).", connection, getConnName()), e); + } finally { + vars.remove(getConnName()); // clean up thread local var as well + removeTopicSubscribed(clientId); + } } ConnectionParameters parameters = new ConnectionParameters(); @@ -52,7 +54,6 @@ public SampleResult sample(Entry entry) { parameters.setPath(getWsPath()); } - String clientId; if(isClientIdSuffix()) { clientId = Util.generateClientId(getConnPrefix()); } else { @@ -60,8 +61,8 @@ public SampleResult sample(Entry entry) { } parameters.setClientId(clientId); - parameters.setConnectMaxAttempts(Integer.parseInt(getConnAttamptMax())); - parameters.setReconnectMaxAttempts(Integer.parseInt(getConnReconnAttamptMax())); + parameters.setConnectMaxAttempts(Integer.parseInt(getConnAttemptMax())); + parameters.setReconnectMaxAttempts(Integer.parseInt(getConnReconnAttemptMax())); if (!"".equals(getUserNameAuth().trim())) { parameters.setUsername(getUserNameAuth()); @@ -76,7 +77,7 @@ public SampleResult sample(Entry entry) { parameters.setSsl(ssl); } } catch (Exception e) { - logger.log(Level.SEVERE, "Failed to establish Connection " + connection , e); + logger.log(Level.SEVERE, "Failed to establish Connection " + connection, e); result.setSuccessful(false); result.setResponseMessage(MessageFormat.format("Failed to establish Connection {0}. Please check SSL authentication info.", connection)); result.setResponseData("Failed to establish Connection. Please check SSL authentication info.".getBytes()); @@ -92,9 +93,9 @@ public SampleResult sample(Entry entry) { result.sampleEnd(); if (connection.isConnectionSucc()) { - vars.putObject("conn", connection); // save connection object as thread local variable !! - vars.putObject("clientId", client.getClientId()); //save client id as thread local variable - topicSubscribed.put(client.getClientId(), new HashSet<>()); + vars.putObject(getConnName(), connection); // save connection object as thread local variable !! + vars.putObject(getConnName()+"_clientId", client.getClientId()); //save client id as thread local variable + setTopicSubscribed(client.getClientId(), new HashSet<>()); result.setSuccessful(true); result.setResponseData("Successful.".getBytes()); result.setResponseMessage(MessageFormat.format("Connection {0} established successfully.", connection)); @@ -103,11 +104,11 @@ public SampleResult sample(Entry entry) { result.setSuccessful(false); result.setResponseMessage(MessageFormat.format("Failed to establish Connection {0}.", connection)); result.setResponseData(MessageFormat.format("Client [{0}] failed. Couldn't establish connection.", - client.getClientId()).getBytes()); + client.getClientId()).getBytes()); result.setResponseCode("501"); } } catch (Exception e) { - logger.log(Level.SEVERE, "Failed to establish Connection " + connection , e); + logger.log(Level.SEVERE, "Failed to establish Connection " + connection, e); if (result.getEndTime() == 0) result.sampleEnd(); //avoid re-enter sampleEnd() result.setSuccessful(false); result.setResponseMessage(MessageFormat.format("Failed to establish Connection {0}.", connection)); diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/DisConnectSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/DisConnectSampler.java index fc55d8d..e0e09db 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/DisConnectSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/DisConnectSampler.java @@ -4,27 +4,24 @@ import java.util.logging.Level; import java.util.logging.Logger; +import net.xmeter.samplers.mqtt.MQTTConnection; import org.apache.jmeter.samplers.Entry; import org.apache.jmeter.samplers.SampleResult; import org.apache.jmeter.threads.JMeterContextService; import org.apache.jmeter.threads.JMeterVariables; -import net.xmeter.samplers.mqtt.MQTTConnection; - public class DisConnectSampler extends AbstractMQTTSampler { private static final long serialVersionUID = 4360869021667126983L; private static final Logger logger = Logger.getLogger(DisConnectSampler.class.getCanonicalName()); - private transient MQTTConnection connection = null; - @Override public SampleResult sample(Entry entry) { SampleResult result = new SampleResult(); result.setSampleLabel(getName()); JMeterVariables vars = JMeterContextService.getContext().getVariables(); - connection = (MQTTConnection) vars.getObject("conn"); - String clientId = (String) vars.getObject("clientId"); + MQTTConnection connection = (MQTTConnection) vars.getObject(getConnName()); + String clientId = (String) vars.getObject(getConnName()+"_clientId"); if (connection == null) { result.sampleStart(); result.setSuccessful(false); @@ -37,25 +34,27 @@ public SampleResult sample(Entry entry) { try { result.sampleStart(); - - if (connection != null) { - logger.info(MessageFormat.format("Disconnect connection {0}.", connection)); + + logger.info(MessageFormat.format("Disconnect connection {0} ({1}).", connection, getConnName())); + try { connection.disconnect(); - vars.remove("conn"); // clean up thread local var as well - topicSubscribed.remove(clientId); + } catch (Exception e) { + logger.log(Level.SEVERE, MessageFormat.format("Failed to disconnect connection {0} ({1}).", connection, getConnName()), e); + } finally { + vars.remove(getConnName()); // clean up thread local var as well + topicsSubscribed.remove(clientId); } - + result.sampleEnd(); - result.setSuccessful(true); result.setResponseData("Successful.".getBytes()); result.setResponseMessage(MessageFormat.format("Connection {0} disconnected.", connection)); result.setResponseCodeOK(); } catch (Exception e) { - logger.log(Level.SEVERE, "Failed to disconnect Connection" + connection, e); + logger.log(Level.SEVERE, MessageFormat.format("Failed to disconnect Connection {0} ({1}).", connection, getConnName()), e); if (result.getEndTime() == 0) result.sampleEnd(); //avoid re-enter sampleEnd() result.setSuccessful(false); - result.setResponseMessage(MessageFormat.format("Failed to disconnect Connection {0}.", connection)); + result.setResponseMessage(MessageFormat.format("Failed to disconnect Connection {0} ({1}).", connection, getConnName())); result.setResponseData(MessageFormat.format("Client [{0}] failed. Couldn't disconnect connection.", (clientId == null ? "null" : clientId)).getBytes()); result.setResponseCode("501"); } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientConnectSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientConnectSampler.java index 556d6b2..b072794 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientConnectSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientConnectSampler.java @@ -5,12 +5,12 @@ import java.util.logging.Level; import java.util.logging.Logger; +import net.xmeter.Util; import org.apache.jmeter.samplers.Entry; import org.apache.jmeter.samplers.SampleResult; import org.apache.jmeter.threads.JMeterContextService; import org.apache.jmeter.threads.JMeterVariables; -import net.xmeter.Util; import net.xmeter.samplers.mqtt.ConnectionParameters; import net.xmeter.samplers.mqtt.MQTT; import net.xmeter.samplers.mqtt.MQTTClient; @@ -151,8 +151,8 @@ private MQTTClient createMqttInstance(String clientId) throws Exception { parameters.setVersion(getMqttVersion()); parameters.setKeepAlive((short) Integer.parseInt(getConnKeepAlive())); - parameters.setConnectMaxAttempts(Integer.parseInt(getConnAttamptMax())); - parameters.setReconnectMaxAttempts(Integer.parseInt(getConnReconnAttamptMax())); + parameters.setConnectMaxAttempts(Integer.parseInt(getConnAttemptMax())); + parameters.setReconnectMaxAttempts(Integer.parseInt(getConnReconnAttemptMax())); // System.out.println("!!max reconnect:" + mqtt.getReconnectAttemptsMax()); if (!"".equals(getUserNameAuth().trim())) { diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientDisConnectSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientDisConnectSampler.java index e33502d..d6a54b7 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientDisConnectSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientDisConnectSampler.java @@ -5,13 +5,12 @@ import java.util.logging.Level; import java.util.logging.Logger; +import net.xmeter.samplers.mqtt.MQTTConnection; import org.apache.jmeter.samplers.Entry; import org.apache.jmeter.samplers.SampleResult; import org.apache.jmeter.threads.JMeterContextService; import org.apache.jmeter.threads.JMeterVariables; -import net.xmeter.samplers.mqtt.MQTTConnection; - public class EfficientDisConnectSampler extends AbstractMQTTSampler { private static final long serialVersionUID = 4360869021667126983L; private static final Logger logger = Logger.getLogger(EfficientDisConnectSampler.class.getCanonicalName()); diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubCallback.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubCallback.java index cc0e4fa..4b7b99e 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubCallback.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubCallback.java @@ -10,14 +10,12 @@ public class PubCallback implements Callback{ private static final Logger logger = Logger.getLogger(PubCallback.class.getCanonicalName()); private boolean successful = false; - private Object pubLock; + private final Object pubLock; private String errorMessage = ""; - private QoS qos; - + public PubCallback(Object pubLock, QoS qos) { this.pubLock = pubLock; - this.qos = qos; - if(this.qos == QoS.AT_MOST_ONCE) { + if(qos == QoS.AT_MOST_ONCE) { this.successful = true; } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java index fb86b04..a696a2d 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java @@ -6,25 +6,22 @@ import javax.xml.bind.DatatypeConverter; -import org.apache.jmeter.samplers.Entry; -import org.apache.jmeter.samplers.SampleResult; -import org.apache.jmeter.threads.JMeterContextService; -import org.apache.jmeter.threads.JMeterVariables; - import net.xmeter.Util; import net.xmeter.samplers.mqtt.MQTTConnection; import net.xmeter.samplers.mqtt.MQTTPubResult; import net.xmeter.samplers.mqtt.MQTTQoS; +import org.apache.jmeter.samplers.Entry; +import org.apache.jmeter.samplers.SampleResult; +import org.apache.jmeter.threads.JMeterContextService; +import org.apache.jmeter.threads.JMeterVariables; public class PubSampler extends AbstractMQTTSampler { private static final long serialVersionUID = 4312341622759500786L; private static final Logger logger = Logger.getLogger(PubSampler.class.getCanonicalName()); - - private transient MQTTConnection connection = null; + private String payload = null; private MQTTQoS qos_enum = MQTTQoS.AT_MOST_ONCE; private String topicName = ""; - private boolean retainedMsg = false; public String getQOS() { return getPropertyAsString(QOS_LEVEL, String.valueOf(QOS_0)); @@ -92,8 +89,8 @@ public SampleResult sample(Entry arg0) { result.setSampleLabel(getName()); JMeterVariables vars = JMeterContextService.getContext().getVariables(); - connection = (MQTTConnection) vars.getObject("conn"); - String clientId = (String) vars.getObject("clientId"); + MQTTConnection connection = (MQTTConnection) vars.getObject(getConnName()); + String clientId = (String) vars.getObject(getConnName()+"_clientId"); if (connection == null) { result.sampleStart(); result.setSuccessful(false); @@ -103,6 +100,15 @@ public SampleResult sample(Entry arg0) { result.sampleEnd(); // avoid endtime=0 exposed in trace log return result; } + if (!connection.isConnectionSucc()) { + result.sampleStart(); + result.setSuccessful(false); + result.setResponseMessage("Publish: Connection is broken."); + result.setResponseData("Publish failed because connection is broken and cannot be used.".getBytes()); + result.setResponseCode("500"); + result.sampleEnd(); + return result; + } byte[] toSend = new byte[]{}; try { @@ -136,7 +142,7 @@ public SampleResult sample(Entry arg0) { } topicName = getTopic(); - retainedMsg = getRetainedMessage(); + boolean retainedMsg = getRetainedMessage(); if (isAddTimestamp()) { byte[] timePrefix = (System.currentTimeMillis() + TIME_STAMP_SEP_FLAG).getBytes(); toSend = new byte[timePrefix.length + tmp.length]; diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java index 3b336d6..7e58c3d 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java @@ -9,15 +9,14 @@ import java.util.logging.Level; import java.util.logging.Logger; +import net.xmeter.SubBean; +import net.xmeter.samplers.mqtt.MQTTConnection; +import net.xmeter.samplers.mqtt.MQTTQoS; import org.apache.jmeter.samplers.Entry; import org.apache.jmeter.samplers.SampleResult; import org.apache.jmeter.threads.JMeterContextService; import org.apache.jmeter.threads.JMeterVariables; -import net.xmeter.SubBean; -import net.xmeter.samplers.mqtt.MQTTConnection; -import net.xmeter.samplers.mqtt.MQTTQoS; - @SuppressWarnings("deprecation") public class SubSampler extends AbstractMQTTSampler { private static final long serialVersionUID = 2979978053740194951L; @@ -26,15 +25,16 @@ public class SubSampler extends AbstractMQTTSampler { private transient MQTTConnection connection = null; private transient String clientId; private boolean subFailed = false; - - private boolean sampleByTime = true; // initial values - private int sampleElapsedTime = 1000; + + private int sampleElapsedTime = 1000; private int sampleCount = 1; + private int sampleCountTimeout = 5000; - private transient ConcurrentLinkedQueue batches = new ConcurrentLinkedQueue<>(); + private final transient ConcurrentLinkedQueue batches = new ConcurrentLinkedQueue<>(); private boolean printFlag = false; - private transient Object dataLock = new Object(); + private final transient Object dataLock = new Object(); + private boolean lockReleased = false; public String getQOS() { return getPropertyAsString(QOS_LEVEL, String.valueOf(QOS_0)); @@ -44,11 +44,9 @@ public void setQOS(String qos) { setProperty(QOS_LEVEL, qos); } - public String getTopics() { - return getPropertyAsString(TOPIC_NAME, DEFAULT_TOPIC_NAME); - } + public String getTopic() { return getPropertyAsString(TOPIC_NAME, DEFAULT_TOPIC_NAME); } - public void setTopics(String topicsName) { + public void setTopic(String topicsName) { setProperty(TOPIC_NAME, topicsName); } @@ -67,7 +65,15 @@ public String getSampleCount() { public void setSampleCount(String count) { setProperty(SAMPLE_CONDITION_VALUE, count); } - + + public String getSampleCountTimeout() { + return getPropertyAsString(SAMPLE_CONDITION_VALUE_OPT, DEFAULT_SAMPLE_VALUE_COUNT_TIMEOUT); + } + + public void setSampleCountTimeout(String timeout) { + setProperty(SAMPLE_CONDITION_VALUE_OPT, timeout); + } + public String getSampleElapsedTime() { return getPropertyAsString(SAMPLE_CONDITION_VALUE, DEFAULT_SAMPLE_VALUE_ELAPSED_TIME_MILLI_SEC); } @@ -98,18 +104,20 @@ public SampleResult sample(Entry arg0) { result.setSampleLabel(getName()); JMeterVariables vars = JMeterContextService.getContext().getVariables(); - connection = (MQTTConnection) vars.getObject("conn"); - clientId = (String) vars.getObject("clientId"); + connection = (MQTTConnection) vars.getObject(getConnName()); + clientId = (String) vars.getObject(getConnName()+"_clientId"); if (connection == null) { return fillFailedResult(result, "500", "Subscribe failed because connection is not established."); } - - sampleByTime = SAMPLE_ON_CONDITION_OPTION1.equals(getSampleCondition()); + + // initial values + boolean sampleByTime = SAMPLE_ON_CONDITION_OPTION1.equals(getSampleCondition()); try { if (sampleByTime) { - sampleElapsedTime = Integer.parseInt(getSampleElapsedTime()); + sampleElapsedTime = Integer.parseUnsignedInt(getSampleElapsedTime()); } else { - sampleCount = Integer.parseInt(getSampleCount()); + sampleCount = Integer.parseUnsignedInt(getSampleCount()); + sampleCountTimeout = Integer.parseUnsignedInt(getSampleCountTimeout()); } } catch (NumberFormatException e) { return fillFailedResult(result, "510", "Unrecognized value for sample elapsed time or message count."); @@ -118,24 +126,24 @@ public SampleResult sample(Entry arg0) { if (sampleByTime && sampleElapsedTime <=0 ) { return fillFailedResult(result, "511", "Sample on elapsed time: must be greater than 0 ms."); } else if (sampleCount < 1) { - return fillFailedResult(result, "512", "Sample on message count: must be greater than 1."); + return fillFailedResult(result, "512", "Sample on message count: must be greater than 0."); } - final String topicsName= getTopics(); + String topicsName = getTopic(); setListener(sampleByTime, sampleCount); - Set topics = topicSubscribed.get(clientId); + Set topics = topicsSubscribed.get(clientId); if (topics == null) { logger.severe("subscribed topics haven't been initiated. [clientId: " + (clientId == null ? "null" : clientId) + "]"); topics = new HashSet<>(); topics.add(topicsName); - topicSubscribed.put(clientId, topics); - listenToTopics(topicsName); // TODO: run once or multiple times ? + topicsSubscribed.put(clientId, topics); + listenToTopics(topicsName); } else { if (!topics.contains(topicsName)) { topics.add(topicsName); - topicSubscribed.put(clientId, topics); - logger.fine("Listen to topics: " + topicsName); - listenToTopics(topicsName); // TODO: run once or multiple times ? + topicsSubscribed.put(clientId, topics); + logger.fine("Listen to topic: " + topicsName); + listenToTopics(topicsName); } } @@ -155,19 +163,33 @@ public SampleResult sample(Entry arg0) { } } else { synchronized (dataLock) { - int receivedCount1 = (batches.isEmpty() ? 0 : batches.element().getReceivedCount());; - boolean needWait = false; - if(receivedCount1 < sampleCount) { - needWait = true; - } - - if(needWait) { + int receivedCount = (batches.isEmpty() ? 0 : batches.element().getReceivedCount()); + if (receivedCount < sampleCount) { try { - dataLock.wait(); + if (sampleCountTimeout > 0) { + // handle spurious wakeups (https://docs.oracle.com/en/java/javase/18/docs/api/java.base/java/lang/Object.html#wait(long,int)) + lockReleased = false; + long endtime = System.currentTimeMillis() + sampleCountTimeout; + long currenttime = 0; + while (!lockReleased && currenttime < endtime) { + dataLock.wait(sampleCountTimeout); + currenttime = System.currentTimeMillis(); + } + } else { + // handle spurious wakeups (https://docs.oracle.com/en/java/javase/18/docs/api/java.base/java/lang/Object.html#wait(long,int)) + lockReleased = false; + while (lockReleased == false) { + dataLock.wait(); + } + } } catch (InterruptedException e) { logger.log(Level.INFO, "Received exception when waiting for notification signal", e); } } + receivedCount = (batches.isEmpty() ? 0 : batches.element().getReceivedCount()); + if (receivedCount < sampleCount) { + return fillFailedResult(result, "502", "Failed: No message received on topic: " + topicsName + " (Timeout after " + sampleCountTimeout + "ms)"); + } result.sampleStart(); return produceResult(result, topicsName); } @@ -182,15 +204,15 @@ private SampleResult produceResult(SampleResult result, String topicName) { int receivedCount = bean.getReceivedCount(); List contents = bean.getContents(); String message = MessageFormat.format("Received {0} of message.", receivedCount); - StringBuffer content = new StringBuffer(""); + StringBuilder content = new StringBuilder(); if (isDebugResponse()) { - for (int i = 0; i < contents.size(); i++) { - content.append(contents.get(i) + "\n"); + for (String s : contents) { + content.append(s).append("\n"); } } result = fillOKResult(result, bean.getReceivedMessageSize(), message, content.toString()); if (logger.isLoggable(Level.FINE)) { - logger.fine("sub [topic]: " + topicName + ", [payload]: " + content.toString()); + logger.fine("sub [topic]: " + topicName + ", [payload]: " + content); } if(receivedCount == 0) { @@ -212,23 +234,24 @@ private void listenToTopics(final String topicsName) { int qos; try { qos = Integer.parseInt(getQOS()); - } catch(Exception ex) { - logger.log(Level.SEVERE, ex, () -> MessageFormat.format("Specified invalid QoS value {0}, set to default QoS value {1}!", ex.getMessage(), QOS_0)); + } catch(Exception e) { + logger.log(Level.SEVERE, e, () -> MessageFormat.format("Specified invalid QoS value {0}, set to default QoS value {1}!", e.getMessage(), QOS_0)); qos = QOS_0; } - final String[] paraTopics = topicsName.split(","); + final String[] topicNames = topicsName.split(","); if(qos < 0 || qos > 2) { logger.severe("Specified invalid QoS value, set to default QoS value " + qos); qos = QOS_0; } - connection.subscribe(paraTopics, MQTTQoS.fromValue(qos), () -> { - logger.fine(() -> "sub successful, topic length is " + paraTopics.length); - }, error -> { - logger.log(Level.INFO, "subscribe failed", error); - subFailed = true; - }); + connection.subscribe(topicNames, + MQTTQoS.fromValue(qos), + () -> logger.fine(() -> "successful subscribed topics: " + String.join(", ", topicNames)), + error -> { + logger.log(Level.INFO, "subscribe failed", error); + subFailed = true; + }); } private void setListener(final boolean sampleByTime, final int sampleCount) { @@ -243,6 +266,7 @@ private void setListener(final boolean sampleByTime, final int sampleCount) { synchronized (dataLock) { SubBean bean = handleSubBean(sampleByTime, message, sampleCount); if(bean.getReceivedCount() == sampleCount) { + lockReleased = true; dataLock.notify(); } } @@ -251,7 +275,7 @@ private void setListener(final boolean sampleByTime, final int sampleCount) { } private SubBean handleSubBean(boolean sampleByTime, String msg, int sampleCount) { - SubBean bean = null; + SubBean bean; if(batches.isEmpty()) { bean = new SubBean(); batches.add(bean); diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTClient.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTClient.java index d76588a..c99b78f 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTClient.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTClient.java @@ -6,12 +6,12 @@ import java.util.logging.Level; import java.util.logging.Logger; +import net.xmeter.samplers.mqtt.ConnectionParameters; import org.fusesource.mqtt.client.Callback; import org.fusesource.mqtt.client.CallbackConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Tracer; -import net.xmeter.samplers.mqtt.ConnectionParameters; import net.xmeter.samplers.mqtt.MQTTClient; import net.xmeter.samplers.mqtt.MQTTClientException; import net.xmeter.samplers.mqtt.MQTTConnection; diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseUtil.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseUtil.java index 18b2c10..0bff0d8 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseUtil.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseUtil.java @@ -3,10 +3,9 @@ import java.util.ArrayList; import java.util.List; -import org.fusesource.mqtt.client.QoS; - import net.xmeter.Constants; import net.xmeter.samplers.mqtt.MQTTQoS; +import org.fusesource.mqtt.client.QoS; class FuseUtil { static final List ALLOWED_PROTOCOLS; diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java index 1e09a23..3eb52cc 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java @@ -1,9 +1,9 @@ package net.xmeter.samplers.mqtt.hivemq; import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.function.Consumer; import java.util.logging.Logger; @@ -18,7 +18,6 @@ import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscription; import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode; -import net.xmeter.samplers.mqtt.MQTTClientException; import net.xmeter.samplers.mqtt.MQTTConnection; import net.xmeter.samplers.mqtt.MQTTPubResult; import net.xmeter.samplers.mqtt.MQTTQoS; @@ -27,8 +26,8 @@ class HiveMQTTConnection implements MQTTConnection { private static final Logger logger = Logger.getLogger(HiveMQTTConnection.class.getCanonicalName()); - private static final Charset charset = Charset.forName("UTF-8"); - private static final CharsetDecoder decoder = charset.newDecoder(); + private static final Charset charset = StandardCharsets.UTF_8; + private static final ThreadLocal decoder = ThreadLocal.withInitial(charset::newDecoder); private final Mqtt3BlockingClient client; private final String clientId; @@ -105,17 +104,19 @@ public void subscribe(String[] topicNames, MQTTQoS qos, Runnable onSuccess, Cons } private void handlePublishReceived(Mqtt3Publish received) { + System.out.println("Topic name (before decoding): " + received.getTopic()); String topic = decode(received.getTopic().toByteBuffer()); String payload = received.getPayload().map(this::decode).orElse(""); this.listener.accept(topic, payload, () -> {}); } private String decode(ByteBuffer value) { - try { - return decoder.decode(value).toString(); - } catch (CharacterCodingException e) { - throw new RuntimeException(new MQTTClientException("Failed to decode", e)); - } +// try { +// return decoder.get().decode(value).toString(); +// } catch (CharacterCodingException e) { +// throw new RuntimeException(new MQTTClientException("Failed to decode", e)); +// } + return StandardCharsets.UTF_8.decode(value).toString(); } @Override @@ -125,8 +126,6 @@ public void setSubListener(MQTTSubListener listener) { @Override public String toString() { - return "HiveMQTTConnection{" + - "clientId='" + clientId + '\'' + - '}'; + return "HiveMQTTConnection{clientId='" + clientId + "'}"; } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTFactory.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTFactory.java index d514035..be02fa3 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTFactory.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTFactory.java @@ -1,7 +1,5 @@ package net.xmeter.samplers.mqtt.hivemq; -import static net.xmeter.Constants.HIVEMQ_MQTT_CLIENT_NAME; - import java.io.File; import java.util.Collections; import java.util.List; @@ -14,6 +12,7 @@ import com.hivemq.client.util.KeyStoreUtil; import net.xmeter.AcceptAllTrustManagerFactory; +import net.xmeter.Constants; import net.xmeter.Util; import net.xmeter.samplers.AbstractMQTTSampler; import net.xmeter.samplers.mqtt.ConnectionParameters; @@ -26,7 +25,7 @@ class HiveMQTTFactory implements MQTTFactory { @Override public String getName() { - return HIVEMQ_MQTT_CLIENT_NAME; + return Constants.HIVEMQ_MQTT_CLIENT_NAME; } @Override diff --git a/mqtt_jmeter/src/test/resources/conn.jmx b/mqtt_jmeter/src/test/resources/conn.jmx index b1cdb28..7fbb6cf 100644 --- a/mqtt_jmeter/src/test/resources/conn.jmx +++ b/mqtt_jmeter/src/test/resources/conn.jmx @@ -63,23 +63,23 @@ - - - ${keep_alive} - 0 - ${conn_keep} - conn_ - 0 - 10 - false - - - ${port} - ${protocol} - ${server} - - - true + + + ${keep_alive} + 0 + ${conn_keep} + conn_ + 0 + 10 + false + + + ${port} + ${protocol} + ${server} + + + true diff --git a/mqtt_jmeter/src/test/resources/pub.jmx b/mqtt_jmeter/src/test/resources/pub.jmx index ecdbc25..9b6d67d 100644 --- a/mqtt_jmeter/src/test/resources/pub.jmx +++ b/mqtt_jmeter/src/test/resources/pub.jmx @@ -80,31 +80,31 @@ true - - - - ${keep_alive} - 0 - 1800 - pub_ - 0 - 10 - false - - - ${port} - ${protocol} - ${server} - - - ${topic_name} - ${QoS} - true - Random string with fixed length - ${msg_length} - - true - + + + + ${keep_alive} + 0 + 1800 + pub_ + 0 + 10 + false + + + ${port} + ${protocol} + ${server} + + + ${topic_name} + ${QoS} + true + Random string with fixed length + ${msg_length} + + true + ${send_interval} diff --git a/mqtt_jmeter/src/test/resources/sub.jmx b/mqtt_jmeter/src/test/resources/sub.jmx index 5d4d8a5..3cc8ced 100644 --- a/mqtt_jmeter/src/test/resources/sub.jmx +++ b/mqtt_jmeter/src/test/resources/sub.jmx @@ -74,31 +74,31 @@ true - - - - ${keep_alive} - 0 - 1800 - sub_ - 0 - 10 - false - - - ${port} - ${protocol} - ${server} - - - ${topic_name} - ${QoS} - true - false - true - specified elapsed time (ms) - 1000 - + + + + ${keep_alive} + 0 + 1800 + sub_ + 0 + 10 + false + + + ${port} + ${protocol} + ${server} + + + ${topic_name} + ${QoS} + true + false + true + specified elapsed time (ms) + 1000 + ${recv_interval}