Skip to content

Commit

Permalink
Merge branch 'release' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
NTaylorMullen committed May 10, 2013
2 parents 322bcad + 5d70038 commit 2f93ca4
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 15 deletions.
5 changes: 5 additions & 0 deletions src/Microsoft.AspNet.SignalR.Core/Messaging/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public Message(string source, string key, ArraySegment<byte> value)
/// </summary>
public Encoding Encoding { get; private set; }

/// <summary>
/// The scaleout mapping id. Only used in scaleout scenarios
/// </summary>
public ulong MappingId { get; set; }

public bool IsCommand
{
get
Expand Down
22 changes: 20 additions & 2 deletions src/Microsoft.AspNet.SignalR.Core/Messaging/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ public virtual IDisposable Subscribe(ISubscriber subscriber, string cursor, Func
subscriber.EventKeyRemoved += _removeEvent;
subscriber.WriteCursor = subscription.WriteCursor;

var disposable = new DisposableAction(_disposeSubscription, subscriber);
var subscriptionState = new SubscriptionState(subscriber);
var disposable = new DisposableAction(_disposeSubscription, subscriptionState);

// When the subscription itself is disposed then dispose it
subscription.Disposable = disposable;
Expand All @@ -254,6 +255,8 @@ public virtual IDisposable Subscribe(ISubscriber subscriber, string cursor, Func
topic.AddSubscription(subscription);
}

subscriptionState.Initialized.Set();

// If there's a cursor then schedule work for this subscription
if (!String.IsNullOrEmpty(cursor))
{
Expand Down Expand Up @@ -491,7 +494,8 @@ private void RemoveEvent(ISubscriber subscriber, string eventKey)
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Failure to invoke the callback should be ignored")]
private void DisposeSubscription(object state)
{
var subscriber = (ISubscriber)state;
var subscriptionState = (SubscriptionState)state;
var subscriber = subscriptionState.Subscriber;

// This will stop work from continuting to happen
subscriber.Subscription.Dispose();
Expand All @@ -507,6 +511,8 @@ private void DisposeSubscription(object state)
// so the terminal message isn't required.
}

subscriptionState.Initialized.Wait();

subscriber.EventKeyAdded -= _addEvent;
subscriber.EventKeyRemoved -= _removeEvent;
subscriber.WriteCursor = null;
Expand All @@ -517,5 +523,17 @@ private void DisposeSubscription(object state)
RemoveEvent(subscriber, eventKey);
}
}

private class SubscriptionState
{
public ISubscriber Subscriber { get; private set; }
public ManualResetEventSlim Initialized { get; private set; }

public SubscriptionState(ISubscriber subscriber)
{
Initialized = new ManualResetEventSlim();
Subscriber = subscriber;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void OnReceivedCore(int streamIndex, ulong id, ScaleoutMessage scaleoutM
for (var i = 0; i < scaleoutMessage.Messages.Count; ++i)
{
Message message = scaleoutMessage.Messages[i];

message.MappingId = id;

IList<LocalEventKeyInfo> keyInfo;
if (!localMapping.TryGetValue(message.Key, out keyInfo))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public override void WriteCursor(TextWriter textWriter)
protected override void PerformWork(IList<ArraySegment<Message>> items, out int totalCount, out object state)
{
// The list of cursors represent (streamid, payloadid)
var nextCursors = new ScaleoutMapping[_streams.Count];
var nextCursors = new ulong?[_streams.Count];
totalCount = 0;

// Get the enumerator so that we can extract messages for this subscription
Expand All @@ -83,10 +83,17 @@ protected override void PerformWork(IList<ArraySegment<Message>> items, out int
ScaleoutMapping mapping = enumerator.Current.Item1;
int streamIndex = enumerator.Current.Item2;

ExtractMessages(mapping, items, ref totalCount);
ulong mappingId = ExtractMessages(mapping, items, ref totalCount);

// Update the cursor id
nextCursors[streamIndex] = mapping;
nextCursors[streamIndex] = mappingId;

// If the mapping id of the message we received is bigger than our current mapping id
// it means we missed messages and we need to jump ahead.
if (mappingId > mapping.Id)
{
break;
}
}

state = nextCursors;
Expand All @@ -95,17 +102,17 @@ protected override void PerformWork(IList<ArraySegment<Message>> items, out int
protected override void BeforeInvoke(object state)
{
// Update the list of cursors before invoking anything
var nextCursors = (ScaleoutMapping[])state;
var nextCursors = (ulong?[])state;
for (int i = 0; i < _cursors.Count; i++)
{
// Only update non-null entries
ScaleoutMapping nextMapping = nextCursors[i];
ulong? nextCursor = nextCursors[i];

if (nextMapping != null)
if (nextCursor.HasValue)
{
Cursor cursor = _cursors[i];

cursor.Id = nextMapping.Id;
cursor.Id = nextCursor.Value;
}
}
}
Expand Down Expand Up @@ -160,7 +167,7 @@ private IEnumerable<Tuple<ScaleoutMapping, int>> GetMappings()
}
}

private void ExtractMessages(ScaleoutMapping mapping, IList<ArraySegment<Message>> items, ref int totalCount)
private ulong ExtractMessages(ScaleoutMapping mapping, IList<ArraySegment<Message>> items, ref int totalCount)
{
// For each of the event keys we care about, extract all of the messages
// from the payload
Expand All @@ -181,11 +188,20 @@ private void ExtractMessages(ScaleoutMapping mapping, IList<ArraySegment<Message
{
items.Add(storeResult.Messages);
totalCount += storeResult.Messages.Count;

ulong mappingId = storeResult.Messages.Array[storeResult.Messages.Offset].MappingId;

if (mappingId > mapping.Id)
{
return mappingId;
}
}
}
}
}
}

return mapping.Id;
}

private class CachedStreamEnumerator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ internal class OrderedCountDownRange<T>
{
private readonly IEnumerator<T> _enumerator;
private T _value;
private readonly ManualResetEventSlim _wh = new ManualResetEventSlim(false);
private readonly ManualResetEventSlim _wh = new ManualResetEventSlim();
private bool _result;

public OrderedCountDownRange(IEnumerable<T> range)
{
_enumerator = range.GetEnumerator();
_enumerator.MoveNext();
_value = _enumerator.Current;
_result = true;
}

public bool Expect(T item)
{
bool result = Object.Equals(_value, item);
_result = Object.Equals(_value, item) && _result;

if (result)
if (_result)
{
if (_enumerator.MoveNext())
{
Expand All @@ -33,7 +35,7 @@ public bool Expect(T item)
}
}

return result;
return _result;
}

public bool Wait(TimeSpan timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.SignalR.Configuration;
using Microsoft.AspNet.SignalR.Messaging;
using Microsoft.AspNet.SignalR.Tests.Infrastructure;
using Xunit;
Expand Down Expand Up @@ -173,6 +174,54 @@ public void SubscriptionPullFromMultipleStreamsInFairOrder()
}
}

[Fact]
public void SubscriptionGetsNewMessagesWhenTopicStoreOverrun()
{
var dr = new DefaultDependencyResolver();
dr.Resolve<IConfigurationManager>().DefaultMessageBufferSize = 10;

using (var bus = new TestScaleoutBus(dr))
{
var subscriber = new TestSubscriber(new[] { "key" });
IDisposable subscription = null;
// 16-49 is the valid range
var cd = new OrderedCountDownRange<int>(Enumerable.Range(16, 33));
var results = new List<bool>();

for (int i = 0; i < 50; i++)
{
bus.Publish(0, (ulong)i, new[] {
new Message("test", "key", i.ToString())
});
}

try
{
subscription = bus.Subscribe(subscriber, "0,1", (result, state) =>
{
foreach (var m in result.GetMessages())
{
int n = Int32.Parse(m.GetString());

cd.Expect(n);
}

return TaskAsyncHelper.True;

}, 10, null);

Assert.True(cd.Wait(TimeSpan.FromSeconds(5)));
}
finally
{
if (subscription != null)
{
subscription.Dispose();
}
}
}
}

[Fact]
public void SubscriptionPublishingAfter()
{
Expand Down

0 comments on commit 2f93ca4

Please sign in to comment.