Skip to content

Commit

Permalink
Implement and use a Session data storage (#721)
Browse files Browse the repository at this point in the history
Introduce the concept of SessionsRepository to store session's data that are not subscriptions or queues; those are already persisted with their repository instances.
This is a step to move to MQTT5 which benefit also the MQTT3 implementation.

introduce ISessionRepository interface and its H2 implementation
the session data are stored in a new data class named SessionData
implements serializitaion/deserialization of SessionData for H2


* Capped the infinite session expire interval to 100 years (as seconds instead of UINT max value)
* Replaces session semi-final fields in Session live object with SessionData
  • Loading branch information
andsel authored Mar 4, 2023
1 parent ba139db commit 399c503
Show file tree
Hide file tree
Showing 16 changed files with 295 additions and 35 deletions.
6 changes: 6 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@
<version>${h2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
96 changes: 96 additions & 0 deletions broker/src/main/java/io/moquette/broker/ISessionsRepository.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.moquette.broker;

import io.netty.handler.codec.mqtt.MqttVersion;

import java.time.Instant;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;

/**
* Used to store data about persisted sessions like MQTT version, session's properties.
* */
public interface ISessionsRepository {

// Data class
final class SessionData {
private final String clientId;
private Instant expireAt = null;
final MqttVersion version;
private final int expiryInterval;

/**
* Construct a new SessionData without expiration set yet.
* */
public SessionData(String clientId, MqttVersion version, int expiryInterval) {
this.clientId = clientId;
this.version = version;
this.expiryInterval = expiryInterval;
}

/**
* Construct SessionData with an expiration instant, created by loading from the storage.
* */
public SessionData(String clientId, Instant expireAt, MqttVersion version, int expiryInterval) {
this.expiryInterval = expiryInterval;
Objects.requireNonNull(expireAt, "An expiration time is requested");
this.clientId = clientId;
this.expireAt = expireAt;
this.version = version;
}

public String clientId() {
return clientId;
}

public MqttVersion protocolVersion() {
return version;
}

public Optional<Instant> expireAt() {
return Optional.ofNullable(expireAt);
}

public Optional<Long> expiryInstant() {
return expireAt()
.map(Instant::toEpochMilli);
}

public int expiryInterval() {
return expiryInterval;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SessionData that = (SessionData) o;
return clientId.equals(that.clientId);
}

@Override
public int hashCode() {
return Objects.hash(clientId);
}

@Override
public String toString() {
return "SessionData{" +
"clientId='" + clientId + '\'' +
", expireAt=" + expireAt +
", version=" + version +
", expiryInterval=" + expiryInterval +
'}';
}
}

/**
* @return the full list of persisted sessions data.
* */
Collection<SessionData> list();

/**
* Save data composing a session, es MQTT version, creation date and properties but not queues or subscriptions.
* */
void saveSession(SessionData session);
}
6 changes: 5 additions & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.moquette.broker.unsafequeues.QueueException;
import io.moquette.interception.InterceptHandler;
import io.moquette.persistence.H2Builder;
import io.moquette.persistence.MemorySessionsRepository;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory;
Expand Down Expand Up @@ -192,6 +193,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
authenticator = initializeAuthenticator(authenticator, config);
authorizatorPolicy = initializeAuthorizatorPolicy(authorizatorPolicy, config);

final ISessionsRepository sessionsRepository;
final ISubscriptionsRepository subscriptionsRepository;
final IQueueRepository queueRepository;
final IRetainedRepository retainedRepository;
Expand Down Expand Up @@ -225,17 +227,19 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
LOG.trace("Configuring H2 subscriptions repository");
subscriptionsRepository = h2Builder.subscriptionsRepository();
retainedRepository = h2Builder.retainedRepository();
sessionsRepository = h2Builder.sessionsRepository();
} else {
LOG.trace("Configuring in-memory subscriptions store");
subscriptionsRepository = new MemorySubscriptionsRepository();
queueRepository = new MemoryQueueRepository();
retainedRepository = new MemoryRetainedRepository();
sessionsRepository = new MemorySessionsRepository();
}

ISubscriptionsDirectory subscriptions = new CTrieSubscriptionDirectory();
subscriptions.init(subscriptionsRepository);
final Authorizator authorizator = new Authorizator(authorizatorPolicy);
sessions = new SessionRegistry(subscriptions, queueRepository, authorizator);
sessions = new SessionRegistry(subscriptions, sessionsRepository, queueRepository, authorizator);
final int sessionQueueSize = config.intProp(BrokerConstants.SESSION_QUEUE_SIZE, 1024);
dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, interceptor, authorizator,
sessionQueueSize);
Expand Down
35 changes: 19 additions & 16 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.time.Instant;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -46,6 +51,9 @@
class Session {

private static final Logger LOG = LoggerFactory.getLogger(Session.class);
// By specification session expiry value of 0xFFFFFFFF (UINT_MAX) (seconds) means
// session that doesn't expire, it's ~136 years, we can set a cap at 100 year
static final int INFINITE_EXPIRY = (int) Duration.ofDays(80 * 365).toMillis() / 1000;

static class InFlightPacket implements Delayed {

Expand Down Expand Up @@ -95,7 +103,6 @@ static final class Will {
}
}

private final String clientId;
private boolean clean;
private Will will;
private final SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue;
Expand All @@ -106,28 +113,24 @@ static final class Will {
private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>();
private final Map<Integer, MqttPublishMessage> qos2Receiving = new HashMap<>();
private final AtomicInteger inflightSlots = new AtomicInteger(INFLIGHT_WINDOW_SIZE); // this should be configurable
private final Instant created;
private final int expiryInterval;
private final ISessionsRepository.SessionData data;

Session(String clientId, boolean clean, Will will, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
this(clientId, clean, sessionQueue);
Session(ISessionsRepository.SessionData data, boolean clean, Will will, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
this(data, clean, sessionQueue);
this.will = will;
}

Session(String clientId, boolean clean, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
Session(ISessionsRepository.SessionData data, boolean clean, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
if (sessionQueue == null) {
throw new IllegalArgumentException("sessionQueue parameter can't be null");
}
this.clientId = clientId;
this.data = data;
this.clean = clean;
this.sessionQueue = sessionQueue;
this.created = Instant.now();
// in MQTT3 cleanSession = true means expiryInterval=0 else infinite
expiryInterval = clean ? 0 : 0xFFFFFFFF;
}

public boolean expireImmediately() {
return expiryInterval == 0;
return data.expiryInterval() == 0;
}

void update(boolean clean, Will will) {
Expand Down Expand Up @@ -160,7 +163,7 @@ public boolean connected() {
}

public String getClientID() {
return clientId;
return data.clientId();
}

public List<Subscription> getSubscriptions() {
Expand All @@ -172,7 +175,7 @@ public void addSubscriptions(List<Subscription> newSubscriptions) {
}

public void removeSubscription(Topic topic) {
subscriptions.remove(new Subscription(clientId, topic, MqttQoS.EXACTLY_ONCE));
subscriptions.remove(new Subscription(data.clientId(), topic, MqttQoS.EXACTLY_ONCE));
}

public boolean hasWill() {
Expand Down Expand Up @@ -502,7 +505,7 @@ public void cleanUp() {
@Override
public String toString() {
return "Session{" +
"clientId='" + clientId + '\'' +
"clientId='" + data.clientId() + '\'' +
", clean=" + clean +
", status=" + status +
", inflightSlots=" + inflightSlots +
Expand Down
27 changes: 19 additions & 8 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +36,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import static io.moquette.broker.Session.INFINITE_EXPIRY;

public class SessionRegistry {

public abstract static class EnqueuedMessage {
Expand Down Expand Up @@ -114,27 +117,30 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr

private final ConcurrentMap<String, Session> pool = new ConcurrentHashMap<>();
private final ISubscriptionsDirectory subscriptionsDirectory;
private final ISessionsRepository sessionsRepository;
private final IQueueRepository queueRepository;
private final Authorizator authorizator;

SessionRegistry(ISubscriptionsDirectory subscriptionsDirectory,
ISessionsRepository sessionsRepository,
IQueueRepository queueRepository,
Authorizator authorizator) {
this.subscriptionsDirectory = subscriptionsDirectory;
this.sessionsRepository = sessionsRepository;
this.queueRepository = queueRepository;
this.authorizator = authorizator;
recreateSessionPool();
}

private void recreateSessionPool() {
final Set<String> queues = queueRepository.listQueueNames();
for (String clientId : subscriptionsDirectory.listAllSessionIds()) {
for (ISessionsRepository.SessionData session : sessionsRepository.list()) {
// if the subscriptions are present is obviously false
if (queueRepository.containsQueue(clientId)) {
final SessionMessageQueue<EnqueuedMessage> persistentQueue = queueRepository.getOrCreateQueue(clientId);
queues.remove(clientId);
Session rehydrated = new Session(clientId, false, persistentQueue);
pool.put(clientId, rehydrated);
if (queueRepository.containsQueue(session.clientId())) {
final SessionMessageQueue<EnqueuedMessage> persistentQueue = queueRepository.getOrCreateQueue(session.clientId());
queues.remove(session.clientId());
Session rehydrated = new Session(session, false, persistentQueue);
pool.put(session.clientId(), rehydrated);
}
}
if (!queues.isEmpty()) {
Expand Down Expand Up @@ -228,14 +234,19 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) {
} else {
queue = new InMemoryQueue();
}
// in MQTT3 cleanSession = true means expiryInterval=0 else infinite
final int expiryInterval = clean ? 0 : INFINITE_EXPIRY;
final ISessionsRepository.SessionData sessionData = new ISessionsRepository.SessionData(clientId,
MqttVersion.MQTT_3_1_1, expiryInterval);
if (msg.variableHeader().isWillFlag()) {
final Session.Will will = createWill(msg);
newSession = new Session(clientId, clean, will, queue);
newSession = new Session(sessionData, clean, will, queue);
} else {
newSession = new Session(clientId, clean, queue);
newSession = new Session(sessionData, clean, queue);
}

newSession.markConnecting();
sessionsRepository.saveSession(sessionData);
return newSession;
}

Expand Down
5 changes: 5 additions & 0 deletions broker/src/main/java/io/moquette/persistence/H2Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.moquette.broker.IQueueRepository;
import io.moquette.broker.IRetainedRepository;
import io.moquette.broker.ISessionsRepository;
import io.moquette.broker.ISubscriptionsRepository;
import org.h2.mvstore.MVStore;
import org.slf4j.Logger;
Expand Down Expand Up @@ -72,4 +73,8 @@ public IQueueRepository queueRepository() {
public IRetainedRepository retainedRepository() {
return new H2RetainedRepository(mvStore);
}

public ISessionsRepository sessionsRepository() {
return new H2SessionsRepository(mvStore);
}
}
Loading

0 comments on commit 399c503

Please sign in to comment.