-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix(server): at-least-once messages received during subscription must… #1988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
using MQTTnet.Internal; | ||
using MQTTnet.Packets; | ||
using MQTTnet.Protocol; | ||
using static MQTTnet.Server.MqttClientSubscriptionsManager; | ||
|
||
namespace MQTTnet.Server | ||
{ | ||
|
@@ -172,11 +173,17 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket | |
throw new ArgumentNullException(nameof(subscribePacket)); | ||
} | ||
|
||
var retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false); | ||
var result = new SubscribeResult(subscribePacket.TopicFilters.Count); | ||
|
||
var addedSubscriptions = new List<string>(); | ||
var finalTopicFilters = new List<MqttTopicFilter>(); | ||
var atLeastOnceSubscriptionResults = new List<CreateSubscriptionResult>(); | ||
|
||
IList<MqttApplicationMessage> retainedApplicationMessages = null; | ||
if (subscribePacket.TopicFilters.Any(f => f.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this only relevant for at least once QoS? Isn't this also beneficial for QoS 0 and 2? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. The project wherein we discovered this bug uses solely There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chkr1011 Is this PR sufficient as-is or are you requesting that I make changes? |
||
{ | ||
retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false); | ||
} | ||
|
||
// The topic filters are order by its QoS so that the higher QoS will win over a | ||
// lower one. | ||
|
@@ -208,6 +215,24 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket | |
finalTopicFilters.Add(topicFilter); | ||
|
||
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result); | ||
if (createSubscriptionResult.Subscription.GrantedQualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce) | ||
{ | ||
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result); | ||
} | ||
else | ||
{ | ||
atLeastOnceSubscriptionResults.Add(createSubscriptionResult); | ||
} | ||
} | ||
|
||
if (atLeastOnceSubscriptionResults.Count != 0) | ||
{ | ||
// In order to satisfy at least once, we must query for retained messages after creating the subscription. | ||
retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false); | ||
foreach (var createSubscriptionResult in atLeastOnceSubscriptionResults) | ||
{ | ||
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result); | ||
} | ||
} | ||
|
||
// This call will add the new subscription to the internal storage. | ||
|
Uh oh!
There was an error while loading. Please reload this page.