Skip to content

Example code for distributing read requests across EventStore DB nodes. #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions ClientPool/.editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@

[*]
charset = utf-8
end_of_line = lf
trim_trailing_whitespace = false
insert_final_newline = false
indent_style = space
indent_size = 4
max_line_length = 4000

# Microsoft .NET properties
csharp_new_line_before_members_in_object_initializers = false
csharp_new_line_before_open_brace = none
csharp_new_line_before_else = false;
csharp_preferred_modifier_order = public, private, protected, internal, new, abstract, virtual, sealed, override, static, readonly, extern, unsafe, volatile, async:suggestion
csharp_style_var_elsewhere = true:suggestion
csharp_style_var_for_built_in_types = true:suggestion
csharp_style_var_when_type_is_apparent = true:suggestion
dotnet_style_parentheses_in_arithmetic_binary_operators = never_if_unnecessary:none
dotnet_style_parentheses_in_other_binary_operators = never_if_unnecessary:none
dotnet_style_parentheses_in_relational_binary_operators = never_if_unnecessary:none
dotnet_style_predefined_type_for_locals_parameters_members = true:suggestion
dotnet_style_predefined_type_for_member_access = true:suggestion
dotnet_style_qualification_for_event = false:suggestion
dotnet_style_qualification_for_field = false:suggestion
dotnet_style_qualification_for_method = false:suggestion
dotnet_style_qualification_for_property = false:suggestion
dotnet_style_require_accessibility_modifiers = for_non_interface_members:suggestion
dotnet_separate_import_directive_groups = true
dotnet_sort_system_directives_first = true
csharp_using_directive_placement = inside_namespace:error

# ReSharper properties
resharper_autodetect_indent_settings = true
resharper_csharp_empty_block_style = together
resharper_use_indent_from_vs = false

# ReSharper inspection severities
resharper_arrange_redundant_parentheses_highlighting = hint
resharper_arrange_this_qualifier_highlighting = hint
resharper_arrange_type_member_modifiers_highlighting = hint
resharper_arrange_type_modifiers_highlighting = hint
resharper_built_in_type_reference_style_for_member_access_highlighting = hint
resharper_built_in_type_reference_style_highlighting = hint
resharper_check_namespace_highlighting = none
resharper_redundant_base_qualifier_highlighting = warning
resharper_suggest_var_or_type_built_in_types_highlighting = hint
resharper_suggest_var_or_type_elsewhere_highlighting = hint
resharper_suggest_var_or_type_simple_types_highlighting = hint
resharper_web_config_module_not_resolved_highlighting = warning
resharper_web_config_type_not_resolved_highlighting = warning
resharper_web_config_wrong_module_highlighting = warning

[{*.har,*.inputactions,*.jsb2,*.jsb3,*.json,.babelrc,.eslintrc,.stylelintrc,bowerrc,jest.config}]
indent_style = space
indent_size = 2

[*.js.map]
indent_style = space
indent_size = 2

[*.{appxmanifest,asax,ascx,aspx,axaml,build,cg,cginc,compute,cs,cshtml,dtd,fs,fsi,fsscript,fsx,hlsl,hlsli,hlslinc,master,ml,mli,nuspec,paml,razor,resw,resx,shader,skin,usf,ush,vb,xaml,xamlx,xoml,xsd}]
indent_style = space
indent_size = 4
tab_width = 4
22 changes: 22 additions & 0 deletions ClientPool/ClientPool.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.30114.105
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ClientPool", "ClientPool\ClientPool.csproj", "{91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{91E7BA2D-A313-45E2-BFA3-87BB01A5D48F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
14 changes: 14 additions & 0 deletions ClientPool/ClientPool/ClientPool.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="EventStore.Client.Grpc.Streams" Version="22.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
</ItemGroup>

</Project>
135 changes: 135 additions & 0 deletions ClientPool/ClientPool/EventStoreClientPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
namespace ClientPool {
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using EventStore.Client;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using ILogger = Microsoft.Extensions.Logging.ILogger;

public class EventStoreClientPool {
private SemaphoreSlim _readSemaphor;
private SemaphoreSlim _writeSemaphor;
private ReadNodeInformation _leader;
private LinkedList<ReadNodeInformation> _readers;
private LinkedList<ReadNodeInformation>.Enumerator _readersEnumerator;
private EventStoreClientPoolOptions _options;
private ILogger _log;

public EventStoreClientPool(IOptions<EventStoreClientPoolOptions> options, ILoggerFactory loggerFactory) {
_options = options.Value;
_log = loggerFactory.CreateLogger<EventStoreClientPool>();
}

public Task ConnectAsync() {
BuildLeaderConnection(_options.LeaderUri);

var readNodeUris = _options.ReadNodeUris.Union(new[] { _options.LeaderUri }).ToArray();
_readers = new LinkedList<ReadNodeInformation>(readNodeUris.Select(href => {
var index = Array.IndexOf(readNodeUris, href);
var settings = EventStoreClientSettings.Create($"{href}");
settings.ConnectivitySettings.NodePreference = NodePreference.Random;
settings.DefaultCredentials = _options.DefaultCredentials;
var client = new EventStoreClient(settings);

return new ReadNodeInformation(index, href, client);
}).ToArray());

var leaderSettings = EventStoreClientSettings.Create($"{_options.LeaderUri}");
leaderSettings.ConnectivitySettings.NodePreference = NodePreference.Leader;
leaderSettings.DefaultCredentials = _options.DefaultCredentials;
_leader = new ReadNodeInformation(-1, _options.LeaderUri, new EventStoreClient(leaderSettings));

_readSemaphor = new SemaphoreSlim(1);
_readSemaphor.Release(1);

_writeSemaphor = new SemaphoreSlim(0, _options.MaximumWriterThreads);
_writeSemaphor.Release(_options.MaximumWriterThreads);
_log.LogDebug("Writes will be done on Leader node: {@LeaderNodeUri}", _options.LeaderUri);

return Task.CompletedTask;
}

public async Task<IEnumerable<ResolvedEvent>> ReadStreamAsync(Direction direction, string streamName, StreamPosition revision, long maxCount = long.MaxValue, bool resolveLinkTos = false, TimeSpan? deadline = null, UserCredentials? userCredentials = null, CancellationToken token = default(CancellationToken)) {
ResolvedEvent evt = default;
List<ResolvedEvent> events = new();
int numberOfRetries = 0;
bool retriedLeaderRead = false;

var startingNode = NextNode();
var node = startingNode;

// considered as bad ju-ju in certain circumstances, this is probably the clearest/cleanest way to express a retry.
RETRY_READ:

try {
_log.LogTrace("Reading from: Node-{@ReadNode} - {@NodeUri}", node.ReadNodeIndex, node.ServerUri);
var slice = node.Client.ReadStreamAsync(direction, streamName, revision, maxCount, resolveLinkTos, deadline, userCredentials, token);
events.AddRange(await slice.ToListAsync(token));
} catch (Exception exc) {
numberOfRetries++;
if (numberOfRetries == _readers.Count) throw;
_log.LogWarning(exc, "Read failed for current node. Retrying on next node.");
node = NextNode();
goto RETRY_READ;
}

RETRY_LEADER_READ:
try {
if (evt.Event != null) {
var slice = _leader.Client.ReadStreamAsync(direction, streamName, evt.OriginalEventNumber, maxCount, resolveLinkTos, deadline, userCredentials, token);
events.AddRange(await slice.ToListAsync());
}
} catch (NullReferenceException) {
// squelch.
} catch (NotLeaderException exc) {
if (retriedLeaderRead) throw;
retriedLeaderRead = true;
BuildLeaderConnection(exc);
goto RETRY_LEADER_READ;
}

return events;
}
private ReadNodeInformation NextNode() {
_readSemaphor.Wait();

try {
if (_readersEnumerator.Current == null) {
_readersEnumerator = _readers.GetEnumerator();
_readersEnumerator.MoveNext();
return _readersEnumerator.Current;
}

if (_readersEnumerator.MoveNext()) return _readersEnumerator.Current;

_readersEnumerator = _readers.GetEnumerator();
_readersEnumerator.MoveNext();
return _readersEnumerator.Current;
} finally {
_readSemaphor.Release();
}
}

private void BuildLeaderConnection(NotLeaderException exc) {
_log.LogWarning(exc, "Re-configuring leader to resolved node.");
var llNode = _readers.First(x => x.ServerUri.Host == exc.LeaderEndpoint.Host && x.ServerUri.Port == exc.LeaderEndpoint.Port);
if (llNode == null) throw exc;

BuildLeaderConnection(llNode.ServerUri);
}

private void BuildLeaderConnection(Uri leaderUri) {
_log.LogInformation("Setting leader connection to {@leaderUri}.", leaderUri);
var grpcClientSettings = EventStoreClientSettings.Create($"{_options.LeaderUri}");
grpcClientSettings.DefaultCredentials = _options.DefaultCredentials;
grpcClientSettings.ConnectivitySettings.NodePreference = Client.NodePreference.Leader;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
grpcClientSettings.ConnectivitySettings.NodePreference = Client.NodePreference.Leader;
grpcClientSettings.ConnectivitySettings.NodePreference = NodePreference.Leader;

Otherwise it does not build

_leader = new ReadNodeInformation(-1, leaderUri, new EventStoreClient(grpcClientSettings));
_log.LogDebug("Lead node connection created.");
}
}
}
11 changes: 11 additions & 0 deletions ClientPool/ClientPool/EventStoreClientPoolOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace ClientPool {
using EventStore.Client;

public class EventStoreClientPoolOptions {
public int MaximumReaderThreads { get; set; } = 3;
public int MaximumWriterThreads { get; set; } = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't expose the write operation , remove ?

public Uri LeaderUri { get; set; }
public Uri[] ReadNodeUris { get; set; }
public UserCredentials DefaultCredentials { get; set; }
}
}
15 changes: 15 additions & 0 deletions ClientPool/ClientPool/ReadNodeInformation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace ClientPool {
using EventStore.Client;

internal record ReadNodeInformation {
public int ReadNodeIndex { get; }
public Uri ServerUri { get; }
public EventStoreClient Client { get; }

public ReadNodeInformation(int readNodeIndex, Uri serverUri, EventStoreClient client) {
ReadNodeIndex = readNodeIndex;
ServerUri = serverUri;
Client = client;
}
}
}