added fix for #3393#3394
Conversation
Introduced EventStream<TPayload> for .NET 8+, providing a strongly-typed pub/sub event stream with a fixed-size rolling backlog using a ring buffer. Added multiple Subscribe overloads supporting thread options, strong/weak references, filtering, and backlog replay. Overrode publish and subscription logic for backlog management and thread safety. Included extensive unit tests covering all major behaviors and edge cases.
Renamed private fields in EventStream<TPayload> and Backlog<T> to use a leading underscore (_) for consistency with C# naming conventions. No functional changes were made.
Updated XML docs for EventStream<TPayload> to specify that backlogAction is always invoked synchronously on the publisher's thread, regardless of ThreadOption. Added remarks to distinguish thread marshalling behavior for backlogAction vs. action. Clarified lock usage during backlog snapshot and invocation timing.
There was a problem hiding this comment.
Pull request overview
Implements the new EventStream<TPayload> event type requested in issue #3393, adding a pub/sub event that retains a rolling backlog and can replay it to late subscribers.
Changes:
- Added
EventStream<TPayload>implementation inPrism.Eventswith backlog replay + ring-buffer eviction. - Added extensive unit tests covering backlog replay, filtering, thread options, unsubscribe semantics, and weak/strong reference behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
src/Prism.Events/EventStream.cs |
New event type with rolling backlog, backlog replay on subscribe, and integration with Prism’s existing subscription model. |
tests/Prism.Core.Tests/Events/EventStreamFixture.cs |
New test suite validating backlog behavior and parity with existing PubSubEvent semantics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// <param name="backlogAction"> | ||
| /// The callback invoked once for each event currently in the backlog at the time of | ||
| /// subscription. Always invoked synchronously on the caller's (publisher's) thread, | ||
| /// regardless of <see cref="ThreadOption"/>. May be <see langword="null"/> to skip | ||
| /// backlog replay. | ||
| /// </param> |
There was a problem hiding this comment.
The XML docs say backlogAction is invoked synchronously on the caller's “(publisher's) thread”. Backlog replay happens during Subscribe(), so the callback runs on the subscriber/caller thread, not necessarily the publish thread; this wording is misleading and should be corrected (and kept consistent across overloads).
| var ring = _recentEvents.CurrentState(out uint readPosition, out uint writePosition); | ||
| backlog = ArrayPool<TPayload>.Shared.Rent(ring.Length); | ||
| int index = 0; | ||
| TPayload item = default; | ||
| while (index < ring.Length && | ||
| Backlog<TPayload>.TryRead(ref ring, ref readPosition, writePosition, out item)) | ||
| { | ||
| backlog[index] = item; | ||
| index++; | ||
| } | ||
|
|
||
| backlogCount = index; |
There was a problem hiding this comment.
Backlog snapshotting work is performed even when backlogAction is null (array rent + copy under lock), which adds unnecessary overhead for callers that don't need replay (including the InternalSubscribe path). Consider skipping the snapshot entirely when backlogAction is null.
| var ring = _recentEvents.CurrentState(out uint readPosition, out uint writePosition); | |
| backlog = ArrayPool<TPayload>.Shared.Rent(ring.Length); | |
| int index = 0; | |
| TPayload item = default; | |
| while (index < ring.Length && | |
| Backlog<TPayload>.TryRead(ref ring, ref readPosition, writePosition, out item)) | |
| { | |
| backlog[index] = item; | |
| index++; | |
| } | |
| backlogCount = index; | |
| if (backlogAction != null) | |
| { | |
| var ring = _recentEvents.CurrentState(out uint readPosition, out uint writePosition); | |
| backlog = ArrayPool<TPayload>.Shared.Rent(ring.Length); | |
| int index = 0; | |
| TPayload item = default; | |
| while (index < ring.Length && | |
| Backlog<TPayload>.TryRead(ref ring, ref readPosition, writePosition, out item)) | |
| { | |
| backlog[index] = item; | |
| index++; | |
| } | |
| backlogCount = index; | |
| } |
| finally | ||
| { | ||
| if (backlog != null) | ||
| ArrayPool<TPayload>.Shared.Return(backlog, clearArray: true); | ||
| } |
There was a problem hiding this comment.
The pooled backlog array is always returned with clearArray: true. For value-type payloads this can add avoidable CPU cost; consider clearing only when needed (e.g., based on whether TPayload is a reference type / contains references).
| /// </summary> | ||
| public EventStream() | ||
| { | ||
| _recentEvents = new Backlog<TPayload>((int)BacklogSize()); |
There was a problem hiding this comment.
BacklogSize() can be overridden to return 0, which would construct a ring buffer with length 0 and cause a divide-by-zero in Backlog.Write (_writePosition % _size). Add validation to ensure the capacity is >= 1 (either when creating _recentEvents or inside Backlog's constructor).
| _recentEvents = new Backlog<TPayload>((int)BacklogSize()); | |
| int backlogSize = (int)BacklogSize(); | |
| if (backlogSize < 1) | |
| { | |
| throw new ArgumentOutOfRangeException(nameof(backlogSize), backlogSize, "Backlog size must be greater than or equal to 1."); | |
| } | |
| _recentEvents = new Backlog<TPayload>(backlogSize); |
| private uint _writePosition; | ||
| private uint _readPosition; | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new <see cref="Backlog{T}"/> with the specified capacity. | ||
| /// </summary> | ||
| /// <param name="capacity">Maximum number of items the ring buffer can hold.</param> | ||
| public Backlog(int capacity) | ||
| { | ||
| _size = capacity; | ||
| _ring = new T[capacity]; | ||
| _writePosition = 0; | ||
| _readPosition = 0; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Writes <paramref name="item"/> to the ring buffer. If the buffer is full, the | ||
| /// oldest unread item is evicted by advancing the read position before writing. | ||
| /// </summary> | ||
| /// <param name="item">The item to write.</param> | ||
| public void Write(T item) | ||
| { | ||
| bool isFull = _writePosition - _readPosition >= _ring.Length; | ||
|
|
||
|
|
||
| if (isFull) | ||
| { | ||
| _readPosition++; | ||
| } |
There was a problem hiding this comment.
The ring buffer cursors use uint arithmetic (_writePosition - _readPosition) to detect fullness. After enough publishes, these counters will overflow and wrap, which can break the fullness/ordering logic. Consider using ulong/long or tracking count separately to avoid overflow-related corruption in long-running apps.
| var subscription = Subscriptions.ElementAt(i); | ||
| var strategy = subscription.GetExecutionStrategy(); | ||
| if (strategy == null) | ||
| Subscriptions.Remove(subscription);// prune dead weak refs |
There was a problem hiding this comment.
When pruning dead subscriptions during publish, the code removes by value (Subscriptions.Remove(subscription)) even though the index is known. Since Subscriptions is backed by a List<IEventSubscription> (EventBase.cs:13), removing by index (when possible) avoids an extra linear search.
| Subscriptions.Remove(subscription);// prune dead weak refs | |
| Subscriptions.RemoveAt(i);// prune dead weak refs |
| #if NET8_0_OR_GREATER | ||
| using System.Buffers; | ||
| using Prism.Events.Properties; | ||
|
|
||
| namespace Prism.Events; | ||
|
|
There was a problem hiding this comment.
EventStream.cs uses Action, List<>, Predicate<>, LINQ (Cast/FirstOrDefault/ElementAt), and exceptions, but only imports System.Buffers and Prism.Events.Properties. This will not compile without adding the required using directives (e.g., System, System.Collections.Generic, System.Linq).
|
Not accepting this PR. Details in the related issue #3393 |
Description of Change
Fixed #3393