diff --git a/ClientPool/.editorconfig b/ClientPool/.editorconfig new file mode 100644 index 0000000..9734e1f --- /dev/null +++ b/ClientPool/.editorconfig @@ -0,0 +1,65 @@ + +[*] +charset = utf-8 +end_of_line = lf +trim_trailing_whitespace = false +insert_final_newline = false +indent_style = space +indent_size = 4 +max_line_length = 4000 + +# Microsoft .NET properties +csharp_new_line_before_members_in_object_initializers = false +csharp_new_line_before_open_brace = none +csharp_new_line_before_else = false; +csharp_preferred_modifier_order = public, private, protected, internal, new, abstract, virtual, sealed, override, static, readonly, extern, unsafe, volatile, async:suggestion +csharp_style_var_elsewhere = true:suggestion +csharp_style_var_for_built_in_types = true:suggestion +csharp_style_var_when_type_is_apparent = true:suggestion +dotnet_style_parentheses_in_arithmetic_binary_operators = never_if_unnecessary:none +dotnet_style_parentheses_in_other_binary_operators = never_if_unnecessary:none +dotnet_style_parentheses_in_relational_binary_operators = never_if_unnecessary:none +dotnet_style_predefined_type_for_locals_parameters_members = true:suggestion +dotnet_style_predefined_type_for_member_access = true:suggestion +dotnet_style_qualification_for_event = false:suggestion +dotnet_style_qualification_for_field = false:suggestion +dotnet_style_qualification_for_method = false:suggestion +dotnet_style_qualification_for_property = false:suggestion +dotnet_style_require_accessibility_modifiers = for_non_interface_members:suggestion +dotnet_separate_import_directive_groups = true +dotnet_sort_system_directives_first = true +csharp_using_directive_placement = inside_namespace:error + +# ReSharper properties +resharper_autodetect_indent_settings = true +resharper_csharp_empty_block_style = together +resharper_use_indent_from_vs = false + +# ReSharper inspection severities +resharper_arrange_redundant_parentheses_highlighting = hint +resharper_arrange_this_qualifier_highlighting = hint +resharper_arrange_type_member_modifiers_highlighting = hint +resharper_arrange_type_modifiers_highlighting = hint +resharper_built_in_type_reference_style_for_member_access_highlighting = hint +resharper_built_in_type_reference_style_highlighting = hint +resharper_check_namespace_highlighting = none +resharper_redundant_base_qualifier_highlighting = warning +resharper_suggest_var_or_type_built_in_types_highlighting = hint +resharper_suggest_var_or_type_elsewhere_highlighting = hint +resharper_suggest_var_or_type_simple_types_highlighting = hint +resharper_web_config_module_not_resolved_highlighting = warning +resharper_web_config_type_not_resolved_highlighting = warning +resharper_web_config_wrong_module_highlighting = warning + +[{*.har,*.inputactions,*.jsb2,*.jsb3,*.json,.babelrc,.eslintrc,.stylelintrc,bowerrc,jest.config}] +indent_style = space +indent_size = 2 + +[*.js.map] +indent_style = space +indent_size = 2 + +[*.{appxmanifest,asax,ascx,aspx,axaml,build,cg,cginc,compute,cs,cshtml,dtd,fs,fsi,fsscript,fsx,hlsl,hlsli,hlslinc,master,ml,mli,nuspec,paml,razor,resw,resx,shader,skin,usf,ush,vb,xaml,xamlx,xoml,xsd}] +indent_style = space +indent_size = 4 +tab_width = 4 \ No newline at end of file diff --git a/ClientPool/ClientPool.sln b/ClientPool/ClientPool.sln new file mode 100644 index 0000000..9856c8c --- /dev/null +++ b/ClientPool/ClientPool.sln @@ -0,0 +1,22 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.30114.105 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ClientPool", "ClientPool\ClientPool.csproj", "{91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/ClientPool/ClientPool/ClientPool.csproj b/ClientPool/ClientPool/ClientPool.csproj new file mode 100644 index 0000000..fa6dbb8 --- /dev/null +++ b/ClientPool/ClientPool/ClientPool.csproj @@ -0,0 +1,14 @@ + + + + net6.0 + enable + enable + + + + + + + + diff --git a/ClientPool/ClientPool/EventStoreClientPool.cs b/ClientPool/ClientPool/EventStoreClientPool.cs new file mode 100644 index 0000000..7ee1ec2 --- /dev/null +++ b/ClientPool/ClientPool/EventStoreClientPool.cs @@ -0,0 +1,135 @@ +namespace ClientPool { + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + + using EventStore.Client; + + using Microsoft.Extensions.Logging; + using Microsoft.Extensions.Options; + + using ILogger = Microsoft.Extensions.Logging.ILogger; + + public class EventStoreClientPool { + private SemaphoreSlim _readSemaphor; + private SemaphoreSlim _writeSemaphor; + private ReadNodeInformation _leader; + private LinkedList _readers; + private LinkedList.Enumerator _readersEnumerator; + private EventStoreClientPoolOptions _options; + private ILogger _log; + + public EventStoreClientPool(IOptions options, ILoggerFactory loggerFactory) { + _options = options.Value; + _log = loggerFactory.CreateLogger(); + } + + public Task ConnectAsync() { + BuildLeaderConnection(_options.LeaderUri); + + var readNodeUris = _options.ReadNodeUris.Union(new[] { _options.LeaderUri }).ToArray(); + _readers = new LinkedList(readNodeUris.Select(href => { + var index = Array.IndexOf(readNodeUris, href); + var settings = EventStoreClientSettings.Create($"{href}"); + settings.ConnectivitySettings.NodePreference = NodePreference.Random; + settings.DefaultCredentials = _options.DefaultCredentials; + var client = new EventStoreClient(settings); + + return new ReadNodeInformation(index, href, client); + }).ToArray()); + + var leaderSettings = EventStoreClientSettings.Create($"{_options.LeaderUri}"); + leaderSettings.ConnectivitySettings.NodePreference = NodePreference.Leader; + leaderSettings.DefaultCredentials = _options.DefaultCredentials; + _leader = new ReadNodeInformation(-1, _options.LeaderUri, new EventStoreClient(leaderSettings)); + + _readSemaphor = new SemaphoreSlim(1); + _readSemaphor.Release(1); + + _writeSemaphor = new SemaphoreSlim(0, _options.MaximumWriterThreads); + _writeSemaphor.Release(_options.MaximumWriterThreads); + _log.LogDebug("Writes will be done on Leader node: {@LeaderNodeUri}", _options.LeaderUri); + + return Task.CompletedTask; + } + + public async Task> ReadStreamAsync(Direction direction, string streamName, StreamPosition revision, long maxCount = long.MaxValue, bool resolveLinkTos = false, TimeSpan? deadline = null, UserCredentials? userCredentials = null, CancellationToken token = default(CancellationToken)) { + ResolvedEvent evt = default; + List events = new(); + int numberOfRetries = 0; + bool retriedLeaderRead = false; + + var startingNode = NextNode(); + var node = startingNode; + + // considered as bad ju-ju in certain circumstances, this is probably the clearest/cleanest way to express a retry. + RETRY_READ: + + try { + _log.LogTrace("Reading from: Node-{@ReadNode} - {@NodeUri}", node.ReadNodeIndex, node.ServerUri); + var slice = node.Client.ReadStreamAsync(direction, streamName, revision, maxCount, resolveLinkTos, deadline, userCredentials, token); + events.AddRange(await slice.ToListAsync(token)); + } catch (Exception exc) { + numberOfRetries++; + if (numberOfRetries == _readers.Count) throw; + _log.LogWarning(exc, "Read failed for current node. Retrying on next node."); + node = NextNode(); + goto RETRY_READ; + } + + RETRY_LEADER_READ: + try { + if (evt.Event != null) { + var slice = _leader.Client.ReadStreamAsync(direction, streamName, evt.OriginalEventNumber, maxCount, resolveLinkTos, deadline, userCredentials, token); + events.AddRange(await slice.ToListAsync()); + } + } catch (NullReferenceException) { + // squelch. + } catch (NotLeaderException exc) { + if (retriedLeaderRead) throw; + retriedLeaderRead = true; + BuildLeaderConnection(exc); + goto RETRY_LEADER_READ; + } + + return events; + } + private ReadNodeInformation NextNode() { + _readSemaphor.Wait(); + + try { + if (_readersEnumerator.Current == null) { + _readersEnumerator = _readers.GetEnumerator(); + _readersEnumerator.MoveNext(); + return _readersEnumerator.Current; + } + + if (_readersEnumerator.MoveNext()) return _readersEnumerator.Current; + + _readersEnumerator = _readers.GetEnumerator(); + _readersEnumerator.MoveNext(); + return _readersEnumerator.Current; + } finally { + _readSemaphor.Release(); + } + } + + private void BuildLeaderConnection(NotLeaderException exc) { + _log.LogWarning(exc, "Re-configuring leader to resolved node."); + var llNode = _readers.First(x => x.ServerUri.Host == exc.LeaderEndpoint.Host && x.ServerUri.Port == exc.LeaderEndpoint.Port); + if (llNode == null) throw exc; + + BuildLeaderConnection(llNode.ServerUri); + } + + private void BuildLeaderConnection(Uri leaderUri) { + _log.LogInformation("Setting leader connection to {@leaderUri}.", leaderUri); + var grpcClientSettings = EventStoreClientSettings.Create($"{_options.LeaderUri}"); + grpcClientSettings.DefaultCredentials = _options.DefaultCredentials; + grpcClientSettings.ConnectivitySettings.NodePreference = Client.NodePreference.Leader; + _leader = new ReadNodeInformation(-1, leaderUri, new EventStoreClient(grpcClientSettings)); + _log.LogDebug("Lead node connection created."); + } + } +} \ No newline at end of file diff --git a/ClientPool/ClientPool/EventStoreClientPoolOptions.cs b/ClientPool/ClientPool/EventStoreClientPoolOptions.cs new file mode 100644 index 0000000..3161144 --- /dev/null +++ b/ClientPool/ClientPool/EventStoreClientPoolOptions.cs @@ -0,0 +1,11 @@ +namespace ClientPool { + using EventStore.Client; + + public class EventStoreClientPoolOptions { + public int MaximumReaderThreads { get; set; } = 3; + public int MaximumWriterThreads { get; set; } = 1; + public Uri LeaderUri { get; set; } + public Uri[] ReadNodeUris { get; set; } + public UserCredentials DefaultCredentials { get; set; } + } +} diff --git a/ClientPool/ClientPool/ReadNodeInformation.cs b/ClientPool/ClientPool/ReadNodeInformation.cs new file mode 100644 index 0000000..0a633ba --- /dev/null +++ b/ClientPool/ClientPool/ReadNodeInformation.cs @@ -0,0 +1,15 @@ +namespace ClientPool { + using EventStore.Client; + + internal record ReadNodeInformation { + public int ReadNodeIndex { get; } + public Uri ServerUri { get; } + public EventStoreClient Client { get; } + + public ReadNodeInformation(int readNodeIndex, Uri serverUri, EventStoreClient client) { + ReadNodeIndex = readNodeIndex; + ServerUri = serverUri; + Client = client; + } + } +} \ No newline at end of file