diff --git a/Kurrent.Client.sln b/Kurrent.Client.sln
index 1ea14ce6f..00544398a 100644
--- a/Kurrent.Client.sln
+++ b/Kurrent.Client.sln
@@ -13,10 +13,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client", "src\Kurre
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.Common", "test\Kurrent.Client.Tests.Common\Kurrent.Client.Tests.Common.csproj", "{47BF715B-A0BF-4044-B335-717E56422550}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.NeverLoadedAssembly", "test\Kurrent.Client.Tests.NeverLoadedAssembly\Kurrent.Client.Tests.NeverLoadedAssembly.csproj", "{0AC8A7E9-6839-4B4C-B299-950C376DF71F}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.ExternalAssembly", "test\Kurrent.Client.Tests.ExternalAssembly\Kurrent.Client.Tests.ExternalAssembly.csproj", "{829AF806-1144-408A-85FE-763835775086}"
-EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
@@ -38,20 +34,10 @@ Global
{47BF715B-A0BF-4044-B335-717E56422550}.Debug|x64.Build.0 = Debug|Any CPU
{47BF715B-A0BF-4044-B335-717E56422550}.Release|x64.ActiveCfg = Release|Any CPU
{47BF715B-A0BF-4044-B335-717E56422550}.Release|x64.Build.0 = Release|Any CPU
- {0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Debug|x64.ActiveCfg = Debug|Any CPU
- {0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Debug|x64.Build.0 = Debug|Any CPU
- {0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Release|x64.ActiveCfg = Release|Any CPU
- {0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Release|x64.Build.0 = Release|Any CPU
- {829AF806-1144-408A-85FE-763835775086}.Debug|x64.ActiveCfg = Debug|Any CPU
- {829AF806-1144-408A-85FE-763835775086}.Debug|x64.Build.0 = Debug|Any CPU
- {829AF806-1144-408A-85FE-763835775086}.Release|x64.ActiveCfg = Release|Any CPU
- {829AF806-1144-408A-85FE-763835775086}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{FC829F1B-43AD-4C96-9002-23D04BBA3AF3} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{762EECAA-122E-4B0C-BC50-5AA4F72CA4E0} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
{47BF715B-A0BF-4044-B335-717E56422550} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
- {0AC8A7E9-6839-4B4C-B299-950C376DF71F} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
- {829AF806-1144-408A-85FE-763835775086} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
EndGlobalSection
EndGlobal
diff --git a/src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs b/src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs
deleted file mode 100644
index 294cdc169..000000000
--- a/src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs
+++ /dev/null
@@ -1,360 +0,0 @@
-using System.Text.Json;
-using Kurrent.Client.Core.Serialization;
-
-namespace EventStore.Client;
-
-///
-/// Provides configuration options for messages serialization and deserialization in the KurrentDB client.
-///
-public class KurrentClientSerializationSettings {
- ///
- /// The serializer responsible for handling JSON-formatted data. This serializer is used both for
- /// serializing outgoing JSON messages and deserializing incoming JSON messages. If not specified,
- /// a default System.Text.Json serializer will be used with standard settings.
- ///
- /// That also allows you to bring your custom JSON serializer implementation (e.g. JSON.NET)
- ///
- public ISerializer? JsonSerializer { get; set; }
-
- ///
- /// The serializer responsible for handling binary data formats. This is used when working with
- /// binary-encoded messages rather than text-based formats (e.g. Protobuf or Avro). Required when storing
- /// or retrieving content with "application/octet-stream" content type
- ///
- public ISerializer? BytesSerializer { get; set; }
-
- ///
- /// Determines which serialization format (JSON or binary) is used by default when writing messages
- /// where the content type isn't explicitly specified. The default content type is "application/json"
- ///
- public ContentType DefaultContentType { get; set; } = ContentType.Json;
-
- ///
- /// Defines the custom strategy used to map between the type name stored in messages and .NET type names.
- /// If not provided the default will be used.
- /// It resolves the CLR type name to the format: "{stream category name}-{CLR Message Type}".
- /// You can provide your own implementation of
- /// and register it here to override the default behaviour
- ///
- public IMessageTypeNamingStrategy? MessageTypeNamingStrategy { get; set; }
-
- ///
- /// Allows to register mapping of CLR message types to their corresponding message type names used in serialized messages.
- ///
- public IDictionary MessageTypeMap { get; set; } = new Dictionary();
-
- ///
- /// Registers CLR message types that can be appended to the specific stream category.
- /// Types will have message type names resolved based on the used
- ///
- public IDictionary CategoryMessageTypesMap { get; set; } = new Dictionary();
-
- ///
- /// Specifies the CLR type that should be used when deserializing metadata for all events.
- /// When set, the client will attempt to deserialize event metadata into this type.
- /// If not provided, will be used.
- ///
- public Type? DefaultMetadataType { get; set; }
-
- ///
- /// Creates a new instance of serialization settings with either default values or custom configuration.
- /// This factory method is the recommended way to create serialization settings for the KurrentDB client.
- ///
- /// Optional callback to customize the settings. If null, default settings are used.
- /// A fully configured instance ready to be used with the KurrentDB client.
- ///
- ///
- /// var settings = KurrentClientSerializationSettings.Default(options => {
- /// options.RegisterMessageType<UserCreated>("user-created");
- /// options.RegisterMessageType<UserUpdated>("user-updated");
- /// options.RegisterMessageTypeForCategory<UserCreated>("user");
- /// });
- ///
- ///
- public static KurrentClientSerializationSettings Default(
- Action? configure = null
- ) {
- var settings = new KurrentClientSerializationSettings();
-
- configure?.Invoke(settings);
-
- return settings;
- }
-
- ///
- /// Configures the JSON serializer using custom options while inheriting from the default System.Text.Json settings.
- /// This allows fine-tuning serialization behavior such as case sensitivity, property naming, etc.
- ///
- /// A function that receives the default options and returns modified options.
- /// The current instance for method chaining.
- ///
- ///
- /// settings.UseJsonSettings(options => {
- /// options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
- /// options.WriteIndented = true;
- /// options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
- /// return options;
- /// });
- ///
- ///
- public KurrentClientSerializationSettings UseJsonSettings(
- Func configure
- ) {
- JsonSerializer = new SystemTextJsonSerializer(
- new SystemTextJsonSerializationSettings
- { Options = configure(SystemTextJsonSerializationSettings.DefaultJsonSerializerOptions) }
- );
-
- return this;
- }
-
- ///
- /// Configures JSON serialization using provided System.Text.Json serializer options.
- ///
- /// The JSON serializer options to use.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings UseJsonSettings(JsonSerializerOptions systemTextJsonSerializerOptions) {
- JsonSerializer = new SystemTextJsonSerializer(
- new SystemTextJsonSerializationSettings { Options = systemTextJsonSerializerOptions }
- );
-
- return this;
- }
-
- ///
- /// Configures JSON serialization using provided
- ///
- /// The SystemTextJson serialization settings to use.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings UseJsonSettings(
- SystemTextJsonSerializationSettings jsonSerializationSettings
- ) {
- JsonSerializer = new SystemTextJsonSerializer(jsonSerializationSettings);
-
- return this;
- }
-
- ///
- /// Sets a custom JSON serializer implementation.
- /// That also allows you to bring your custom JSON serializer implementation (e.g. JSON.NET)
- ///
- /// The serializer to use for JSON content.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings UseJsonSerializer(ISerializer serializer) {
- JsonSerializer = serializer;
-
- return this;
- }
-
- ///
- /// Sets a custom binary serializer implementation.
- /// That also allows you to bring your custom binary serializer implementation (e.g. Protobuf or Avro)
- ///
- /// The serializer to use for binary content.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings UseBytesSerializer(ISerializer serializer) {
- BytesSerializer = serializer;
-
- return this;
- }
-
- ///
- /// Configures a custom message type naming strategy.
- ///
- /// The type of naming strategy to use.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings UseMessageTypeNamingStrategy()
- where TCustomMessageTypeResolutionStrategy : IMessageTypeNamingStrategy, new() =>
- UseMessageTypeNamingStrategy(new TCustomMessageTypeResolutionStrategy());
-
- ///
- /// Configures a custom message type naming strategy.
- ///
- /// The naming strategy instance to use.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings UseMessageTypeNamingStrategy(
- IMessageTypeNamingStrategy messageTypeNamingStrategy
- ) {
- MessageTypeNamingStrategy = messageTypeNamingStrategy;
-
- return this;
- }
-
- ///
- /// Associates a message type with a specific stream category to enable automatic deserialization.
- /// In event sourcing, streams are often prefixed with a category (e.g., "user-123", "order-456").
- /// This method tells the client which message types can appear in streams of a given category.
- ///
- /// The event or message type that can appear in the category's streams.
- /// The category prefix (e.g., "user", "order", "account").
- /// The current instance for method chaining.
- ///
- ///
- /// // Register event types that can appear in user streams
- /// settings.RegisterMessageTypeForCategory<UserCreated>("user")
- /// .RegisterMessageTypeForCategory<UserUpdated>("user")
- /// .RegisterMessageTypeForCategory<UserDeleted>("user");
- ///
- ///
- public KurrentClientSerializationSettings RegisterMessageTypeForCategory(string categoryName) =>
- RegisterMessageTypeForCategory(categoryName, typeof(T));
-
- ///
- /// Registers multiple message types for a specific stream category.
- ///
- /// The category name to register the types with.
- /// The message types to register.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings RegisterMessageTypeForCategory(string categoryName, params Type[] types) {
- CategoryMessageTypesMap[categoryName] = CategoryMessageTypesMap.TryGetValue(categoryName, out var current)
- ? [..current, ..types]
- : types;
-
- return this;
- }
-
- ///
- /// Maps a .NET type to a specific message type name that will be stored in the message metadata.
- /// This mapping is used during automatic deserialization, as it tells the client which CLR type
- /// to instantiate when encountering a message with a particular type name in the database.
- ///
- /// The .NET type to register (typically a message class).
- /// The string identifier to use for this type in the database.
- /// The current instance for method chaining.
- ///
- /// The type name is often different from the .NET type name to support versioning and evolution
- /// of your domain model without breaking existing stored messages.
- ///
- ///
- ///
- /// // Register me types with their corresponding type identifiers
- /// settings.RegisterMessageType<UserCreated>("user-created-v1")
- /// .RegisterMessageType<OrderPlaced>("order-placed-v2");
- ///
- ///
- public KurrentClientSerializationSettings RegisterMessageType(string typeName) =>
- RegisterMessageType(typeof(T), typeName);
-
- ///
- /// Registers a message type with a specific type name.
- ///
- /// The message type to register.
- /// The type name to register for the message type.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings RegisterMessageType(Type type, string typeName) {
- MessageTypeMap[type] = typeName;
-
- return this;
- }
-
- ///
- /// Registers multiple message types with their corresponding type names.
- ///
- /// Dictionary mapping types to their type names.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings RegisterMessageTypes(IDictionary typeMap) {
- foreach (var map in typeMap) {
- MessageTypeMap[map.Key] = map.Value;
- }
-
- return this;
- }
-
- ///
- /// Configures a strongly-typed metadata class for all mes in the system.
- /// This enables accessing metadata properties in a type-safe manner rather than using dynamic objects.
- ///
- /// The metadata class type containing properties matching the expected metadata fields.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings UseMetadataType() =>
- UseMetadataType(typeof(T));
-
- ///
- /// Configures a strongly-typed metadata class for all mes in the system.
- /// This enables accessing metadata properties in a type-safe manner rather than using dynamic objects.
- ///
- /// The metadata class type containing properties matching the expected metadata fields.
- /// The current instance for method chaining.
- public KurrentClientSerializationSettings UseMetadataType(Type type) {
- DefaultMetadataType = type;
-
- return this;
- }
-
- ///
- /// Creates a deep copy of the current serialization settings.
- ///
- /// A new instance with copied settings.
- internal KurrentClientSerializationSettings Clone() {
- return new KurrentClientSerializationSettings {
- BytesSerializer = BytesSerializer,
- JsonSerializer = JsonSerializer,
- DefaultContentType = DefaultContentType,
- MessageTypeMap = new Dictionary(MessageTypeMap),
- CategoryMessageTypesMap = new Dictionary(CategoryMessageTypesMap),
- MessageTypeNamingStrategy = MessageTypeNamingStrategy
- };
- }
-}
-
-///
-/// Provides operation-specific serialization settings that override the global client configuration
-/// for individual operations like reading from or appending to streams. This allows fine-tuning
-/// serialization behavior on a per-operation basis without changing the client-wide settings.
-///
-public class OperationSerializationSettings {
- ///
- /// Controls whether mes should be automatically deserialized for this specific operation.
- /// When enabled (the default), messages will be converted to their appropriate CLR types.
- /// When disabled, messages will be returned in their raw serialized form.
- ///
- public AutomaticDeserialization AutomaticDeserialization { get; private set; } = AutomaticDeserialization.Enabled;
-
- ///
- /// A callback that allows customizing serialization settings for this specific operation.
- /// This can be used to override type mappings, serializers, or other settings just for
- /// the scope of a single operation without affecting other operations.
- ///
- public Action? ConfigureSettings { get; private set; }
-
- ///
- /// A pre-configured settings instance that disables automatic deserialization.
- /// Use this when you need to access raw message data in its serialized form.
- ///
- public static readonly OperationSerializationSettings Disabled = new OperationSerializationSettings {
- AutomaticDeserialization = AutomaticDeserialization.Disabled
- };
-
- ///
- /// Creates operation-specific serialization settings with custom configuration while keeping
- /// automatic deserialization enabled. This allows operation-specific type mappings or
- /// serializer settings without changing the global client configuration.
- ///
- /// A callback to customize serialization settings for this operation.
- /// A configured instance of with enabled deserialization.
- public static OperationSerializationSettings Configure(Action configure) =>
- new OperationSerializationSettings {
- AutomaticDeserialization = AutomaticDeserialization.Enabled,
- ConfigureSettings = configure
- };
-}
-
-///
-/// Controls whether the KurrentDB client should automatically deserialize message payloads
-/// into their corresponding CLR types based on the configured type mappings.
-///
-public enum AutomaticDeserialization {
- ///
- /// Disables automatic deserialization. Messages will be returned in their raw serialized form,
- /// requiring manual deserialization by the application. Use this when you need direct access to the raw data
- /// or when working with messages that don't have registered type mappings.
- ///
- Disabled = 0,
-
- ///
- /// Enables automatic deserialization. The client will attempt to convert messages into their appropriate
- /// CLR types using the configured serializers and type mappings. This simplifies working with strongly-typed
- /// domain messages but requires proper type registration.
- ///
- Enabled = 1
-}
diff --git a/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs b/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs
index 57be0c950..c730b7b5a 100644
--- a/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs
+++ b/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs
@@ -14,20 +14,6 @@ public partial class KurrentClientSettings {
public static KurrentClientSettings Create(string connectionString) =>
ConnectionStringParser.Parse(connectionString);
- ///
- /// Creates client settings from a connection string with additional configuration
- ///
- ///
- /// allows you to make additional customization of client settings
- ///
- public static KurrentClientSettings Create(string connectionString, Action configure) {
- var settings = ConnectionStringParser.Parse(connectionString);
-
- configure(settings);
-
- return settings;
- }
-
private static class ConnectionStringParser {
private const string SchemeSeparator = "://";
private const string UserInfoSeparator = "@";
diff --git a/src/Kurrent.Client/Core/KurrentClientSettings.cs b/src/Kurrent.Client/Core/KurrentClientSettings.cs
index 71bfa446e..aed914074 100644
--- a/src/Kurrent.Client/Core/KurrentClientSettings.cs
+++ b/src/Kurrent.Client/Core/KurrentClientSettings.cs
@@ -57,11 +57,5 @@ public partial class KurrentClientSettings {
/// The default deadline for calls. Will not be applied to reads or subscriptions.
///
public TimeSpan? DefaultDeadline { get; set; } = TimeSpan.FromSeconds(10);
-
- ///
- /// Provides configuration options for messages serialization and deserialization in the KurrentDB client.
- /// If null, default settings are used.
- ///
- public KurrentClientSerializationSettings Serialization { get; set; } = KurrentClientSerializationSettings.Default();
}
}
diff --git a/src/Kurrent.Client/Core/ResolvedEvent.cs b/src/Kurrent.Client/Core/ResolvedEvent.cs
index 3b4209082..25ca13a78 100644
--- a/src/Kurrent.Client/Core/ResolvedEvent.cs
+++ b/src/Kurrent.Client/Core/ResolvedEvent.cs
@@ -1,5 +1,3 @@
-using Kurrent.Client.Core.Serialization;
-
namespace EventStore.Client {
///
/// A structure representing a single event or a resolved link event.
@@ -24,23 +22,6 @@ public readonly struct ResolvedEvent {
///
public EventRecord OriginalEvent => Link ?? Event;
- ///
- /// Returns the deserialized message
- /// It will be provided or equal to null, depending on the automatic deserialization settings you choose.
- /// If it's null, you can use to deserialize it manually.
- ///
- public readonly Message? Message;
-
- ///
- /// Returns the deserialized message data.
- ///
- public object? DeserializedData => Message?.Data;
-
- ///
- /// Returns the deserialized message metadata.
- ///
- public object? DeserializedMetadata => Message?.Metadata;
-
///
/// Position of the if available.
///
@@ -68,44 +49,12 @@ public readonly struct ResolvedEvent {
///
///
///
- public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) : this(
- @event,
- link,
- null,
- commitPosition
- ) { }
-
- ///
- /// Constructs a new .
- ///
- ///
- ///
- ///
- ///
- ResolvedEvent(
- EventRecord @event,
- EventRecord? link,
- Message? message,
- ulong? commitPosition
- ) {
- Event = @event;
- Link = link;
- Message = message;
+ public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) {
+ Event = @event;
+ Link = link;
OriginalPosition = commitPosition.HasValue
? new Position(commitPosition.Value, (link ?? @event).Position.PreparePosition)
: new Position?();
}
-
- internal static ResolvedEvent From(
- EventRecord @event,
- EventRecord? link,
- ulong? commitPosition,
- IMessageSerializer messageSerializer
- ) {
- var originalEvent = link ?? @event;
- return messageSerializer.TryDeserialize(originalEvent, out var message)
- ? new ResolvedEvent(@event, link, message, commitPosition)
- : new ResolvedEvent(@event, link, commitPosition);
- }
}
}
diff --git a/src/Kurrent.Client/Core/Serialization/ISerializer.cs b/src/Kurrent.Client/Core/Serialization/ISerializer.cs
deleted file mode 100644
index 93382428d..000000000
--- a/src/Kurrent.Client/Core/Serialization/ISerializer.cs
+++ /dev/null
@@ -1,30 +0,0 @@
-namespace Kurrent.Client.Core.Serialization;
-
-///
-/// Defines the core serialization capabilities required by the KurrentDB client.
-/// Implementations of this interface handle the conversion between .NET objects and their
-/// binary representation for storage in and retrieval from the event store.
-///
-/// The client ships default System.Text.Json implementation, but custom implementations can be provided or other formats.
-///
-public interface ISerializer {
- ///
- /// Converts a .NET object to its binary representation for storage in the event store.
- ///
- /// The object to serialize. This could be an event, command, or metadata object.
- ///
- /// A binary representation of the object that can be stored in KurrentDB.
- ///
- public ReadOnlyMemory Serialize(object value);
-
- ///
- /// Reconstructs a .NET object from its binary representation retrieved from the event store.
- ///
- /// The binary data to deserialize, typically retrieved from a KurrentDB event.
- /// The target .NET type to deserialize the data into, determined from message type mappings.
- ///
- /// The deserialized object cast to the specified type, or null if the data cannot be deserialized.
- /// The returned object will be an instance of the specified type or a compatible subtype.
- ///
- public object? Deserialize(ReadOnlyMemory data, Type type);
-}
diff --git a/src/Kurrent.Client/Core/Serialization/Message.cs b/src/Kurrent.Client/Core/Serialization/Message.cs
deleted file mode 100644
index 6666831dd..000000000
--- a/src/Kurrent.Client/Core/Serialization/Message.cs
+++ /dev/null
@@ -1,62 +0,0 @@
-using EventStore.Client;
-
-namespace Kurrent.Client.Core.Serialization;
-
-///
-/// Represents a message wrapper in the KurrentDB system, containing both domain data and optional metadata.
-/// Messages can represent events, commands, or other domain objects along with their associated metadata.
-///
-/// The message domain data.
-/// Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information.
-/// Unique identifier for this specific message instance. When null, the system will auto-generate an ID.
-public record Message(object Data, object? Metadata, Uuid? MessageId = null) {
- ///
- /// Creates a new Message with the specified domain data and message ID, but without metadata.
- /// This factory method is a convenient shorthand when working with systems that don't require metadata.
- ///
- /// The message domain data.
- /// Unique identifier for this message instance. Must not be Uuid.Empty.
- /// A new immutable Message instance containing the provided data and ID with null metadata.
- ///
- ///
- /// // Create a message with a specific ID
- /// var userCreated = new UserCreated { Id = "123", Name = "Alice" };
- /// var messageId = Uuid.NewUuid();
- /// var message = Message.From(userCreated, messageId);
- ///
- ///
- public static Message From(object data, Uuid messageId) =>
- From(data, null, messageId);
-
- ///
- /// Creates a new Message with the specified domain data and message ID and metadata.
- ///
- /// The message domain data.
- /// Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information.
- /// Unique identifier for this specific message instance.
- /// A new immutable Message instance with the specified properties.
- /// Thrown when messageId is explicitly set to Uuid.Empty, which is an invalid identifier.
- ///
- ///
- /// // Create a message with data and metadata
- /// var orderPlaced = new OrderPlaced { OrderId = "ORD-123", Amount = 99.99m };
- /// var metadata = new EventMetadata {
- /// UserId = "user-456",
- /// Timestamp = DateTimeOffset.UtcNow,
- /// CorrelationId = correlationId
- /// };
- ///
- /// // Let the system assign an ID automatically
- /// var message = Message.From(orderPlaced, metadata);
- ///
- /// // Or specify a custom ID
- /// var messageWithId = Message.From(orderPlaced, metadata, Uuid.NewUuid());
- ///
- ///
- public static Message From(object data, object? metadata = null, Uuid? messageId = null) {
- if (messageId == Uuid.Empty)
- throw new ArgumentOutOfRangeException(nameof(messageId), "Message ID cannot be an empty UUID.");
-
- return new Message(data, metadata, messageId);
- }
-}
diff --git a/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs b/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs
deleted file mode 100644
index de66372b2..000000000
--- a/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs
+++ /dev/null
@@ -1,148 +0,0 @@
-using System.Diagnostics.CodeAnalysis;
-using EventStore.Client;
-
-namespace Kurrent.Client.Core.Serialization;
-
-using static ContentTypeExtensions;
-
-interface IMessageSerializer {
- public EventData Serialize(Message value, MessageSerializationContext context);
-
-#if NET48
- public bool TryDeserialize(EventRecord record, out Message? deserialized);
-#else
- public bool TryDeserialize(EventRecord record, [NotNullWhen(true)] out Message? deserialized);
-#endif
-}
-
-record MessageSerializationContext(
- string StreamName,
- ContentType ContentType
-) {
- public string CategoryName =>
- StreamName.Split('-').FirstOrDefault() ?? "no_stream_category";
-}
-
-static class MessageSerializerExtensions {
- public static EventData[] Serialize(
- this IMessageSerializer serializer,
- IEnumerable messages,
- MessageSerializationContext context
- ) {
- return messages.Select(m => serializer.Serialize(m, context)).ToArray();
- }
-
- public static IMessageSerializer With(
- this IMessageSerializer defaultMessageSerializer,
- KurrentClientSerializationSettings defaultSettings,
- OperationSerializationSettings? operationSettings
- ) {
- if (operationSettings == null)
- return defaultMessageSerializer;
-
- if (operationSettings.AutomaticDeserialization == AutomaticDeserialization.Disabled)
- return NullMessageSerializer.Instance;
-
- if (operationSettings.ConfigureSettings == null)
- return defaultMessageSerializer;
-
- var settings = defaultSettings.Clone();
- operationSettings.ConfigureSettings.Invoke(settings);
-
- return new MessageSerializer(SchemaRegistry.From(settings));
- }
-}
-
-class MessageSerializer(SchemaRegistry schemaRegistry) : IMessageSerializer {
- readonly ISerializer _jsonSerializer =
- schemaRegistry.GetSerializer(ContentType.Json);
-
- readonly IMessageTypeNamingStrategy _messageTypeNamingStrategy =
- schemaRegistry.MessageTypeNamingStrategy;
-
- public EventData Serialize(Message message, MessageSerializationContext serializationContext) {
- var (data, metadata, eventId) = message;
-
- var eventType = _messageTypeNamingStrategy
- .ResolveTypeName(
- message.Data.GetType(),
- new MessageTypeNamingResolutionContext(serializationContext.CategoryName)
- );
-
- var serializedData = schemaRegistry
- .GetSerializer(serializationContext.ContentType)
- .Serialize(data);
-
- var serializedMetadata = metadata != null
- ? _jsonSerializer.Serialize(metadata)
- : ReadOnlyMemory.Empty;
-
- return new EventData(
- eventId ?? Uuid.NewUuid(),
- eventType,
- serializedData,
- serializedMetadata,
- serializationContext.ContentType.ToMessageContentType()
- );
- }
-
-#if NET48
- public bool TryDeserialize(EventRecord record, out Message? deserialized) {
-#else
- public bool TryDeserialize(EventRecord record, [NotNullWhen(true)] out Message? deserialized) {
-#endif
- if (!TryResolveClrType(record, out var clrType)) {
- deserialized = null;
- return false;
- }
-
- var data = schemaRegistry
- .GetSerializer(FromMessageContentType(record.ContentType))
- .Deserialize(record.Data, clrType!);
-
- if (data == null) {
- deserialized = null;
- return false;
- }
-
- object? metadata = record.Metadata.Length > 0 && TryResolveClrMetadataType(record, out var clrMetadataType)
- ? _jsonSerializer.Deserialize(record.Metadata, clrMetadataType!)
- : null;
-
- deserialized = Message.From(data, metadata, record.EventId);
- return true;
- }
-
- public static MessageSerializer From(KurrentClientSerializationSettings? settings = null) {
- settings ??= KurrentClientSerializationSettings.Default();
-
- return new MessageSerializer(SchemaRegistry.From(settings));
- }
-
- bool TryResolveClrType(EventRecord record, out Type? clrType) =>
- schemaRegistry
- .MessageTypeNamingStrategy
- .TryResolveClrType(record.EventType, out clrType);
-
- bool TryResolveClrMetadataType(EventRecord record, out Type? clrMetadataType) =>
- schemaRegistry
- .MessageTypeNamingStrategy
- .TryResolveClrMetadataType(record.EventType, out clrMetadataType);
-}
-
-class NullMessageSerializer : IMessageSerializer {
- public static readonly NullMessageSerializer Instance = new NullMessageSerializer();
-
- public EventData Serialize(Message value, MessageSerializationContext context) {
- throw new InvalidOperationException("Cannot serialize, automatic deserialization is disabled");
- }
-
-#if NET48
- public bool TryDeserialize(EventRecord record, out Message? deserialized) {
-#else
- public bool TryDeserialize(EventRecord eventRecord, [NotNullWhen(true)] out Message? deserialized) {
-#endif
- deserialized = null;
- return false;
- }
-}
diff --git a/src/Kurrent.Client/Core/Serialization/MessageTypeRegistry.cs b/src/Kurrent.Client/Core/Serialization/MessageTypeRegistry.cs
deleted file mode 100644
index 68752884a..000000000
--- a/src/Kurrent.Client/Core/Serialization/MessageTypeRegistry.cs
+++ /dev/null
@@ -1,76 +0,0 @@
-using System.Collections.Concurrent;
-
-namespace Kurrent.Client.Core.Serialization;
-
-interface IMessageTypeRegistry {
- void Register(Type messageType, string messageTypeName);
- string? GetTypeName(Type messageType);
- string GetOrAddTypeName(Type clrType, Func getTypeName);
- Type? GetClrType(string messageTypeName);
- Type? GetOrAddClrType(string messageTypeName, Func getClrType);
-}
-
-class MessageTypeRegistry : IMessageTypeRegistry {
- readonly ConcurrentDictionary _typeMap = new();
- readonly ConcurrentDictionary _typeNameMap = new();
-
- public void Register(Type messageType, string messageTypeName) {
- _typeNameMap.AddOrUpdate(messageType, messageTypeName, (_, _) => messageTypeName);
- _typeMap.AddOrUpdate(messageTypeName, messageType, (_, type) => type);
- }
-
- public string? GetTypeName(Type messageType) =>
-#if NET48
- _typeNameMap.TryGetValue(messageType, out var typeName) ? typeName : null;
-#else
- _typeNameMap.GetValueOrDefault(messageType);
-#endif
-
- public string GetOrAddTypeName(Type clrType, Func getTypeName) =>
- _typeNameMap.GetOrAdd(
- clrType,
- _ => {
- var typeName = getTypeName(clrType);
-
- _typeMap.TryAdd(typeName, clrType);
-
- return typeName;
- }
- );
-
- public Type? GetClrType(string messageTypeName) =>
-#if NET48
- _typeMap.TryGetValue(messageTypeName, out var clrType) ? clrType : null;
-#else
- _typeMap.GetValueOrDefault(messageTypeName);
-#endif
-
- public Type? GetOrAddClrType(string messageTypeName, Func getClrType) =>
- _typeMap.GetOrAdd(
- messageTypeName,
- _ => {
- var clrType = getClrType(messageTypeName);
-
- if (clrType == null)
- return null;
-
- _typeNameMap.TryAdd(clrType, messageTypeName);
-
- return clrType;
- }
- );
-}
-
-static class MessageTypeRegistryExtensions {
- public static void Register(this IMessageTypeRegistry messageTypeRegistry, string messageTypeName) =>
- messageTypeRegistry.Register(typeof(T), messageTypeName);
-
- public static void Register(this IMessageTypeRegistry messageTypeRegistry, IDictionary typeMap) {
- foreach (var map in typeMap) {
- messageTypeRegistry.Register(map.Key, map.Value);
- }
- }
-
- public static string? GetTypeName(this IMessageTypeRegistry messageTypeRegistry) =>
- messageTypeRegistry.GetTypeName(typeof(TMessageType));
-}
diff --git a/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs b/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs
deleted file mode 100644
index 12ff65f6b..000000000
--- a/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs
+++ /dev/null
@@ -1,100 +0,0 @@
-using System.Diagnostics.CodeAnalysis;
-using Kurrent.Diagnostics.Tracing;
-
-namespace Kurrent.Client.Core.Serialization;
-
-public interface IMessageTypeNamingStrategy {
- string ResolveTypeName(Type messageType, MessageTypeNamingResolutionContext resolutionContext);
-
-#if NET48
- bool TryResolveClrType(string messageTypeName, out Type? type);
-#else
- bool TryResolveClrType(string messageTypeName, [NotNullWhen(true)] out Type? type);
-#endif
-
-
-#if NET48
- bool TryResolveClrMetadataType(string messageTypeName, out Type? type);
-#else
- bool TryResolveClrMetadataType(string messageTypeName, [NotNullWhen(true)] out Type? type);
-#endif
-}
-
-public record MessageTypeNamingResolutionContext(string CategoryName);
-
-class MessageTypeNamingStrategyWrapper(
- IMessageTypeRegistry messageTypeRegistry,
- IMessageTypeNamingStrategy messageTypeNamingStrategy
-) : IMessageTypeNamingStrategy {
- public string ResolveTypeName(Type messageType, MessageTypeNamingResolutionContext resolutionContext) {
- return messageTypeRegistry.GetOrAddTypeName(
- messageType,
- _ => messageTypeNamingStrategy.ResolveTypeName(messageType, resolutionContext)
- );
- }
-
-#if NET48
- public bool TryResolveClrType(string messageTypeName, out Type? type) {
-#else
- public bool TryResolveClrType(string messageTypeName, [NotNullWhen(true)] out Type? type) {
-#endif
- type = messageTypeRegistry.GetOrAddClrType(
- messageTypeName,
- _ => messageTypeNamingStrategy.TryResolveClrType(messageTypeName, out var resolvedType)
- ? resolvedType
- : null
- );
-
- return type != null;
- }
-
-#if NET48
- public bool TryResolveClrMetadataType(string messageTypeName, out Type? type) {
-#else
- public bool TryResolveClrMetadataType(string messageTypeName, [NotNullWhen(true)] out Type? type) {
-#endif
- type = messageTypeRegistry.GetOrAddClrType(
- $"{messageTypeName}-metadata",
- _ => messageTypeNamingStrategy.TryResolveClrMetadataType(messageTypeName, out var resolvedType)
- ? resolvedType
- : null
- );
-
- return type != null;
- }
-}
-
-public class DefaultMessageTypeNamingStrategy(Type? defaultMetadataType) : IMessageTypeNamingStrategy {
- readonly Type _defaultMetadataType = defaultMetadataType ?? typeof(TracingMetadata);
-
- public string ResolveTypeName(Type messageType, MessageTypeNamingResolutionContext resolutionContext) =>
- $"{resolutionContext.CategoryName}-{messageType.FullName}";
-
-#if NET48
- public bool TryResolveClrType(string messageTypeName, out Type? type) {
-#else
- public bool TryResolveClrType(string messageTypeName, [NotNullWhen(true)] out Type? type) {
-#endif
- var categorySeparatorIndex = messageTypeName.IndexOf('-');
-
- if (categorySeparatorIndex == -1 || categorySeparatorIndex == messageTypeName.Length - 1) {
- type = null;
- return false;
- }
-
- var clrTypeName = messageTypeName[(categorySeparatorIndex + 1)..];
-
- type = TypeProvider.GetTypeByFullName(clrTypeName);
-
- return type != null;
- }
-
-#if NET48
- public bool TryResolveClrMetadataType(string messageTypeName, out Type? type) {
-#else
- public bool TryResolveClrMetadataType(string messageTypeName, [NotNullWhen(true)] out Type? type) {
-#endif
- type = _defaultMetadataType;
- return true;
- }
-}
diff --git a/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs b/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs
deleted file mode 100644
index 9a5ad7515..000000000
--- a/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs
+++ /dev/null
@@ -1,91 +0,0 @@
-using EventStore.Client;
-
-namespace Kurrent.Client.Core.Serialization;
-
-using static Constants.Metadata.ContentTypes;
-
-public enum ContentType {
- Json = 1,
-
- // Protobuf = 2,
- // Avro = 3,
- Bytes = 4
-}
-
-static class ContentTypeExtensions {
- public static ContentType FromMessageContentType(string contentType) =>
- contentType == ApplicationJson
- ? ContentType.Json
- : ContentType.Bytes;
-
- public static string ToMessageContentType(this ContentType contentType) =>
- contentType switch {
- ContentType.Json => ApplicationJson,
- ContentType.Bytes => ApplicationOctetStream,
- _ => throw new ArgumentOutOfRangeException(nameof(contentType), contentType, null)
- };
-}
-
-class SchemaRegistry(
- IDictionary serializers,
- IMessageTypeNamingStrategy messageTypeNamingStrategy
-) {
- public IMessageTypeNamingStrategy MessageTypeNamingStrategy { get; } = messageTypeNamingStrategy;
-
- public ISerializer GetSerializer(ContentType schemaType) =>
- serializers[schemaType];
-
- public static SchemaRegistry From(KurrentClientSerializationSettings settings) {
- var messageTypeNamingStrategy =
- settings.MessageTypeNamingStrategy ?? new DefaultMessageTypeNamingStrategy(settings.DefaultMetadataType);
-
- var categoriesTypeMap = ResolveMessageTypeUsingNamingStrategy(
- settings.CategoryMessageTypesMap,
- messageTypeNamingStrategy
- );
-
- var messageTypeRegistry = new MessageTypeRegistry();
- messageTypeRegistry.Register(settings.MessageTypeMap);
- messageTypeRegistry.Register(categoriesTypeMap);
-
- var serializers = new Dictionary {
- {
- ContentType.Json,
- settings.JsonSerializer ?? new SystemTextJsonSerializer()
- }, {
- ContentType.Bytes,
- settings.BytesSerializer ?? new SystemTextJsonSerializer()
- }
- };
-
- return new SchemaRegistry(
- serializers,
- new MessageTypeNamingStrategyWrapper(
- messageTypeRegistry,
- settings.MessageTypeNamingStrategy ?? new DefaultMessageTypeNamingStrategy(settings.DefaultMetadataType)
- )
- );
- }
-
- static Dictionary ResolveMessageTypeUsingNamingStrategy(
- IDictionary categoryMessageTypesMap,
- IMessageTypeNamingStrategy messageTypeNamingStrategy
- ) =>
- categoryMessageTypesMap
- .SelectMany(
- categoryTypes => categoryTypes.Value.Select(
- type =>
- (
- Type: type,
- TypeName: messageTypeNamingStrategy.ResolveTypeName(
- type,
- new MessageTypeNamingResolutionContext(categoryTypes.Key)
- )
- )
- )
- )
- .ToDictionary(
- ks => ks.Type,
- vs => vs.TypeName
- );
-}
diff --git a/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs b/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs
deleted file mode 100644
index 4efa7be96..000000000
--- a/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs
+++ /dev/null
@@ -1,32 +0,0 @@
-using System.Text.Json;
-using System.Text.Json.Serialization;
-
-namespace Kurrent.Client.Core.Serialization;
-
-public class SystemTextJsonSerializationSettings {
- public static readonly JsonSerializerOptions DefaultJsonSerializerOptions =
- new JsonSerializerOptions(JsonSerializerOptions.Default) {
- PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
- DictionaryKeyPolicy = JsonNamingPolicy.CamelCase,
- PropertyNameCaseInsensitive = false,
- DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
- UnknownTypeHandling = JsonUnknownTypeHandling.JsonNode,
- UnmappedMemberHandling = JsonUnmappedMemberHandling.Skip,
- NumberHandling = JsonNumberHandling.AllowReadingFromString,
- Converters = {
- new JsonStringEnumConverter(JsonNamingPolicy.CamelCase),
- }
- };
-
- public JsonSerializerOptions Options { get; set; } = DefaultJsonSerializerOptions;
-}
-
-public class SystemTextJsonSerializer(SystemTextJsonSerializationSettings? options = null) : ISerializer {
- readonly JsonSerializerOptions _options = options?.Options ?? SystemTextJsonSerializationSettings.DefaultJsonSerializerOptions;
-
- public ReadOnlyMemory Serialize(object value) =>
- JsonSerializer.SerializeToUtf8Bytes(value, _options);
-
- public object? Deserialize(ReadOnlyMemory data, Type type) =>
- !data.IsEmpty ? JsonSerializer.Deserialize(data.Span, type, _options) : null;
-}
diff --git a/src/Kurrent.Client/Core/Serialization/TypeProvider.cs b/src/Kurrent.Client/Core/Serialization/TypeProvider.cs
deleted file mode 100644
index f05f23f87..000000000
--- a/src/Kurrent.Client/Core/Serialization/TypeProvider.cs
+++ /dev/null
@@ -1,15 +0,0 @@
-namespace Kurrent.Client.Core.Serialization;
-
-static class TypeProvider {
- public static Type? GetTypeByFullName(string fullName) =>
- Type.GetType(fullName) ?? GetFirstMatchingTypeFromCurrentDomainAssembly(fullName);
-
- static Type? GetFirstMatchingTypeFromCurrentDomainAssembly(string fullName) {
- var firstNamespacePart = fullName.Split('.')[0];
-
- return AppDomain.CurrentDomain.GetAssemblies()
- .OrderByDescending(assembly => assembly.FullName?.StartsWith(firstNamespacePart) == true)
- .Select(assembly => assembly.GetType(fullName))
- .FirstOrDefault(type => type != null);
- }
-}
diff --git a/src/Kurrent.Client/Kurrent.Client.csproj b/src/Kurrent.Client/Kurrent.Client.csproj
index 59ebc0861..e6652b773 100644
--- a/src/Kurrent.Client/Kurrent.Client.csproj
+++ b/src/Kurrent.Client/Kurrent.Client.csproj
@@ -101,7 +101,7 @@
-
+
diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs
index 1ed1b40a0..779413111 100644
--- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs
+++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs
@@ -2,65 +2,11 @@
using EventStore.Client.PersistentSubscriptions;
using EventStore.Client.Diagnostics;
using Grpc.Core;
-using Kurrent.Client.Core.Serialization;
+
using static EventStore.Client.PersistentSubscriptions.PersistentSubscriptions;
using static EventStore.Client.PersistentSubscriptions.ReadResp.ContentOneofCase;
namespace EventStore.Client {
- public class SubscribeToPersistentSubscriptionOptions {
- ///
- /// The size of the buffer.
- ///
- public int BufferSize { get; set; } = 10;
-
- ///
- /// The optional user credentials to perform operation with.
- ///
- public UserCredentials? UserCredentials { get; set; } = null;
-
- ///
- /// Allows to customize or disable the automatic deserialization
- ///
- public OperationSerializationSettings? SerializationSettings { get; set; }
- }
-
- public class PersistentSubscriptionListener {
- ///
- /// A handler called when a new event is received over the subscription.
- ///
-#if NET48
- public Func EventAppeared { get; set; } =
- null!;
-#else
- public required Func EventAppeared {
- get;
- set;
- }
-#endif
- ///
- /// A handler called if the subscription is dropped.
- ///
- public Action? SubscriptionDropped { get; set; }
-
- ///
- /// Returns the subscription listener with configured handlers
- ///
- /// Handler invoked when a new event is received over the subscription.
- /// A handler invoked if the subscription is dropped.
- /// A handler called when a checkpoint is reached.
- /// Set the checkpointInterval in subscription filter options to define how often this method is called.
- ///
- ///
- public static PersistentSubscriptionListener Handle(
- Func eventAppeared,
- Action? subscriptionDropped = null
- ) =>
- new PersistentSubscriptionListener {
- EventAppeared = eventAppeared,
- SubscriptionDropped = subscriptionDropped
- };
- }
-
partial class KurrentPersistentSubscriptionsClient {
///
/// Subscribes to a persistent subscription.
@@ -70,13 +16,10 @@ partial class KurrentPersistentSubscriptionsClient {
///
[Obsolete("SubscribeAsync is no longer supported. Use SubscribeToStream with manual acks instead.", false)]
public async Task SubscribeAsync(
- string streamName,
- string groupName,
+ string streamName, string groupName,
Func eventAppeared,
Action? subscriptionDropped = null,
- UserCredentials? userCredentials = null,
- int bufferSize = 10,
- bool autoAck = true,
+ UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true,
CancellationToken cancellationToken = default
) {
if (autoAck) {
@@ -85,17 +28,16 @@ public async Task SubscribeAsync(
);
}
- return await SubscribeToStreamAsync(
- streamName,
- groupName,
- PersistentSubscriptionListener.Handle(eventAppeared, subscriptionDropped),
- new SubscribeToPersistentSubscriptionOptions {
- UserCredentials = userCredentials,
- BufferSize = bufferSize,
- SerializationSettings = OperationSerializationSettings.Disabled
- },
- cancellationToken
- );
+ return await PersistentSubscription
+ .Confirm(
+ SubscribeToStream(streamName, groupName, bufferSize, userCredentials, cancellationToken),
+ eventAppeared,
+ subscriptionDropped ?? delegate { },
+ _log,
+ userCredentials,
+ cancellationToken
+ )
+ .ConfigureAwait(false);
}
///
@@ -104,45 +46,20 @@ public async Task SubscribeAsync(
///
///
///
- public Task SubscribeToStreamAsync(
- string streamName,
- string groupName,
+ public async Task SubscribeToStreamAsync(
+ string streamName, string groupName,
Func eventAppeared,
Action? subscriptionDropped = null,
- UserCredentials? userCredentials = null,
- int bufferSize = 10,
- CancellationToken cancellationToken = default
- ) =>
- SubscribeToStreamAsync(
- streamName,
- groupName,
- PersistentSubscriptionListener.Handle(eventAppeared, subscriptionDropped),
- new SubscribeToPersistentSubscriptionOptions {
- UserCredentials = userCredentials,
- BufferSize = bufferSize,
- SerializationSettings = OperationSerializationSettings.Disabled
- },
- cancellationToken
- );
-
- ///
- /// Subscribes to a persistent subscription. Messages must be manually acknowledged
- ///
- ///
- ///
- ///
- public async Task SubscribeToStreamAsync(
- string streamName,
- string groupName,
- PersistentSubscriptionListener listener,
- SubscribeToPersistentSubscriptionOptions options,
+ UserCredentials? userCredentials = null, int bufferSize = 10,
CancellationToken cancellationToken = default
) {
return await PersistentSubscription
.Confirm(
- SubscribeToStream(streamName, groupName, options, cancellationToken),
- listener,
+ SubscribeToStream(streamName, groupName, bufferSize, userCredentials, cancellationToken),
+ eventAppeared,
+ subscriptionDropped ?? delegate { },
_log,
+ userCredentials,
cancellationToken
)
.ConfigureAwait(false);
@@ -158,65 +75,8 @@ public async Task SubscribeToStreamAsync(
/// The optional .
///
public PersistentSubscriptionResult SubscribeToStream(
- string streamName,
- string groupName,
- int bufferSize,
- UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default
- ) {
- return SubscribeToStream(
- streamName,
- groupName,
- new SubscribeToPersistentSubscriptionOptions {
- BufferSize = bufferSize,
- UserCredentials = userCredentials,
- SerializationSettings = OperationSerializationSettings.Disabled
- },
- cancellationToken
- );
- }
-
-
-
- ///
- /// Subscribes to a persistent subscription. Messages must be manually acknowledged.
- ///
- /// The name of the stream to read events from.
- /// The name of the persistent subscription group.
- /// The size of the buffer.
- /// The optional user credentials to perform operation with.
- /// The optional .
- ///
- public PersistentSubscriptionResult SubscribeToStream(
- string streamName,
- string groupName,
- UserCredentials? userCredentials,
- CancellationToken cancellationToken = default
- ) {
- return SubscribeToStream(
- streamName,
- groupName,
- new SubscribeToPersistentSubscriptionOptions {
- UserCredentials = userCredentials,
- SerializationSettings = OperationSerializationSettings.Disabled
- },
- cancellationToken
- );
- }
-
- ///
- /// Subscribes to a persistent subscription. Messages must be manually acknowledged.
- ///
- /// The name of the stream to read events from.
- /// The name of the persistent subscription group.
- /// Optional settings to configure subscription
- /// The optional .
- ///
- public PersistentSubscriptionResult SubscribeToStream(
- string streamName,
- string groupName,
- SubscribeToPersistentSubscriptionOptions options,
- CancellationToken cancellationToken = default
+ string streamName, string groupName, int bufferSize = 10,
+ UserCredentials? userCredentials = null, CancellationToken cancellationToken = default
) {
if (streamName == null) {
throw new ArgumentNullException(nameof(streamName));
@@ -234,12 +94,12 @@ public PersistentSubscriptionResult SubscribeToStream(
throw new ArgumentException($"{nameof(groupName)} may not be empty.", nameof(groupName));
}
- if (options.BufferSize <= 0) {
- throw new ArgumentOutOfRangeException(nameof(options.BufferSize));
+ if (bufferSize <= 0) {
+ throw new ArgumentOutOfRangeException(nameof(bufferSize));
}
var readOptions = new ReadReq.Types.Options {
- BufferSize = options.BufferSize,
+ BufferSize = bufferSize,
GroupName = groupName,
UuidOption = new ReadReq.Types.Options.Types.UUIDOption { Structured = new Empty() }
};
@@ -267,8 +127,7 @@ public PersistentSubscriptionResult SubscribeToStream(
},
new() { Options = readOptions },
Settings,
- options.UserCredentials,
- _messageSerializer.With(Settings.Serialization, options.SerializationSettings),
+ userCredentials,
cancellationToken
);
}
@@ -276,41 +135,23 @@ public PersistentSubscriptionResult SubscribeToStream(
///
/// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged
///
- public Task SubscribeToAllAsync(
+ public async Task SubscribeToAllAsync(
string groupName,
Func eventAppeared,
Action? subscriptionDropped = null,
- UserCredentials? userCredentials = null,
- int bufferSize = 10,
+ UserCredentials? userCredentials = null, int bufferSize = 10,
CancellationToken cancellationToken = default
) =>
- SubscribeToAllAsync(
- groupName,
- PersistentSubscriptionListener.Handle(eventAppeared, subscriptionDropped),
- new SubscribeToPersistentSubscriptionOptions {
- BufferSize = bufferSize,
- UserCredentials = userCredentials,
- SerializationSettings = OperationSerializationSettings.Disabled
- },
- cancellationToken
- );
-
- ///
- /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged
- ///
- public Task SubscribeToAllAsync(
- string groupName,
- PersistentSubscriptionListener listener,
- SubscribeToPersistentSubscriptionOptions options,
- CancellationToken cancellationToken = default
- ) =>
- SubscribeToStreamAsync(
- SystemStreams.AllStream,
- groupName,
- listener,
- options,
- cancellationToken
- );
+ await SubscribeToStreamAsync(
+ SystemStreams.AllStream,
+ groupName,
+ eventAppeared,
+ subscriptionDropped,
+ userCredentials,
+ bufferSize,
+ cancellationToken
+ )
+ .ConfigureAwait(false);
///
/// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged.
@@ -321,76 +162,22 @@ public Task SubscribeToAllAsync(
/// The optional .
///
public PersistentSubscriptionResult SubscribeToAll(
- string groupName,
- int bufferSize,
- UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default
+ string groupName, int bufferSize = 10,
+ UserCredentials? userCredentials = null, CancellationToken cancellationToken = default
) =>
- SubscribeToStream(
- SystemStreams.AllStream,
- groupName,
- new SubscribeToPersistentSubscriptionOptions {
- BufferSize = bufferSize,
- UserCredentials = userCredentials,
- SerializationSettings = OperationSerializationSettings.Disabled
- },
- cancellationToken
- );
-
-
- ///
- /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged.
- ///
- /// The name of the persistent subscription group.
- /// The size of the buffer.
- /// The optional user credentials to perform operation with.
- /// The optional .
- ///
- public PersistentSubscriptionResult SubscribeToAll(
- string groupName,
- UserCredentials? userCredentials,
- CancellationToken cancellationToken = default
- ) =>
- SubscribeToStream(
- SystemStreams.AllStream,
- groupName,
- new SubscribeToPersistentSubscriptionOptions {
- UserCredentials = userCredentials,
- SerializationSettings = OperationSerializationSettings.Disabled
- },
- cancellationToken
- );
-
- ///
- /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged.
- ///
- /// The name of the persistent subscription group.
- /// Optional settings to configure subscription
- /// The optional .
- ///
- public PersistentSubscriptionResult SubscribeToAll(
- string groupName,
- SubscribeToPersistentSubscriptionOptions options,
- CancellationToken cancellationToken = default
- ) =>
- SubscribeToStream(
- SystemStreams.AllStream,
- groupName,
- options,
- cancellationToken
- );
+ SubscribeToStream(SystemStreams.AllStream, groupName, bufferSize, userCredentials, cancellationToken);
///
public class PersistentSubscriptionResult : IAsyncEnumerable, IAsyncDisposable, IDisposable {
- const int MaxEventIdLength = 2000;
+ const int MaxEventIdLength = 2000;
+
+ readonly ReadReq _request;
+ readonly Channel _channel;
+ readonly CancellationTokenSource _cts;
+ readonly CallOptions _callOptions;
- readonly ReadReq _request;
- readonly Channel _channel;
- readonly CancellationTokenSource _cts;
- readonly CallOptions _callOptions;
-
- AsyncDuplexStreamingCall? _call;
- int _messagesEnumerated;
+ AsyncDuplexStreamingCall? _call;
+ int _messagesEnumerated;
///
/// The server-generated unique identifier for the subscription.
@@ -413,33 +200,30 @@ public class PersistentSubscriptionResult : IAsyncEnumerable, IAs
public IAsyncEnumerable Messages {
get {
if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1)
- throw new InvalidOperationException("Messages may only be enumerated once.");
+ throw new InvalidOperationException("Messages may only be enumerated once.");
return GetMessages();
async IAsyncEnumerable GetMessages() {
- try {
- await foreach (var message in _channel.Reader.ReadAllAsync(_cts.Token)) {
- if (message is PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId))
- SubscriptionId = subscriptionId;
-
- yield return message;
- }
- } finally {
- _cts.Cancel();
- }
+ try {
+ await foreach (var message in _channel.Reader.ReadAllAsync(_cts.Token)) {
+ if (message is PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId))
+ SubscriptionId = subscriptionId;
+
+ yield return message;
+ }
+ }
+ finally {
+ _cts.Cancel();
+ }
}
}
}
internal PersistentSubscriptionResult(
- string streamName,
- string groupName,
+ string streamName, string groupName,
Func> selectChannelInfo,
- ReadReq request,
- KurrentClientSettings settings,
- UserCredentials? userCredentials,
- IMessageSerializer messageSerializer,
+ ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
StreamName = streamName;
@@ -463,21 +247,20 @@ CancellationToken cancellationToken
async Task PumpMessages() {
try {
- var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false);
- var client = new PersistentSubscriptionsClient(channelInfo.CallInvoker);
+ var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false);
+ var client = new PersistentSubscriptionsClient(channelInfo.CallInvoker);
_call = client.Read(_callOptions);
await _call.RequestStream.WriteAsync(_request).ConfigureAwait(false);
- await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token)
- .ConfigureAwait(false)) {
+ await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token).ConfigureAwait(false)) {
PersistentSubscriptionMessage subscriptionMessage = response.ContentCase switch {
SubscriptionConfirmation => new PersistentSubscriptionMessage.SubscriptionConfirmation(
response.SubscriptionConfirmation.SubscriptionId
),
Event => new PersistentSubscriptionMessage.Event(
- ConvertToResolvedEvent(response, messageSerializer),
+ ConvertToResolvedEvent(response),
response.Event.CountCase switch {
ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount,
_ => null
@@ -509,18 +292,17 @@ async Task PumpMessages() {
// The absence of this header leads to an RpcException with the status code 'Cancelled' and the message "No grpc-status found on response".
// The switch statement below handles these specific exceptions and translates them into the appropriate
// PersistentSubscriptionDroppedByServerException exception.
- case RpcException { StatusCode: StatusCode.Unavailable } rex1
- when rex1.Status.Detail.Contains("WinHttpException: Error 12030"):
+ case RpcException { StatusCode: StatusCode.Unavailable } rex1 when rex1.Status.Detail.Contains("WinHttpException: Error 12030"):
case RpcException { StatusCode: StatusCode.Cancelled } rex2
- when rex2.Status.Detail.Contains("No grpc-status found on response"):
+ when rex2.Status.Detail.Contains("No grpc-status found on response"):
ex = new PersistentSubscriptionDroppedByServerException(StreamName, GroupName, ex);
break;
}
#endif
if (ex is PersistentSubscriptionNotFoundException) {
await _channel.Writer
- .WriteAsync(PersistentSubscriptionMessage.NotFound.Instance, cancellationToken)
- .ConfigureAwait(false);
+ .WriteAsync(PersistentSubscriptionMessage.NotFound.Instance, cancellationToken)
+ .ConfigureAwait(false);
_channel.Writer.TryComplete();
return;
@@ -578,26 +360,19 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par
/// A reason given.
/// The s to nak. There should not be more than 2000 to nak at a time.
/// The number of resolvedEvents exceeded the limit of 2000.
- public Task Nack(
- PersistentSubscriptionNakEventAction action, string reason, params ResolvedEvent[] resolvedEvents
- ) =>
- Nack(action, reason, Array.ConvertAll(resolvedEvents, re => re.OriginalEvent.EventId));
-
- static ResolvedEvent ConvertToResolvedEvent(
- ReadResp response,
- IMessageSerializer messageSerializer
- ) =>
- ResolvedEvent.From(
- ConvertToEventRecord(response.Event.Event)!,
- ConvertToEventRecord(response.Event.Link),
- response.Event.PositionCase switch {
- ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
- _ => null
- },
- messageSerializer
- );
+ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params ResolvedEvent[] resolvedEvents) =>
+ Nack(action, reason, Array.ConvertAll(resolvedEvents, re => re.OriginalEvent.EventId));
+
+ static ResolvedEvent ConvertToResolvedEvent(ReadResp response) => new(
+ ConvertToEventRecord(response.Event.Event)!,
+ ConvertToEventRecord(response.Event.Link),
+ response.Event.PositionCase switch {
+ ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
+ _ => null
+ }
+ );
- Task AckInternal(params Uuid[] eventIds) {
+ Task AckInternal(params Uuid[] eventIds) {
if (eventIds.Length > MaxEventIdLength) {
throw new ArgumentException(
$"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.",
@@ -618,7 +393,7 @@ Task AckInternal(params Uuid[] eventIds) {
);
}
- Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action, string reason) {
+ Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action, string reason) {
if (eventIds.Length > MaxEventIdLength) {
throw new ArgumentException(
$"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.",
@@ -647,7 +422,7 @@ Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action,
);
}
- static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) =>
+ static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) =>
e is null
? null
: new EventRecord(
@@ -690,94 +465,14 @@ public void Dispose() {
}
///
- public async IAsyncEnumerator GetAsyncEnumerator(
- CancellationToken cancellationToken = default
- ) {
+ public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) {
await foreach (var message in Messages.WithCancellation(cancellationToken)) {
if (message is not PersistentSubscriptionMessage.Event(var resolvedEvent, _))
- continue;
+ continue;
yield return resolvedEvent;
}
}
}
}
-
- public static class KurrentClientPersistentSubscriptionsExtensions {
- ///
- /// Subscribes to a persistent subscription. Messages must be manually acknowledged
- ///
- ///
- ///
- ///
- public static Task SubscribeToStreamAsync(
- this KurrentPersistentSubscriptionsClient kurrentClient,
- string streamName,
- string groupName,
- PersistentSubscriptionListener listener,
- CancellationToken cancellationToken = default
- ) =>
- kurrentClient.SubscribeToStreamAsync(
- streamName,
- groupName,
- listener,
- new SubscribeToPersistentSubscriptionOptions(),
- cancellationToken
- );
-
- ///
- /// Subscribes to a persistent subscription. Messages must be manually acknowledged.
- ///
- ///
- /// The name of the stream to read events from.
- /// The name of the persistent subscription group.
- /// The optional .
- ///
- public static KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult SubscribeToStream(
- this KurrentPersistentSubscriptionsClient kurrentClient,
- string streamName,
- string groupName,
- CancellationToken cancellationToken = default
- ) =>
- kurrentClient.SubscribeToStream(
- streamName,
- groupName,
- new SubscribeToPersistentSubscriptionOptions(),
- cancellationToken
- );
-
- ///
- /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged
- ///
- public static Task SubscribeToAllAsync(
- this KurrentPersistentSubscriptionsClient kurrentClient,
- string groupName,
- PersistentSubscriptionListener listener,
- CancellationToken cancellationToken = default
- ) =>
- kurrentClient.SubscribeToAllAsync(
- groupName,
- listener,
- new SubscribeToPersistentSubscriptionOptions(),
- cancellationToken
- );
-
- ///
- /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged.
- ///
- ///
- /// The name of the persistent subscription group.
- /// The optional .
- ///
- public static KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult SubscribeToAll(
- this KurrentPersistentSubscriptionsClient kurrentClient,
- string groupName,
- CancellationToken cancellationToken = default
- ) =>
- kurrentClient.SubscribeToAll(
- groupName,
- new SubscribeToPersistentSubscriptionOptions(),
- cancellationToken
- );
- }
}
diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs
index 0d251f59c..070f32698 100644
--- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs
+++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs
@@ -1,7 +1,6 @@
using System.Text.Encodings.Web;
using System.Threading.Channels;
using Grpc.Core;
-using Kurrent.Client.Core.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@@ -10,14 +9,13 @@ namespace EventStore.Client {
/// The client used to manage persistent subscriptions in the KurrentDB.
///
public sealed partial class KurrentPersistentSubscriptionsClient : KurrentClientBase {
- static BoundedChannelOptions ReadBoundedChannelOptions = new (1) {
+ private static BoundedChannelOptions ReadBoundedChannelOptions = new (1) {
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = true
};
- readonly ILogger _log;
- readonly IMessageSerializer _messageSerializer;
+ private readonly ILogger _log;
///
/// Constructs a new .
@@ -39,8 +37,6 @@ public KurrentPersistentSubscriptionsClient(KurrentClientSettings? settings) : b
}) {
_log = Settings.LoggerFactory?.CreateLogger()
?? new NullLogger();
-
- _messageSerializer = MessageSerializer.From(settings?.Serialization);
}
private static string UrlEncode(string s) {
diff --git a/src/Kurrent.Client/PersistentSubscriptions/PersistentSubscription.cs b/src/Kurrent.Client/PersistentSubscriptions/PersistentSubscription.cs
index 0674cb9ac..637f54e30 100644
--- a/src/Kurrent.Client/PersistentSubscriptions/PersistentSubscription.cs
+++ b/src/Kurrent.Client/PersistentSubscriptions/PersistentSubscription.cs
@@ -7,9 +7,7 @@ namespace EventStore.Client {
/// Represents a persistent subscription connection.
///
public class PersistentSubscription : IDisposable {
- private readonly KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult
- _persistentSubscriptionResult;
-
+ private readonly KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult _persistentSubscriptionResult;
private readonly IAsyncEnumerator _enumerator;
private readonly Func _eventAppeared;
private readonly Action _subscriptionDropped;
@@ -25,10 +23,9 @@ private readonly KurrentPersistentSubscriptionsClient.PersistentSubscriptionResu
internal static async Task Confirm(
KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult,
- PersistentSubscriptionListener listener,
- ILogger log,
- CancellationToken cancellationToken = default
- ) {
+ Func eventAppeared,
+ Action subscriptionDropped,
+ ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default) {
var enumerator = persistentSubscriptionResult
.Messages
.GetAsyncEnumerator(cancellationToken);
@@ -37,19 +34,11 @@ internal static async Task Confirm(
return (result, enumerator.Current) switch {
(true, PersistentSubscriptionMessage.SubscriptionConfirmation (var subscriptionId)) =>
- new PersistentSubscription(
- persistentSubscriptionResult,
- enumerator,
- subscriptionId,
- listener,
- log,
- cancellationToken
- ),
+ new PersistentSubscription(persistentSubscriptionResult, enumerator, subscriptionId, eventAppeared,
+ subscriptionDropped, log, cancellationToken),
(true, PersistentSubscriptionMessage.NotFound) =>
- throw new PersistentSubscriptionNotFoundException(
- persistentSubscriptionResult.StreamName,
- persistentSubscriptionResult.GroupName
- ),
+ throw new PersistentSubscriptionNotFoundException(persistentSubscriptionResult.StreamName,
+ persistentSubscriptionResult.GroupName),
_ => throw new InvalidOperationException("Subscription could not be confirmed.")
};
}
@@ -57,19 +46,17 @@ internal static async Task Confirm(
// PersistentSubscription takes responsibility for disposing the call and the disposable
private PersistentSubscription(
KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult,
- IAsyncEnumerator enumerator,
- string subscriptionId,
- PersistentSubscriptionListener listener,
- ILogger log,
- CancellationToken cancellationToken
- ) {
+ IAsyncEnumerator enumerator, string subscriptionId,
+ Func eventAppeared,
+ Action subscriptionDropped, ILogger log,
+ CancellationToken cancellationToken) {
_persistentSubscriptionResult = persistentSubscriptionResult;
- _enumerator = enumerator;
- SubscriptionId = subscriptionId;
- _eventAppeared = listener.EventAppeared;
- _subscriptionDropped = listener.SubscriptionDropped ?? delegate { };
- _log = log;
- _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ _enumerator = enumerator;
+ SubscriptionId = subscriptionId;
+ _eventAppeared = eventAppeared;
+ _subscriptionDropped = subscriptionDropped;
+ _log = log;
+ _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task.Run(Subscribe, _cts.Token);
}
@@ -104,6 +91,7 @@ public Task Ack(params ResolvedEvent[] resolvedEvents) =>
public Task Ack(IEnumerable resolvedEvents) =>
Ack(resolvedEvents.Select(resolvedEvent => resolvedEvent.OriginalEvent.EventId));
+
///
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
///
@@ -111,8 +99,7 @@ public Task Ack(IEnumerable resolvedEvents) =>
/// A reason given.
/// The of the s to nak. There should not be more than 2000 to nak at a time.
/// The number of eventIds exceeded the limit of 2000.
- public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) =>
- NackInternal(eventIds, action, reason);
+ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) => NackInternal(eventIds, action, reason);
///
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
@@ -121,15 +108,10 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par
/// A reason given.
/// The s to nak. There should not be more than 2000 to nak at a time.
/// The number of resolvedEvents exceeded the limit of 2000.
- public Task Nack(
- PersistentSubscriptionNakEventAction action, string reason,
- params ResolvedEvent[] resolvedEvents
- ) =>
- Nack(
- action,
- reason,
- Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId)
- );
+ public Task Nack(PersistentSubscriptionNakEventAction action, string reason,
+ params ResolvedEvent[] resolvedEvents) =>
+ Nack(action, reason,
+ Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId));
///
public void Dispose() => SubscriptionDropped(SubscriptionDroppedReason.Disposed);
@@ -139,8 +121,7 @@ private async Task Subscribe() {
try {
while (await _enumerator.MoveNextAsync(_cts.Token).ConfigureAwait(false)) {
- if (_enumerator.Current is not
- PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
+ if (_enumerator.Current is not PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
continue;
}
@@ -148,54 +129,39 @@ private async Task Subscribe() {
if (_subscriptionDroppedInvoked != 0) {
return;
}
-
- SubscriptionDropped(
- SubscriptionDroppedReason.ServerError,
+ SubscriptionDropped(SubscriptionDroppedReason.ServerError,
new PersistentSubscriptionNotFoundException(
- _persistentSubscriptionResult.StreamName,
- _persistentSubscriptionResult.GroupName
- )
- );
-
+ _persistentSubscriptionResult.StreamName, _persistentSubscriptionResult.GroupName));
return;
}
-
+
_log.LogTrace(
"Persistent Subscription {subscriptionId} received event {streamName}@{streamRevision} {position}",
- SubscriptionId,
- resolvedEvent.OriginalEvent.EventStreamId,
- resolvedEvent.OriginalEvent.EventNumber,
- resolvedEvent.OriginalEvent.Position
- );
+ SubscriptionId, resolvedEvent.OriginalEvent.EventStreamId,
+ resolvedEvent.OriginalEvent.EventNumber, resolvedEvent.OriginalEvent.Position);
try {
await _eventAppeared(
this,
resolvedEvent,
retryCount,
- _cts.Token
- ).ConfigureAwait(false);
+ _cts.Token).ConfigureAwait(false);
} catch (Exception ex) when (ex is ObjectDisposedException or OperationCanceledException) {
if (_subscriptionDroppedInvoked != 0) {
return;
}
- _log.LogWarning(
- ex,
+ _log.LogWarning(ex,
"Persistent Subscription {subscriptionId} was dropped because cancellation was requested by another caller.",
- SubscriptionId
- );
+ SubscriptionId);
SubscriptionDropped(SubscriptionDroppedReason.Disposed);
return;
} catch (Exception ex) {
- _log.LogError(
- ex,
+ _log.LogError(ex,
"Persistent Subscription {subscriptionId} was dropped because the subscriber made an error.",
- SubscriptionId
- );
-
+ SubscriptionId);
SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex);
return;
@@ -203,21 +169,16 @@ await _eventAppeared(
}
} catch (Exception ex) {
if (_subscriptionDroppedInvoked == 0) {
- _log.LogError(
- ex,
+ _log.LogError(ex,
"Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.",
- SubscriptionId
- );
-
+ SubscriptionId);
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
}
} finally {
if (_subscriptionDroppedInvoked == 0) {
_log.LogError(
"Persistent Subscription {subscriptionId} was unexpectedly terminated.",
- SubscriptionId
- );
-
+ SubscriptionId);
SubscriptionDropped(SubscriptionDroppedReason.ServerError);
}
}
diff --git a/src/Kurrent.Client/Streams/KurrentClient.Append.cs b/src/Kurrent.Client/Streams/KurrentClient.Append.cs
index 593d2ab40..39d6f3066 100644
--- a/src/Kurrent.Client/Streams/KurrentClient.Append.cs
+++ b/src/Kurrent.Client/Streams/KurrentClient.Append.cs
@@ -6,7 +6,6 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using EventStore.Client.Diagnostics;
-using Kurrent.Client.Core.Serialization;
using Kurrent.Diagnostics;
using Kurrent.Diagnostics.Telemetry;
using Kurrent.Diagnostics.Tracing;
@@ -15,48 +14,6 @@
namespace EventStore.Client {
public partial class KurrentClient {
- ///
- /// Appends events asynchronously to a stream. Messages are serialized using default or custom serialization configured through
- ///
- /// The name of the stream to append events to.
- /// Messages to append to the stream.
- /// Optional settings for the append operation, e.g. expected stream position for optimistic concurrency check
- /// The optional .
- ///
- public Task AppendToStreamAsync(
- string streamName,
- IEnumerable messages,
- AppendToStreamOptions options,
- CancellationToken cancellationToken = default
- ) {
- var serializationContext = new MessageSerializationContext(
- streamName,
- Settings.Serialization.DefaultContentType
- );
-
- var eventsData = _messageSerializer.Serialize(messages, serializationContext);
-
- return options.ExpectedStreamRevision.HasValue
- ? AppendToStreamAsync(
- streamName,
- options.ExpectedStreamRevision.Value,
- eventsData,
- options.ConfigureOperationOptions,
- options.Deadline,
- options.UserCredentials,
- cancellationToken
- )
- : AppendToStreamAsync(
- streamName,
- options.ExpectedStreamState ?? StreamState.Any,
- eventsData,
- options.ConfigureOperationOptions,
- options.Deadline,
- options.UserCredentials,
- cancellationToken
- );
- }
-
///
/// Appends events asynchronously to a stream.
///
@@ -157,28 +114,16 @@ ValueTask AppendToStreamInternal(
CancellationToken cancellationToken
) {
var tags = new ActivityTagsCollection()
- .WithRequiredTag(
- TelemetryTags.Kurrent.Stream,
- header.Options.StreamIdentifier.StreamName.ToStringUtf8()
- )
+ .WithRequiredTag(TelemetryTags.Kurrent.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(Settings)
- .WithOptionalTag(
- TelemetryTags.Database.User,
- userCredentials?.Username ?? Settings.DefaultCredentials?.Username
- );
+ .WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username);
- return KurrentClientDiagnostics.ActivitySource.TraceClientOperation(
- Operation,
- TracingConstants.Operations.Append,
- tags
- );
+ return KurrentClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags);
async ValueTask Operation() {
using var call = new StreamsClient(channelInfo.CallInvoker)
- .Append(
- KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
- );
+ .Append(KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.RequestStream
.WriteAsync(header)
@@ -215,13 +160,11 @@ await call.RequestStream
}
IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
- var currentRevision = response.Success.CurrentRevisionOptionCase
- == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
+ var currentRevision = response.Success.CurrentRevisionOptionCase == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision);
- var position = response.Success.PositionOptionCase
- == AppendResp.Types.Success.PositionOptionOneofCase.Position
+ var position = response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(response.Success.Position.CommitPosition, response.Success.Position.PreparePosition)
: default;
@@ -238,8 +181,7 @@ IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
IWriteResult HandleWrongExpectedRevision(
AppendResp response, AppendReq header, KurrentClientOperationOptions operationOptions
) {
- var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase
- == CurrentRevisionOptionOneofCase.CurrentRevision
+ var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision
? new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
: StreamRevision.None;
@@ -251,8 +193,7 @@ IWriteResult HandleWrongExpectedRevision(
);
if (operationOptions.ThrowOnAppendFailure) {
- if (response.WrongExpectedVersion.ExpectedRevisionOptionCase
- == ExpectedRevisionOptionOneofCase.ExpectedRevision) {
+ if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
@@ -274,8 +215,7 @@ IWriteResult HandleWrongExpectedRevision(
);
}
- var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase
- == ExpectedRevisionOptionOneofCase.ExpectedRevision
+ var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision
? new StreamRevision(response.WrongExpectedVersion.ExpectedRevision)
: StreamRevision.None;
@@ -287,7 +227,7 @@ IWriteResult HandleWrongExpectedRevision(
}
class StreamAppender : IDisposable {
- readonly KurrentClientSettings _settings;
+ readonly KurrentClientSettings _settings;
readonly CancellationToken _cancellationToken;
readonly Action _onException;
readonly Channel _channel;
@@ -362,7 +302,8 @@ async ValueTask Operation() {
try {
foreach (var appendRequest in GetRequests(events, options, correlationId))
await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
- } catch (ChannelClosedException ex) {
+ }
+ catch (ChannelClosedException ex) {
// channel is closed, our tcs won't necessarily get completed, don't wait for it.
throw ex.InnerException ?? ex;
}
@@ -392,7 +333,8 @@ async Task Duplex(ValueTask channelInfoTask) {
_ = Task.Run(Receive, _cancellationToken);
_isUsable.TrySetResult(true);
- } catch (Exception ex) {
+ }
+ catch (Exception ex) {
_isUsable.TrySetException(ex);
_onException(ex);
}
@@ -402,8 +344,7 @@ async Task Duplex(ValueTask channelInfoTask) {
async Task Send() {
if (_call is null) return;
- await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken)
- .ConfigureAwait(false))
+ await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);
await _call.RequestStream.CompleteAsync().ConfigureAwait(false);
@@ -413,22 +354,20 @@ async Task Receive() {
if (_call is null) return;
try {
- await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken)
- .ConfigureAwait(false)) {
- if (!_pendingRequests.TryRemove(
- Uuid.FromDto(response.CorrelationId),
- out var writeResult
- )) {
+ await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) {
+ if (!_pendingRequests.TryRemove(Uuid.FromDto(response.CorrelationId), out var writeResult)) {
continue; // TODO: Log?
}
try {
writeResult.TrySetResult(response.ToWriteResult());
- } catch (Exception ex) {
+ }
+ catch (Exception ex) {
writeResult.TrySetException(ex);
}
}
- } catch (Exception ex) {
+ }
+ catch (Exception ex) {
// signal that no tcs added to _pendingRequests after this point will necessarily complete
_channel.Writer.TryComplete(ex);
@@ -441,9 +380,7 @@ out var writeResult
}
}
- IEnumerable GetRequests(
- IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId
- ) {
+ IEnumerable GetRequests(IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId) {
var batchSize = 0;
var first = true;
var correlationIdDto = correlationId.ToDto();
@@ -490,153 +427,4 @@ public void Dispose() {
}
}
}
-
- public static class KurrentClientAppendToStreamExtensions {
- ///
- /// Appends events asynchronously to a stream. Messages are serialized using default or custom serialization configured through
- ///
- ///
- /// The name of the stream to append events to.
- /// Messages to append to the stream.
- /// The optional .
- ///
- public static Task AppendToStreamAsync(
- this KurrentClient client,
- string streamName,
- IEnumerable messages,
- CancellationToken cancellationToken = default
- )
- => client.AppendToStreamAsync(
- streamName,
- messages,
- new AppendToStreamOptions(),
- cancellationToken
- );
-
- ///
- /// Appends events asynchronously to a stream. Messages are serialized using default or custom serialization configured through
- ///
- ///
- /// The name of the stream to append events to.
- /// Messages to append to the stream.
- /// The optional .
- ///
- public static Task AppendToStreamAsync(
- this KurrentClient client,
- string streamName,
- IEnumerable