Skip to content

Commit 94dc05e

Browse files
committed
Expose ThreadTracked/ThreadUntracked events to easier react to threads becoming relevant for local user
1 parent 8bfaabc commit 94dc05e

9 files changed

Lines changed: 424 additions & 0 deletions

File tree

Assets/Plugins/StreamChat/Core/IStreamChatClient.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,32 @@ public interface IStreamChatClient : IDisposable, IStreamChatClientEventsListene
7070
/// </summary>
7171
event ChannelMemberRemovedHandler RemovedFromChannelAsMember;
7272

73+
/// <summary>
74+
/// Raised when an <see cref="IStreamThread"/> becomes available locally. Use this to bind
75+
/// per-thread UI and to subscribe to the thread's own events such as
76+
/// <see cref="IStreamThread.Updated"/>, <see cref="IStreamThread.ReplyReceived"/> and
77+
/// <see cref="IStreamThread.ReadStateChanged"/>.
78+
///
79+
/// Fires when:
80+
/// - A channel watch (<see cref="GetOrCreateChannelWithIdAsync"/>, <see cref="QueryChannelsAsync"/>)
81+
/// returns a channel that contains threads.
82+
/// - You call <see cref="GetThreadAsync"/> or <see cref="QueryThreadsAsync"/>.
83+
///
84+
/// A thread started by another user in a channel you are watching but where you are NOT a
85+
/// thread participant will not raise this event - the server delivers only the reply, with
86+
/// no thread payload. To learn about such threads call <see cref="QueryThreadsAsync"/>.
87+
/// </summary>
88+
event StreamThreadChangeHandler ThreadTracked;
89+
90+
/// <summary>
91+
/// Raised when an <see cref="IStreamThread"/> is no longer available locally. Use this to
92+
/// tear down per-thread UI.
93+
///
94+
/// Fires when the thread's parent message is hard-deleted - the thread is destroyed and
95+
/// will no longer appear in <see cref="QueryThreadsAsync"/> results.
96+
/// </summary>
97+
event StreamThreadChangeHandler ThreadUntracked;
98+
7399
/// <summary>
74100
/// Current connection state
75101
/// </summary>

Assets/Plugins/StreamChat/Core/State/Caches/Cache.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public Cache(StreamChatClient stateClient, ISerializer serializer, ILogs logs)
5151

5252
Threads.RegisterDtoIdMapping<StreamThread, ThreadStateResponseInternalDTO>(dto => dto.ParentMessageId);
5353
Threads.RegisterDtoIdMapping2<StreamThread, ThreadResponseInternalDTO>(dto => dto.ParentMessageId);
54+
Threads.RegisterDtoIdMapping3<StreamThread, ThreadStateInternalDTO>(dto => dto.ParentMessageId);
5455
}
5556

5657
public ICacheRepository<StreamChannel> Channels { get; }

Assets/Plugins/StreamChat/Core/State/Caches/CacheRepository.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ namespace StreamChat.Core.State.Caches
1111
internal sealed class CacheRepository<TStatefulModel> : ICacheRepository<TStatefulModel>
1212
where TStatefulModel : class, IStreamStatefulModel
1313
{
14+
public event Action<TStatefulModel> Tracked;
15+
public event Action<TStatefulModel> Untracked;
16+
1417
public IReadOnlyList<TStatefulModel> AllItems => _statefulModels;
1518

1619
public bool TryGet(string uniqueId, out TStatefulModel trackedObject)
@@ -71,6 +74,7 @@ public TType CreateOrUpdate<TType, TDto>(TDto dto, out bool wasCreated)
7174
{
7275
var typedStatefulModel = GetOrCreateStatefulModel<TType, TDto>(dto, out wasCreated);
7376
typedStatefulModel.UpdateFromDto(dto, _cache);
77+
RaiseTrackedIfCreated(typedStatefulModel, wasCreated);
7478
return typedStatefulModel;
7579
}
7680

@@ -79,6 +83,7 @@ public TType CreateOrUpdate2<TType, TDto>(TDto dto, out bool wasCreated)
7983
{
8084
var typedStatefulModel = GetOrCreateStatefulModel<TType, TDto>(dto, out wasCreated);
8185
typedStatefulModel.UpdateFromDto(dto, _cache);
86+
RaiseTrackedIfCreated(typedStatefulModel, wasCreated);
8287
return typedStatefulModel;
8388
}
8489

@@ -87,6 +92,7 @@ public TType CreateOrUpdate3<TType, TDto>(TDto dto, out bool wasCreated)
8792
{
8893
var typedStatefulModel = GetOrCreateStatefulModel<TType, TDto>(dto, out wasCreated);
8994
typedStatefulModel.UpdateFromDto(dto, _cache);
95+
RaiseTrackedIfCreated(typedStatefulModel, wasCreated);
9096
return typedStatefulModel;
9197
}
9298

@@ -95,6 +101,7 @@ public TType CreateOrUpdate4<TType, TDto>(TDto dto, out bool wasCreated)
95101
{
96102
var typedStatefulModel = GetOrCreateStatefulModel<TType, TDto>(dto, out wasCreated);
97103
typedStatefulModel.UpdateFromDto(dto, _cache);
104+
RaiseTrackedIfCreated(typedStatefulModel, wasCreated);
98105
return typedStatefulModel;
99106
}
100107

@@ -103,9 +110,21 @@ public TType CreateOrUpdate5<TType, TDto>(TDto dto, out bool wasCreated)
103110
{
104111
var typedStatefulModel = GetOrCreateStatefulModel<TType, TDto>(dto, out wasCreated);
105112
typedStatefulModel.UpdateFromDto(dto, _cache);
113+
RaiseTrackedIfCreated(typedStatefulModel, wasCreated);
106114
return typedStatefulModel;
107115
}
108116

117+
// Defer Tracked emission until AFTER the first UpdateFromDto so subscribers always observe
118+
// a fully-hydrated object. Track() itself runs from the StreamStatefulModelBase constructor,
119+
// before any DTO is applied, so emitting from there would surface a blank instance.
120+
private void RaiseTrackedIfCreated(TStatefulModel trackedObject, bool wasCreated)
121+
{
122+
if (wasCreated)
123+
{
124+
Tracked?.Invoke(trackedObject);
125+
}
126+
}
127+
109128
/// <summary>
110129
/// This is called from <see cref="IStreamStatefulModel"/> constructor
111130
/// </summary>
@@ -137,6 +156,8 @@ public void Remove(TStatefulModel trackedObject)
137156

138157
_statefulModels.Remove(trackedObject);
139158
_statefulModelById.Remove(trackedObject.UniqueId);
159+
160+
Untracked?.Invoke(trackedObject);
140161
}
141162

142163
internal delegate TStatefulModel ConstructorHandler(string uniqueId);

Assets/Plugins/StreamChat/Core/State/Caches/ICacheExt.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,5 +85,8 @@ public static StreamThread TryCreateOrUpdate(this ICache cache, ThreadStateRespo
8585

8686
public static StreamThread TryCreateOrUpdate(this ICache cache, ThreadResponseInternalDTO dto)
8787
=> dto == null ? null : cache.Threads.CreateOrUpdate2<StreamThread, ThreadResponseInternalDTO>(dto, out _);
88+
89+
public static StreamThread TryCreateOrUpdate(this ICache cache, ThreadStateInternalDTO dto)
90+
=> dto == null ? null : cache.Threads.CreateOrUpdate3<StreamThread, ThreadStateInternalDTO>(dto, out _);
8891
}
8992
}

Assets/Plugins/StreamChat/Core/State/Caches/ICacheRepository.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,18 @@ namespace StreamChat.Core.State.Caches
1010
internal interface ICacheRepository<TTrackedObject>
1111
where TTrackedObject : class, IStreamStatefulModel
1212
{
13+
/// <summary>
14+
/// Raised after a tracked object is first hydrated by a CreateOrUpdate call (the first
15+
/// time a given DTO id is observed). Fires after the initial UpdateFromDto completes,
16+
/// so subscribers always observe a populated object - never a blank instance.
17+
/// </summary>
18+
event Action<TTrackedObject> Tracked;
19+
20+
/// <summary>
21+
/// Raised after a tracked object is removed from the repository.
22+
/// </summary>
23+
event Action<TTrackedObject> Untracked;
24+
1325
IReadOnlyList<TTrackedObject> AllItems { get; }
1426

1527
bool TryGet(string uniqueId, out TTrackedObject trackedObject);

Assets/Plugins/StreamChat/Core/StatefulModels/StreamChannel.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,7 @@ void IUpdateableFrom<ChannelStateResponseInternalDTO, StreamChannel>.UpdateFromD
684684
_pendingMessages.TryReplaceRegularObjectsFromDto(dto.PendingMessages, cache);
685685
_pinnedMessages.TryReplaceTrackedObjects2(dto.PinnedMessages, cache.Messages);
686686
_read.TryReplaceRegularObjectsFromDto2(dto.Read, cache);
687+
SeedThreadsFromDto(dto.Threads, cache);
687688
WatcherCount = GetOrDefault(dto.WatcherCount, WatcherCount);
688689
_watchers.TryAppendUniqueTrackedObjects2(dto.Watchers, cache.Users);
689690

@@ -709,6 +710,7 @@ void IUpdateableFrom3<ChannelStateResponseFieldsInternalDTO, StreamChannel>.Upda
709710
_pendingMessages.TryReplaceRegularObjectsFromDto(dto.PendingMessages, cache);
710711
_pinnedMessages.TryReplaceTrackedObjects2(dto.PinnedMessages, cache.Messages);
711712
_read.TryReplaceRegularObjectsFromDto2(dto.Read, cache);
713+
SeedThreadsFromDto(dto.Threads, cache);
712714
WatcherCount = GetOrDefault(dto.WatcherCount, WatcherCount);
713715
_watchers.TryAppendUniqueTrackedObjects2(dto.Watchers, cache.Users);
714716

@@ -1151,6 +1153,25 @@ private Task InternalBanUserAsync(IStreamUser user, bool isShadowBan = false, st
11511153

11521154
private void SortMessagesByCreatedAt() => _messages.Sort(MessageCreatedAtComparer.Instance);
11531155

1156+
// Seed the global Cache.Threads from the channel-state threads carried by the watch
1157+
// response. Without this the WS thread handlers (thread.updated, notification.thread_message_new,
1158+
// mark read/unread) would early-return on an unknown thread id and silently drop every
1159+
// mutation on threads the watcher has not explicitly fetched.
1160+
// Each first-time insertion fans out as IStreamChatClient.ThreadTracked through the
1161+
// CacheRepository.Tracked subscription wired up in StreamChatClient.
1162+
private static void SeedThreadsFromDto(List<ThreadStateInternalDTO> dtos, ICache cache)
1163+
{
1164+
if (dtos == null)
1165+
{
1166+
return;
1167+
}
1168+
1169+
foreach (var threadDto in dtos)
1170+
{
1171+
cache.TryCreateOrUpdate(threadDto);
1172+
}
1173+
}
1174+
11541175
private UpdateChannelRequestInternalDTO GetUpdateRequestWithCurrentData()
11551176
=> new UpdateChannelRequestInternalDTO
11561177
{

Assets/Plugins/StreamChat/Core/StatefulModels/StreamThread.cs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ namespace StreamChat.Core.StatefulModels
1616
internal sealed class StreamThread : StreamStatefulModelBase<StreamThread>,
1717
IUpdateableFrom<ThreadStateResponseInternalDTO, StreamThread>,
1818
IUpdateableFrom2<ThreadResponseInternalDTO, StreamThread>,
19+
IUpdateableFrom3<ThreadStateInternalDTO, StreamThread>,
1920
IStreamThread
2021
{
2122
public event StreamThreadChangeHandler Updated;
@@ -222,6 +223,66 @@ void IUpdateableFrom<ThreadStateResponseInternalDTO, StreamThread>.UpdateFromDto
222223
Updated?.Invoke(this);
223224
}
224225

226+
// ChannelStateResponse[Fields]InternalDTO carries threads as ThreadStateInternalDTO
227+
// (the embedded variant). It differs from ThreadStateResponseInternalDTO mainly in:
228+
// - Channel is the lightweight ChannelInternalDTO (no nested config / messages / read).
229+
// Resolve via cache by CID instead of constructing a partial channel from this payload.
230+
// - Read is List<ReadInternalDTO> (no last_read_message_id), uses the v1 IStateLoadableFrom path.
231+
// Replies / parent / participants / custom are otherwise applied with the same semantics
232+
// as the response variant so that thread events that arrive after a channel watch can
233+
// mutate the now-cached thread instead of being silently dropped.
234+
void IUpdateableFrom3<ThreadStateInternalDTO, StreamThread>.UpdateFromDto(
235+
ThreadStateInternalDTO dto, ICache cache)
236+
{
237+
ActiveParticipantCount = GetOrDefault(dto.ActiveParticipantCount, ActiveParticipantCount);
238+
239+
var cid = dto.Channel?.Cid ?? dto.ChannelCid;
240+
if (!string.IsNullOrEmpty(cid) && cache.Channels.TryGet(cid, out var existingChannel))
241+
{
242+
Channel = existingChannel;
243+
}
244+
245+
ChannelCid = GetOrDefault(cid, ChannelCid);
246+
CreatedAt = dto.CreatedAt;
247+
248+
if (dto.CreatedBy != null)
249+
{
250+
CreatedBy = cache.TryCreateOrUpdate(dto.CreatedBy);
251+
}
252+
253+
DeletedAt = dto.DeletedAt;
254+
LastMessageAt = dto.LastMessageAt;
255+
256+
if (dto.LatestReplies != null)
257+
{
258+
_latestReplies.TryReplaceTrackedObjects(dto.LatestReplies, cache.Messages);
259+
SortLatestRepliesByCreatedAt();
260+
}
261+
262+
if (dto.ParentMessage != null)
263+
{
264+
ParentMessage = cache.TryCreateOrUpdate(dto.ParentMessage);
265+
}
266+
267+
ParentMessageId = GetOrDefault(dto.ParentMessageId, ParentMessageId);
268+
ParticipantCount = GetOrDefault(dto.ParticipantCount, ParticipantCount);
269+
ReplyCount = dto.ReplyCount;
270+
271+
_read.TryReplaceRegularObjectsFromDto(dto.Read, cache);
272+
273+
if (dto.ThreadParticipants != null)
274+
{
275+
_threadParticipants.TryReplaceRegularObjectsFromDto(dto.ThreadParticipants, cache);
276+
}
277+
278+
Title = GetOrDefault(dto.Title, Title);
279+
UpdatedAt = dto.UpdatedAt;
280+
281+
LoadAdditionalCustom(dto.Custom);
282+
283+
Updated?.Invoke(this);
284+
}
285+
225286
void IUpdateableFrom2<ThreadResponseInternalDTO, StreamThread>.UpdateFromDto(
226287
ThreadResponseInternalDTO dto, ICache cache)
227288
{

Assets/Plugins/StreamChat/Core/StreamChatClient.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ public sealed class StreamChatClient : IStreamChatClient
8686
public event ChannelMemberAddedHandler AddedToChannelAsMember;
8787
public event ChannelMemberRemovedHandler RemovedFromChannelAsMember;
8888

89+
public event StreamThreadChangeHandler ThreadTracked;
90+
public event StreamThreadChangeHandler ThreadUntracked;
91+
8992
public const int QueryUsersLimitMaxValue = 30;
9093
public const int QueryUsersOffsetMaxValue = 1000;
9194

@@ -619,6 +622,12 @@ public void Dispose()
619622
InternalLowLevelClient.Dispose();
620623
}
621624

625+
if (_cache?.Threads != null)
626+
{
627+
_cache.Threads.Tracked -= OnThreadEnteredCache;
628+
_cache.Threads.Untracked -= OnThreadLeftCache;
629+
}
630+
622631
_isDisposed = true;
623632
Disposed?.Invoke();
624633
}
@@ -754,9 +763,16 @@ private StreamChatClient(IWebsocketClient websocketClient, IHttpClient httpClien
754763
_cache = new Cache(this, serializer, _logs);
755764
_pollsApi = new StreamPollsApi(InternalLowLevelClient, _cache);
756765

766+
_cache.Threads.Tracked += OnThreadEnteredCache;
767+
_cache.Threads.Untracked += OnThreadLeftCache;
768+
757769
SubscribeTo(InternalLowLevelClient);
758770
}
759771

772+
private void OnThreadEnteredCache(StreamThread thread) => ThreadTracked?.Invoke(thread);
773+
774+
private void OnThreadLeftCache(StreamThread thread) => ThreadUntracked?.Invoke(thread);
775+
760776
private void InternalDeleteChannel(StreamChannel channel)
761777
{
762778
//StreamTodo: mark StreamChannel object as deleted + probably silent clear all internal data?

0 commit comments

Comments
 (0)