-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRedisMemberStreamService.cs
86 lines (75 loc) · 3.12 KB
/
RedisMemberStreamService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
using System.Text.Json.Nodes;
using MediatR;
using PlayOfferService.Domain.Events;
using PlayOfferService.Domain.Events.Member;
using PlayOfferService.Domain.Repositories;
using StackExchange.Redis;
namespace PlayOfferService.Application;
public class RedisMemberStreamService : BackgroundService
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly CancellationToken _cancellationToken;
private readonly IDatabase _db;
private const string StreamName = "club_service_events.public.DomainEvent";
private const string GroupName = "pos.member.events.group";
public RedisMemberStreamService(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
var tokenSource = new CancellationTokenSource();
_cancellationToken = tokenSource.Token;
var muxer = ConnectionMultiplexer.Connect("redis");
_db = muxer.GetDatabase();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
if (!(await _db.KeyExistsAsync(StreamName)) ||
(await _db.StreamGroupInfoAsync(StreamName)).All(x=>x.Name!=GroupName))
{
await _db.StreamCreateConsumerGroupAsync(StreamName, GroupName, "0-0", true);
}
var id = string.Empty;
while (!_cancellationToken.IsCancellationRequested)
{
if (!string.IsNullOrEmpty(id))
{
await _db.StreamAcknowledgeAsync(StreamName, GroupName, id);
id = string.Empty;
}
var result = await _db.StreamReadGroupAsync(StreamName, GroupName, "pos-member", ">", 1);
if (result.Any())
{
var streamEntry = result.First();
id = streamEntry.Id;
var parsedEvent = ParseEvent(streamEntry);
if (parsedEvent == null)
continue;
await mediator.Send(parsedEvent, _cancellationToken);
}
await Task.Delay(1000);
}
}
private TechnicalMemberEvent? ParseEvent(StreamEntry value)
{
var dict = value.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
var jsonContent = JsonNode.Parse(dict.Values.First());
var eventInfo = jsonContent["payload"]["after"];
var eventType = eventInfo["eventType"].GetValue<string>();
var entityType = eventInfo["entityType"].GetValue<string>();
if (
(eventType != "MEMBER_REGISTERED"
&& eventType != "MEMBER_DELETED"
&& eventType != "MEMBER_LOCKED"
&& eventType != "MEMBER_UNLOCKED"
&& eventType != "MEMBER_EMAIL_CHANGED"
&& eventType != "MEMBER_FULL_NAME_CHANGED"
)
|| entityType != "MEMBER"
)
{
return null;
}
return EventParser.ParseEvent<TechnicalMemberEvent>(eventInfo);
}
}