Skip to content

Commit

Permalink
Minor, fixed imports
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Sep 30, 2024
1 parent 01cd874 commit 5bdceb5
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 6 deletions.
6 changes: 5 additions & 1 deletion broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,13 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
}

private Quota retrieveSendQuota(MqttConnectMessage msg) {
if (!isProtocolVersion(msg, MqttVersion.MQTT_5)) {
if (isProtocolVersion(msg, MqttVersion.MQTT_3_1) || isProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
// for protocol versions that didn't define explicit
// the receiver maximum and without specification of flow control
// define one by the default.
return createQuota(BrokerConstants.INFLIGHT_WINDOW_SIZE);
}

MqttProperties.IntegerProperty receiveMaximumProperty = (MqttProperties.IntegerProperty) msg.variableHeader()
.properties()
.getProperty(MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value());
Expand Down
1 change: 0 additions & 1 deletion broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ enum SessionStatus {
// used only in MQTT3 where resends are done on timeout of ACKs.
private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>();
private final Map<Integer, MqttPublishMessage> qos2Receiving = new HashMap<>();
// private final AtomicInteger inflightSlots;
private ISessionsRepository.SessionData data;

Session(ISessionsRepository.SessionData data, boolean clean, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public abstract class AbstractServerIntegrationTest extends AbstractServerIntegrationWithoutClientFixture {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ private void verifyNotSet(MqttPropertyType propertyType, MqttProperties props, S
public void testAckResponseProperties() {
final MqttProperties ackProps = connAck.variableHeader().properties();
verifyProperty(MqttPropertyType.SESSION_EXPIRY_INTERVAL, ackProps, BrokerConstants.INFINITE_SESSION_EXPIRY, "Session expiry is infinite");
// verifyProperty(MqttPropertyType.RECEIVE_MAXIMUM, ackProps, INFLIGHT_WINDOW_SIZE, "Receive maximum property must equals flight window size");
verifyNotSet(MqttPropertyType.MAXIMUM_QOS, ackProps, "Maximum QoS is not set => QoS 2 ready");
verifyProperty(MqttPropertyType.RETAIN_AVAILABLE, ackProps, 1, "Retain feature is available");
verifyNotSet(MqttPropertyType.MAXIMUM_PACKET_SIZE, ackProps, "Maximum packet size is the one defined by specs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import io.moquette.broker.config.FluentConfig;
import io.moquette.broker.config.IConfig;
import io.moquette.testclient.Client;
Expand All @@ -36,8 +35,9 @@

import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class FlowControlTest extends AbstractServerIntegrationTest {

Expand Down

0 comments on commit 5bdceb5

Please sign in to comment.