From 8a84af87e17f40cf1bd21991fb3a80691a4d623c Mon Sep 17 00:00:00 2001 From: David Fowler Date: Wed, 8 May 2013 16:39:57 -0700 Subject: [PATCH 1/2] Fixed duplicate messages when topic store overrun. - Added unit tests #1973 --- .../Messaging/Message.cs | 5 ++ .../Messaging/ScaleoutMessageBus.cs | 2 +- .../Messaging/ScaleoutSubscription.cs | 32 +++++++++--- .../Infrastructure/OrderedCountDownRange.cs | 10 ++-- .../Server/ScaleOutMessageBusFacts.cs | 49 +++++++++++++++++++ 5 files changed, 85 insertions(+), 13 deletions(-) diff --git a/src/Microsoft.AspNet.SignalR.Core/Messaging/Message.cs b/src/Microsoft.AspNet.SignalR.Core/Messaging/Message.cs index 96165cc599..79f24d6c72 100644 --- a/src/Microsoft.AspNet.SignalR.Core/Messaging/Message.cs +++ b/src/Microsoft.AspNet.SignalR.Core/Messaging/Message.cs @@ -93,6 +93,11 @@ public Message(string source, string key, ArraySegment value) /// public Encoding Encoding { get; private set; } + /// + /// The scaleout mapping id. Only used in scaleout scenarios + /// + public ulong MappingId { get; set; } + public bool IsCommand { get diff --git a/src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutMessageBus.cs b/src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutMessageBus.cs index 290a2b18fd..3e5ced0121 100644 --- a/src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutMessageBus.cs +++ b/src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutMessageBus.cs @@ -178,7 +178,7 @@ private void OnReceivedCore(int streamIndex, ulong id, ScaleoutMessage scaleoutM for (var i = 0; i < scaleoutMessage.Messages.Count; ++i) { Message message = scaleoutMessage.Messages[i]; - + message.MappingId = id; IList keyInfo; if (!localMapping.TryGetValue(message.Key, out keyInfo)) diff --git a/src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutSubscription.cs b/src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutSubscription.cs index 4c8b8c8b86..5655225f69 100644 --- a/src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutSubscription.cs +++ b/src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutSubscription.cs @@ -72,7 +72,7 @@ public override void WriteCursor(TextWriter textWriter) protected override void PerformWork(IList> items, out int totalCount, out object state) { // The list of cursors represent (streamid, payloadid) - var nextCursors = new ScaleoutMapping[_streams.Count]; + var nextCursors = new ulong?[_streams.Count]; totalCount = 0; // Get the enumerator so that we can extract messages for this subscription @@ -83,10 +83,17 @@ protected override void PerformWork(IList> items, out int ScaleoutMapping mapping = enumerator.Current.Item1; int streamIndex = enumerator.Current.Item2; - ExtractMessages(mapping, items, ref totalCount); + ulong mappingId = ExtractMessages(mapping, items, ref totalCount); // Update the cursor id - nextCursors[streamIndex] = mapping; + nextCursors[streamIndex] = mappingId; + + // If the mapping id of the message we received is bigger than our current mapping id + // it means we missed messages and we need to jump ahead. + if (mappingId > mapping.Id) + { + break; + } } state = nextCursors; @@ -95,17 +102,17 @@ protected override void PerformWork(IList> items, out int protected override void BeforeInvoke(object state) { // Update the list of cursors before invoking anything - var nextCursors = (ScaleoutMapping[])state; + var nextCursors = (ulong?[])state; for (int i = 0; i < _cursors.Count; i++) { // Only update non-null entries - ScaleoutMapping nextMapping = nextCursors[i]; + ulong? nextCursor = nextCursors[i]; - if (nextMapping != null) + if (nextCursor.HasValue) { Cursor cursor = _cursors[i]; - cursor.Id = nextMapping.Id; + cursor.Id = nextCursor.Value; } } } @@ -160,7 +167,7 @@ private IEnumerable> GetMappings() } } - private void ExtractMessages(ScaleoutMapping mapping, IList> items, ref int totalCount) + private ulong ExtractMessages(ScaleoutMapping mapping, IList> items, ref int totalCount) { // For each of the event keys we care about, extract all of the messages // from the payload @@ -181,11 +188,20 @@ private void ExtractMessages(ScaleoutMapping mapping, IList mapping.Id) + { + return mappingId; + } } } } } } + + return mapping.Id; } private class CachedStreamEnumerator diff --git a/tests/Microsoft.AspNet.SignalR.Tests.Common/Infrastructure/OrderedCountDownRange.cs b/tests/Microsoft.AspNet.SignalR.Tests.Common/Infrastructure/OrderedCountDownRange.cs index 1af8ef46ed..743e2e695d 100644 --- a/tests/Microsoft.AspNet.SignalR.Tests.Common/Infrastructure/OrderedCountDownRange.cs +++ b/tests/Microsoft.AspNet.SignalR.Tests.Common/Infrastructure/OrderedCountDownRange.cs @@ -8,20 +8,22 @@ internal class OrderedCountDownRange { private readonly IEnumerator _enumerator; private T _value; - private readonly ManualResetEventSlim _wh = new ManualResetEventSlim(false); + private readonly ManualResetEventSlim _wh = new ManualResetEventSlim(); + private bool _result; public OrderedCountDownRange(IEnumerable range) { _enumerator = range.GetEnumerator(); _enumerator.MoveNext(); _value = _enumerator.Current; + _result = true; } public bool Expect(T item) { - bool result = Object.Equals(_value, item); + _result = Object.Equals(_value, item) && _result; - if (result) + if (_result) { if (_enumerator.MoveNext()) { @@ -33,7 +35,7 @@ public bool Expect(T item) } } - return result; + return _result; } public bool Wait(TimeSpan timeout) diff --git a/tests/Microsoft.AspNet.SignalR.Tests/Server/ScaleOutMessageBusFacts.cs b/tests/Microsoft.AspNet.SignalR.Tests/Server/ScaleOutMessageBusFacts.cs index 0888e40deb..f857f4b7da 100644 --- a/tests/Microsoft.AspNet.SignalR.Tests/Server/ScaleOutMessageBusFacts.cs +++ b/tests/Microsoft.AspNet.SignalR.Tests/Server/ScaleOutMessageBusFacts.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNet.SignalR.Configuration; using Microsoft.AspNet.SignalR.Messaging; using Microsoft.AspNet.SignalR.Tests.Infrastructure; using Xunit; @@ -173,6 +174,54 @@ public void SubscriptionPullFromMultipleStreamsInFairOrder() } } + [Fact] + public void SubscriptionGetsNewMessagesWhenTopicStoreOverrun() + { + var dr = new DefaultDependencyResolver(); + dr.Resolve().DefaultMessageBufferSize = 10; + + using (var bus = new TestScaleoutBus(dr)) + { + var subscriber = new TestSubscriber(new[] { "key" }); + IDisposable subscription = null; + // 16-49 is the valid range + var cd = new OrderedCountDownRange(Enumerable.Range(16, 33)); + var results = new List(); + + for (int i = 0; i < 50; i++) + { + bus.Publish(0, (ulong)i, new[] { + new Message("test", "key", i.ToString()) + }); + } + + try + { + subscription = bus.Subscribe(subscriber, "0,1", (result, state) => + { + foreach (var m in result.GetMessages()) + { + int n = Int32.Parse(m.GetString()); + + cd.Expect(n); + } + + return TaskAsyncHelper.True; + + }, 10, null); + + Assert.True(cd.Wait(TimeSpan.FromSeconds(5))); + } + finally + { + if (subscription != null) + { + subscription.Dispose(); + } + } + } + } + [Fact] public void SubscriptionPublishingAfter() { From 5d700389a95064031d1c4c2044351b164a008918 Mon Sep 17 00:00:00 2001 From: NTaylorMullen Date: Thu, 9 May 2013 17:10:34 -0700 Subject: [PATCH 2/2] Synchronized dispose and subscription initialization within the MessageBus to avoid memory leaks #1993 --- .../Messaging/MessageBus.cs | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/Microsoft.AspNet.SignalR.Core/Messaging/MessageBus.cs b/src/Microsoft.AspNet.SignalR.Core/Messaging/MessageBus.cs index 08fb011a50..72eb9c7ffa 100644 --- a/src/Microsoft.AspNet.SignalR.Core/Messaging/MessageBus.cs +++ b/src/Microsoft.AspNet.SignalR.Core/Messaging/MessageBus.cs @@ -241,7 +241,8 @@ public virtual IDisposable Subscribe(ISubscriber subscriber, string cursor, Func subscriber.EventKeyRemoved += _removeEvent; subscriber.WriteCursor = subscription.WriteCursor; - var disposable = new DisposableAction(_disposeSubscription, subscriber); + var subscriptionState = new SubscriptionState(subscriber); + var disposable = new DisposableAction(_disposeSubscription, subscriptionState); // When the subscription itself is disposed then dispose it subscription.Disposable = disposable; @@ -254,6 +255,8 @@ public virtual IDisposable Subscribe(ISubscriber subscriber, string cursor, Func topic.AddSubscription(subscription); } + subscriptionState.Initialized.Set(); + // If there's a cursor then schedule work for this subscription if (!String.IsNullOrEmpty(cursor)) { @@ -491,7 +494,8 @@ private void RemoveEvent(ISubscriber subscriber, string eventKey) [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Failure to invoke the callback should be ignored")] private void DisposeSubscription(object state) { - var subscriber = (ISubscriber)state; + var subscriptionState = (SubscriptionState)state; + var subscriber = subscriptionState.Subscriber; // This will stop work from continuting to happen subscriber.Subscription.Dispose(); @@ -507,6 +511,8 @@ private void DisposeSubscription(object state) // so the terminal message isn't required. } + subscriptionState.Initialized.Wait(); + subscriber.EventKeyAdded -= _addEvent; subscriber.EventKeyRemoved -= _removeEvent; subscriber.WriteCursor = null; @@ -517,5 +523,17 @@ private void DisposeSubscription(object state) RemoveEvent(subscriber, eventKey); } } + + private class SubscriptionState + { + public ISubscriber Subscriber { get; private set; } + public ManualResetEventSlim Initialized { get; private set; } + + public SubscriptionState(ISubscriber subscriber) + { + Initialized = new ManualResetEventSlim(); + Subscriber = subscriber; + } + } } }