diff --git a/Kurrent.Client.sln b/Kurrent.Client.sln
index 00544398a..1ea14ce6f 100644
--- a/Kurrent.Client.sln
+++ b/Kurrent.Client.sln
@@ -13,6 +13,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client", "src\Kurre
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.Common", "test\Kurrent.Client.Tests.Common\Kurrent.Client.Tests.Common.csproj", "{47BF715B-A0BF-4044-B335-717E56422550}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.NeverLoadedAssembly", "test\Kurrent.Client.Tests.NeverLoadedAssembly\Kurrent.Client.Tests.NeverLoadedAssembly.csproj", "{0AC8A7E9-6839-4B4C-B299-950C376DF71F}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.ExternalAssembly", "test\Kurrent.Client.Tests.ExternalAssembly\Kurrent.Client.Tests.ExternalAssembly.csproj", "{829AF806-1144-408A-85FE-763835775086}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
@@ -34,10 +38,20 @@ Global
{47BF715B-A0BF-4044-B335-717E56422550}.Debug|x64.Build.0 = Debug|Any CPU
{47BF715B-A0BF-4044-B335-717E56422550}.Release|x64.ActiveCfg = Release|Any CPU
{47BF715B-A0BF-4044-B335-717E56422550}.Release|x64.Build.0 = Release|Any CPU
+ {0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Debug|x64.Build.0 = Debug|Any CPU
+ {0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Release|x64.ActiveCfg = Release|Any CPU
+ {0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Release|x64.Build.0 = Release|Any CPU
+ {829AF806-1144-408A-85FE-763835775086}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {829AF806-1144-408A-85FE-763835775086}.Debug|x64.Build.0 = Debug|Any CPU
+ {829AF806-1144-408A-85FE-763835775086}.Release|x64.ActiveCfg = Release|Any CPU
+ {829AF806-1144-408A-85FE-763835775086}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{FC829F1B-43AD-4C96-9002-23D04BBA3AF3} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{762EECAA-122E-4B0C-BC50-5AA4F72CA4E0} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
{47BF715B-A0BF-4044-B335-717E56422550} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
+ {0AC8A7E9-6839-4B4C-B299-950C376DF71F} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
+ {829AF806-1144-408A-85FE-763835775086} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
EndGlobalSection
EndGlobal
diff --git a/src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs b/src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs
new file mode 100644
index 000000000..294cdc169
--- /dev/null
+++ b/src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs
@@ -0,0 +1,360 @@
+using System.Text.Json;
+using Kurrent.Client.Core.Serialization;
+
+namespace EventStore.Client;
+
+///
+/// Provides configuration options for messages serialization and deserialization in the KurrentDB client.
+///
+public class KurrentClientSerializationSettings {
+ ///
+ /// The serializer responsible for handling JSON-formatted data. This serializer is used both for
+ /// serializing outgoing JSON messages and deserializing incoming JSON messages. If not specified,
+ /// a default System.Text.Json serializer will be used with standard settings.
+ ///
+ /// That also allows you to bring your custom JSON serializer implementation (e.g. JSON.NET)
+ ///
+ public ISerializer? JsonSerializer { get; set; }
+
+ ///
+ /// The serializer responsible for handling binary data formats. This is used when working with
+ /// binary-encoded messages rather than text-based formats (e.g. Protobuf or Avro). Required when storing
+ /// or retrieving content with "application/octet-stream" content type
+ ///
+ public ISerializer? BytesSerializer { get; set; }
+
+ ///
+ /// Determines which serialization format (JSON or binary) is used by default when writing messages
+ /// where the content type isn't explicitly specified. The default content type is "application/json"
+ ///
+ public ContentType DefaultContentType { get; set; } = ContentType.Json;
+
+ ///
+ /// Defines the custom strategy used to map between the type name stored in messages and .NET type names.
+ /// If not provided the default will be used.
+ /// It resolves the CLR type name to the format: "{stream category name}-{CLR Message Type}".
+ /// You can provide your own implementation of
+ /// and register it here to override the default behaviour
+ ///
+ public IMessageTypeNamingStrategy? MessageTypeNamingStrategy { get; set; }
+
+ ///
+ /// Allows to register mapping of CLR message types to their corresponding message type names used in serialized messages.
+ ///
+ public IDictionary MessageTypeMap { get; set; } = new Dictionary();
+
+ ///
+ /// Registers CLR message types that can be appended to the specific stream category.
+ /// Types will have message type names resolved based on the used
+ ///
+ public IDictionary CategoryMessageTypesMap { get; set; } = new Dictionary();
+
+ ///
+ /// Specifies the CLR type that should be used when deserializing metadata for all events.
+ /// When set, the client will attempt to deserialize event metadata into this type.
+ /// If not provided, will be used.
+ ///
+ public Type? DefaultMetadataType { get; set; }
+
+ ///
+ /// Creates a new instance of serialization settings with either default values or custom configuration.
+ /// This factory method is the recommended way to create serialization settings for the KurrentDB client.
+ ///
+ /// Optional callback to customize the settings. If null, default settings are used.
+ /// A fully configured instance ready to be used with the KurrentDB client.
+ ///
+ ///
+ /// var settings = KurrentClientSerializationSettings.Default(options => {
+ /// options.RegisterMessageType<UserCreated>("user-created");
+ /// options.RegisterMessageType<UserUpdated>("user-updated");
+ /// options.RegisterMessageTypeForCategory<UserCreated>("user");
+ /// });
+ ///
+ ///
+ public static KurrentClientSerializationSettings Default(
+ Action? configure = null
+ ) {
+ var settings = new KurrentClientSerializationSettings();
+
+ configure?.Invoke(settings);
+
+ return settings;
+ }
+
+ ///
+ /// Configures the JSON serializer using custom options while inheriting from the default System.Text.Json settings.
+ /// This allows fine-tuning serialization behavior such as case sensitivity, property naming, etc.
+ ///
+ /// A function that receives the default options and returns modified options.
+ /// The current instance for method chaining.
+ ///
+ ///
+ /// settings.UseJsonSettings(options => {
+ /// options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
+ /// options.WriteIndented = true;
+ /// options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
+ /// return options;
+ /// });
+ ///
+ ///
+ public KurrentClientSerializationSettings UseJsonSettings(
+ Func configure
+ ) {
+ JsonSerializer = new SystemTextJsonSerializer(
+ new SystemTextJsonSerializationSettings
+ { Options = configure(SystemTextJsonSerializationSettings.DefaultJsonSerializerOptions) }
+ );
+
+ return this;
+ }
+
+ ///
+ /// Configures JSON serialization using provided System.Text.Json serializer options.
+ ///
+ /// The JSON serializer options to use.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings UseJsonSettings(JsonSerializerOptions systemTextJsonSerializerOptions) {
+ JsonSerializer = new SystemTextJsonSerializer(
+ new SystemTextJsonSerializationSettings { Options = systemTextJsonSerializerOptions }
+ );
+
+ return this;
+ }
+
+ ///
+ /// Configures JSON serialization using provided
+ ///
+ /// The SystemTextJson serialization settings to use.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings UseJsonSettings(
+ SystemTextJsonSerializationSettings jsonSerializationSettings
+ ) {
+ JsonSerializer = new SystemTextJsonSerializer(jsonSerializationSettings);
+
+ return this;
+ }
+
+ ///
+ /// Sets a custom JSON serializer implementation.
+ /// That also allows you to bring your custom JSON serializer implementation (e.g. JSON.NET)
+ ///
+ /// The serializer to use for JSON content.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings UseJsonSerializer(ISerializer serializer) {
+ JsonSerializer = serializer;
+
+ return this;
+ }
+
+ ///
+ /// Sets a custom binary serializer implementation.
+ /// That also allows you to bring your custom binary serializer implementation (e.g. Protobuf or Avro)
+ ///
+ /// The serializer to use for binary content.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings UseBytesSerializer(ISerializer serializer) {
+ BytesSerializer = serializer;
+
+ return this;
+ }
+
+ ///
+ /// Configures a custom message type naming strategy.
+ ///
+ /// The type of naming strategy to use.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings UseMessageTypeNamingStrategy()
+ where TCustomMessageTypeResolutionStrategy : IMessageTypeNamingStrategy, new() =>
+ UseMessageTypeNamingStrategy(new TCustomMessageTypeResolutionStrategy());
+
+ ///
+ /// Configures a custom message type naming strategy.
+ ///
+ /// The naming strategy instance to use.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings UseMessageTypeNamingStrategy(
+ IMessageTypeNamingStrategy messageTypeNamingStrategy
+ ) {
+ MessageTypeNamingStrategy = messageTypeNamingStrategy;
+
+ return this;
+ }
+
+ ///
+ /// Associates a message type with a specific stream category to enable automatic deserialization.
+ /// In event sourcing, streams are often prefixed with a category (e.g., "user-123", "order-456").
+ /// This method tells the client which message types can appear in streams of a given category.
+ ///
+ /// The event or message type that can appear in the category's streams.
+ /// The category prefix (e.g., "user", "order", "account").
+ /// The current instance for method chaining.
+ ///
+ ///
+ /// // Register event types that can appear in user streams
+ /// settings.RegisterMessageTypeForCategory<UserCreated>("user")
+ /// .RegisterMessageTypeForCategory<UserUpdated>("user")
+ /// .RegisterMessageTypeForCategory<UserDeleted>("user");
+ ///
+ ///
+ public KurrentClientSerializationSettings RegisterMessageTypeForCategory(string categoryName) =>
+ RegisterMessageTypeForCategory(categoryName, typeof(T));
+
+ ///
+ /// Registers multiple message types for a specific stream category.
+ ///
+ /// The category name to register the types with.
+ /// The message types to register.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings RegisterMessageTypeForCategory(string categoryName, params Type[] types) {
+ CategoryMessageTypesMap[categoryName] = CategoryMessageTypesMap.TryGetValue(categoryName, out var current)
+ ? [..current, ..types]
+ : types;
+
+ return this;
+ }
+
+ ///
+ /// Maps a .NET type to a specific message type name that will be stored in the message metadata.
+ /// This mapping is used during automatic deserialization, as it tells the client which CLR type
+ /// to instantiate when encountering a message with a particular type name in the database.
+ ///
+ /// The .NET type to register (typically a message class).
+ /// The string identifier to use for this type in the database.
+ /// The current instance for method chaining.
+ ///
+ /// The type name is often different from the .NET type name to support versioning and evolution
+ /// of your domain model without breaking existing stored messages.
+ ///
+ ///
+ ///
+ /// // Register me types with their corresponding type identifiers
+ /// settings.RegisterMessageType<UserCreated>("user-created-v1")
+ /// .RegisterMessageType<OrderPlaced>("order-placed-v2");
+ ///
+ ///
+ public KurrentClientSerializationSettings RegisterMessageType(string typeName) =>
+ RegisterMessageType(typeof(T), typeName);
+
+ ///
+ /// Registers a message type with a specific type name.
+ ///
+ /// The message type to register.
+ /// The type name to register for the message type.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings RegisterMessageType(Type type, string typeName) {
+ MessageTypeMap[type] = typeName;
+
+ return this;
+ }
+
+ ///
+ /// Registers multiple message types with their corresponding type names.
+ ///
+ /// Dictionary mapping types to their type names.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings RegisterMessageTypes(IDictionary typeMap) {
+ foreach (var map in typeMap) {
+ MessageTypeMap[map.Key] = map.Value;
+ }
+
+ return this;
+ }
+
+ ///
+ /// Configures a strongly-typed metadata class for all mes in the system.
+ /// This enables accessing metadata properties in a type-safe manner rather than using dynamic objects.
+ ///
+ /// The metadata class type containing properties matching the expected metadata fields.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings UseMetadataType() =>
+ UseMetadataType(typeof(T));
+
+ ///
+ /// Configures a strongly-typed metadata class for all mes in the system.
+ /// This enables accessing metadata properties in a type-safe manner rather than using dynamic objects.
+ ///
+ /// The metadata class type containing properties matching the expected metadata fields.
+ /// The current instance for method chaining.
+ public KurrentClientSerializationSettings UseMetadataType(Type type) {
+ DefaultMetadataType = type;
+
+ return this;
+ }
+
+ ///
+ /// Creates a deep copy of the current serialization settings.
+ ///
+ /// A new instance with copied settings.
+ internal KurrentClientSerializationSettings Clone() {
+ return new KurrentClientSerializationSettings {
+ BytesSerializer = BytesSerializer,
+ JsonSerializer = JsonSerializer,
+ DefaultContentType = DefaultContentType,
+ MessageTypeMap = new Dictionary(MessageTypeMap),
+ CategoryMessageTypesMap = new Dictionary(CategoryMessageTypesMap),
+ MessageTypeNamingStrategy = MessageTypeNamingStrategy
+ };
+ }
+}
+
+///
+/// Provides operation-specific serialization settings that override the global client configuration
+/// for individual operations like reading from or appending to streams. This allows fine-tuning
+/// serialization behavior on a per-operation basis without changing the client-wide settings.
+///
+public class OperationSerializationSettings {
+ ///
+ /// Controls whether mes should be automatically deserialized for this specific operation.
+ /// When enabled (the default), messages will be converted to their appropriate CLR types.
+ /// When disabled, messages will be returned in their raw serialized form.
+ ///
+ public AutomaticDeserialization AutomaticDeserialization { get; private set; } = AutomaticDeserialization.Enabled;
+
+ ///
+ /// A callback that allows customizing serialization settings for this specific operation.
+ /// This can be used to override type mappings, serializers, or other settings just for
+ /// the scope of a single operation without affecting other operations.
+ ///
+ public Action? ConfigureSettings { get; private set; }
+
+ ///
+ /// A pre-configured settings instance that disables automatic deserialization.
+ /// Use this when you need to access raw message data in its serialized form.
+ ///
+ public static readonly OperationSerializationSettings Disabled = new OperationSerializationSettings {
+ AutomaticDeserialization = AutomaticDeserialization.Disabled
+ };
+
+ ///
+ /// Creates operation-specific serialization settings with custom configuration while keeping
+ /// automatic deserialization enabled. This allows operation-specific type mappings or
+ /// serializer settings without changing the global client configuration.
+ ///
+ /// A callback to customize serialization settings for this operation.
+ /// A configured instance of with enabled deserialization.
+ public static OperationSerializationSettings Configure(Action configure) =>
+ new OperationSerializationSettings {
+ AutomaticDeserialization = AutomaticDeserialization.Enabled,
+ ConfigureSettings = configure
+ };
+}
+
+///
+/// Controls whether the KurrentDB client should automatically deserialize message payloads
+/// into their corresponding CLR types based on the configured type mappings.
+///
+public enum AutomaticDeserialization {
+ ///
+ /// Disables automatic deserialization. Messages will be returned in their raw serialized form,
+ /// requiring manual deserialization by the application. Use this when you need direct access to the raw data
+ /// or when working with messages that don't have registered type mappings.
+ ///
+ Disabled = 0,
+
+ ///
+ /// Enables automatic deserialization. The client will attempt to convert messages into their appropriate
+ /// CLR types using the configured serializers and type mappings. This simplifies working with strongly-typed
+ /// domain messages but requires proper type registration.
+ ///
+ Enabled = 1
+}
diff --git a/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs b/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs
index c730b7b5a..57be0c950 100644
--- a/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs
+++ b/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs
@@ -14,6 +14,20 @@ public partial class KurrentClientSettings {
public static KurrentClientSettings Create(string connectionString) =>
ConnectionStringParser.Parse(connectionString);
+ ///
+ /// Creates client settings from a connection string with additional configuration
+ ///
+ ///
+ /// allows you to make additional customization of client settings
+ ///
+ public static KurrentClientSettings Create(string connectionString, Action configure) {
+ var settings = ConnectionStringParser.Parse(connectionString);
+
+ configure(settings);
+
+ return settings;
+ }
+
private static class ConnectionStringParser {
private const string SchemeSeparator = "://";
private const string UserInfoSeparator = "@";
diff --git a/src/Kurrent.Client/Core/KurrentClientSettings.cs b/src/Kurrent.Client/Core/KurrentClientSettings.cs
index aed914074..71bfa446e 100644
--- a/src/Kurrent.Client/Core/KurrentClientSettings.cs
+++ b/src/Kurrent.Client/Core/KurrentClientSettings.cs
@@ -57,5 +57,11 @@ public partial class KurrentClientSettings {
/// The default deadline for calls. Will not be applied to reads or subscriptions.
///
public TimeSpan? DefaultDeadline { get; set; } = TimeSpan.FromSeconds(10);
+
+ ///
+ /// Provides configuration options for messages serialization and deserialization in the KurrentDB client.
+ /// If null, default settings are used.
+ ///
+ public KurrentClientSerializationSettings Serialization { get; set; } = KurrentClientSerializationSettings.Default();
}
}
diff --git a/src/Kurrent.Client/Core/ResolvedEvent.cs b/src/Kurrent.Client/Core/ResolvedEvent.cs
index 25ca13a78..3b4209082 100644
--- a/src/Kurrent.Client/Core/ResolvedEvent.cs
+++ b/src/Kurrent.Client/Core/ResolvedEvent.cs
@@ -1,3 +1,5 @@
+using Kurrent.Client.Core.Serialization;
+
namespace EventStore.Client {
///
/// A structure representing a single event or a resolved link event.
@@ -22,6 +24,23 @@ public readonly struct ResolvedEvent {
///
public EventRecord OriginalEvent => Link ?? Event;
+ ///
+ /// Returns the deserialized message
+ /// It will be provided or equal to null, depending on the automatic deserialization settings you choose.
+ /// If it's null, you can use to deserialize it manually.
+ ///
+ public readonly Message? Message;
+
+ ///
+ /// Returns the deserialized message data.
+ ///
+ public object? DeserializedData => Message?.Data;
+
+ ///
+ /// Returns the deserialized message metadata.
+ ///
+ public object? DeserializedMetadata => Message?.Metadata;
+
///
/// Position of the if available.
///
@@ -49,12 +68,44 @@ public readonly struct ResolvedEvent {
///
///
///
- public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) {
- Event = @event;
- Link = link;
+ public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) : this(
+ @event,
+ link,
+ null,
+ commitPosition
+ ) { }
+
+ ///
+ /// Constructs a new .
+ ///
+ ///
+ ///
+ ///
+ ///
+ ResolvedEvent(
+ EventRecord @event,
+ EventRecord? link,
+ Message? message,
+ ulong? commitPosition
+ ) {
+ Event = @event;
+ Link = link;
+ Message = message;
OriginalPosition = commitPosition.HasValue
? new Position(commitPosition.Value, (link ?? @event).Position.PreparePosition)
: new Position?();
}
+
+ internal static ResolvedEvent From(
+ EventRecord @event,
+ EventRecord? link,
+ ulong? commitPosition,
+ IMessageSerializer messageSerializer
+ ) {
+ var originalEvent = link ?? @event;
+ return messageSerializer.TryDeserialize(originalEvent, out var message)
+ ? new ResolvedEvent(@event, link, message, commitPosition)
+ : new ResolvedEvent(@event, link, commitPosition);
+ }
}
}
diff --git a/src/Kurrent.Client/Core/Serialization/ISerializer.cs b/src/Kurrent.Client/Core/Serialization/ISerializer.cs
new file mode 100644
index 000000000..93382428d
--- /dev/null
+++ b/src/Kurrent.Client/Core/Serialization/ISerializer.cs
@@ -0,0 +1,30 @@
+namespace Kurrent.Client.Core.Serialization;
+
+///
+/// Defines the core serialization capabilities required by the KurrentDB client.
+/// Implementations of this interface handle the conversion between .NET objects and their
+/// binary representation for storage in and retrieval from the event store.
+///
+/// The client ships default System.Text.Json implementation, but custom implementations can be provided or other formats.
+///
+public interface ISerializer {
+ ///
+ /// Converts a .NET object to its binary representation for storage in the event store.
+ ///
+ /// The object to serialize. This could be an event, command, or metadata object.
+ ///
+ /// A binary representation of the object that can be stored in KurrentDB.
+ ///
+ public ReadOnlyMemory Serialize(object value);
+
+ ///
+ /// Reconstructs a .NET object from its binary representation retrieved from the event store.
+ ///
+ /// The binary data to deserialize, typically retrieved from a KurrentDB event.
+ /// The target .NET type to deserialize the data into, determined from message type mappings.
+ ///
+ /// The deserialized object cast to the specified type, or null if the data cannot be deserialized.
+ /// The returned object will be an instance of the specified type or a compatible subtype.
+ ///
+ public object? Deserialize(ReadOnlyMemory data, Type type);
+}
diff --git a/src/Kurrent.Client/Core/Serialization/Message.cs b/src/Kurrent.Client/Core/Serialization/Message.cs
new file mode 100644
index 000000000..6666831dd
--- /dev/null
+++ b/src/Kurrent.Client/Core/Serialization/Message.cs
@@ -0,0 +1,62 @@
+using EventStore.Client;
+
+namespace Kurrent.Client.Core.Serialization;
+
+///
+/// Represents a message wrapper in the KurrentDB system, containing both domain data and optional metadata.
+/// Messages can represent events, commands, or other domain objects along with their associated metadata.
+///
+/// The message domain data.
+/// Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information.
+/// Unique identifier for this specific message instance. When null, the system will auto-generate an ID.
+public record Message(object Data, object? Metadata, Uuid? MessageId = null) {
+ ///
+ /// Creates a new Message with the specified domain data and message ID, but without metadata.
+ /// This factory method is a convenient shorthand when working with systems that don't require metadata.
+ ///
+ /// The message domain data.
+ /// Unique identifier for this message instance. Must not be Uuid.Empty.
+ /// A new immutable Message instance containing the provided data and ID with null metadata.
+ ///
+ ///
+ /// // Create a message with a specific ID
+ /// var userCreated = new UserCreated { Id = "123", Name = "Alice" };
+ /// var messageId = Uuid.NewUuid();
+ /// var message = Message.From(userCreated, messageId);
+ ///
+ ///
+ public static Message From(object data, Uuid messageId) =>
+ From(data, null, messageId);
+
+ ///
+ /// Creates a new Message with the specified domain data and message ID and metadata.
+ ///
+ /// The message domain data.
+ /// Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information.
+ /// Unique identifier for this specific message instance.
+ /// A new immutable Message instance with the specified properties.
+ /// Thrown when messageId is explicitly set to Uuid.Empty, which is an invalid identifier.
+ ///
+ ///
+ /// // Create a message with data and metadata
+ /// var orderPlaced = new OrderPlaced { OrderId = "ORD-123", Amount = 99.99m };
+ /// var metadata = new EventMetadata {
+ /// UserId = "user-456",
+ /// Timestamp = DateTimeOffset.UtcNow,
+ /// CorrelationId = correlationId
+ /// };
+ ///
+ /// // Let the system assign an ID automatically
+ /// var message = Message.From(orderPlaced, metadata);
+ ///
+ /// // Or specify a custom ID
+ /// var messageWithId = Message.From(orderPlaced, metadata, Uuid.NewUuid());
+ ///
+ ///
+ public static Message From(object data, object? metadata = null, Uuid? messageId = null) {
+ if (messageId == Uuid.Empty)
+ throw new ArgumentOutOfRangeException(nameof(messageId), "Message ID cannot be an empty UUID.");
+
+ return new Message(data, metadata, messageId);
+ }
+}
diff --git a/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs b/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs
new file mode 100644
index 000000000..de66372b2
--- /dev/null
+++ b/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs
@@ -0,0 +1,148 @@
+using System.Diagnostics.CodeAnalysis;
+using EventStore.Client;
+
+namespace Kurrent.Client.Core.Serialization;
+
+using static ContentTypeExtensions;
+
+interface IMessageSerializer {
+ public EventData Serialize(Message value, MessageSerializationContext context);
+
+#if NET48
+ public bool TryDeserialize(EventRecord record, out Message? deserialized);
+#else
+ public bool TryDeserialize(EventRecord record, [NotNullWhen(true)] out Message? deserialized);
+#endif
+}
+
+record MessageSerializationContext(
+ string StreamName,
+ ContentType ContentType
+) {
+ public string CategoryName =>
+ StreamName.Split('-').FirstOrDefault() ?? "no_stream_category";
+}
+
+static class MessageSerializerExtensions {
+ public static EventData[] Serialize(
+ this IMessageSerializer serializer,
+ IEnumerable messages,
+ MessageSerializationContext context
+ ) {
+ return messages.Select(m => serializer.Serialize(m, context)).ToArray();
+ }
+
+ public static IMessageSerializer With(
+ this IMessageSerializer defaultMessageSerializer,
+ KurrentClientSerializationSettings defaultSettings,
+ OperationSerializationSettings? operationSettings
+ ) {
+ if (operationSettings == null)
+ return defaultMessageSerializer;
+
+ if (operationSettings.AutomaticDeserialization == AutomaticDeserialization.Disabled)
+ return NullMessageSerializer.Instance;
+
+ if (operationSettings.ConfigureSettings == null)
+ return defaultMessageSerializer;
+
+ var settings = defaultSettings.Clone();
+ operationSettings.ConfigureSettings.Invoke(settings);
+
+ return new MessageSerializer(SchemaRegistry.From(settings));
+ }
+}
+
+class MessageSerializer(SchemaRegistry schemaRegistry) : IMessageSerializer {
+ readonly ISerializer _jsonSerializer =
+ schemaRegistry.GetSerializer(ContentType.Json);
+
+ readonly IMessageTypeNamingStrategy _messageTypeNamingStrategy =
+ schemaRegistry.MessageTypeNamingStrategy;
+
+ public EventData Serialize(Message message, MessageSerializationContext serializationContext) {
+ var (data, metadata, eventId) = message;
+
+ var eventType = _messageTypeNamingStrategy
+ .ResolveTypeName(
+ message.Data.GetType(),
+ new MessageTypeNamingResolutionContext(serializationContext.CategoryName)
+ );
+
+ var serializedData = schemaRegistry
+ .GetSerializer(serializationContext.ContentType)
+ .Serialize(data);
+
+ var serializedMetadata = metadata != null
+ ? _jsonSerializer.Serialize(metadata)
+ : ReadOnlyMemory.Empty;
+
+ return new EventData(
+ eventId ?? Uuid.NewUuid(),
+ eventType,
+ serializedData,
+ serializedMetadata,
+ serializationContext.ContentType.ToMessageContentType()
+ );
+ }
+
+#if NET48
+ public bool TryDeserialize(EventRecord record, out Message? deserialized) {
+#else
+ public bool TryDeserialize(EventRecord record, [NotNullWhen(true)] out Message? deserialized) {
+#endif
+ if (!TryResolveClrType(record, out var clrType)) {
+ deserialized = null;
+ return false;
+ }
+
+ var data = schemaRegistry
+ .GetSerializer(FromMessageContentType(record.ContentType))
+ .Deserialize(record.Data, clrType!);
+
+ if (data == null) {
+ deserialized = null;
+ return false;
+ }
+
+ object? metadata = record.Metadata.Length > 0 && TryResolveClrMetadataType(record, out var clrMetadataType)
+ ? _jsonSerializer.Deserialize(record.Metadata, clrMetadataType!)
+ : null;
+
+ deserialized = Message.From(data, metadata, record.EventId);
+ return true;
+ }
+
+ public static MessageSerializer From(KurrentClientSerializationSettings? settings = null) {
+ settings ??= KurrentClientSerializationSettings.Default();
+
+ return new MessageSerializer(SchemaRegistry.From(settings));
+ }
+
+ bool TryResolveClrType(EventRecord record, out Type? clrType) =>
+ schemaRegistry
+ .MessageTypeNamingStrategy
+ .TryResolveClrType(record.EventType, out clrType);
+
+ bool TryResolveClrMetadataType(EventRecord record, out Type? clrMetadataType) =>
+ schemaRegistry
+ .MessageTypeNamingStrategy
+ .TryResolveClrMetadataType(record.EventType, out clrMetadataType);
+}
+
+class NullMessageSerializer : IMessageSerializer {
+ public static readonly NullMessageSerializer Instance = new NullMessageSerializer();
+
+ public EventData Serialize(Message value, MessageSerializationContext context) {
+ throw new InvalidOperationException("Cannot serialize, automatic deserialization is disabled");
+ }
+
+#if NET48
+ public bool TryDeserialize(EventRecord record, out Message? deserialized) {
+#else
+ public bool TryDeserialize(EventRecord eventRecord, [NotNullWhen(true)] out Message? deserialized) {
+#endif
+ deserialized = null;
+ return false;
+ }
+}
diff --git a/src/Kurrent.Client/Core/Serialization/MessageTypeRegistry.cs b/src/Kurrent.Client/Core/Serialization/MessageTypeRegistry.cs
new file mode 100644
index 000000000..68752884a
--- /dev/null
+++ b/src/Kurrent.Client/Core/Serialization/MessageTypeRegistry.cs
@@ -0,0 +1,76 @@
+using System.Collections.Concurrent;
+
+namespace Kurrent.Client.Core.Serialization;
+
+interface IMessageTypeRegistry {
+ void Register(Type messageType, string messageTypeName);
+ string? GetTypeName(Type messageType);
+ string GetOrAddTypeName(Type clrType, Func getTypeName);
+ Type? GetClrType(string messageTypeName);
+ Type? GetOrAddClrType(string messageTypeName, Func getClrType);
+}
+
+class MessageTypeRegistry : IMessageTypeRegistry {
+ readonly ConcurrentDictionary _typeMap = new();
+ readonly ConcurrentDictionary _typeNameMap = new();
+
+ public void Register(Type messageType, string messageTypeName) {
+ _typeNameMap.AddOrUpdate(messageType, messageTypeName, (_, _) => messageTypeName);
+ _typeMap.AddOrUpdate(messageTypeName, messageType, (_, type) => type);
+ }
+
+ public string? GetTypeName(Type messageType) =>
+#if NET48
+ _typeNameMap.TryGetValue(messageType, out var typeName) ? typeName : null;
+#else
+ _typeNameMap.GetValueOrDefault(messageType);
+#endif
+
+ public string GetOrAddTypeName(Type clrType, Func getTypeName) =>
+ _typeNameMap.GetOrAdd(
+ clrType,
+ _ => {
+ var typeName = getTypeName(clrType);
+
+ _typeMap.TryAdd(typeName, clrType);
+
+ return typeName;
+ }
+ );
+
+ public Type? GetClrType(string messageTypeName) =>
+#if NET48
+ _typeMap.TryGetValue(messageTypeName, out var clrType) ? clrType : null;
+#else
+ _typeMap.GetValueOrDefault(messageTypeName);
+#endif
+
+ public Type? GetOrAddClrType(string messageTypeName, Func getClrType) =>
+ _typeMap.GetOrAdd(
+ messageTypeName,
+ _ => {
+ var clrType = getClrType(messageTypeName);
+
+ if (clrType == null)
+ return null;
+
+ _typeNameMap.TryAdd(clrType, messageTypeName);
+
+ return clrType;
+ }
+ );
+}
+
+static class MessageTypeRegistryExtensions {
+ public static void Register(this IMessageTypeRegistry messageTypeRegistry, string messageTypeName) =>
+ messageTypeRegistry.Register(typeof(T), messageTypeName);
+
+ public static void Register(this IMessageTypeRegistry messageTypeRegistry, IDictionary typeMap) {
+ foreach (var map in typeMap) {
+ messageTypeRegistry.Register(map.Key, map.Value);
+ }
+ }
+
+ public static string? GetTypeName(this IMessageTypeRegistry messageTypeRegistry) =>
+ messageTypeRegistry.GetTypeName(typeof(TMessageType));
+}
diff --git a/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs b/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs
new file mode 100644
index 000000000..12ff65f6b
--- /dev/null
+++ b/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs
@@ -0,0 +1,100 @@
+using System.Diagnostics.CodeAnalysis;
+using Kurrent.Diagnostics.Tracing;
+
+namespace Kurrent.Client.Core.Serialization;
+
+public interface IMessageTypeNamingStrategy {
+ string ResolveTypeName(Type messageType, MessageTypeNamingResolutionContext resolutionContext);
+
+#if NET48
+ bool TryResolveClrType(string messageTypeName, out Type? type);
+#else
+ bool TryResolveClrType(string messageTypeName, [NotNullWhen(true)] out Type? type);
+#endif
+
+
+#if NET48
+ bool TryResolveClrMetadataType(string messageTypeName, out Type? type);
+#else
+ bool TryResolveClrMetadataType(string messageTypeName, [NotNullWhen(true)] out Type? type);
+#endif
+}
+
+public record MessageTypeNamingResolutionContext(string CategoryName);
+
+class MessageTypeNamingStrategyWrapper(
+ IMessageTypeRegistry messageTypeRegistry,
+ IMessageTypeNamingStrategy messageTypeNamingStrategy
+) : IMessageTypeNamingStrategy {
+ public string ResolveTypeName(Type messageType, MessageTypeNamingResolutionContext resolutionContext) {
+ return messageTypeRegistry.GetOrAddTypeName(
+ messageType,
+ _ => messageTypeNamingStrategy.ResolveTypeName(messageType, resolutionContext)
+ );
+ }
+
+#if NET48
+ public bool TryResolveClrType(string messageTypeName, out Type? type) {
+#else
+ public bool TryResolveClrType(string messageTypeName, [NotNullWhen(true)] out Type? type) {
+#endif
+ type = messageTypeRegistry.GetOrAddClrType(
+ messageTypeName,
+ _ => messageTypeNamingStrategy.TryResolveClrType(messageTypeName, out var resolvedType)
+ ? resolvedType
+ : null
+ );
+
+ return type != null;
+ }
+
+#if NET48
+ public bool TryResolveClrMetadataType(string messageTypeName, out Type? type) {
+#else
+ public bool TryResolveClrMetadataType(string messageTypeName, [NotNullWhen(true)] out Type? type) {
+#endif
+ type = messageTypeRegistry.GetOrAddClrType(
+ $"{messageTypeName}-metadata",
+ _ => messageTypeNamingStrategy.TryResolveClrMetadataType(messageTypeName, out var resolvedType)
+ ? resolvedType
+ : null
+ );
+
+ return type != null;
+ }
+}
+
+public class DefaultMessageTypeNamingStrategy(Type? defaultMetadataType) : IMessageTypeNamingStrategy {
+ readonly Type _defaultMetadataType = defaultMetadataType ?? typeof(TracingMetadata);
+
+ public string ResolveTypeName(Type messageType, MessageTypeNamingResolutionContext resolutionContext) =>
+ $"{resolutionContext.CategoryName}-{messageType.FullName}";
+
+#if NET48
+ public bool TryResolveClrType(string messageTypeName, out Type? type) {
+#else
+ public bool TryResolveClrType(string messageTypeName, [NotNullWhen(true)] out Type? type) {
+#endif
+ var categorySeparatorIndex = messageTypeName.IndexOf('-');
+
+ if (categorySeparatorIndex == -1 || categorySeparatorIndex == messageTypeName.Length - 1) {
+ type = null;
+ return false;
+ }
+
+ var clrTypeName = messageTypeName[(categorySeparatorIndex + 1)..];
+
+ type = TypeProvider.GetTypeByFullName(clrTypeName);
+
+ return type != null;
+ }
+
+#if NET48
+ public bool TryResolveClrMetadataType(string messageTypeName, out Type? type) {
+#else
+ public bool TryResolveClrMetadataType(string messageTypeName, [NotNullWhen(true)] out Type? type) {
+#endif
+ type = _defaultMetadataType;
+ return true;
+ }
+}
diff --git a/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs b/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs
new file mode 100644
index 000000000..9a5ad7515
--- /dev/null
+++ b/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs
@@ -0,0 +1,91 @@
+using EventStore.Client;
+
+namespace Kurrent.Client.Core.Serialization;
+
+using static Constants.Metadata.ContentTypes;
+
+public enum ContentType {
+ Json = 1,
+
+ // Protobuf = 2,
+ // Avro = 3,
+ Bytes = 4
+}
+
+static class ContentTypeExtensions {
+ public static ContentType FromMessageContentType(string contentType) =>
+ contentType == ApplicationJson
+ ? ContentType.Json
+ : ContentType.Bytes;
+
+ public static string ToMessageContentType(this ContentType contentType) =>
+ contentType switch {
+ ContentType.Json => ApplicationJson,
+ ContentType.Bytes => ApplicationOctetStream,
+ _ => throw new ArgumentOutOfRangeException(nameof(contentType), contentType, null)
+ };
+}
+
+class SchemaRegistry(
+ IDictionary serializers,
+ IMessageTypeNamingStrategy messageTypeNamingStrategy
+) {
+ public IMessageTypeNamingStrategy MessageTypeNamingStrategy { get; } = messageTypeNamingStrategy;
+
+ public ISerializer GetSerializer(ContentType schemaType) =>
+ serializers[schemaType];
+
+ public static SchemaRegistry From(KurrentClientSerializationSettings settings) {
+ var messageTypeNamingStrategy =
+ settings.MessageTypeNamingStrategy ?? new DefaultMessageTypeNamingStrategy(settings.DefaultMetadataType);
+
+ var categoriesTypeMap = ResolveMessageTypeUsingNamingStrategy(
+ settings.CategoryMessageTypesMap,
+ messageTypeNamingStrategy
+ );
+
+ var messageTypeRegistry = new MessageTypeRegistry();
+ messageTypeRegistry.Register(settings.MessageTypeMap);
+ messageTypeRegistry.Register(categoriesTypeMap);
+
+ var serializers = new Dictionary {
+ {
+ ContentType.Json,
+ settings.JsonSerializer ?? new SystemTextJsonSerializer()
+ }, {
+ ContentType.Bytes,
+ settings.BytesSerializer ?? new SystemTextJsonSerializer()
+ }
+ };
+
+ return new SchemaRegistry(
+ serializers,
+ new MessageTypeNamingStrategyWrapper(
+ messageTypeRegistry,
+ settings.MessageTypeNamingStrategy ?? new DefaultMessageTypeNamingStrategy(settings.DefaultMetadataType)
+ )
+ );
+ }
+
+ static Dictionary ResolveMessageTypeUsingNamingStrategy(
+ IDictionary categoryMessageTypesMap,
+ IMessageTypeNamingStrategy messageTypeNamingStrategy
+ ) =>
+ categoryMessageTypesMap
+ .SelectMany(
+ categoryTypes => categoryTypes.Value.Select(
+ type =>
+ (
+ Type: type,
+ TypeName: messageTypeNamingStrategy.ResolveTypeName(
+ type,
+ new MessageTypeNamingResolutionContext(categoryTypes.Key)
+ )
+ )
+ )
+ )
+ .ToDictionary(
+ ks => ks.Type,
+ vs => vs.TypeName
+ );
+}
diff --git a/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs b/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs
new file mode 100644
index 000000000..4efa7be96
--- /dev/null
+++ b/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs
@@ -0,0 +1,32 @@
+using System.Text.Json;
+using System.Text.Json.Serialization;
+
+namespace Kurrent.Client.Core.Serialization;
+
+public class SystemTextJsonSerializationSettings {
+ public static readonly JsonSerializerOptions DefaultJsonSerializerOptions =
+ new JsonSerializerOptions(JsonSerializerOptions.Default) {
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
+ DictionaryKeyPolicy = JsonNamingPolicy.CamelCase,
+ PropertyNameCaseInsensitive = false,
+ DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
+ UnknownTypeHandling = JsonUnknownTypeHandling.JsonNode,
+ UnmappedMemberHandling = JsonUnmappedMemberHandling.Skip,
+ NumberHandling = JsonNumberHandling.AllowReadingFromString,
+ Converters = {
+ new JsonStringEnumConverter(JsonNamingPolicy.CamelCase),
+ }
+ };
+
+ public JsonSerializerOptions Options { get; set; } = DefaultJsonSerializerOptions;
+}
+
+public class SystemTextJsonSerializer(SystemTextJsonSerializationSettings? options = null) : ISerializer {
+ readonly JsonSerializerOptions _options = options?.Options ?? SystemTextJsonSerializationSettings.DefaultJsonSerializerOptions;
+
+ public ReadOnlyMemory Serialize(object value) =>
+ JsonSerializer.SerializeToUtf8Bytes(value, _options);
+
+ public object? Deserialize(ReadOnlyMemory data, Type type) =>
+ !data.IsEmpty ? JsonSerializer.Deserialize(data.Span, type, _options) : null;
+}
diff --git a/src/Kurrent.Client/Core/Serialization/TypeProvider.cs b/src/Kurrent.Client/Core/Serialization/TypeProvider.cs
new file mode 100644
index 000000000..f05f23f87
--- /dev/null
+++ b/src/Kurrent.Client/Core/Serialization/TypeProvider.cs
@@ -0,0 +1,15 @@
+namespace Kurrent.Client.Core.Serialization;
+
+static class TypeProvider {
+ public static Type? GetTypeByFullName(string fullName) =>
+ Type.GetType(fullName) ?? GetFirstMatchingTypeFromCurrentDomainAssembly(fullName);
+
+ static Type? GetFirstMatchingTypeFromCurrentDomainAssembly(string fullName) {
+ var firstNamespacePart = fullName.Split('.')[0];
+
+ return AppDomain.CurrentDomain.GetAssemblies()
+ .OrderByDescending(assembly => assembly.FullName?.StartsWith(firstNamespacePart) == true)
+ .Select(assembly => assembly.GetType(fullName))
+ .FirstOrDefault(type => type != null);
+ }
+}
diff --git a/src/Kurrent.Client/Kurrent.Client.csproj b/src/Kurrent.Client/Kurrent.Client.csproj
index e6652b773..59ebc0861 100644
--- a/src/Kurrent.Client/Kurrent.Client.csproj
+++ b/src/Kurrent.Client/Kurrent.Client.csproj
@@ -101,7 +101,7 @@
-
+
diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs
index 779413111..1ed1b40a0 100644
--- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs
+++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs
@@ -2,11 +2,65 @@
using EventStore.Client.PersistentSubscriptions;
using EventStore.Client.Diagnostics;
using Grpc.Core;
-
+using Kurrent.Client.Core.Serialization;
using static EventStore.Client.PersistentSubscriptions.PersistentSubscriptions;
using static EventStore.Client.PersistentSubscriptions.ReadResp.ContentOneofCase;
namespace EventStore.Client {
+ public class SubscribeToPersistentSubscriptionOptions {
+ ///
+ /// The size of the buffer.
+ ///
+ public int BufferSize { get; set; } = 10;
+
+ ///
+ /// The optional user credentials to perform operation with.
+ ///
+ public UserCredentials? UserCredentials { get; set; } = null;
+
+ ///
+ /// Allows to customize or disable the automatic deserialization
+ ///
+ public OperationSerializationSettings? SerializationSettings { get; set; }
+ }
+
+ public class PersistentSubscriptionListener {
+ ///
+ /// A handler called when a new event is received over the subscription.
+ ///
+#if NET48
+ public Func EventAppeared { get; set; } =
+ null!;
+#else
+ public required Func EventAppeared {
+ get;
+ set;
+ }
+#endif
+ ///
+ /// A handler called if the subscription is dropped.
+ ///
+ public Action? SubscriptionDropped { get; set; }
+
+ ///
+ /// Returns the subscription listener with configured handlers
+ ///
+ /// Handler invoked when a new event is received over the subscription.
+ /// A handler invoked if the subscription is dropped.
+ /// A handler called when a checkpoint is reached.
+ /// Set the checkpointInterval in subscription filter options to define how often this method is called.
+ ///
+ ///
+ public static PersistentSubscriptionListener Handle(
+ Func eventAppeared,
+ Action? subscriptionDropped = null
+ ) =>
+ new PersistentSubscriptionListener {
+ EventAppeared = eventAppeared,
+ SubscriptionDropped = subscriptionDropped
+ };
+ }
+
partial class KurrentPersistentSubscriptionsClient {
///
/// Subscribes to a persistent subscription.
@@ -16,10 +70,13 @@ partial class KurrentPersistentSubscriptionsClient {
///
[Obsolete("SubscribeAsync is no longer supported. Use SubscribeToStream with manual acks instead.", false)]
public async Task SubscribeAsync(
- string streamName, string groupName,
+ string streamName,
+ string groupName,
Func eventAppeared,
Action? subscriptionDropped = null,
- UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true,
+ UserCredentials? userCredentials = null,
+ int bufferSize = 10,
+ bool autoAck = true,
CancellationToken cancellationToken = default
) {
if (autoAck) {
@@ -28,16 +85,17 @@ public async Task SubscribeAsync(
);
}
- return await PersistentSubscription
- .Confirm(
- SubscribeToStream(streamName, groupName, bufferSize, userCredentials, cancellationToken),
- eventAppeared,
- subscriptionDropped ?? delegate { },
- _log,
- userCredentials,
- cancellationToken
- )
- .ConfigureAwait(false);
+ return await SubscribeToStreamAsync(
+ streamName,
+ groupName,
+ PersistentSubscriptionListener.Handle(eventAppeared, subscriptionDropped),
+ new SubscribeToPersistentSubscriptionOptions {
+ UserCredentials = userCredentials,
+ BufferSize = bufferSize,
+ SerializationSettings = OperationSerializationSettings.Disabled
+ },
+ cancellationToken
+ );
}
///
@@ -46,20 +104,45 @@ public async Task SubscribeAsync(
///
///
///
- public async Task SubscribeToStreamAsync(
- string streamName, string groupName,
+ public Task SubscribeToStreamAsync(
+ string streamName,
+ string groupName,
Func eventAppeared,
Action? subscriptionDropped = null,
- UserCredentials? userCredentials = null, int bufferSize = 10,
+ UserCredentials? userCredentials = null,
+ int bufferSize = 10,
+ CancellationToken cancellationToken = default
+ ) =>
+ SubscribeToStreamAsync(
+ streamName,
+ groupName,
+ PersistentSubscriptionListener.Handle(eventAppeared, subscriptionDropped),
+ new SubscribeToPersistentSubscriptionOptions {
+ UserCredentials = userCredentials,
+ BufferSize = bufferSize,
+ SerializationSettings = OperationSerializationSettings.Disabled
+ },
+ cancellationToken
+ );
+
+ ///
+ /// Subscribes to a persistent subscription. Messages must be manually acknowledged
+ ///
+ ///
+ ///
+ ///
+ public async Task SubscribeToStreamAsync(
+ string streamName,
+ string groupName,
+ PersistentSubscriptionListener listener,
+ SubscribeToPersistentSubscriptionOptions options,
CancellationToken cancellationToken = default
) {
return await PersistentSubscription
.Confirm(
- SubscribeToStream(streamName, groupName, bufferSize, userCredentials, cancellationToken),
- eventAppeared,
- subscriptionDropped ?? delegate { },
+ SubscribeToStream(streamName, groupName, options, cancellationToken),
+ listener,
_log,
- userCredentials,
cancellationToken
)
.ConfigureAwait(false);
@@ -75,8 +158,65 @@ public async Task SubscribeToStreamAsync(
/// The optional .
///
public PersistentSubscriptionResult SubscribeToStream(
- string streamName, string groupName, int bufferSize = 10,
- UserCredentials? userCredentials = null, CancellationToken cancellationToken = default
+ string streamName,
+ string groupName,
+ int bufferSize,
+ UserCredentials? userCredentials = null,
+ CancellationToken cancellationToken = default
+ ) {
+ return SubscribeToStream(
+ streamName,
+ groupName,
+ new SubscribeToPersistentSubscriptionOptions {
+ BufferSize = bufferSize,
+ UserCredentials = userCredentials,
+ SerializationSettings = OperationSerializationSettings.Disabled
+ },
+ cancellationToken
+ );
+ }
+
+
+
+ ///
+ /// Subscribes to a persistent subscription. Messages must be manually acknowledged.
+ ///
+ /// The name of the stream to read events from.
+ /// The name of the persistent subscription group.
+ /// The size of the buffer.
+ /// The optional user credentials to perform operation with.
+ /// The optional .
+ ///
+ public PersistentSubscriptionResult SubscribeToStream(
+ string streamName,
+ string groupName,
+ UserCredentials? userCredentials,
+ CancellationToken cancellationToken = default
+ ) {
+ return SubscribeToStream(
+ streamName,
+ groupName,
+ new SubscribeToPersistentSubscriptionOptions {
+ UserCredentials = userCredentials,
+ SerializationSettings = OperationSerializationSettings.Disabled
+ },
+ cancellationToken
+ );
+ }
+
+ ///
+ /// Subscribes to a persistent subscription. Messages must be manually acknowledged.
+ ///
+ /// The name of the stream to read events from.
+ /// The name of the persistent subscription group.
+ /// Optional settings to configure subscription
+ /// The optional .
+ ///
+ public PersistentSubscriptionResult SubscribeToStream(
+ string streamName,
+ string groupName,
+ SubscribeToPersistentSubscriptionOptions options,
+ CancellationToken cancellationToken = default
) {
if (streamName == null) {
throw new ArgumentNullException(nameof(streamName));
@@ -94,12 +234,12 @@ public PersistentSubscriptionResult SubscribeToStream(
throw new ArgumentException($"{nameof(groupName)} may not be empty.", nameof(groupName));
}
- if (bufferSize <= 0) {
- throw new ArgumentOutOfRangeException(nameof(bufferSize));
+ if (options.BufferSize <= 0) {
+ throw new ArgumentOutOfRangeException(nameof(options.BufferSize));
}
var readOptions = new ReadReq.Types.Options {
- BufferSize = bufferSize,
+ BufferSize = options.BufferSize,
GroupName = groupName,
UuidOption = new ReadReq.Types.Options.Types.UUIDOption { Structured = new Empty() }
};
@@ -127,7 +267,8 @@ public PersistentSubscriptionResult SubscribeToStream(
},
new() { Options = readOptions },
Settings,
- userCredentials,
+ options.UserCredentials,
+ _messageSerializer.With(Settings.Serialization, options.SerializationSettings),
cancellationToken
);
}
@@ -135,23 +276,41 @@ public PersistentSubscriptionResult SubscribeToStream(
///
/// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged
///
- public async Task SubscribeToAllAsync(
+ public Task SubscribeToAllAsync(
string groupName,
Func eventAppeared,
Action? subscriptionDropped = null,
- UserCredentials? userCredentials = null, int bufferSize = 10,
+ UserCredentials? userCredentials = null,
+ int bufferSize = 10,
CancellationToken cancellationToken = default
) =>
- await SubscribeToStreamAsync(
- SystemStreams.AllStream,
- groupName,
- eventAppeared,
- subscriptionDropped,
- userCredentials,
- bufferSize,
- cancellationToken
- )
- .ConfigureAwait(false);
+ SubscribeToAllAsync(
+ groupName,
+ PersistentSubscriptionListener.Handle(eventAppeared, subscriptionDropped),
+ new SubscribeToPersistentSubscriptionOptions {
+ BufferSize = bufferSize,
+ UserCredentials = userCredentials,
+ SerializationSettings = OperationSerializationSettings.Disabled
+ },
+ cancellationToken
+ );
+
+ ///
+ /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged
+ ///
+ public Task SubscribeToAllAsync(
+ string groupName,
+ PersistentSubscriptionListener listener,
+ SubscribeToPersistentSubscriptionOptions options,
+ CancellationToken cancellationToken = default
+ ) =>
+ SubscribeToStreamAsync(
+ SystemStreams.AllStream,
+ groupName,
+ listener,
+ options,
+ cancellationToken
+ );
///
/// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged.
@@ -162,22 +321,76 @@ await SubscribeToStreamAsync(
/// The optional .
///
public PersistentSubscriptionResult SubscribeToAll(
- string groupName, int bufferSize = 10,
- UserCredentials? userCredentials = null, CancellationToken cancellationToken = default
+ string groupName,
+ int bufferSize,
+ UserCredentials? userCredentials = null,
+ CancellationToken cancellationToken = default
) =>
- SubscribeToStream(SystemStreams.AllStream, groupName, bufferSize, userCredentials, cancellationToken);
+ SubscribeToStream(
+ SystemStreams.AllStream,
+ groupName,
+ new SubscribeToPersistentSubscriptionOptions {
+ BufferSize = bufferSize,
+ UserCredentials = userCredentials,
+ SerializationSettings = OperationSerializationSettings.Disabled
+ },
+ cancellationToken
+ );
+
+
+ ///
+ /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged.
+ ///
+ /// The name of the persistent subscription group.
+ /// The size of the buffer.
+ /// The optional user credentials to perform operation with.
+ /// The optional .
+ ///
+ public PersistentSubscriptionResult SubscribeToAll(
+ string groupName,
+ UserCredentials? userCredentials,
+ CancellationToken cancellationToken = default
+ ) =>
+ SubscribeToStream(
+ SystemStreams.AllStream,
+ groupName,
+ new SubscribeToPersistentSubscriptionOptions {
+ UserCredentials = userCredentials,
+ SerializationSettings = OperationSerializationSettings.Disabled
+ },
+ cancellationToken
+ );
+
+ ///
+ /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged.
+ ///
+ /// The name of the persistent subscription group.
+ /// Optional settings to configure subscription
+ /// The optional .
+ ///
+ public PersistentSubscriptionResult SubscribeToAll(
+ string groupName,
+ SubscribeToPersistentSubscriptionOptions options,
+ CancellationToken cancellationToken = default
+ ) =>
+ SubscribeToStream(
+ SystemStreams.AllStream,
+ groupName,
+ options,
+ cancellationToken
+ );
///
public class PersistentSubscriptionResult : IAsyncEnumerable, IAsyncDisposable, IDisposable {
- const int MaxEventIdLength = 2000;
-
- readonly ReadReq _request;
- readonly Channel _channel;
- readonly CancellationTokenSource _cts;
- readonly CallOptions _callOptions;
+ const int MaxEventIdLength = 2000;
- AsyncDuplexStreamingCall? _call;
- int _messagesEnumerated;
+ readonly ReadReq _request;
+ readonly Channel _channel;
+ readonly CancellationTokenSource _cts;
+ readonly CallOptions _callOptions;
+
+ AsyncDuplexStreamingCall? _call;
+ int _messagesEnumerated;
///
/// The server-generated unique identifier for the subscription.
@@ -200,30 +413,33 @@ public class PersistentSubscriptionResult : IAsyncEnumerable, IAs
public IAsyncEnumerable Messages {
get {
if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1)
- throw new InvalidOperationException("Messages may only be enumerated once.");
+ throw new InvalidOperationException("Messages may only be enumerated once.");
return GetMessages();
async IAsyncEnumerable GetMessages() {
- try {
- await foreach (var message in _channel.Reader.ReadAllAsync(_cts.Token)) {
- if (message is PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId))
- SubscriptionId = subscriptionId;
-
- yield return message;
- }
- }
- finally {
- _cts.Cancel();
- }
+ try {
+ await foreach (var message in _channel.Reader.ReadAllAsync(_cts.Token)) {
+ if (message is PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId))
+ SubscriptionId = subscriptionId;
+
+ yield return message;
+ }
+ } finally {
+ _cts.Cancel();
+ }
}
}
}
internal PersistentSubscriptionResult(
- string streamName, string groupName,
+ string streamName,
+ string groupName,
Func> selectChannelInfo,
- ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials,
+ ReadReq request,
+ KurrentClientSettings settings,
+ UserCredentials? userCredentials,
+ IMessageSerializer messageSerializer,
CancellationToken cancellationToken
) {
StreamName = streamName;
@@ -247,20 +463,21 @@ CancellationToken cancellationToken
async Task PumpMessages() {
try {
- var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false);
- var client = new PersistentSubscriptionsClient(channelInfo.CallInvoker);
+ var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false);
+ var client = new PersistentSubscriptionsClient(channelInfo.CallInvoker);
_call = client.Read(_callOptions);
await _call.RequestStream.WriteAsync(_request).ConfigureAwait(false);
- await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token).ConfigureAwait(false)) {
+ await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token)
+ .ConfigureAwait(false)) {
PersistentSubscriptionMessage subscriptionMessage = response.ContentCase switch {
SubscriptionConfirmation => new PersistentSubscriptionMessage.SubscriptionConfirmation(
response.SubscriptionConfirmation.SubscriptionId
),
Event => new PersistentSubscriptionMessage.Event(
- ConvertToResolvedEvent(response),
+ ConvertToResolvedEvent(response, messageSerializer),
response.Event.CountCase switch {
ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount,
_ => null
@@ -292,17 +509,18 @@ async Task PumpMessages() {
// The absence of this header leads to an RpcException with the status code 'Cancelled' and the message "No grpc-status found on response".
// The switch statement below handles these specific exceptions and translates them into the appropriate
// PersistentSubscriptionDroppedByServerException exception.
- case RpcException { StatusCode: StatusCode.Unavailable } rex1 when rex1.Status.Detail.Contains("WinHttpException: Error 12030"):
+ case RpcException { StatusCode: StatusCode.Unavailable } rex1
+ when rex1.Status.Detail.Contains("WinHttpException: Error 12030"):
case RpcException { StatusCode: StatusCode.Cancelled } rex2
- when rex2.Status.Detail.Contains("No grpc-status found on response"):
+ when rex2.Status.Detail.Contains("No grpc-status found on response"):
ex = new PersistentSubscriptionDroppedByServerException(StreamName, GroupName, ex);
break;
}
#endif
if (ex is PersistentSubscriptionNotFoundException) {
await _channel.Writer
- .WriteAsync(PersistentSubscriptionMessage.NotFound.Instance, cancellationToken)
- .ConfigureAwait(false);
+ .WriteAsync(PersistentSubscriptionMessage.NotFound.Instance, cancellationToken)
+ .ConfigureAwait(false);
_channel.Writer.TryComplete();
return;
@@ -360,19 +578,26 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par
/// A reason given.
/// The s to nak. There should not be more than 2000 to nak at a time.
/// The number of resolvedEvents exceeded the limit of 2000.
- public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params ResolvedEvent[] resolvedEvents) =>
- Nack(action, reason, Array.ConvertAll(resolvedEvents, re => re.OriginalEvent.EventId));
-
- static ResolvedEvent ConvertToResolvedEvent(ReadResp response) => new(
- ConvertToEventRecord(response.Event.Event)!,
- ConvertToEventRecord(response.Event.Link),
- response.Event.PositionCase switch {
- ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
- _ => null
- }
- );
+ public Task Nack(
+ PersistentSubscriptionNakEventAction action, string reason, params ResolvedEvent[] resolvedEvents
+ ) =>
+ Nack(action, reason, Array.ConvertAll(resolvedEvents, re => re.OriginalEvent.EventId));
+
+ static ResolvedEvent ConvertToResolvedEvent(
+ ReadResp response,
+ IMessageSerializer messageSerializer
+ ) =>
+ ResolvedEvent.From(
+ ConvertToEventRecord(response.Event.Event)!,
+ ConvertToEventRecord(response.Event.Link),
+ response.Event.PositionCase switch {
+ ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
+ _ => null
+ },
+ messageSerializer
+ );
- Task AckInternal(params Uuid[] eventIds) {
+ Task AckInternal(params Uuid[] eventIds) {
if (eventIds.Length > MaxEventIdLength) {
throw new ArgumentException(
$"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.",
@@ -393,7 +618,7 @@ Task AckInternal(params Uuid[] eventIds) {
);
}
- Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action, string reason) {
+ Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action, string reason) {
if (eventIds.Length > MaxEventIdLength) {
throw new ArgumentException(
$"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.",
@@ -422,7 +647,7 @@ Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action,
);
}
- static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) =>
+ static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) =>
e is null
? null
: new EventRecord(
@@ -465,14 +690,94 @@ public void Dispose() {
}
///
- public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) {
+ public async IAsyncEnumerator GetAsyncEnumerator(
+ CancellationToken cancellationToken = default
+ ) {
await foreach (var message in Messages.WithCancellation(cancellationToken)) {
if (message is not PersistentSubscriptionMessage.Event(var resolvedEvent, _))
- continue;
+ continue;
yield return resolvedEvent;
}
}
}
}
+
+ public static class KurrentClientPersistentSubscriptionsExtensions {
+ ///
+ /// Subscribes to a persistent subscription. Messages must be manually acknowledged
+ ///
+ ///
+ ///
+ ///
+ public static Task SubscribeToStreamAsync(
+ this KurrentPersistentSubscriptionsClient kurrentClient,
+ string streamName,
+ string groupName,
+ PersistentSubscriptionListener listener,
+ CancellationToken cancellationToken = default
+ ) =>
+ kurrentClient.SubscribeToStreamAsync(
+ streamName,
+ groupName,
+ listener,
+ new SubscribeToPersistentSubscriptionOptions(),
+ cancellationToken
+ );
+
+ ///
+ /// Subscribes to a persistent subscription. Messages must be manually acknowledged.
+ ///
+ ///
+ /// The name of the stream to read events from.
+ /// The name of the persistent subscription group.
+ /// The optional .
+ ///
+ public static KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult SubscribeToStream(
+ this KurrentPersistentSubscriptionsClient kurrentClient,
+ string streamName,
+ string groupName,
+ CancellationToken cancellationToken = default
+ ) =>
+ kurrentClient.SubscribeToStream(
+ streamName,
+ groupName,
+ new SubscribeToPersistentSubscriptionOptions(),
+ cancellationToken
+ );
+
+ ///
+ /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged
+ ///
+ public static Task SubscribeToAllAsync(
+ this KurrentPersistentSubscriptionsClient kurrentClient,
+ string groupName,
+ PersistentSubscriptionListener listener,
+ CancellationToken cancellationToken = default
+ ) =>
+ kurrentClient.SubscribeToAllAsync(
+ groupName,
+ listener,
+ new SubscribeToPersistentSubscriptionOptions(),
+ cancellationToken
+ );
+
+ ///
+ /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged.
+ ///
+ ///
+ /// The name of the persistent subscription group.
+ /// The optional .
+ ///
+ public static KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult SubscribeToAll(
+ this KurrentPersistentSubscriptionsClient kurrentClient,
+ string groupName,
+ CancellationToken cancellationToken = default
+ ) =>
+ kurrentClient.SubscribeToAll(
+ groupName,
+ new SubscribeToPersistentSubscriptionOptions(),
+ cancellationToken
+ );
+ }
}
diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs
index 070f32698..0d251f59c 100644
--- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs
+++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs
@@ -1,6 +1,7 @@
using System.Text.Encodings.Web;
using System.Threading.Channels;
using Grpc.Core;
+using Kurrent.Client.Core.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@@ -9,13 +10,14 @@ namespace EventStore.Client {
/// The client used to manage persistent subscriptions in the KurrentDB.
///
public sealed partial class KurrentPersistentSubscriptionsClient : KurrentClientBase {
- private static BoundedChannelOptions ReadBoundedChannelOptions = new (1) {
+ static BoundedChannelOptions ReadBoundedChannelOptions = new (1) {
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = true
};
- private readonly ILogger _log;
+ readonly ILogger _log;
+ readonly IMessageSerializer _messageSerializer;
///
/// Constructs a new .
@@ -37,6 +39,8 @@ public KurrentPersistentSubscriptionsClient(KurrentClientSettings? settings) : b
}) {
_log = Settings.LoggerFactory?.CreateLogger()
?? new NullLogger();
+
+ _messageSerializer = MessageSerializer.From(settings?.Serialization);
}
private static string UrlEncode(string s) {
diff --git a/src/Kurrent.Client/PersistentSubscriptions/PersistentSubscription.cs b/src/Kurrent.Client/PersistentSubscriptions/PersistentSubscription.cs
index 637f54e30..0674cb9ac 100644
--- a/src/Kurrent.Client/PersistentSubscriptions/PersistentSubscription.cs
+++ b/src/Kurrent.Client/PersistentSubscriptions/PersistentSubscription.cs
@@ -7,7 +7,9 @@ namespace EventStore.Client {
/// Represents a persistent subscription connection.
///
public class PersistentSubscription : IDisposable {
- private readonly KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult _persistentSubscriptionResult;
+ private readonly KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult
+ _persistentSubscriptionResult;
+
private readonly IAsyncEnumerator _enumerator;
private readonly Func _eventAppeared;
private readonly Action _subscriptionDropped;
@@ -23,9 +25,10 @@ public class PersistentSubscription : IDisposable {
internal static async Task Confirm(
KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult,
- Func eventAppeared,
- Action subscriptionDropped,
- ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default) {
+ PersistentSubscriptionListener listener,
+ ILogger log,
+ CancellationToken cancellationToken = default
+ ) {
var enumerator = persistentSubscriptionResult
.Messages
.GetAsyncEnumerator(cancellationToken);
@@ -34,11 +37,19 @@ internal static async Task Confirm(
return (result, enumerator.Current) switch {
(true, PersistentSubscriptionMessage.SubscriptionConfirmation (var subscriptionId)) =>
- new PersistentSubscription(persistentSubscriptionResult, enumerator, subscriptionId, eventAppeared,
- subscriptionDropped, log, cancellationToken),
+ new PersistentSubscription(
+ persistentSubscriptionResult,
+ enumerator,
+ subscriptionId,
+ listener,
+ log,
+ cancellationToken
+ ),
(true, PersistentSubscriptionMessage.NotFound) =>
- throw new PersistentSubscriptionNotFoundException(persistentSubscriptionResult.StreamName,
- persistentSubscriptionResult.GroupName),
+ throw new PersistentSubscriptionNotFoundException(
+ persistentSubscriptionResult.StreamName,
+ persistentSubscriptionResult.GroupName
+ ),
_ => throw new InvalidOperationException("Subscription could not be confirmed.")
};
}
@@ -46,17 +57,19 @@ internal static async Task Confirm(
// PersistentSubscription takes responsibility for disposing the call and the disposable
private PersistentSubscription(
KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult,
- IAsyncEnumerator enumerator, string subscriptionId,
- Func eventAppeared,
- Action subscriptionDropped, ILogger log,
- CancellationToken cancellationToken) {
+ IAsyncEnumerator enumerator,
+ string subscriptionId,
+ PersistentSubscriptionListener listener,
+ ILogger log,
+ CancellationToken cancellationToken
+ ) {
_persistentSubscriptionResult = persistentSubscriptionResult;
- _enumerator = enumerator;
- SubscriptionId = subscriptionId;
- _eventAppeared = eventAppeared;
- _subscriptionDropped = subscriptionDropped;
- _log = log;
- _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ _enumerator = enumerator;
+ SubscriptionId = subscriptionId;
+ _eventAppeared = listener.EventAppeared;
+ _subscriptionDropped = listener.SubscriptionDropped ?? delegate { };
+ _log = log;
+ _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task.Run(Subscribe, _cts.Token);
}
@@ -91,7 +104,6 @@ public Task Ack(params ResolvedEvent[] resolvedEvents) =>
public Task Ack(IEnumerable resolvedEvents) =>
Ack(resolvedEvents.Select(resolvedEvent => resolvedEvent.OriginalEvent.EventId));
-
///
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
///
@@ -99,7 +111,8 @@ public Task Ack(IEnumerable resolvedEvents) =>
/// A reason given.
/// The of the s to nak. There should not be more than 2000 to nak at a time.
/// The number of eventIds exceeded the limit of 2000.
- public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) => NackInternal(eventIds, action, reason);
+ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) =>
+ NackInternal(eventIds, action, reason);
///
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
@@ -108,10 +121,15 @@ public Task Ack(IEnumerable resolvedEvents) =>
/// A reason given.
/// The s to nak. There should not be more than 2000 to nak at a time.
/// The number of resolvedEvents exceeded the limit of 2000.
- public Task Nack(PersistentSubscriptionNakEventAction action, string reason,
- params ResolvedEvent[] resolvedEvents) =>
- Nack(action, reason,
- Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId));
+ public Task Nack(
+ PersistentSubscriptionNakEventAction action, string reason,
+ params ResolvedEvent[] resolvedEvents
+ ) =>
+ Nack(
+ action,
+ reason,
+ Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId)
+ );
///
public void Dispose() => SubscriptionDropped(SubscriptionDroppedReason.Disposed);
@@ -121,7 +139,8 @@ private async Task Subscribe() {
try {
while (await _enumerator.MoveNextAsync(_cts.Token).ConfigureAwait(false)) {
- if (_enumerator.Current is not PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
+ if (_enumerator.Current is not
+ PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
continue;
}
@@ -129,39 +148,54 @@ private async Task Subscribe() {
if (_subscriptionDroppedInvoked != 0) {
return;
}
- SubscriptionDropped(SubscriptionDroppedReason.ServerError,
+
+ SubscriptionDropped(
+ SubscriptionDroppedReason.ServerError,
new PersistentSubscriptionNotFoundException(
- _persistentSubscriptionResult.StreamName, _persistentSubscriptionResult.GroupName));
+ _persistentSubscriptionResult.StreamName,
+ _persistentSubscriptionResult.GroupName
+ )
+ );
+
return;
}
-
+
_log.LogTrace(
"Persistent Subscription {subscriptionId} received event {streamName}@{streamRevision} {position}",
- SubscriptionId, resolvedEvent.OriginalEvent.EventStreamId,
- resolvedEvent.OriginalEvent.EventNumber, resolvedEvent.OriginalEvent.Position);
+ SubscriptionId,
+ resolvedEvent.OriginalEvent.EventStreamId,
+ resolvedEvent.OriginalEvent.EventNumber,
+ resolvedEvent.OriginalEvent.Position
+ );
try {
await _eventAppeared(
this,
resolvedEvent,
retryCount,
- _cts.Token).ConfigureAwait(false);
+ _cts.Token
+ ).ConfigureAwait(false);
} catch (Exception ex) when (ex is ObjectDisposedException or OperationCanceledException) {
if (_subscriptionDroppedInvoked != 0) {
return;
}
- _log.LogWarning(ex,
+ _log.LogWarning(
+ ex,
"Persistent Subscription {subscriptionId} was dropped because cancellation was requested by another caller.",
- SubscriptionId);
+ SubscriptionId
+ );
SubscriptionDropped(SubscriptionDroppedReason.Disposed);
return;
} catch (Exception ex) {
- _log.LogError(ex,
+ _log.LogError(
+ ex,
"Persistent Subscription {subscriptionId} was dropped because the subscriber made an error.",
- SubscriptionId);
+ SubscriptionId
+ );
+
SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex);
return;
@@ -169,16 +203,21 @@ await _eventAppeared(
}
} catch (Exception ex) {
if (_subscriptionDroppedInvoked == 0) {
- _log.LogError(ex,
+ _log.LogError(
+ ex,
"Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.",
- SubscriptionId);
+ SubscriptionId
+ );
+
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
}
} finally {
if (_subscriptionDroppedInvoked == 0) {
_log.LogError(
"Persistent Subscription {subscriptionId} was unexpectedly terminated.",
- SubscriptionId);
+ SubscriptionId
+ );
+
SubscriptionDropped(SubscriptionDroppedReason.ServerError);
}
}
diff --git a/src/Kurrent.Client/Streams/KurrentClient.Append.cs b/src/Kurrent.Client/Streams/KurrentClient.Append.cs
index 39d6f3066..593d2ab40 100644
--- a/src/Kurrent.Client/Streams/KurrentClient.Append.cs
+++ b/src/Kurrent.Client/Streams/KurrentClient.Append.cs
@@ -6,6 +6,7 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using EventStore.Client.Diagnostics;
+using Kurrent.Client.Core.Serialization;
using Kurrent.Diagnostics;
using Kurrent.Diagnostics.Telemetry;
using Kurrent.Diagnostics.Tracing;
@@ -14,6 +15,48 @@
namespace EventStore.Client {
public partial class KurrentClient {
+ ///
+ /// Appends events asynchronously to a stream. Messages are serialized using default or custom serialization configured through
+ ///
+ /// The name of the stream to append events to.
+ /// Messages to append to the stream.
+ /// Optional settings for the append operation, e.g. expected stream position for optimistic concurrency check
+ /// The optional .
+ ///
+ public Task AppendToStreamAsync(
+ string streamName,
+ IEnumerable messages,
+ AppendToStreamOptions options,
+ CancellationToken cancellationToken = default
+ ) {
+ var serializationContext = new MessageSerializationContext(
+ streamName,
+ Settings.Serialization.DefaultContentType
+ );
+
+ var eventsData = _messageSerializer.Serialize(messages, serializationContext);
+
+ return options.ExpectedStreamRevision.HasValue
+ ? AppendToStreamAsync(
+ streamName,
+ options.ExpectedStreamRevision.Value,
+ eventsData,
+ options.ConfigureOperationOptions,
+ options.Deadline,
+ options.UserCredentials,
+ cancellationToken
+ )
+ : AppendToStreamAsync(
+ streamName,
+ options.ExpectedStreamState ?? StreamState.Any,
+ eventsData,
+ options.ConfigureOperationOptions,
+ options.Deadline,
+ options.UserCredentials,
+ cancellationToken
+ );
+ }
+
///
/// Appends events asynchronously to a stream.
///
@@ -114,16 +157,28 @@ ValueTask AppendToStreamInternal(
CancellationToken cancellationToken
) {
var tags = new ActivityTagsCollection()
- .WithRequiredTag(TelemetryTags.Kurrent.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8())
+ .WithRequiredTag(
+ TelemetryTags.Kurrent.Stream,
+ header.Options.StreamIdentifier.StreamName.ToStringUtf8()
+ )
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(Settings)
- .WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username);
+ .WithOptionalTag(
+ TelemetryTags.Database.User,
+ userCredentials?.Username ?? Settings.DefaultCredentials?.Username
+ );
- return KurrentClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags);
+ return KurrentClientDiagnostics.ActivitySource.TraceClientOperation(
+ Operation,
+ TracingConstants.Operations.Append,
+ tags
+ );
async ValueTask Operation() {
using var call = new StreamsClient(channelInfo.CallInvoker)
- .Append(KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
+ .Append(
+ KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
+ );
await call.RequestStream
.WriteAsync(header)
@@ -160,11 +215,13 @@ await call.RequestStream
}
IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
- var currentRevision = response.Success.CurrentRevisionOptionCase == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
+ var currentRevision = response.Success.CurrentRevisionOptionCase
+ == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision);
- var position = response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
+ var position = response.Success.PositionOptionCase
+ == AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(response.Success.Position.CommitPosition, response.Success.Position.PreparePosition)
: default;
@@ -181,7 +238,8 @@ IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
IWriteResult HandleWrongExpectedRevision(
AppendResp response, AppendReq header, KurrentClientOperationOptions operationOptions
) {
- var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision
+ var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase
+ == CurrentRevisionOptionOneofCase.CurrentRevision
? new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
: StreamRevision.None;
@@ -193,7 +251,8 @@ IWriteResult HandleWrongExpectedRevision(
);
if (operationOptions.ThrowOnAppendFailure) {
- if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision) {
+ if (response.WrongExpectedVersion.ExpectedRevisionOptionCase
+ == ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
@@ -215,7 +274,8 @@ IWriteResult HandleWrongExpectedRevision(
);
}
- var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision
+ var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase
+ == ExpectedRevisionOptionOneofCase.ExpectedRevision
? new StreamRevision(response.WrongExpectedVersion.ExpectedRevision)
: StreamRevision.None;
@@ -227,7 +287,7 @@ IWriteResult HandleWrongExpectedRevision(
}
class StreamAppender : IDisposable {
- readonly KurrentClientSettings _settings;
+ readonly KurrentClientSettings _settings;
readonly CancellationToken _cancellationToken;
readonly Action _onException;
readonly Channel _channel;
@@ -302,8 +362,7 @@ async ValueTask Operation() {
try {
foreach (var appendRequest in GetRequests(events, options, correlationId))
await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
- }
- catch (ChannelClosedException ex) {
+ } catch (ChannelClosedException ex) {
// channel is closed, our tcs won't necessarily get completed, don't wait for it.
throw ex.InnerException ?? ex;
}
@@ -333,8 +392,7 @@ async Task Duplex(ValueTask channelInfoTask) {
_ = Task.Run(Receive, _cancellationToken);
_isUsable.TrySetResult(true);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
_isUsable.TrySetException(ex);
_onException(ex);
}
@@ -344,7 +402,8 @@ async Task Duplex(ValueTask channelInfoTask) {
async Task Send() {
if (_call is null) return;
- await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
+ await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken)
+ .ConfigureAwait(false))
await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);
await _call.RequestStream.CompleteAsync().ConfigureAwait(false);
@@ -354,20 +413,22 @@ async Task Receive() {
if (_call is null) return;
try {
- await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) {
- if (!_pendingRequests.TryRemove(Uuid.FromDto(response.CorrelationId), out var writeResult)) {
+ await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken)
+ .ConfigureAwait(false)) {
+ if (!_pendingRequests.TryRemove(
+ Uuid.FromDto(response.CorrelationId),
+ out var writeResult
+ )) {
continue; // TODO: Log?
}
try {
writeResult.TrySetResult(response.ToWriteResult());
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
writeResult.TrySetException(ex);
}
}
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
// signal that no tcs added to _pendingRequests after this point will necessarily complete
_channel.Writer.TryComplete(ex);
@@ -380,7 +441,9 @@ async Task Receive() {
}
}
- IEnumerable GetRequests(IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId) {
+ IEnumerable GetRequests(
+ IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId
+ ) {
var batchSize = 0;
var first = true;
var correlationIdDto = correlationId.ToDto();
@@ -427,4 +490,153 @@ public void Dispose() {
}
}
}
+
+ public static class KurrentClientAppendToStreamExtensions {
+ ///
+ /// Appends events asynchronously to a stream. Messages are serialized using default or custom serialization configured through
+ ///
+ ///
+ /// The name of the stream to append events to.
+ /// Messages to append to the stream.
+ /// The optional .
+ ///
+ public static Task AppendToStreamAsync(
+ this KurrentClient client,
+ string streamName,
+ IEnumerable messages,
+ CancellationToken cancellationToken = default
+ )
+ => client.AppendToStreamAsync(
+ streamName,
+ messages,
+ new AppendToStreamOptions(),
+ cancellationToken
+ );
+
+ ///
+ /// Appends events asynchronously to a stream. Messages are serialized using default or custom serialization configured through
+ ///
+ ///
+ /// The name of the stream to append events to.
+ /// Messages to append to the stream.
+ /// The optional .
+ ///
+ public static Task AppendToStreamAsync(
+ this KurrentClient client,
+ string streamName,
+ IEnumerable