Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement response-information property for request-response flow #840

Merged
merged 7 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Implement response-information property for request-response flow. (#840)
[fix] Optimised page file opening for disk-based queues. (#837)
[feature] Manage payload format indicator property, when set verify payload format. (#826)
[refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827)
Expand Down
31 changes: 29 additions & 2 deletions broker/src/main/java/io/moquette/broker/Authorizator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

import static io.moquette.broker.Utils.messageId;
Expand All @@ -37,6 +39,11 @@ final class Authorizator {

private final IAuthorizatorPolicy policy;

// Contains the list of topic-client that has read access forced on reply topic.
private ConcurrentMap<Utils.Couple<Topic, String>, Boolean> responseTopicForcedReads = new ConcurrentHashMap<>();
// Contains the list of requesters' reply topics that need write access by all the other (responders).
private ConcurrentMap<Topic, Boolean> responseTopicForcedWrites = new ConcurrentHashMap<>();

Authorizator(IAuthorizatorPolicy policy) {
this.policy = policy;
}
Expand Down Expand Up @@ -113,10 +120,30 @@ private MqttQoS getQoSCheckingAlsoPermissionsOnTopic(String clientID, String use
* @return true if the user from client can publish data on topic.
*/
boolean canWrite(Topic topic, String user, String client) {
return policy.canWrite(topic, user, client);
boolean policyResult = policy.canWrite(topic, user, client);
if (!policyResult && responseTopicForcedWrites.containsKey(topic)) {
LOG.warn("Found write discord by policy and response information topic configured. The policy prohibit " +
"while the response topic should be accessible for all to write. topic: {}", topic);
return true;
}
return policyResult;
}

boolean canRead(Topic topic, String user, String client) {
return policy.canRead(topic, user, client);
boolean policyResult = policy.canRead(topic, user, client);
if (!policyResult && responseTopicForcedReads.containsKey(Utils.Couple.of(topic, client))) {
LOG.warn("Found read discord by policy and response information topic configured. The policy prohibit " +
"while the response topic should be accessible by read from client{}. topic: {}", client, topic);
return true;
}
return policyResult;
}

void forceReadAccess(Topic topic, String client) {
responseTopicForcedReads.putIfAbsent(Utils.Couple.of(topic, client), true);
}

public void forceWriteToAll(Topic topic) {
responseTopicForcedWrites.putIfAbsent(topic, true);
}
}
22 changes: 17 additions & 5 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,13 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser
.sessionPresent(isSessionAlreadyPresent);
if (isProtocolVersion(msg, MqttVersion.MQTT_5)) {
// set properties for MQTT 5
final MqttProperties ackProperties = prepareConnAckProperties(serverGeneratedClientId, clientId);
ConnAckPropertiesBuilder connAckPropertiesBuilder = prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId);
if (isNeedResponseInformation(msg)) {
// the responder and requested access to the topic are already configured during session creation
// in SessionRegistry
connAckPropertiesBuilder.responseInformation("/reqresp/response/" + clientId);
}
final MqttProperties ackProperties = connAckPropertiesBuilder.build();
connAckBuilder.properties(ackProperties);
}
final MqttConnAckMessage ackMessage = connAckBuilder.build();
Expand Down Expand Up @@ -328,6 +334,16 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

/**
* @return true iff message contains property REQUEST_RESPONSE_INFORMATION and is positive.
* */
static boolean isNeedResponseInformation(MqttConnectMessage msg) {
MqttProperties.IntegerProperty requestRespInfo = (MqttProperties.IntegerProperty) msg.variableHeader()
.properties()
.getProperty(MqttProperties.MqttPropertyType.REQUEST_RESPONSE_INFORMATION.value());
return requestRespInfo != null && requestRespInfo.value() >= 1;
}

/**
* @return the value of the Payload Format Indicator property from Will specification.
* */
Expand All @@ -352,10 +368,6 @@ private static boolean checkUTF8Validity(byte[] rawBytes) {
return true;
}

private MqttProperties prepareConnAckProperties(boolean serverGeneratedClientId, String clientId) {
return prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId).build();
}

private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverGeneratedClientId, String clientId) {
final ConnAckPropertiesBuilder builder = new ConnAckPropertiesBuilder();
// default maximumQos is 2, [MQTT-3.2.2-10]
Expand Down
6 changes: 6 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) {
newSession = new Session(sessionData, clean, queue);
newSession.markConnecting();
sessionsRepository.saveSession(sessionData);
if (MQTTConnection.isNeedResponseInformation(msg)) {
// the responder client must have write access to this topic
// the requester client must have read access on this topic
authorizator.forceReadAccess(Topic.asTopic("/reqresp/response/" + clientId), clientId);
authorizator.forceWriteToAll(Topic.asTopic("/reqresp/response/" + clientId));
}
return newSession;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,150 +1,37 @@
package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.integration.IntegrationUtils;
import io.moquette.testclient.Client;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static io.moquette.integration.mqtt5.ConnectTest.assertConnectionAccepted;
import static org.junit.jupiter.api.Assertions.*;

public abstract class AbstractServerIntegrationTest {
Server broker;
IConfig config;

@TempDir
Path tempFolder;
protected String dbPath;
public abstract class AbstractServerIntegrationTest extends AbstractServerIntegrationWithoutClientFixture {

Client lowLevelClient;

@NotNull
static Mqtt5BlockingClient createSubscriberClient(String clientId) {
final Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier(clientId)
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, client.connect().getReasonCode(), clientId + " connected");
return client;
}

@NotNull
static Mqtt5BlockingClient createPublisherClient() {
return AbstractSubscriptionIntegrationTest.createClientWithStartFlagAndClientId(true, "publisher");
}

protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer<Void> action, Duration timeout, String message) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
action.accept(null);
Optional<Mqtt5Publish> publishedMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS);

// verify no published will in 10 seconds
assertFalse(publishedMessage.isPresent(), message);
}
}

protected static void verifyPublishedMessage(Mqtt5BlockingClient client, Consumer<Void> action, MqttQos expectedQos,
String expectedPayload, String errorMessage, int timeoutSeconds) throws Exception {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
action.accept(null);
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals(expectedPayload, payload, errorMessage);
assertEquals(expectedQos, msgPub.getQos());
}
}

static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) {
assertEquals(mqttMessageType, received.fixedHeader().messageType());
}

static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer<Mqtt5Publish> assertion) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> publishMessage = publishes.receive(1, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
assertion.accept(msgPub);
}
}

@NotNull
Mqtt5BlockingClient createSubscriberClient() {
String clientId = clientName();
return createSubscriberClient(clientId);
return createHiveBlockingClient(clientId);
}

public abstract String clientName();

protected void startServer(String dbPath) throws IOException {
broker = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath);
config = new MemoryConfig(configProps);
broker.startServer(config);
}

@BeforeAll
public static void beforeTests() {
Awaitility.setDefaultTimeout(Durations.ONE_SECOND);
}

@BeforeEach
public void setUp() throws Exception {
dbPath = IntegrationUtils.tempH2Path(tempFolder);
startServer(dbPath);
super.setUp();

lowLevelClient = new Client("localhost").clientId(clientName());
}

@AfterEach
public void tearDown() throws Exception {
lowLevelClient.shutdownConnection();
stopServer();
}

protected void stopServer() {
broker.stopServer();
}

void restartServerWithSuspension(Duration timeout) throws InterruptedException, IOException {
stopServer();
Thread.sleep(timeout.toMillis());
startServer(dbPath);
super.tearDown();
}

void connectLowLevel() {
Expand Down
Loading
Loading