From 5104ce34c0108b15115457c33f24a932afed06fc Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Fri, 14 Mar 2025 10:17:03 +0100 Subject: [PATCH 1/8] [DEVEX-250] Plugged the initial code for serialization settings --- .../dbclient/ConnectionSettingsBuilder.java | 15 +++++++- .../kurrent/dbclient/KurrentDBClientBase.java | 5 +++ .../dbclient/KurrentDBClientSettings.java | 12 ++++++- .../KurrentDBClientSerializationSettings.java | 34 +++++++++++++++++++ .../serialization/MessageSerializer.java | 5 +++ .../MessageSerializerBuilder.java | 7 ++++ .../serialization/MessageSerializerImpl.java | 4 +++ 7 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java b/db-client-java/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java index df4cfe4e..dd02657b 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java @@ -2,6 +2,7 @@ import io.grpc.ClientInterceptor; +import io.kurrent.dbclient.serialization.KurrentDBClientSerializationSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ public class ConnectionSettingsBuilder { private List _interceptors = new ArrayList<>(); private String _tlsCaFile = null; private Set _features = new HashSet<>(); + private KurrentDBClientSerializationSettings _serializationSettings; ConnectionSettingsBuilder() {} @@ -60,7 +62,9 @@ public KurrentDBClientSettings buildConnectionSettings() { _defaultDeadline, _interceptors, _tlsCaFile, - _features); + _features, + _serializationSettings + ); } /** @@ -241,6 +245,15 @@ public ConnectionSettingsBuilder feature(String feature) { return this; } + /** + * Provides configuration options for messages serialization and deserialization in the KurrentDB client. + * If null, default settings are used. + */ + public ConnectionSettingsBuilder serialization(KurrentDBClientSerializationSettings serializationSettings) { + this._serializationSettings = serializationSettings; + return this; + } + void parseGossipSeed(String host) { String[] hostParts = host.split(":"); diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java index c552c3e7..f6ac464d 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java @@ -1,5 +1,7 @@ package io.kurrent.dbclient; +import io.kurrent.dbclient.serialization.MessageSerializer; +import io.kurrent.dbclient.serialization.MessageSerializerBuilder; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,6 +16,7 @@ public class KurrentDBClientBase { final Logger logger = LoggerFactory.getLogger(KurrentDBClientBase.class); final private GrpcClient client; + final private MessageSerializer serializer; KurrentDBClientBase(KurrentDBClientSettings settings) { Discovery discovery; @@ -30,6 +33,8 @@ public class KurrentDBClientBase { this.client = service.getHandle(); CompletableFuture.runAsync(service, createConnectionLoopExecutor()); + + serializer = MessageSerializerBuilder.get(); } private Executor createConnectionLoopExecutor() { return Executors.newSingleThreadExecutor(r -> { diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientSettings.java b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientSettings.java index 8b0ae2a9..e34ded7f 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientSettings.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientSettings.java @@ -1,6 +1,7 @@ package io.kurrent.dbclient; import io.grpc.ClientInterceptor; +import io.kurrent.dbclient.serialization.KurrentDBClientSerializationSettings; import java.net.InetSocketAddress; import java.util.List; @@ -41,6 +42,7 @@ public class KurrentDBClientSettings { private final List interceptors; private final String tlsCaFile; private final Set features; + private final KurrentDBClientSerializationSettings serializationSettings; /** * If the dns discovery is enabled. @@ -167,6 +169,12 @@ public String getTlsCaFile() { */ public Set getFeatures() { return features; } + /** + * Provides configuration options for messages serialization and deserialization in the KurrentDB client. + * If null, default settings are used. + */ + public KurrentDBClientSerializationSettings getSerializationSettings() { return serializationSettings; } + KurrentDBClientSettings( boolean dnsDiscover, int maxDiscoverAttempts, @@ -183,7 +191,8 @@ public String getTlsCaFile() { Long defaultDeadline, List interceptors, String tlsCaFile, - Set features + Set features, + KurrentDBClientSerializationSettings serializationSettings ) { this.dnsDiscover = dnsDiscover; this.maxDiscoverAttempts = maxDiscoverAttempts; @@ -201,6 +210,7 @@ public String getTlsCaFile() { this.interceptors = interceptors; this.tlsCaFile = tlsCaFile; this.features = features; + this.serializationSettings = serializationSettings; } /** diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java new file mode 100644 index 00000000..e9915c17 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java @@ -0,0 +1,34 @@ +package io.kurrent.dbclient.serialization; + +import java.util.function.Consumer; + +/** + * Provides configuration options for messages serialization and deserialization in the KurrentDB client. + */ +public class KurrentDBClientSerializationSettings { + /** + * Creates a new instance of serialization settings with either default values or custom configuration. + * This factory method is the recommended way to create serialization settings for the KurrentDB client. + * @param configure Optional callback to customize the settings. If null, default settings are used. + * @return A fully configured instance ready to be used with the KurrentDB client. + *
{@code
+     * var settings = KurrentDBClientSerializationSettings.get(options -> {
+     *     options.RegisterMessageType("user-created");
+     *     options.RegisterMessageType("user-role-assigned");
+     * });
+     * }
+ */ + public static KurrentDBClientSerializationSettings get( + Consumer configure + ) { + KurrentDBClientSerializationSettings settings = get(); + + configure.accept(settings); + + return settings; + } + + public static KurrentDBClientSerializationSettings get() { + return new KurrentDBClientSerializationSettings(); + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java new file mode 100644 index 00000000..61ea3f29 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java @@ -0,0 +1,5 @@ +package io.kurrent.dbclient.serialization; + +public interface MessageSerializer { +} + diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java new file mode 100644 index 00000000..5c52a1e6 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java @@ -0,0 +1,7 @@ +package io.kurrent.dbclient.serialization; + +public final class MessageSerializerBuilder { + public static MessageSerializer get() { + return new MessageSerializerImpl(); + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java new file mode 100644 index 00000000..7ec15af2 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java @@ -0,0 +1,4 @@ +package io.kurrent.dbclient.serialization; + +class MessageSerializerImpl implements MessageSerializer { +} From 316bc7aa02fc2b36b50419195e58c74340dc62a7 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Fri, 14 Mar 2025 14:38:58 +0100 Subject: [PATCH 2/8] [DEVEX-250] Added default structure for message serializer and naming resolution --- .../dbclient/AppendToStreamOptions.java | 24 ++++ .../io/kurrent/dbclient/KurrentDBClient.java | 135 ++++++++++++++---- .../kurrent/dbclient/KurrentDBClientBase.java | 2 +- .../dbclient/serialization/ContentType.java | 16 +++ .../MessageSerializationContext.java | 13 ++ .../serialization/MessageSerializer.java | 3 + .../serialization/MessageSerializerImpl.java | 9 ++ .../MessageTypeNamingResolutionContext.java | 21 +++ .../OperationSerializationSettings.java | 4 + .../io/kurrent/dbclient/StreamsTests.java | 2 + .../kurrent/dbclient/streams/AppendTests.java | 3 + .../serialization/SerializationTests.java | 105 ++++++++++++++ 12 files changed, 307 insertions(+), 30 deletions(-) create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/ContentType.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingResolutionContext.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java create mode 100644 db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStreamOptions.java b/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStreamOptions.java index 3dd3f2be..e8c78775 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStreamOptions.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStreamOptions.java @@ -1,12 +1,36 @@ package io.kurrent.dbclient; +import io.kurrent.dbclient.serialization.OperationSerializationSettings; + +import java.util.Optional; + /** * Options of the append stream request. */ public class AppendToStreamOptions extends OptionsWithStreamStateBase { + private OperationSerializationSettings serializationSettings = null; + private AppendToStreamOptions() { } + /** + * Returns optional serialization settings + */ + public Optional serializationSettings() { + return Optional.ofNullable(this.serializationSettings); + } + + /** + * Allows to customize or disable the automatic deserialization + * + * @param serializationSettings - expected revision. + * @return updated options. + */ + public AppendToStreamOptions serializationSettings(OperationSerializationSettings serializationSettings) { + this.serializationSettings = serializationSettings; + return this; + } + /** * Returns options with default values. */ diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java index 22cbe548..6537d2ae 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.json.JsonMapper; +import io.kurrent.dbclient.serialization.MessageSerializationContext; import org.reactivestreams.Publisher; import java.util.Arrays; @@ -9,6 +10,10 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.kurrent.dbclient.serialization.MessageTypeNamingResolutionContext.fromStreamName; /** * Represents EventStoreDB client for stream operations. A client instance maintains a two-way communication to EventStoreDB. @@ -28,10 +33,11 @@ public static KurrentDBClient create(KurrentDBClientSettings settings) { /** * Appends events to a given stream. + * * @param streamName stream's name. - * @param events events to send. - * @see WriteResult + * @param events events to send. * @return a write result if successful. + * @see WriteResult */ public CompletableFuture appendToStream(String streamName, EventData... events) { return this.appendToStream(streamName, Arrays.stream(events).iterator()); @@ -39,10 +45,11 @@ public CompletableFuture appendToStream(String streamName, EventDat /** * Appends events to a given stream. + * * @param streamName stream's name. - * @param events events to send. - * @see WriteResult + * @param events events to send. * @return a write result if successful. + * @see WriteResult */ public CompletableFuture appendToStream(String streamName, Iterator events) { return this.appendToStream(streamName, AppendToStreamOptions.get(), events); @@ -50,11 +57,12 @@ public CompletableFuture appendToStream(String streamName, Iterator /** * Appends events to a given stream. + * * @param streamName stream's name. - * @param options append stream request's options. - * @param events events to send. - * @see WriteResult + * @param options append stream request's options. + * @param events events to send. * @return a write result if successful. + * @see WriteResult */ public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, EventData... events) { return this.appendToStream(streamName, options, Arrays.stream(events).iterator()); @@ -62,11 +70,12 @@ public CompletableFuture appendToStream(String streamName, AppendTo /** * Appends events to a given stream. + * * @param streamName stream's name. - * @param options append stream request's options. - * @param events events to send. - * @see WriteResult + * @param options append stream request's options. + * @param events events to send. * @return a write result if successful. + * @see WriteResult */ public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, Iterator events) { if (options == null) @@ -76,11 +85,63 @@ public CompletableFuture appendToStream(String streamName, AppendTo } /** - * Sets a stream's metadata. + * Appends events to a given stream. + * + * @param streamName stream's name. + * @param events events to send. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream(String streamName, List events) { + // TODO: Sort type erasure issue with Iterator + return this.appendToStream(streamName, AppendToStreamOptions.get(), events); + } + + /** + * Appends events to a given stream. + * + * @param streamName stream's name. + * @param options append stream request's options. + * @param events events to send. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, Object... events) { + return this.appendToStream(streamName, options, Stream.of(events).collect(Collectors.toList())); + } + + /** + * Appends events to a given stream. + * * @param streamName stream's name. - * @param metadata stream's metadata + * @param options append stream request's options. + * @param events events to send. + * @return a write result if successful. * @see WriteResult + */ + public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, List events) { + // TODO: Sort type erasure issue with Iterator + if (options == null) + options = AppendToStreamOptions.get(); + + MessageSerializationContext serializationContext = new MessageSerializationContext(fromStreamName(streamName)); + + options.serializationSettings() + .map(serializer::with) + .orElse(serializer) + .serialize(); + + throw new RuntimeException("Not Implemented!"); + //return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute(); + } + + /** + * Sets a stream's metadata. + * + * @param streamName stream's name. + * @param metadata stream's metadata * @return a write result if successful. + * @see WriteResult */ public CompletableFuture setStreamMetadata(String streamName, StreamMetadata metadata) { return setStreamMetadata(streamName, null, metadata); @@ -88,11 +149,12 @@ public CompletableFuture setStreamMetadata(String streamName, Strea /** * Sets a stream's metadata. + * * @param streamName stream's name. - * @param options append stream request's options. - * @param metadata stream's metadata - * @see WriteResult + * @param options append stream request's options. + * @param metadata stream's metadata * @return a write result if successful. + * @see WriteResult */ public CompletableFuture setStreamMetadata(String streamName, AppendToStreamOptions options, StreamMetadata metadata) { JsonMapper mapper = new JsonMapper(); @@ -103,12 +165,13 @@ public CompletableFuture setStreamMetadata(String streamName, Appen } catch (JsonProcessingException e) { throw new RuntimeException(e); } - } + } /** * Reads events from a given stream. The reading can be done forwards and backwards. + * * @param streamName stream's name. - * @param options read request's operations. + * @param options read request's operations. */ public CompletableFuture readStream(String streamName, ReadStreamOptions options) { return readEventsFromPublisher(readStreamReactive(streamName, options)); @@ -116,6 +179,7 @@ public CompletableFuture readStream(String streamName, ReadStreamOpt /** * Reads events from a given stream. The reading can be done forwards and backwards. + * * @param streamName stream's name. */ public Publisher readStreamReactive(String streamName) { @@ -125,8 +189,9 @@ public Publisher readStreamReactive(String streamName) { /** * Reads events from a given stream. The reading can be done forwards and backwards. + * * @param streamName stream's name. - * @param options read request's operations. + * @param options read request's operations. */ public Publisher readStreamReactive(String streamName, ReadStreamOptions options) { if (options == null) @@ -137,6 +202,7 @@ public Publisher readStreamReactive(String streamName, ReadStreamOp /** * Reads stream's metadata. + * * @param streamName stream's name. * @see StreamMetadata */ @@ -146,8 +212,9 @@ public CompletableFuture getStreamMetadata(String streamName) { /** * Reads stream's metadata. + * * @param streamName stream's name. - * @param options read request's operations. + * @param options read request's operations. * @see StreamMetadata */ public CompletableFuture getStreamMetadata(String streamName, ReadStreamOptions options) { @@ -183,6 +250,7 @@ public CompletableFuture readAll() { /** * Reads events from the $all stream. The reading can be done forwards and backwards. + * * @param options options of the read $all request. */ public CompletableFuture readAll(ReadAllOptions options) { @@ -195,6 +263,7 @@ public Publisher readAllReactive() { /** * Reads events from the $all stream. The reading can be done forwards and backwards. + * * @param options options of the read $all request. */ public Publisher readAllReactive(ReadAllOptions options) { @@ -210,8 +279,9 @@ public Publisher readAllReactive(ReadAllOptions options) { * event from the starting point onward. If events already exist, the handler will be called for each event one by * one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event * appears. + * * @param streamName stream's name. - * @param listener consumes a subscription's events. + * @param listener consumes a subscription's events. * @return a subscription handle. */ public CompletableFuture subscribeToStream(String streamName, SubscriptionListener listener) { @@ -224,9 +294,10 @@ public CompletableFuture subscribeToStream(String streamName, Subs * event from the starting point onward. If events already exist, the handler will be called for each event one by * one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event * appears. + * * @param streamName stream's name. - * @param listener consumes a subscription's events. - * @param options a subscription request's options. + * @param listener consumes a subscription's events. + * @param options a subscription request's options. * @return a subscription handle. */ public CompletableFuture subscribeToStream(String streamName, SubscriptionListener listener, SubscribeToStreamOptions options) { @@ -242,6 +313,7 @@ public CompletableFuture subscribeToStream(String streamName, Subs * event from the starting point onward. If events already exist, the handler will be called for each event one by * one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event * appears. + * * @param listener consumes a subscription's events. * @return a subscription handle. */ @@ -255,8 +327,9 @@ public CompletableFuture subscribeToAll(SubscriptionListener liste * event from the starting point onward. If events already exist, the handler will be called for each event one by * one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event * appears. + * * @param listener consumes a subscription's events. - * @param options subscription to $all request's options. + * @param options subscription to $all request's options. * @return a subscription handle. */ public CompletableFuture subscribeToAll(SubscriptionListener listener, SubscribeToAllOptions options) { @@ -274,9 +347,10 @@ public CompletableFuture subscribeToAll(SubscriptionListener liste * deleting the stream, you are able to write to it again, continuing from where it left off. *

* Note: Deletion is reversible until the scavenging process runs. + * * @param streamName stream's name - * @see DeleteResult * @return if successful, delete result. + * @see DeleteResult */ public CompletableFuture deleteStream(String streamName) { return this.deleteStream(streamName, DeleteStreamOptions.get()); @@ -290,10 +364,11 @@ public CompletableFuture deleteStream(String streamName) { * deleting the stream, you are able to write to it again, continuing from where it left off. *

* Note: soft deletion is reversible until the scavenging process runs. + * * @param streamName stream's name - * @param options delete stream request's options. - * @see DeleteResult + * @param options delete stream request's options. * @return if successful, delete result. + * @see DeleteResult */ public CompletableFuture deleteStream(String streamName, DeleteStreamOptions options) { if (options == null) @@ -309,9 +384,10 @@ public CompletableFuture deleteStream(String streamName, DeleteStr * written to again. Tombstone events are written with the event's type '$streamDeleted'. When a tombstoned stream * is read, the read will return a StreamDeleted error. *

+ * * @param streamName a stream's name. - * @see DeleteResult * @return if successful, delete result. + * @see DeleteResult */ public CompletableFuture tombstoneStream(String streamName) { return this.tombstoneStream(streamName, DeleteStreamOptions.get()); @@ -324,10 +400,11 @@ public CompletableFuture tombstoneStream(String streamName) { * written to again. Tombstone events are written with the event's type '$streamDeleted'. When a tombstoned stream * is read, the read will return a StreamDeleted error. *

+ * * @param streamName a stream's name. - * @param options delete stream request's options. - * @see DeleteResult + * @param options delete stream request's options. * @return if successful, delete result. + * @see DeleteResult */ public CompletableFuture tombstoneStream(String streamName, DeleteStreamOptions options) { if (options == null) diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java index f6ac464d..41dbcb3d 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java @@ -16,7 +16,7 @@ public class KurrentDBClientBase { final Logger logger = LoggerFactory.getLogger(KurrentDBClientBase.class); final private GrpcClient client; - final private MessageSerializer serializer; + final protected MessageSerializer serializer; KurrentDBClientBase(KurrentDBClientSettings settings) { Discovery discovery; diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/ContentType.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/ContentType.java new file mode 100644 index 00000000..617902bb --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/ContentType.java @@ -0,0 +1,16 @@ +package io.kurrent.dbclient.serialization; + +public enum ContentType { + JSON(1), + // PROTBUF(2), + // AVRO(3), + BYTES(4); + + private final int value; + + ContentType(final int newValue) { + value = newValue; + } + + public int getValue() { return value; } +} \ No newline at end of file diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java new file mode 100644 index 00000000..e6eeb283 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java @@ -0,0 +1,13 @@ +package io.kurrent.dbclient.serialization; + +public class MessageSerializationContext { + private final MessageTypeNamingResolutionContext namingResolution; + + public MessageSerializationContext(MessageTypeNamingResolutionContext namingResolution) { + this.namingResolution = namingResolution; + } + + public MessageTypeNamingResolutionContext getNamingResolution() { + return namingResolution; + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java index 61ea3f29..b4b9f9f4 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java @@ -1,5 +1,8 @@ package io.kurrent.dbclient.serialization; public interface MessageSerializer { + MessageSerializer with(OperationSerializationSettings serializationSettings); + + void serialize(); } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java index 7ec15af2..bcd5688e 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java @@ -1,4 +1,13 @@ package io.kurrent.dbclient.serialization; class MessageSerializerImpl implements MessageSerializer { + @Override + public MessageSerializer with(OperationSerializationSettings serializationSettings) { + return null; + } + + @Override + public void serialize() { + + } } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingResolutionContext.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingResolutionContext.java new file mode 100644 index 00000000..7342ddce --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingResolutionContext.java @@ -0,0 +1,21 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Arrays; + +public class MessageTypeNamingResolutionContext { + private final String categoryName; + + public MessageTypeNamingResolutionContext(String streamName) { + this.categoryName = streamName; + } + + public String getCategoryName() { + return categoryName; + } + + public static MessageTypeNamingResolutionContext fromStreamName(String streamName) { + return new MessageTypeNamingResolutionContext( + Arrays.stream(streamName.split("-")).findFirst().orElse("no_stream_category") + ); + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java new file mode 100644 index 00000000..4646ce58 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java @@ -0,0 +1,4 @@ +package io.kurrent.dbclient.serialization; + +public class OperationSerializationSettings { +} diff --git a/db-client-java/src/test/java/io/kurrent/dbclient/StreamsTests.java b/db-client-java/src/test/java/io/kurrent/dbclient/StreamsTests.java index 571ef250..dafd3ebf 100644 --- a/db-client-java/src/test/java/io/kurrent/dbclient/StreamsTests.java +++ b/db-client-java/src/test/java/io/kurrent/dbclient/StreamsTests.java @@ -3,6 +3,7 @@ import io.kurrent.dbclient.streams.*; import io.kurrent.dbclient.streams.ReadStreamTests; +import io.kurrent.dbclient.streams.serialization.SerializationTests; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; @@ -16,6 +17,7 @@ public class StreamsTests implements DeadlineTests, InterceptorTests, MetadataTests, + SerializationTests, ClientLifecycleTests { static private Database database; static private Logger logger; diff --git a/db-client-java/src/test/java/io/kurrent/dbclient/streams/AppendTests.java b/db-client-java/src/test/java/io/kurrent/dbclient/streams/AppendTests.java index 72701fd0..a17f76f1 100644 --- a/db-client-java/src/test/java/io/kurrent/dbclient/streams/AppendTests.java +++ b/db-client-java/src/test/java/io/kurrent/dbclient/streams/AppendTests.java @@ -5,7 +5,10 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public interface AppendTests extends ConnectionAware { @Test diff --git a/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java b/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java new file mode 100644 index 00000000..6e4fdeb0 --- /dev/null +++ b/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java @@ -0,0 +1,105 @@ +package io.kurrent.dbclient.streams.serialization; + +import io.kurrent.dbclient.*; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.UUID; +import java.util.stream.IntStream; +import java.util.stream.Collectors; + +public interface SerializationTests extends ConnectionAware { + @Test + default void testPlainJavaObjectsAreSerializedAndDeserializedUsingAutoSerialization() throws Throwable { + KurrentDBClient client = getDatabase().defaultClient(); + + // Given + final String streamName = generateName(); + final List expected = generateMessages(2); + + + // When + AppendToStreamOptions appendOptions = AppendToStreamOptions.get() + .expectedRevision(ExpectedRevision.noStream()); + + WriteResult appendResult = client.appendToStream(streamName, appendOptions, expected.iterator()) + .get(); + + Assertions.assertEquals(ExpectedRevision.expectedRevision(0), appendResult.getNextExpectedRevision()); + + ReadStreamOptions readStreamOptions = ReadStreamOptions.get() + .fromEnd() + .backwards() + .maxCount(1); + + // Ensure appended event is readable + ReadResult result = client.readStream(streamName, readStreamOptions) + .get(); + + Assertions.assertEquals(2, result.getEvents().size()); + } + + static List generateMessages(int count){ + return IntStream.range(0, count) + .mapToObj(x -> + new UserRegistered( + UUID.randomUUID(), + new Address(UUID.randomUUID().toString(), UUID.randomUUID().hashCode()) + ) + ) + .collect(Collectors.toList()); + } + + class Address{ + String street; + int number; + + public Address(String street, int number) { + this.street = street; + this.number = number; + } + + public String getStreet() { + return street; + } + + public void setStreet(String street) { + this.street = street; + } + + public int getNumber() { + return number; + } + + public void setNumber(int number) { + this.number = number; + } + } + + class UserRegistered{ + UUID userId; + Address address; + + public UserRegistered(UUID userId, Address address) { + this.userId = userId; + this.address = address; + } + + public UUID getUserId() { + return userId; + } + + public void setUserId(UUID userId) { + this.userId = userId; + } + + public Address getAddress() { + return address; + } + + public void setAddress(Address address) { + this.address = address; + } + } +} From 4dc914d6494a94bb5d8f883a9054395e62acd1a7 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Fri, 14 Mar 2025 16:33:30 +0100 Subject: [PATCH 3/8] [DEVEX-250] Added MessageData as a future replacement for EventData to make clearer responsibility on raw and auto-serialized events --- .../io/kurrent/dbclient/AppendToStream.java | 20 +-- .../io/kurrent/dbclient/ClientTelemetry.java | 18 +-- .../java/io/kurrent/dbclient/EventData.java | 5 + .../io/kurrent/dbclient/KurrentDBClient.java | 56 ++++--- .../java/io/kurrent/dbclient/Message.java | 150 +++++++++++++++++ .../java/io/kurrent/dbclient/MessageData.java | 106 ++++++++++++ .../kurrent/dbclient/MessageDataBuilder.java | 153 ++++++++++++++++++ .../serialization/MessageSerializer.java | 9 +- .../serialization/MessageSerializerImpl.java | 14 +- 9 files changed, 490 insertions(+), 41 deletions(-) create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/Message.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java b/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java index 11ef0060..0f066ea5 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java @@ -17,10 +17,10 @@ class AppendToStream { private final GrpcClient client; private final String streamName; - private final List events; + private final List events; private final AppendToStreamOptions options; - public AppendToStream(GrpcClient client, String streamName, Iterator events, AppendToStreamOptions options) { + public AppendToStream(GrpcClient client, String streamName, Iterator events, AppendToStreamOptions options) { this.client = client; this.streamName = streamName; this.events = new ArrayList<>(); @@ -40,7 +40,7 @@ public CompletableFuture execute() { this.options.getCredentials())); } - private CompletableFuture append(ManagedChannel channel, List events) { + private CompletableFuture append(ManagedChannel channel, List events) { CompletableFuture result = new CompletableFuture<>(); StreamsOuterClass.AppendReq.Options.Builder options = this.options.getStreamState().applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder() .setStreamIdentifier(Shared.StreamIdentifier.newBuilder() @@ -93,18 +93,18 @@ private CompletableFuture append(ManagedChannel channel, List tryInjectTracingContext(Span span, List events) { - List injectedEvents = new ArrayList<>(); - for (EventData event : events) { + private static List tryInjectTracingContext(Span span, List events) { + List injectedEvents = new ArrayList<>(); + for (MessageData event : events) { boolean isJsonEvent = Objects.equals(event.getContentType(), ContentType.JSON); - injectedEvents.add(EventDataBuilder - .binary(event.getEventId(), event.getEventType(), event.getEventData(), isJsonEvent) - .metadataAsBytes(tryInjectTracingContext(span, event.getUserMetadata())) - .build()); + injectedEvents.add( + MessageDataBuilder + .with(event.getMessageType(), event.getMessageData(), tryInjectTracingContext(span, event.getMessageData()), event.getMessageId(), isJsonEvent) + .build()); } return injectedEvents; } @@ -85,9 +85,9 @@ private static SpanContext tryExtractTracingContext(byte[] userMetadataBytes) { } static CompletableFuture traceAppend( - BiFunction, CompletableFuture> appendOperation, + BiFunction, CompletableFuture> appendOperation, ManagedChannel channel, - List events, String streamId, KurrentDBClientSettings settings, + List events, String streamId, KurrentDBClientSettings settings, UserCredentials optionalCallCredentials) { Span span = createSpan( ClientTelemetryConstants.Operations.APPEND, diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/EventData.java b/db-client-java/src/main/java/io/kurrent/dbclient/EventData.java index 3cd5ba4c..e0efcca1 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/EventData.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/EventData.java @@ -75,6 +75,7 @@ public static EventDataBuilder builderAsJson(String eventType, A eventData) * @return an event data builder. * @param a type that can be serialized in JSON. */ + @Deprecated public static EventDataBuilder builderAsJson(UUID eventId, String eventType, A eventData) { return EventDataBuilder.json(eventId, eventType, eventData); } @@ -120,5 +121,9 @@ public static EventDataBuilder builderAsBinary(String eventType, byte[] eventDat public static EventDataBuilder builderAsBinary(UUID eventId, String eventType, byte[] eventData) { return EventDataBuilder.binary(eventId, eventType, eventData); } + + public MessageData toMessageData() { + return new MessageData(eventType, eventData, userMetadata, eventId, contentType); + } } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java index 6537d2ae..ca708d77 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java @@ -5,13 +5,11 @@ import io.kurrent.dbclient.serialization.MessageSerializationContext; import org.reactivestreams.Publisher; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static io.kurrent.dbclient.serialization.MessageTypeNamingResolutionContext.fromStreamName; @@ -81,58 +79,76 @@ public CompletableFuture appendToStream(String streamName, AppendTo if (options == null) options = AppendToStreamOptions.get(); - return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute(); + Iterator messageData = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(events, Spliterator.ORDERED), false) + .map(EventData::toMessageData) + .iterator(); + + return new AppendToStream(this.getGrpcClient(), streamName, messageData, options).execute(); } /** - * Appends events to a given stream. + * Appends messages to a given stream. * * @param streamName stream's name. - * @param events events to send. + * @param messages messages to send. * @return a write result if successful. * @see WriteResult */ - public CompletableFuture appendToStream(String streamName, List events) { + public CompletableFuture appendToStream(String streamName, List messages) { // TODO: Sort type erasure issue with Iterator - return this.appendToStream(streamName, AppendToStreamOptions.get(), events); + return this.appendToStream(streamName, AppendToStreamOptions.get(), messages); } /** - * Appends events to a given stream. + * Appends messages to a given stream. * * @param streamName stream's name. * @param options append stream request's options. - * @param events events to send. + * @param messages messages to send. * @return a write result if successful. * @see WriteResult */ - public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, Object... events) { - return this.appendToStream(streamName, options, Stream.of(events).collect(Collectors.toList())); + public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, Message... messages) { + return this.appendToStream(streamName, options, Stream.of(messages).collect(Collectors.toList())); } /** - * Appends events to a given stream. + * Appends messages to a given stream. * * @param streamName stream's name. * @param options append stream request's options. - * @param events events to send. + * @param messages messages to send. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, Object... messages) { + return this.appendToStream(streamName, options, Stream.of(messages).map(Message::from).collect(Collectors.toList())); + } + + /** + * Appends messages to a given stream. + * + * @param streamName stream's name. + * @param options append stream request's options. + * @param messages messages to send. * @return a write result if successful. * @see WriteResult */ - public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, List events) { + public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, List messages) { // TODO: Sort type erasure issue with Iterator if (options == null) options = AppendToStreamOptions.get(); MessageSerializationContext serializationContext = new MessageSerializationContext(fromStreamName(streamName)); - options.serializationSettings() + Iterator messageData = options.serializationSettings() .map(serializer::with) .orElse(serializer) - .serialize(); + .serialize(messages, serializationContext) + .iterator(); - throw new RuntimeException("Not Implemented!"); - //return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute(); + return new AppendToStream(this.getGrpcClient(), streamName, messageData, options).execute(); } /** diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/Message.java b/db-client-java/src/main/java/io/kurrent/dbclient/Message.java new file mode 100644 index 00000000..2792798b --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/Message.java @@ -0,0 +1,150 @@ +package io.kurrent.dbclient; + +import java.util.Objects; +import java.util.UUID; + +/** + * Represents a message wrapper in the KurrentDB system, containing both domain data and optional metadata. + * Messages can represent events, commands, or other domain objects along with their associated metadata. + */ +public final class Message { + private final Object data; + private final Object metadata; + private final UUID messageId; + + /** + * Creates a new Message with the specified properties. + * + * @param data The message domain data. + * @param metadata Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information. + * @param messageId Unique identifier for this specific message instance. + */ + public Message(Object data, Object metadata, UUID messageId) { + this.data = data; + this.metadata = metadata; + this.messageId = messageId; + } + + /** + * Creates a new Message with the specified domain data and random ID, but without metadata. + * This factory method is a convenient shorthand when working with systems that don't require metadata. + * + * @param data The message domain data. + * @return A new immutable Message instance containing the provided data and ID with null metadata. + * + *

Example: + *

+     * // Create a message with a specific ID
+     * UserRegistered userRegistered = new UserRegistered("123", "Alice");
+     * Message message = Message.from(userRegistered);
+     * 
+ */ + public static Message from(Object data) { + return from(data, null); + } + + /** + * Creates a new Message with the specified domain data and message ID, but without metadata. + * This factory method is a convenient shorthand when working with systems that don't require metadata. + * + * @param data The message domain data. + * @param messageId Unique identifier for this message instance. Must not be a nil UUID. + * @return A new immutable Message instance containing the provided data and ID with null metadata. + * + *

Example: + *

+     * // Create a message with a specific ID
+     * UserRegistered userRegistered = new UserRegistered("123", "Alice");
+     * UUID messageId = UUID.randomUUID();
+     * Message message = Message.from(userRegistered, messageId);
+     * 
+ */ + public static Message from(Object data, UUID messageId) { + return from(data, null, messageId); + } + + /** + * Creates a new Message with the specified domain data and message ID and metadata. + * + * @param data The message domain data. + * @param metadata Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information. + * @param messageId Unique identifier for this specific message instance. If null, a random UUID will be generated. + * @return A new immutable Message instance with the specified properties. + * @throws IllegalArgumentException Thrown when messageId is explicitly set to a nil UUID, which is an invalid identifier. + * + *

Example: + *

+     * // Create a message with data and metadata
+     * OrderPlaced orderPlaced = new OrderPlaced("ORD-123", 99.99);
+     * EventMetadata metadata = new EventMetadata(
+     *     "user-456", 
+     *     Instant.now(),
+     *     correlationId
+     * );
+     *
+     * // Let the system assign an ID automatically
+     * Message message = Message.from(orderPlaced, metadata);
+     *
+     * // Or specify a custom ID
+     * Message messageWithId = Message.from(orderPlaced, metadata, UUID.randomUUID());
+     * 
+ */ + public static Message from(Object data, Object metadata, UUID messageId) { + if (messageId != null && messageId.equals(new UUID(0, 0))) { + throw new IllegalArgumentException("Message ID cannot be a nil UUID."); + } + + return new Message(data, metadata, messageId != null ? messageId : UUID.randomUUID()); + } + + /** + * Gets the message domain data. + * + * @return The message domain data. + */ + public Object getData() { + return data; + } + + /** + * Gets the message metadata. + * + * @return The message metadata, may be null. + */ + public Object getMetadata() { + return metadata; + } + + /** + * Gets the unique identifier for this message. + * + * @return The message ID. + */ + public UUID getMessageId() { + return messageId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Message message = (Message) o; + return Objects.equals(data, message.data) && + Objects.equals(metadata, message.metadata) && + Objects.equals(messageId, message.messageId); + } + + @Override + public int hashCode() { + return Objects.hash(data, metadata, messageId); + } + + @Override + public String toString() { + return "Message{" + + "data=" + data + + ", metadata=" + metadata + + ", messageId=" + messageId + + '}'; + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java b/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java new file mode 100644 index 00000000..f4994f67 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java @@ -0,0 +1,106 @@ +package io.kurrent.dbclient; + +import java.util.UUID; + +/** + * Represents a message that will be sent to KurrentDB. + */ +public final class MessageData { + private final UUID messageId; + private final String messageType; + private final String contentType; + private final byte[] messageData; + private final byte[] messageMetadata; + + MessageData(String messageType, byte[] messageData) { + this(messageType, messageData, null, UUID.randomUUID(), ContentType.JSON); + } + + MessageData(String messageType, byte[] messageData, byte[] userMetadata) { + this(messageType, messageData, userMetadata, UUID.randomUUID(), ContentType.JSON); + } + + MessageData(String messageType, byte[] messageData, byte[] userMetadata, UUID messageId, String contentType) { + this.messageId = messageId; + this.messageType = messageType; + this.contentType = contentType; + this.messageData = messageData; + this.messageMetadata = userMetadata; + } + + /** + * Returns message's unique identifier + */ + public UUID getMessageId() { + return messageId; + } + + /** + * Returns message's type. + */ + public String getMessageType() { + return messageType; + } + + /** + * Returns message's content's type + */ + public String getContentType() { + return contentType; + } + + /** + * Returns message's payload data + */ + public byte[] getMessageData() { + return messageData; + } + + /** + * Returns message's custom user metadata. + */ + public byte[] getMessageMetadata() { + return messageMetadata; + } + + /** + * Configures a message data builder to host a JSON payload. + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder builderAsJson(String messageType, byte[] messageData) { + return MessageDataBuilder.json(messageType, messageData); + } + + /** + * Configures a message data builder to host a JSON payload. + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder builderAsJson(String messageType, byte[] messageData, byte[] messageMetadata) { + return MessageDataBuilder.json(messageType, messageData, messageMetadata); + } + + /** + * Configures a message data builder to host a binary payload. + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder builderAsBinary(String messageType, byte[] messageData) { + return MessageDataBuilder.binary(messageType, messageData); + } + + /** + * Configures a message data builder to host a binary payload. + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder builderAsBinary(String messageType, byte[] messageData, byte[] messageMetadata) { + return MessageDataBuilder.binary(messageType, messageMetadata); + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java b/db-client-java/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java new file mode 100644 index 00000000..7b0c7730 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java @@ -0,0 +1,153 @@ +package io.kurrent.dbclient; + +import java.util.UUID; + +/** + * Utility class to help building an MessageData. + */ +public class MessageDataBuilder { + private String messageType; + private byte[] messageData; + private byte[] messageMetadata; + private UUID messageId; + private String contentType; + + MessageDataBuilder() { + } + + /** + * Configures a message data builder to host a JSON payload. + * + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder json(String messageType, byte[] messageData) { + return json(messageType, messageData, null, null); + } + + /** + * Configures a message data builder to host a JSON payload. + * + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder json(String messageType, byte[] messageData, byte[] messageMetadata) { + return json(messageType, messageData, messageMetadata, null); + } + + /** + * Configures a message data builder to host a JSON payload. + * + * @param messageId message's id. + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder json(String messageType, byte[] messageData, byte[] messageMetadata, UUID messageId) { + MessageDataBuilder self = new MessageDataBuilder(); + + self.messageType = messageType; + self.messageData = messageData; + self.messageMetadata = messageMetadata; + self.messageId = messageId; + self.contentType = ContentType.JSON; + + return self; + } + + /** + * Configures a message data builder to host a binary payload. + * + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder binary(String messageType, byte[] messageData) { + return binary(messageType, messageData, null, null); + } + + /** + * Configures a message data builder to host a binary payload. + * + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder binary(String messageType, byte[] messageData, byte[] messageMetadata) { + return binary(messageType, messageData, messageMetadata, null); + } + + /** + * Configures a message data builder to host a binary payload. + * + * @param messageId message's id. + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder binary(String messageType, byte[] messageData, byte[] messageMetadata, UUID messageId) { + MessageDataBuilder self = new MessageDataBuilder(); + + self.messageType = messageType; + self.messageData = messageData; + self.messageId = messageId; + self.messageMetadata = messageMetadata; + self.contentType = ContentType.BYTES; + + return self; + } + + + /** + * Configures a message data builder to host a binary payload. + * + * @param messageId message's id. + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder with(String messageType, byte[] messageData, byte[] messageMetadata, UUID messageId, boolean isJson) { + MessageDataBuilder self = new MessageDataBuilder(); + + self.messageType = messageType; + self.messageData = messageData; + self.messageId = messageId; + self.messageMetadata = messageMetadata; + self.contentType = isJson ? ContentType.JSON : ContentType.BYTES; + + return self; + } + + /** + * Sets message's unique identifier. + */ + public MessageDataBuilder messageId(UUID messageId) { + this.messageId = messageId; + return this; + } + + /** + * Sets message's custom user metadata. + */ + public MessageDataBuilder messageMetadata(byte[] value) { + this.messageMetadata = value; + return this; + } + + /** + * Builds a message ready to be sent to KurrentDB. + * + * @see MessageData + */ + public MessageData build() { + UUID messageId = this.messageId == null ? UUID.randomUUID() : this.messageId; + return new MessageData(this.messageType, this.messageData, this.messageMetadata, messageId, this.contentType); + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java index b4b9f9f4..b5e47bf0 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java @@ -1,8 +1,15 @@ package io.kurrent.dbclient.serialization; +import io.kurrent.dbclient.Message; +import io.kurrent.dbclient.MessageData; + +import java.util.List; + public interface MessageSerializer { MessageSerializer with(OperationSerializationSettings serializationSettings); - void serialize(); + MessageData serialize(Message value, MessageSerializationContext context); + + List serialize(List messages, MessageSerializationContext serializationContext); } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java index bcd5688e..bfebad3c 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java @@ -1,5 +1,11 @@ package io.kurrent.dbclient.serialization; +import io.kurrent.dbclient.Message; +import io.kurrent.dbclient.MessageData; + +import java.util.List; +import java.util.stream.Collectors; + class MessageSerializerImpl implements MessageSerializer { @Override public MessageSerializer with(OperationSerializationSettings serializationSettings) { @@ -7,7 +13,13 @@ public MessageSerializer with(OperationSerializationSettings serializationSettin } @Override - public void serialize() { + public MessageData serialize(Message value, MessageSerializationContext context) { + return null; + } + @Override + public List serialize(List messages, MessageSerializationContext serializationContext) { + return messages.stream().map(m -> serialize(m, serializationContext)).collect(Collectors.toList()); } + } From 3a861b70ef5de8b146c3b46ee24766c7f8042faa Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Tue, 18 Mar 2025 14:46:25 +0100 Subject: [PATCH 4/8] [DEVEX-250] Added the initial implementation of Jackson Serializer --- db-client-java/build.gradle | 4 +- .../java/io/kurrent/dbclient/MessageData.java | 8 +-- .../serialization/JacksonSerializer.java | 56 +++++++++++++++++++ .../serialization/MessageSerializerImpl.java | 8 ++- .../dbclient/serialization/Serializer.java | 32 +++++++++++ .../serialization/SerializationTests.java | 16 ++---- 6 files changed, 105 insertions(+), 19 deletions(-) create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/Serializer.java diff --git a/db-client-java/build.gradle b/db-client-java/build.gradle index 7ff66926..0a9efe31 100644 --- a/db-client-java/build.gradle +++ b/db-client-java/build.gradle @@ -46,10 +46,11 @@ dependencies { implementation "io.grpc:grpc-stub:${grpcVersion}" implementation "io.grpc:grpc-protobuf:${grpcVersion}" implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' implementation "org.slf4j:slf4j-api:2.0.17" implementation "org.bouncycastle:bcprov-jdk18on:1.80" implementation "org.bouncycastle:bcpkix-jdk18on:1.80" - + implementation platform("io.opentelemetry:opentelemetry-bom:${openTelemetryVersion}") implementation "io.opentelemetry:opentelemetry-api" implementation "io.opentelemetry.semconv:opentelemetry-semconv:${openTelemetrySemConvVersion}" @@ -64,7 +65,6 @@ dependencies { testImplementation "org.reactivestreams:reactive-streams-tck:${reactiveStreamsApiVersion}" testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}" testImplementation platform("com.fasterxml.jackson:jackson-bom:${jacksonVersion}") - testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' testImplementation "com.github.javafaker:javafaker:1.0.2" testImplementation 'org.slf4j:slf4j-simple:2.0.17' testImplementation "io.opentelemetry:opentelemetry-sdk" diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java b/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java index f4994f67..acd06a72 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java @@ -12,15 +12,15 @@ public final class MessageData { private final byte[] messageData; private final byte[] messageMetadata; - MessageData(String messageType, byte[] messageData) { + public MessageData(String messageType, byte[] messageData) { this(messageType, messageData, null, UUID.randomUUID(), ContentType.JSON); } - MessageData(String messageType, byte[] messageData, byte[] userMetadata) { + public MessageData(String messageType, byte[] messageData, byte[] userMetadata) { this(messageType, messageData, userMetadata, UUID.randomUUID(), ContentType.JSON); } - - MessageData(String messageType, byte[] messageData, byte[] userMetadata, UUID messageId, String contentType) { + + public MessageData(String messageType, byte[] messageData, byte[] userMetadata, UUID messageId, String contentType) { this.messageId = messageId; this.messageType = messageType; this.contentType = contentType; diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java new file mode 100644 index 00000000..10207374 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java @@ -0,0 +1,56 @@ +package io.kurrent.dbclient.serialization; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.*; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Optional; + +public class JacksonSerializer implements Serializer { + public static final JsonMapper.Builder defaultBuilder = JsonMapper.builder() + .addModule(new JavaTimeModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + .propertyNamingStrategy(PropertyNamingStrategies.LOWER_CAMEL_CASE) + .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) + .serializationInclusion(JsonInclude.Include.NON_NULL) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + private static final Logger logger = LoggerFactory.getLogger(JacksonSerializer.class); + + private final JsonMapper jsonMapper; + + public JacksonSerializer(){ + this(defaultBuilder); + } + + public JacksonSerializer(JsonMapper.Builder builder){ + jsonMapper = builder != null ? builder.build() : defaultBuilder.build(); + } + + @Override + public byte[] serialize(Object value) { + try { + return jsonMapper.writeValueAsBytes(value); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + public Optional deserialize(Class eventClass, byte[] data) { + try { + MessageType result = jsonMapper.readValue(data, eventClass); + + return Optional.ofNullable(result); + } catch (IOException e) { + logger.warn("Error deserializing event {}", eventClass.getName(), e); + return Optional.empty(); + } + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java index bfebad3c..e256be26 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java @@ -7,14 +7,18 @@ import java.util.stream.Collectors; class MessageSerializerImpl implements MessageSerializer { + Serializer serializer = new JacksonSerializer(); + @Override public MessageSerializer with(OperationSerializationSettings serializationSettings) { - return null; + return this; } @Override public MessageData serialize(Message value, MessageSerializationContext context) { - return null; + return MessageData + .builderAsJson(value.getData().getClass().getTypeName(), serializer.serialize(value)) + .build(); } @Override diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/Serializer.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/Serializer.java new file mode 100644 index 00000000..7b87331f --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/Serializer.java @@ -0,0 +1,32 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Optional; + +/// +/// Defines the core serialization capabilities required by the KurrentDB client. +/// Implementations of this interface handle the conversion between Java objects and their +/// binary representation for storage in and retrieval from the event store. +///
+/// The client ships default Jackson implementation, but custom implementations can be provided or other formats. +///
+public interface Serializer { + /// + /// Converts a Java object to its binary representation for storage in the event store. + /// + /// The object to serialize. This could be an event, command, or metadata object. + /// + /// A binary representation of the object that can be stored in KurrentDB. + /// + byte[] serialize(Object value); + + /// + /// Reconstructs a Java object from its binary representation retrieved from the event store. + /// + /// The binary data to deserialize, typically retrieved from a KurrentDB event. + /// The target Java type to deserialize the data into, determined from message type mappings. + /// + /// The deserialized object cast to the specified type, or null if the data cannot be deserialized. + /// The returned object will be an instance of the specified type or a compatible subtype. + /// + Optional deserialize(Class eventClass, byte[] data); +} diff --git a/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java b/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java index 6e4fdeb0..f07915a9 100644 --- a/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java +++ b/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java @@ -16,25 +16,19 @@ default void testPlainJavaObjectsAreSerializedAndDeserializedUsingAutoSerializat // Given final String streamName = generateName(); - final List expected = generateMessages(2); + final Object[] expected = generateMessages(2).toArray(); - // When AppendToStreamOptions appendOptions = AppendToStreamOptions.get() .expectedRevision(ExpectedRevision.noStream()); - WriteResult appendResult = client.appendToStream(streamName, appendOptions, expected.iterator()) + WriteResult appendResult = client.appendToStream(streamName, appendOptions, expected) .get(); - Assertions.assertEquals(ExpectedRevision.expectedRevision(0), appendResult.getNextExpectedRevision()); - - ReadStreamOptions readStreamOptions = ReadStreamOptions.get() - .fromEnd() - .backwards() - .maxCount(1); - + Assertions.assertEquals(ExpectedRevision.expectedRevision(1), appendResult.getNextExpectedRevision()); + // Ensure appended event is readable - ReadResult result = client.readStream(streamName, readStreamOptions) + ReadResult result = client.readStream(streamName, ReadStreamOptions.get()) .get(); Assertions.assertEquals(2, result.getEvents().size()); From 7f05d10e2eb7c33a121c533d85de1970daabe84b Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Wed, 19 Mar 2025 13:16:58 +0100 Subject: [PATCH 5/8] [DEVEX-250] Added JacksonSerializer implementation together with Schema Regsitry, naming strategy and related settingns --- .../AutomaticDeserialization.java | 31 ++ .../dbclient/serialization/ClassProvider.java | 21 ++ .../DefaultMessageTypeNamingStrategy.java | 41 +++ .../serialization/JacksonSerializer.java | 49 ++- .../KurrentDBClientSerializationSettings.java | 320 +++++++++++++++++- .../MessageTypeNamingStrategy.java | 79 +++++ .../serialization/MessageTypeRegistry.java | 65 ++++ .../OperationSerializationSettings.java | 54 +++ .../serialization/SchemaRegistry.java | 89 +++++ .../serialization/TracingMetadata.java | 5 + 10 files changed, 730 insertions(+), 24 deletions(-) create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/AutomaticDeserialization.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/ClassProvider.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/DefaultMessageTypeNamingStrategy.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingStrategy.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeRegistry.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/TracingMetadata.java diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/AutomaticDeserialization.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/AutomaticDeserialization.java new file mode 100644 index 00000000..715b26cb --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/AutomaticDeserialization.java @@ -0,0 +1,31 @@ +package io.kurrent.dbclient.serialization; + +/** + * Controls whether the KurrentDB client should automatically deserialize message payloads + * into their corresponding Java types based on the configured type mappings. + */ +public enum AutomaticDeserialization { + /** + * Disables automatic deserialization. Messages will be returned in their raw serialized form, + * requiring manual deserialization by the application. Use this when you need direct access to the raw data + * or when working with messages that don't have registered type mappings. + */ + DISABLED(0), + + /** + * Enables automatic deserialization. The client will attempt to convert messages into their appropriate + * Java types using the configured serializers and type mappings. This simplifies working with strongly-typed + * domain messages but requires proper type registration. + */ + ENABLED(1); + + private final int value; + + AutomaticDeserialization(int value) { + this.value = value; + } + + public int getValue() { + return value; + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/ClassProvider.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/ClassProvider.java new file mode 100644 index 00000000..54629982 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/ClassProvider.java @@ -0,0 +1,21 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Optional; + +public class ClassProvider { + public static Optional> getClassByName(String fullName) { + try { + return Optional.of(Class.forName(fullName)); + } catch (ClassNotFoundException e) { + ClassLoader contextLoader = Thread.currentThread().getContextClassLoader(); + if (contextLoader == null) + return Optional.empty(); + + try { + return Optional.ofNullable(contextLoader.loadClass(fullName)); + } catch (ClassNotFoundException ignored) { + return Optional.empty(); + } + } + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/DefaultMessageTypeNamingStrategy.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/DefaultMessageTypeNamingStrategy.java new file mode 100644 index 00000000..157c1778 --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/DefaultMessageTypeNamingStrategy.java @@ -0,0 +1,41 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Optional; + +/** + * Default implementation of MessageTypeNamingStrategy. + */ +public class DefaultMessageTypeNamingStrategy implements MessageTypeNamingStrategy { + private final Class defaultMetadataType; + + /** + * Creates a new strategy with the specified default metadata type. + * + * @param defaultMetadataType The default metadata type + */ + public DefaultMessageTypeNamingStrategy(Class defaultMetadataType) { + this.defaultMetadataType = defaultMetadataType != null ? defaultMetadataType : TracingMetadata.class; + } + + @Override + public String resolveTypeName(Class messageType, MessageTypeNamingResolutionContext resolutionContext) { + return resolutionContext.getCategoryName() + "-" + messageType.getName(); + } + + @Override + public Optional> tryResolveJavaClass(String messageTypeName) { + int categorySeparatorIndex = messageTypeName.indexOf('-'); + + if (categorySeparatorIndex == -1 || categorySeparatorIndex == messageTypeName.length() - 1) { + return Optional.empty(); + } + + String clrTypeName = messageTypeName.substring(categorySeparatorIndex + 1); + return ClassProvider.getClassByName(clrTypeName); + } + + @Override + public Optional> tryResolveMetadataJavaClass(String messageTypeName) { + return Optional.of(defaultMetadataType); + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java index 10207374..d6c73875 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java @@ -2,7 +2,10 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.*; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.slf4j.Logger; @@ -12,27 +15,39 @@ import java.util.Optional; public class JacksonSerializer implements Serializer { - public static final JsonMapper.Builder defaultBuilder = JsonMapper.builder() - .addModule(new JavaTimeModule()) - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) - .propertyNamingStrategy(PropertyNamingStrategies.LOWER_CAMEL_CASE) - .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) - .serializationInclusion(JsonInclude.Include.NON_NULL) - .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - + public static class Settings { + public static final JsonMapper.Builder defaultBuilder = JsonMapper.builder() + .addModule(new JavaTimeModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + .propertyNamingStrategy(PropertyNamingStrategies.LOWER_CAMEL_CASE) + .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) + .serializationInclusion(JsonInclude.Include.NON_NULL) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + private JsonMapper.Builder jsonMapperBuilder = defaultBuilder; + + public JsonMapper.Builder jsonMapperBuilder() { + return jsonMapperBuilder; + } + + public void jsonMapperBuilder(JsonMapper.Builder jsonMapperBuilder) { + this.jsonMapperBuilder = jsonMapperBuilder; + } + } + private static final Logger logger = LoggerFactory.getLogger(JacksonSerializer.class); - + private final JsonMapper jsonMapper; - public JacksonSerializer(){ - this(defaultBuilder); + public JacksonSerializer() { + this(new Settings()); } - - public JacksonSerializer(JsonMapper.Builder builder){ - jsonMapper = builder != null ? builder.build() : defaultBuilder.build(); + + public JacksonSerializer(Settings settings) { + jsonMapper = settings.jsonMapperBuilder().build(); } - + @Override public byte[] serialize(Object value) { try { diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java index e9915c17..599cd70f 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java @@ -1,22 +1,84 @@ package io.kurrent.dbclient.serialization; +import com.fasterxml.jackson.databind.json.JsonMapper; + +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.function.Consumer; +import java.util.function.Function; /** * Provides configuration options for messages serialization and deserialization in the KurrentDB client. */ -public class KurrentDBClientSerializationSettings { +public class KurrentDBClientSerializationSettings implements Cloneable { + /** + * The serializer responsible for handling JSON-formatted data. This serializer is used both for + * serializing outgoing JSON messages and deserializing incoming JSON messages. If not specified, + * a default System.Text.Json serializer will be used with standard settings. + *

+ * That also allows you to bring your custom JSON serializer implementation (e.g. JSON.NET) + */ + private Serializer jsonSerializer; + + + /** + * The serializer responsible for handling binary data formats. This is used when working with + * binary-encoded messages rather than text-based formats (e.g. Protobuf or Avro). Required when storing + * or retrieving content with "application/octet-stream" content type + */ + private Serializer bytesSerializer; + + /** + * Determines which serialization format (JSON or binary) is used by default when writing messages + * where the content type isn't explicitly specified. The default content type is "application/json" + */ + private ContentType defaultContentType = ContentType.JSON; + + /** + * Defines the custom strategy used to map between the type name stored in messages and Java type names. + * If not provided the default {@link io.kurrent.dbclient.serialization.DefaultMessageTypeNamingStrategy} will be used. + * It resolves the class name to the format: "{stream category name}-{Class Message Type}". + * You can provide your own implementation of {@link io.kurrent.dbclient.serialization.MessageTypeNamingStrategy} + * and register it here to override the default behavior + */ + private MessageTypeNamingStrategy messageTypeNamingStrategy; + + /** + * Allows to register mapping of Java message types to their corresponding message type names used in serialized messages. + */ + private Map, String> messageTypeMap = new HashMap<>(); + + /** + * Registers Java message types that can be appended to the specific stream category. + * Types will have message type names resolved based on the used {@link io.kurrent.dbclient.serialization.MessageTypeNamingStrategy} + */ + private Map[]> categoryMessageTypesMap = new HashMap<>(); + + /** + * Specifies the Java type that should be used when deserializing metadata for all events. + * When set, the client will attempt to deserialize event metadata into this type. + * If not provided, {@link io.kurrent.dbclient.serialization.TracingMetadata} will be used. + */ + private Class defaultMetadataType; + /** * Creates a new instance of serialization settings with either default values or custom configuration. * This factory method is the recommended way to create serialization settings for the KurrentDB client. + * * @param configure Optional callback to customize the settings. If null, default settings are used. * @return A fully configured instance ready to be used with the KurrentDB client. - *

{@code
-     * var settings = KurrentDBClientSerializationSettings.get(options -> {
-     *     options.RegisterMessageType("user-created");
-     *     options.RegisterMessageType("user-role-assigned");
+     * 
+     * {@code
+     * KurrentDBClientSerializationSettings settings = KurrentDBClientSerializationSettings.get(options -> {
+     *     options.registerMessageType(UserRegistered.class, "user-registered");
+     *     options.registerMessageType(UserRoleAssigned.class, "user-role-assigned");
+     *     options.registerMessageTypeForCategory(UserRegistered.class, "user-registered");
      * });
-     * }
+ * } + *
*/ public static KurrentDBClientSerializationSettings get( Consumer configure @@ -27,8 +89,252 @@ public static KurrentDBClientSerializationSettings get( return settings; } - + + /** + * Creates a new instance of serialization settings with either default values. + * This factory method is the recommended way to create serialization settings for the KurrentDB client. + * + * @return A fully configured default instance ready to be used with the KurrentDB client. + */ public static KurrentDBClientSerializationSettings get() { return new KurrentDBClientSerializationSettings(); } + + /** + * Configures the JSON serializer using custom options while inheriting from the default System.Text.Json settings. + * This allows fine-tuning serialization behavior such as case sensitivity, property naming, etc. + * + * @param configure A function that receives the default options and returns modified options. + * @return The current instance for method chaining. + * @example + *
+     * {@code
+     * settings.useJsonSettings(builder -> 
+     *     builder.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true)
+     *             .propertyNamingStrategy(PropertyNamingStrategies.KEBAB_CASE)
+     * );
+     * }
+     * 
+ */ + public KurrentDBClientSerializationSettings useJsonSettings( + Function configure + ) { + JacksonSerializer.Settings settings = new JacksonSerializer.Settings(); + settings.jsonMapperBuilder(configure.apply(settings.jsonMapperBuilder())); + + jsonSerializer = new JacksonSerializer(settings); + + return this; + } + + /** + * Sets a custom JSON serializer implementation. + * That also allows you to bring your custom JSON serializer implementation (e.g. Jackson) + * + * @param serializer The serializer to use for JSON content. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useJsonSerializer(Serializer serializer) { + jsonSerializer = serializer; + + return this; + } + + /** + * Sets a custom binary serializer implementation. + * That also allows you to bring your custom binary serializer implementation (e.g. Protobuf or Avro) + * + * @param serializer The serializer to use for binary content. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useBytesSerializer(Serializer serializer) { + bytesSerializer = serializer; + + return this; + } + + /** + * Configures a custom message type naming strategy. + * + * @param The type of naming strategy to use. + * @param strategyClass The class of the naming strategy to instantiate. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useMessageTypeNamingStrategy( + Class strategyClass + ) { + try { + return useMessageTypeNamingStrategy(strategyClass.getDeclaredConstructor().newInstance()); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new RuntimeException("Failed to instantiate message type naming strategy", e); + } + } + + /** + * Configures a custom message type naming strategy. + * + * @param messageTypeNamingStrategy The naming strategy instance to use. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useMessageTypeNamingStrategy( + MessageTypeNamingStrategy messageTypeNamingStrategy + ) { + this.messageTypeNamingStrategy = messageTypeNamingStrategy; + + return this; + } + + /** + * Associates a message type with a specific stream category to enable automatic deserialization. + * In event sourcing, streams are often prefixed with a category (e.g., "user-123", "order-456"). + * This method tells the client which message types can appear in streams of a given category. + * + * @param The event or message type that can appear in the category's streams. + * @param categoryName The category prefix (e.g., "user", "order", "account"). + * @param clazz The class representing the event type. + * @return The current instance for method chaining. + * @example + *
+     * {@code
+     * // Register event types that can appear in user streams
+     * settings.registerMessageTypeForCategory(UserCreated.class, "user")
+     *        .registerMessageTypeForCategory(UserUpdated.class, "user")
+     *        .registerMessageTypeForCategory(UserDeleted.class, "user");
+     * }
+     * 
+ */ + public KurrentDBClientSerializationSettings registerMessageTypeForCategory(Class clazz, String categoryName) { + return registerMessageTypeForCategory(categoryName, clazz); + } + + /** + * Registers multiple message types for a specific stream category. + * + * @param categoryName The category name to register the types with. + * @param types The message types to register. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings registerMessageTypeForCategory(String categoryName, Class... types) { + if (categoryMessageTypesMap.containsKey(categoryName)) { + Class[] current = categoryMessageTypesMap.get(categoryName); + Class[] combined = Arrays.copyOf(current, current.length + types.length); + System.arraycopy(types, 0, combined, current.length, types.length); + categoryMessageTypesMap.put(categoryName, combined); + } else { + categoryMessageTypesMap.put(categoryName, types); + } + + return this; + } + + /** + * Maps a Java type to a specific message type name that will be stored in the message metadata. + * This mapping is used during automatic deserialization, as it tells the client which Java class + * to instantiate when encountering a message with a particular type name in the database. + * + * @param The Java type to register (typically a message class). + * @param clazz The class representing the message type. + * @param typeName The string identifier to use for this type in the database. + * @return The current instance for method chaining. + * @remarks The type name is often different from the Java type name to support versioning and evolution + * of your domain model without breaking existing stored messages. + * @example + *
+     * {@code
+     * // Register me types with their corresponding type identifiers
+     * settings.registerMessageType(UserCreated.class, "user-created-v1")
+     *        .registerMessageType(OrderPlaced.class, "order-placed-v2");
+     * }
+     * 
+ */ + public KurrentDBClientSerializationSettings registerMessageType(Class clazz, String typeName) { + messageTypeMap.put(clazz, typeName); + + return this; + } + + /** + * Registers multiple message types with their corresponding type names. + * + * @param typeMap Map mapping types to their type names. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings registerMessageTypes(Map, String> typeMap) { + messageTypeMap.putAll(typeMap); + + return this; + } + + /** + * Configures a strongly-typed metadata class for all messages in the system. + * This enables accessing metadata properties in a type-safe manner rather than using dynamic objects. + * + * @param The metadata class type containing properties matching the expected metadata fields. + * @param clazz The class representing the metadata type. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useMetadataType(Class clazz) { + defaultMetadataType = clazz; + + return this; + } + + /** + * Configures which serialization format (JSON or binary) is used by default when writing messages + * where the content type isn't explicitly specified. The default content type is "application/json" + * + * @param contentType The serialization format content type + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useContentType(ContentType contentType) { + defaultContentType = contentType; + + return this; + } + + @Override + public KurrentDBClientSerializationSettings clone() { + try { + KurrentDBClientSerializationSettings clone = (KurrentDBClientSerializationSettings) super.clone(); + clone.bytesSerializer = this.bytesSerializer; + clone.jsonSerializer = this.jsonSerializer; + clone.defaultContentType = this.defaultContentType; + clone.messageTypeMap = new HashMap<>(this.messageTypeMap); + clone.categoryMessageTypesMap = new HashMap<>(this.categoryMessageTypesMap); + clone.messageTypeNamingStrategy = this.messageTypeNamingStrategy; + clone.defaultMetadataType = this.defaultMetadataType; + return clone; + } catch (CloneNotSupportedException e) { + throw new InternalError(e); + } + } + + // Getters + public Optional jsonSerializer() { + return Optional.ofNullable(jsonSerializer); + } + + public Optional bytesSerializer() { + return Optional.ofNullable(bytesSerializer); + } + + public ContentType defaultContentType() { + return defaultContentType; + } + + public Optional messageTypeNamingStrategy() { + return Optional.ofNullable(messageTypeNamingStrategy); + } + + public Map, String> messageTypeMap() { + return messageTypeMap; + } + + public Map[]> categoryMessageTypesMap() { + return categoryMessageTypesMap; + } + + public Class defaultMetadataType() { + return defaultMetadataType; + } } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingStrategy.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingStrategy.java new file mode 100644 index 00000000..b2f3744e --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingStrategy.java @@ -0,0 +1,79 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Optional; + +/** + * Strategy for naming message types. + */ +public interface MessageTypeNamingStrategy { + /** + * Resolves a type name for the given message class. + * + * @param messageClass The message class to resolve a name for + * @param resolutionContext The context for resolution + * @return The resolved type name + */ + String resolveTypeName(Class messageClass, MessageTypeNamingResolutionContext resolutionContext); + + /** + * Tries to resolve a Java class from a message type name. + * + * @param messageTypeName Message type name to resolve + * @return Optional with resolved class, or empty if class wasn't found + */ + Optional> tryResolveJavaClass(String messageTypeName); + + /** + * Tries to resolve a Java metadata class from a message type name. + * + * @param messageTypeName Message type name to resolve + * @return Optional with resolved class, or empty if class wasn't found + */ + Optional> tryResolveMetadataJavaClass(String messageTypeName); +} + +/** + * Wrapper for message type naming strategies. + */ +class MessageTypeNamingStrategyWrapper implements MessageTypeNamingStrategy { + private final MessageTypeRegistry messageTypeRegistry; + private final MessageTypeNamingStrategy messageTypeNamingStrategy; + + /** + * Creates a new wrapper with the specified registry and strategy. + * + * @param messageTypeRegistry The message type registry + * @param messageTypeNamingStrategy The strategy to wrap + */ + public MessageTypeNamingStrategyWrapper( + MessageTypeRegistry messageTypeRegistry, + MessageTypeNamingStrategy messageTypeNamingStrategy) { + this.messageTypeRegistry = messageTypeRegistry; + this.messageTypeNamingStrategy = messageTypeNamingStrategy; + } + + @Override + public String resolveTypeName(Class messageType, MessageTypeNamingResolutionContext resolutionContext) { + return messageTypeRegistry.getOrAddTypeName( + messageType, + type -> messageTypeNamingStrategy.resolveTypeName(messageType, resolutionContext) + ); + } + + @Override + public Optional> tryResolveJavaClass(String messageTypeName) { + return messageTypeRegistry.getOrAddJavaClass( + messageTypeName, + name -> messageTypeNamingStrategy.tryResolveMetadataJavaClass(messageTypeName) + ); + } + + @Override + public Optional> tryResolveMetadataJavaClass(String messageTypeName) { + return messageTypeRegistry.getOrAddJavaClass( + messageTypeName + "-metadata", + name -> messageTypeNamingStrategy.tryResolveMetadataJavaClass(messageTypeName) + ); + } +} + diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeRegistry.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeRegistry.java new file mode 100644 index 00000000..4aa26b7b --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageTypeRegistry.java @@ -0,0 +1,65 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +interface MessageTypeRegistry { + void register(Map, String> messageTypeMap); + + void register(Class messageType, String messageTypeName); + + Optional getTypeName(Class messageType); + + String getOrAddTypeName(Class javaClass, Function, String> getTypeName); + + Optional> getJavaClass(String messageTypeName); + + Optional> getOrAddJavaClass(String messageTypeName, Function>> getJavaClass); +} + +class MessageTypeRegistryImpl implements MessageTypeRegistry { + private final ConcurrentHashMap> classMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, String> typeNameMap = new ConcurrentHashMap<>(); + + + @Override + public void register(Map, String> messageTypeMap) { + for (Map.Entry, String> entry : messageTypeMap.entrySet()) { + register(entry.getKey(), entry.getValue()); + } + } + + @Override + public void register(Class messageType, String messageTypeName) { + classMap.put(messageTypeName, messageType); + typeNameMap.put(messageType, messageTypeName); + } + + @Override + public Optional getTypeName(Class messageType) { + return Optional.ofNullable(typeNameMap.getOrDefault(messageType, null)); + } + + @Override + public String getOrAddTypeName(Class javaClass, Function, String> getTypeName) { + return typeNameMap.computeIfAbsent( + javaClass, + c -> getTypeName.apply(javaClass) + ); + } + + @Override + public Optional> getJavaClass(String messageTypeName) { + return Optional.ofNullable(classMap.getOrDefault(messageTypeName, null)); + } + + @Override + public Optional> getOrAddJavaClass(String messageTypeName, Function>> getJavaClass) { + return Optional.ofNullable(classMap.computeIfAbsent( + messageTypeName, + c -> getJavaClass.apply(messageTypeName).orElse(null) + )); + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java index 4646ce58..c60cfe6e 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java @@ -1,4 +1,58 @@ package io.kurrent.dbclient.serialization; +import java.util.function.Consumer; + +/** + * Provides operation-specific serialization settings that override the global client configuration + * for individual operations like reading from or appending to streams. This allows fine-tuning + * serialization behavior on a per-operation basis without changing the client-wide settings. + */ public class OperationSerializationSettings { + /** + * Controls whether messages should be automatically deserialized for this specific operation. + * When enabled (the default), messages will be converted to their appropriate Java types. + * When disabled, messages will be returned in their raw serialized form. + */ + private AutomaticDeserialization automaticDeserialization = AutomaticDeserialization.ENABLED; + + /** + * A callback that allows customizing serialization settings for this specific operation. + * This can be used to override type mappings, serializers, or other settings just for + * the scope of a single operation without affecting other operations. + */ + private Consumer configureSettings; + + /** + * A pre-configured settings instance that disables automatic deserialization. + * Use this when you need to access raw message data in its serialized form. + */ + public static final OperationSerializationSettings DISABLED = new OperationSerializationSettings(); + + static { + DISABLED.automaticDeserialization = AutomaticDeserialization.DISABLED; + } + + /** + * Creates operation-specific serialization settings with custom configuration while keeping + * automatic deserialization enabled. This allows operation-specific type mappings or + * serializer settings without changing the global client configuration. + * + * @param configure A callback to customize serialization settings for this operation. + * @return A configured instance of {@link OperationSerializationSettings} with enabled deserialization. + */ + public static OperationSerializationSettings configure(Consumer configure) { + OperationSerializationSettings settings = new OperationSerializationSettings(); + settings.automaticDeserialization = AutomaticDeserialization.ENABLED; + settings.configureSettings = configure; + return settings; + } + + // Getters + public AutomaticDeserialization automaticDeserialization() { + return automaticDeserialization; + } + + public Consumer configureSettings() { + return configureSettings; + } } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java new file mode 100644 index 00000000..42f9f9ee --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java @@ -0,0 +1,89 @@ +package io.kurrent.dbclient.serialization; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class SchemaRegistry { + private final Map serializers; + private final MessageTypeNamingStrategy messageTypeNamingStrategy; + + public SchemaRegistry( + Map serializers, + MessageTypeNamingStrategy messageTypeNamingStrategy) { + this.serializers = serializers; + this.messageTypeNamingStrategy = messageTypeNamingStrategy; + } + + public Serializer serializer(ContentType schemaType) { + return serializers.get(schemaType); + } + + public String resolveTypeName(Class messageClass, MessageTypeNamingResolutionContext resolutionContext) { + return messageTypeNamingStrategy.resolveTypeName(messageClass, resolutionContext); + } + + public Optional> tryResolveDataJavaClass(String messageTypeName) { + return messageTypeNamingStrategy.tryResolveJavaClass(messageTypeName); + } + + public Optional> tryResolveMetadataJavaClass(String messageTypeName) { + return messageTypeNamingStrategy.tryResolveMetadataJavaClass(messageTypeName); + } + + public static SchemaRegistry from(KurrentDBClientSerializationSettings settings) { + MessageTypeNamingStrategy messageTypeNamingStrategy = + settings.messageTypeNamingStrategy() + .orElse(new DefaultMessageTypeNamingStrategy(settings.defaultMetadataType())); + + Map, String> categoriesTypeMap = resolveMessageTypeUsingNamingStrategy( + settings.categoryMessageTypesMap(), + messageTypeNamingStrategy + ); + + MessageTypeRegistry messageTypeRegistry = new MessageTypeRegistryImpl(); + messageTypeRegistry.register(settings.messageTypeMap()); + messageTypeRegistry.register(categoriesTypeMap); + + Map serializers = new HashMap<>(); + + serializers.put( + ContentType.JSON, + settings.jsonSerializer().orElse(new JacksonSerializer()) + ); + + serializers.put( + ContentType.BYTES, + settings.bytesSerializer().orElse(new JacksonSerializer()) + ); + + return new SchemaRegistry( + serializers, + new MessageTypeNamingStrategyWrapper( + messageTypeRegistry, + settings.messageTypeNamingStrategy() + .orElse(new DefaultMessageTypeNamingStrategy(settings.defaultMetadataType())) + ) + ); + } + + private static Map, String> resolveMessageTypeUsingNamingStrategy( + Map[]> categoryMessageTypesMap, + MessageTypeNamingStrategy messageTypeNamingStrategy + ) { + Map, String> result = new HashMap<>(); + + for (Map.Entry[]> entry : categoryMessageTypesMap.entrySet()) { + String category = entry.getKey(); + for (Class type : entry.getValue()) { + String typeName = messageTypeNamingStrategy.resolveTypeName( + type, + new MessageTypeNamingResolutionContext(category) + ); + result.put(type, typeName); + } + } + + return result; + } +} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/TracingMetadata.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/TracingMetadata.java new file mode 100644 index 00000000..7bee489c --- /dev/null +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/TracingMetadata.java @@ -0,0 +1,5 @@ +package io.kurrent.dbclient.serialization; + +public class TracingMetadata { + +} From 4d730b77a0b64660ef45518f177291458dcce0d5 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Wed, 19 Mar 2025 15:18:53 +0100 Subject: [PATCH 6/8] [DEVEX-250] Fixed mishap in mapping metadata instead of data in tracing context --- .../io/kurrent/dbclient/AppendToStream.java | 4 +- .../io/kurrent/dbclient/ClientTelemetry.java | 2 +- .../kurrent/dbclient/KurrentDBClientBase.java | 2 +- .../java/io/kurrent/dbclient/Message.java | 6 +- .../MessageSerializationContext.java | 2 +- .../serialization/MessageSerializer.java | 90 ++++++++++++++++++- .../MessageSerializerBuilder.java | 4 +- .../serialization/MessageSerializerImpl.java | 29 ------ .../serialization/SchemaRegistry.java | 2 +- 9 files changed, 100 insertions(+), 41 deletions(-) delete mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java b/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java index 0f066ea5..3ddd75ff 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java @@ -103,8 +103,8 @@ private CompletableFuture append(ManagedChannel channel, List tryInjectTracingContext(Span span, List { diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/Message.java b/db-client-java/src/main/java/io/kurrent/dbclient/Message.java index 2792798b..85b8aebb 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/Message.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/Message.java @@ -102,7 +102,7 @@ public static Message from(Object data, Object metadata, UUID messageId) { * * @return The message domain data. */ - public Object getData() { + public Object data() { return data; } @@ -111,7 +111,7 @@ public Object getData() { * * @return The message metadata, may be null. */ - public Object getMetadata() { + public Object metadata() { return metadata; } @@ -120,7 +120,7 @@ public Object getMetadata() { * * @return The message ID. */ - public UUID getMessageId() { + public UUID messageId() { return messageId; } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java index e6eeb283..94f42358 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java @@ -7,7 +7,7 @@ public MessageSerializationContext(MessageTypeNamingResolutionContext namingReso this.namingResolution = namingResolution; } - public MessageTypeNamingResolutionContext getNamingResolution() { + public MessageTypeNamingResolutionContext namingResolution() { return namingResolution; } } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java index b5e47bf0..6250e7e3 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java @@ -3,13 +3,101 @@ import io.kurrent.dbclient.Message; import io.kurrent.dbclient.MessageData; +import java.util.Arrays; import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; public interface MessageSerializer { MessageSerializer with(OperationSerializationSettings serializationSettings); - MessageData serialize(Message value, MessageSerializationContext context); + MessageData serialize(Message message, MessageSerializationContext context); List serialize(List messages, MessageSerializationContext serializationContext); } +class MessageSerializerImpl implements MessageSerializer { + private final SchemaRegistry schemaRegistry; + private final KurrentDBClientSerializationSettings serializationSettings; + // TODO: Ensure that settings are aligned between clients + private final JacksonSerializer metadataSerializer; + private final String contentType; + + public MessageSerializerImpl(SchemaRegistry schemaRegistry, KurrentDBClientSerializationSettings serializationSettings) { + this.schemaRegistry = schemaRegistry; + this.serializationSettings = serializationSettings; + this.metadataSerializer = new JacksonSerializer(); + this.contentType = ContentTypeUtils.toMessageContentType(serializationSettings.defaultContentType()); + } + + public static MessageSerializer from(KurrentDBClientSerializationSettings settings) { + settings = settings != null ? settings: KurrentDBClientSerializationSettings.get(); + + return new MessageSerializerImpl(SchemaRegistry.from(settings), settings); + } + + @Override + public MessageSerializer with(OperationSerializationSettings serializationSettings) { + return this; + } + + @Override + public MessageData serialize(Message message, MessageSerializationContext context) { + Object data = message.data(); + Object metadata = message.metadata(); + UUID messageId = message.messageId(); + + String messageType = schemaRegistry.resolveTypeName( + data.getClass(), + context.namingResolution() + ); + + byte[] serializedData = schemaRegistry + .getSerializer(serializationSettings.defaultContentType()) + .serialize(data); + + byte[] serializedMetadata = metadata != null + ? metadataSerializer.serialize(metadata) + : new byte[0]; + + return new MessageData( + messageType, + serializedData, + serializedMetadata, + messageId, + contentType + ); + } + + @Override + public List serialize(List messages, MessageSerializationContext serializationContext) { + return messages.stream().map(m -> serialize(m, serializationContext)).collect(Collectors.toList()); + } +} + +class ContentTypeUtils { + public static String toMessageContentType(ContentType contentType) { + switch (contentType) { + case JSON: + return "application/json"; + case BYTES: + return "application/octet-stream"; + default: + throw new IllegalArgumentException("Unknown content type: " + contentType); + } + } + + public static ContentType fromMessageContentType(String contentTypeString) { + if (contentTypeString == null || contentTypeString.isEmpty()) { + return ContentType.JSON; + } + + if ("application/json".equals(contentTypeString)) { + return ContentType.JSON; + } else if ("application/octet-stream".equals(contentTypeString)) { + return ContentType.BYTES; + } else { + throw new IllegalArgumentException("Unknown content type: " + contentTypeString); + } + } +} \ No newline at end of file diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java index 5c52a1e6..58cc4895 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java @@ -1,7 +1,7 @@ package io.kurrent.dbclient.serialization; public final class MessageSerializerBuilder { - public static MessageSerializer get() { - return new MessageSerializerImpl(); + public static MessageSerializer get(KurrentDBClientSerializationSettings settings) { + return MessageSerializerImpl.from(settings); } } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java deleted file mode 100644 index e256be26..00000000 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.kurrent.dbclient.serialization; - -import io.kurrent.dbclient.Message; -import io.kurrent.dbclient.MessageData; - -import java.util.List; -import java.util.stream.Collectors; - -class MessageSerializerImpl implements MessageSerializer { - Serializer serializer = new JacksonSerializer(); - - @Override - public MessageSerializer with(OperationSerializationSettings serializationSettings) { - return this; - } - - @Override - public MessageData serialize(Message value, MessageSerializationContext context) { - return MessageData - .builderAsJson(value.getData().getClass().getTypeName(), serializer.serialize(value)) - .build(); - } - - @Override - public List serialize(List messages, MessageSerializationContext serializationContext) { - return messages.stream().map(m -> serialize(m, serializationContext)).collect(Collectors.toList()); - } - -} diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java index 42f9f9ee..f2fe5a1c 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java @@ -15,7 +15,7 @@ public SchemaRegistry( this.messageTypeNamingStrategy = messageTypeNamingStrategy; } - public Serializer serializer(ContentType schemaType) { + public Serializer getSerializer(ContentType schemaType) { return serializers.get(schemaType); } From ce0858b877f5483f468a5b76c14a14aee8ea9319 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Thu, 20 Mar 2025 10:32:59 +0100 Subject: [PATCH 7/8] [DEVEX-250] Added Message deserialization --- .../io/kurrent/dbclient/AbstractRead.java | 18 +++-- .../dbclient/AbstractRegularSubscription.java | 10 ++- ...stractSubscribePersistentSubscription.java | 21 ++++-- .../java/io/kurrent/dbclient/GrpcClient.java | 12 ++- .../io/kurrent/dbclient/KurrentDBClient.java | 11 ++- .../kurrent/dbclient/KurrentDBClientBase.java | 3 - .../java/io/kurrent/dbclient/Message.java | 6 +- .../java/io/kurrent/dbclient/OptionsBase.java | 2 +- ...tionsWithBackPressureAndSerialization.java | 29 +++++++ ...ionsWithPositionAndResolveLinkTosBase.java | 2 +- ...ithStartRevisionAndResolveLinkTosBase.java | 2 +- .../dbclient/ReadResponseObserver.java | 14 +++- .../io/kurrent/dbclient/ResolvedEvent.java | 55 ++++++++++++-- ...ubscribePersistentSubscriptionOptions.java | 21 ++++++ .../SubscribePersistentSubscriptionToAll.java | 2 +- ...bscribePersistentSubscriptionToStream.java | 2 +- .../serialization/MessageSerializer.java | 75 ++++++++++++++++++- .../serialization/SerializationTests.java | 4 +- 18 files changed, 241 insertions(+), 48 deletions(-) create mode 100644 db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/AbstractRead.java b/db-client-java/src/main/java/io/kurrent/dbclient/AbstractRead.java index 55e0ba8e..fb44f9a2 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/AbstractRead.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/AbstractRead.java @@ -10,9 +10,9 @@ abstract class AbstractRead implements Publisher { protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions; private final GrpcClient client; - private final OptionsWithBackPressure options; + private final OptionsWithBackPressureAndSerialization options; - protected AbstractRead(GrpcClient client, OptionsWithBackPressure options) { + protected AbstractRead(GrpcClient client, OptionsWithBackPressureAndSerialization options) { this.client = client; this.options = options; } @@ -27,13 +27,17 @@ protected AbstractRead(GrpcClient client, OptionsWithBackPressure options) { @Override public void subscribe(Subscriber subscriber) { - ReadResponseObserver observer = new ReadResponseObserver(options, new ReadStreamConsumer(subscriber)); + ReadResponseObserver observer = new ReadResponseObserver( + options, + new ReadStreamConsumer(subscriber), + this.client.getSerializer(options.serializationSettings().orElse(null)) + ); this.client.getWorkItemArgs().whenComplete((args, error) -> { - if (error != null) { - observer.onError(error); - return; - } + if (error != null) { + observer.onError(error); + return; + } StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder() .setOptions(createOptions()) diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java b/db-client-java/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java index 50e6c059..0a80a289 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java @@ -20,9 +20,9 @@ abstract class AbstractRegularSubscription { protected SubscriptionListener listener; protected Checkpointer checkpointer = null; private final GrpcClient client; - private final OptionsWithBackPressure options; + private final OptionsWithBackPressureAndSerialization options; - protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure options) { + protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressureAndSerialization options) { this.client = client; this.options = options; } @@ -72,6 +72,10 @@ private ReadResponseObserver createObserver(WorkItemArgs args, CompletableFuture event); }); - return new ReadResponseObserver(this.options, consumer); + return new ReadResponseObserver( + this.options, + consumer, + this.client.getSerializer(options.serializationSettings().orElse(null)) + ); } } diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java b/db-client-java/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java index 22a36804..5a399903 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java @@ -1,14 +1,15 @@ package io.kurrent.dbclient; -import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent; -import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc; -import io.kurrent.dbclient.proto.shared.Shared; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; +import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent; +import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc; +import io.kurrent.dbclient.proto.shared.Shared; +import io.kurrent.dbclient.serialization.MessageSerializer; import java.util.concurrent.CompletableFuture; @@ -18,6 +19,7 @@ abstract class AbstractSubscribePersistentSubscription { private final String group; private final PersistentSubscriptionListener listener; private final SubscribePersistentSubscriptionOptions options; + private final MessageSerializer messageSerializer; static { defaultReadOptions = Persistent.ReadReq.Options.newBuilder() @@ -25,13 +27,18 @@ abstract class AbstractSubscribePersistentSubscription { .setStructured(Shared.Empty.getDefaultInstance())); } - public AbstractSubscribePersistentSubscription(GrpcClient client, String group, - SubscribePersistentSubscriptionOptions options, - PersistentSubscriptionListener listener) { + public AbstractSubscribePersistentSubscription( + GrpcClient client, + String group, + SubscribePersistentSubscriptionOptions options, + PersistentSubscriptionListener listener, + MessageSerializer messageSerializer + ) { this.client = client; this.group = group; this.options = options; this.listener = listener; + this.messageSerializer = messageSerializer; } protected abstract Persistent.ReadReq.Options.Builder createOptions(); @@ -91,7 +98,7 @@ public void onNext(Persistent.ReadResp readResp) { int retryCount = readResp.getEvent().hasNoRetryCount() ? 0 : readResp.getEvent().getRetryCount(); try { - ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent()); + ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent(), messageSerializer); ClientTelemetry.traceSubscribe( () -> listener.onEvent(this._subscription, retryCount, resolvedEvent), _subscription.getSubscriptionId(), diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/GrpcClient.java b/db-client-java/src/main/java/io/kurrent/dbclient/GrpcClient.java index 0cce2e00..13df1448 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/GrpcClient.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/GrpcClient.java @@ -3,6 +3,9 @@ import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.kurrent.dbclient.serialization.MessageSerializer; +import io.kurrent.dbclient.serialization.MessageSerializerBuilder; +import io.kurrent.dbclient.serialization.OperationSerializationSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,11 +22,14 @@ class GrpcClient { private final AtomicBoolean closed; private final LinkedBlockingQueue queue; private final KurrentDBClientSettings settings; + private final MessageSerializer serializer; GrpcClient(KurrentDBClientSettings settings, AtomicBoolean closed, LinkedBlockingQueue queue) { this.settings = settings; this.closed = closed; this.queue = queue; + + this.serializer = MessageSerializerBuilder.get(settings.getSerializationSettings()); } public boolean isShutdown() { @@ -101,7 +107,7 @@ public CompletableFuture runWithArgs(Function shutdown() { public KurrentDBClientSettings getSettings() { return this.settings; } + + public MessageSerializer getSerializer(OperationSerializationSettings serializationSettings) { + return this.serializer.with(serializationSettings); + } } \ No newline at end of file diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java index ca708d77..af219ac9 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.json.JsonMapper; import io.kurrent.dbclient.serialization.MessageSerializationContext; +import io.kurrent.dbclient.serialization.MessageSerializer; import org.reactivestreams.Publisher; import java.util.*; @@ -140,11 +141,13 @@ public CompletableFuture appendToStream(String streamName, AppendTo if (options == null) options = AppendToStreamOptions.get(); - MessageSerializationContext serializationContext = new MessageSerializationContext(fromStreamName(streamName)); + MessageSerializationContext serializationContext = + new MessageSerializationContext(fromStreamName(streamName)); - Iterator messageData = options.serializationSettings() - .map(serializer::with) - .orElse(serializer) + MessageSerializer serializer = getGrpcClient() + .getSerializer(options.serializationSettings().orElse(null)); + + Iterator messageData = serializer .serialize(messages, serializationContext) .iterator(); diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java index 9f4e19eb..80b34ee0 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java @@ -16,7 +16,6 @@ public class KurrentDBClientBase { final Logger logger = LoggerFactory.getLogger(KurrentDBClientBase.class); final private GrpcClient client; - final protected MessageSerializer serializer; KurrentDBClientBase(KurrentDBClientSettings settings) { Discovery discovery; @@ -33,8 +32,6 @@ public class KurrentDBClientBase { this.client = service.getHandle(); CompletableFuture.runAsync(service, createConnectionLoopExecutor()); - - serializer = MessageSerializerBuilder.get(settings.getSerializationSettings()); } private Executor createConnectionLoopExecutor() { return Executors.newSingleThreadExecutor(r -> { diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/Message.java b/db-client-java/src/main/java/io/kurrent/dbclient/Message.java index 85b8aebb..0895e7b3 100644 --- a/db-client-java/src/main/java/io/kurrent/dbclient/Message.java +++ b/db-client-java/src/main/java/io/kurrent/dbclient/Message.java @@ -34,7 +34,7 @@ public Message(Object data, Object metadata, UUID messageId) { * *

Example: *

-     * // Create a message with a specific ID
+     * Create a message with a specific ID
      * UserRegistered userRegistered = new UserRegistered("123", "Alice");
      * Message message = Message.from(userRegistered);
      * 
@@ -53,7 +53,7 @@ public static Message from(Object data) { * *

Example: *

-     * // Create a message with a specific ID
+     * Create a message with a specific ID
      * UserRegistered userRegistered = new UserRegistered("123", "Alice");
      * UUID messageId = UUID.randomUUID();
      * Message message = Message.from(userRegistered, messageId);
@@ -74,7 +74,7 @@ public static Message from(Object data, UUID messageId) {
      *
      * 

Example: *

-     * // Create a message with data and metadata
+     * Create a message with data and metadata
      * OrderPlaced orderPlaced = new OrderPlaced("ORD-123", 99.99);
      * EventMetadata metadata = new EventMetadata(
      *     "user-456", 
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java
index f7199a88..bb50b6d0 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java
@@ -8,7 +8,7 @@ class OptionsBase {
     private final OperationKind kind;
     private UserCredentials credentials;
     private boolean requiresLeader;
-    private Map headers = new HashMap<>();
+    private final Map headers = new HashMap<>();
 
     protected OptionsBase() {
         this(OperationKind.Regular);
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java
new file mode 100644
index 00000000..eb9b790c
--- /dev/null
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java
@@ -0,0 +1,29 @@
+package io.kurrent.dbclient;
+
+import java.util.Optional;
+
+import io.kurrent.dbclient.serialization.OperationSerializationSettings;
+
+class OptionsWithBackPressureAndSerialization extends OptionsWithBackPressure {
+    public OperationSerializationSettings serializationSettings;
+
+    protected OptionsWithBackPressureAndSerialization(OperationKind kind) {
+        super(kind);
+    }
+    
+    /**
+     * Allows to customize or disable the automatic deserialization.
+     */
+    public Optional serializationSettings() {
+        return Optional.ofNullable(serializationSettings);
+    }
+
+    /**
+     * Customize or disable the automatic deserialization.
+     */
+    @SuppressWarnings("unchecked")
+    public T serializationSettings(OperationSerializationSettings serializationSettings) {
+        this.serializationSettings = serializationSettings;
+        return (T)this;
+    }
+}
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java
index 543126af..dc777fa5 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java
@@ -1,6 +1,6 @@
 package io.kurrent.dbclient;
 
-class OptionsWithPositionAndResolveLinkTosBase extends OptionsWithBackPressure {
+class OptionsWithPositionAndResolveLinkTosBase extends OptionsWithBackPressureAndSerialization {
     private StreamPosition position;
 
     protected OptionsWithPositionAndResolveLinkTosBase(OperationKind kind) {
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java
index 9b978972..5afd050a 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java
@@ -1,6 +1,6 @@
 package io.kurrent.dbclient;
 
-class OptionsWithStartRevisionAndResolveLinkTosBase extends OptionsWithBackPressure {
+class OptionsWithStartRevisionAndResolveLinkTosBase extends OptionsWithBackPressureAndSerialization {
     private StreamPosition startRevision;
 
     protected OptionsWithStartRevisionAndResolveLinkTosBase(OperationKind kind) {
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java b/db-client-java/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java
index da6408c8..5f3b182b 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java
@@ -7,6 +7,7 @@
 import io.grpc.StatusRuntimeException;
 import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.ClientResponseObserver;
+import io.kurrent.dbclient.serialization.MessageSerializer;
 import org.reactivestreams.Subscription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,11 +25,16 @@ class ReadResponseObserver implements ClientResponseObserver requestStream;
     private int outstandingRequests;
     private WorkItemArgs args;
-
-
-    public ReadResponseObserver(OptionsWithBackPressure options, StreamConsumer consumer) {
+    private final MessageSerializer messageSerializer;
+    
+    public ReadResponseObserver(
+            OptionsWithBackPressure options, 
+            StreamConsumer consumer, 
+            MessageSerializer messageSerializer
+    ) {
         this.options = options;
         this.consumer = consumer;
+        this.messageSerializer = messageSerializer;
     }
 
     public Subscription getSubscription() {
@@ -105,7 +111,7 @@ public void onNext(StreamsOuterClass.ReadResp value) {
         }
 
         if (value.hasEvent())
-            consumer.onEvent(ResolvedEvent.fromWire(value.getEvent()));
+            consumer.onEvent(ResolvedEvent.fromWire(value.getEvent(), messageSerializer));
         else if (value.hasConfirmation())
             consumer.onSubscriptionConfirmation(value.getConfirmation().getSubscriptionId());
         else if (value.hasCheckpoint()) {
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/ResolvedEvent.java b/db-client-java/src/main/java/io/kurrent/dbclient/ResolvedEvent.java
index 8f5a8e3d..652f404b 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/ResolvedEvent.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/ResolvedEvent.java
@@ -2,10 +2,10 @@
 
 import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
 import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
+import io.kurrent.dbclient.serialization.MessageSerializer;
 
 import java.util.Objects;
 import java.util.Optional;
-import java.util.StringJoiner;
 
 /**
  * Represents an event with a potential link.
@@ -13,12 +13,17 @@
 public class ResolvedEvent {
     private final RecordedEvent event;
     private final RecordedEvent link;
-
     private final Position position;
+    private final Message message;
 
     public ResolvedEvent(RecordedEvent event, RecordedEvent link, Position position) {
+        this(event, link, null, position);
+    }
+
+    public ResolvedEvent(RecordedEvent event, RecordedEvent link, Message message, Position position) {
         this.event = event;
         this.link = link;
+        this.message = message;
         this.position = position;
     }
 
@@ -44,8 +49,26 @@ public RecordedEvent getOriginalEvent() {
         return this.link != null ? this.link : this.event;
     }
 
+
+    /**
+     * Returns the deserialized message
+     * It will be provided or equal to null, depending on the automatic deserialization settings you choose.
+     * If it's null, you can use OriginalEvent to deserialize it manually.
+     */
+    public Optional getMessage() {
+        return Optional.ofNullable(message);
+    }
+
+    /**
+     * Returns the deserialized message data.
+     */
+    public Optional getDeserializedData() {
+        return getMessage().map(Message::data);
+    }
+
     /**
      * Returns the transaction log position of the resolved event.
+     *
      * @see Position
      */
     public Optional getPosition() {
@@ -65,21 +88,41 @@ public int hashCode() {
         return Objects.hash(event, link);
     }
 
-    static ResolvedEvent fromWire(StreamsOuterClass.ReadResp.ReadEvent wireEvent) {
+    static ResolvedEvent fromWire(
+            StreamsOuterClass.ReadResp.ReadEvent wireEvent,
+            MessageSerializer messageSerializer
+    ) {
         RecordedEvent event = wireEvent.hasEvent() ? RecordedEvent.fromWire(wireEvent.getEvent()) : null;
         RecordedEvent link = wireEvent.hasLink() ? RecordedEvent.fromWire(wireEvent.getLink()) : null;
         Position position = wireEvent.hasNoPosition() ? null : new Position(wireEvent.getCommitPosition(), wireEvent.getCommitPosition());
 
-        return new ResolvedEvent(event, link, position);
+        return ResolvedEvent.from(event, link, position, messageSerializer);
     }
 
-    static ResolvedEvent fromWire(Persistent.ReadResp.ReadEvent wireEvent) {
+    static ResolvedEvent fromWire(
+            Persistent.ReadResp.ReadEvent wireEvent,
+            MessageSerializer messageSerializer
+    ) {
         RecordedEvent event = wireEvent.hasEvent() ? RecordedEvent.fromWire(wireEvent.getEvent()) : null;
         RecordedEvent link = wireEvent.hasLink() ? RecordedEvent.fromWire(wireEvent.getLink()) : null;
         Position position = wireEvent.hasNoPosition() ? null : new Position(wireEvent.getCommitPosition(), wireEvent.getCommitPosition());
 
 
-        return new ResolvedEvent(event, link, position);
+        return ResolvedEvent.from(event, link, position, messageSerializer);
+    }
+
+    static ResolvedEvent from(
+            RecordedEvent event,
+            RecordedEvent link,
+            Position position,
+            MessageSerializer messageSerializer
+    ) {
+        RecordedEvent originalEvent = link != null ? link : event;
+        Optional message = messageSerializer.tryDeserialize(originalEvent);
+
+        return message
+                .map(value -> new ResolvedEvent(event, link, value, position))
+                .orElseGet(() -> new ResolvedEvent(event, link, position));
     }
 
     @Override
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionOptions.java b/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionOptions.java
index 70cfa70a..9d61147e 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionOptions.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionOptions.java
@@ -1,9 +1,14 @@
 package io.kurrent.dbclient;
 
+import io.kurrent.dbclient.serialization.OperationSerializationSettings;
+
+import java.util.Optional;
+
 /**
  * Options of the subscribe persistent subscription request.
  */
 public class SubscribePersistentSubscriptionOptions extends OptionsBase {
+    public OperationSerializationSettings serializationSettings;
     private int bufferSize;
 
     private SubscribePersistentSubscriptionOptions() {
@@ -29,4 +34,20 @@ public SubscribePersistentSubscriptionOptions bufferSize(int value) {
         bufferSize = value;
         return this;
     }
+
+
+    /**
+     * Allows to customize or disable the automatic deserialization.
+     */
+    public Optional serializationSettings() {
+        return Optional.ofNullable(serializationSettings);
+    }
+
+    /**
+     * Customize or disable the automatic deserialization.
+     */
+    public SubscribePersistentSubscriptionOptions serializationSettings(OperationSerializationSettings serializationSettings) {
+        this.serializationSettings = serializationSettings;
+        return this;
+    }   
 }
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToAll.java b/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToAll.java
index ba1abbb4..e7761661 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToAll.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToAll.java
@@ -7,7 +7,7 @@ class SubscribePersistentSubscriptionToAll extends AbstractSubscribePersistentSu
     public SubscribePersistentSubscriptionToAll(GrpcClient connection, String group,
                                                 SubscribePersistentSubscriptionOptions options,
                                                 PersistentSubscriptionListener listener) {
-        super(connection, group, options, listener);
+        super(connection, group, options, listener, connection.getSerializer(options.serializationSettings().orElse(null)));
     }
 
     @Override
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToStream.java b/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToStream.java
index 7be619ca..547494d9 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToStream.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToStream.java
@@ -10,7 +10,7 @@ class SubscribePersistentSubscriptionToStream extends AbstractSubscribePersisten
     public SubscribePersistentSubscriptionToStream(GrpcClient connection, String stream, String group,
                                                    SubscribePersistentSubscriptionOptions options,
                                                    PersistentSubscriptionListener listener) {
-        super(connection, group, options, listener);
+        super(connection, group, options, listener, connection.getSerializer(options.serializationSettings().orElse(null)));
 
         this.stream = stream;
     }
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java
index 6250e7e3..b20ca094 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java
@@ -2,9 +2,12 @@
 
 import io.kurrent.dbclient.Message;
 import io.kurrent.dbclient.MessageData;
+import io.kurrent.dbclient.RecordedEvent;
+import static io.kurrent.dbclient.serialization.ContentTypeUtils.*;
 
-import java.util.Arrays;
+import java.util.function.Consumer;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -14,6 +17,8 @@ public interface MessageSerializer {
     MessageData serialize(Message message, MessageSerializationContext context);
 
     List serialize(List messages, MessageSerializationContext serializationContext);
+
+    public Optional tryDeserialize(RecordedEvent record);
 }
 
 class MessageSerializerImpl implements MessageSerializer {
@@ -37,8 +42,24 @@ public static MessageSerializer from(KurrentDBClientSerializationSettings settin
     }
     
     @Override
-    public MessageSerializer with(OperationSerializationSettings serializationSettings) {
-        return this;
+    public MessageSerializer with(OperationSerializationSettings operationSettings) {
+        if (operationSettings == null) {
+            return this;
+        }
+
+        if (operationSettings.automaticDeserialization() == AutomaticDeserialization.DISABLED) {
+            return NullMessageSerializer.INSTANCE;
+        }
+
+        Consumer configureSettings = operationSettings.configureSettings();
+        if (configureSettings == null) {
+            return this;
+        }
+
+        KurrentDBClientSerializationSettings settings = serializationSettings.clone();
+        configureSettings.accept(settings);
+
+        return new MessageSerializerImpl(SchemaRegistry.from(settings), settings);
     }
 
     @Override
@@ -73,6 +94,54 @@ public MessageData serialize(Message message, MessageSerializationContext contex
     public List serialize(List messages, MessageSerializationContext serializationContext) {
         return messages.stream().map(m -> serialize(m, serializationContext)).collect(Collectors.toList());
     }
+
+    @Override
+    public Optional tryDeserialize(RecordedEvent record) {
+        Optional> messageClass = schemaRegistry.tryResolveDataJavaClass(record.getEventType());
+        if (!messageClass.isPresent()) {
+            return Optional.empty();
+        }
+
+        Object data = schemaRegistry
+                .getSerializer(fromMessageContentType(record.getContentType()))
+                .deserialize(messageClass.get(), record.getEventData());
+
+        if (data == null) {
+            return Optional.empty();
+        }
+
+        Optional> metadataClass = schemaRegistry.tryResolveMetadataJavaClass(record.getEventType());
+        
+        Object metadata = metadataClass.isPresent() && record.getUserMetadata().length > 0
+                ? metadataSerializer.deserialize(metadataClass.get(), record.getUserMetadata())
+                : null;
+
+        return Optional.of(Message.from(data, metadata, record.getEventId()));
+    }
+}
+
+class NullMessageSerializer implements MessageSerializer {
+    public static final NullMessageSerializer INSTANCE = new NullMessageSerializer();
+
+    @Override
+    public MessageData serialize(Message value, MessageSerializationContext context) {
+        throw new UnsupportedOperationException("Cannot serialize, automatic deserialization is disabled");
+    }
+
+    @Override
+    public List serialize(List messages, MessageSerializationContext serializationContext) {
+        throw new UnsupportedOperationException("Cannot serialize, automatic deserialization is disabled");
+    }
+
+    @Override
+    public Optional tryDeserialize(RecordedEvent eventRecord) {
+        return Optional.empty();
+    }
+
+    @Override
+    public MessageSerializer with(OperationSerializationSettings operationSettings) {
+        return this;
+    }
 }
 
 class ContentTypeUtils {
diff --git a/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java b/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java
index f07915a9..ac798e57 100644
--- a/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java
+++ b/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java
@@ -20,12 +20,12 @@ default void testPlainJavaObjectsAreSerializedAndDeserializedUsingAutoSerializat
         
         // When
         AppendToStreamOptions appendOptions = AppendToStreamOptions.get()
-                .expectedRevision(ExpectedRevision.noStream());
+                .streamState(StreamState.noStream());
 
         WriteResult appendResult = client.appendToStream(streamName, appendOptions, expected)
                 .get();
 
-        Assertions.assertEquals(ExpectedRevision.expectedRevision(1), appendResult.getNextExpectedRevision());
+        Assertions.assertEquals(StreamState.streamRevision(1), appendResult.getNextExpectedRevision());
         
         // Ensure appended event is readable
         ReadResult result = client.readStream(streamName, ReadStreamOptions.get())

From 8c5a66a93cc01a597862c4934c9e62a7bc0b961f Mon Sep 17 00:00:00 2001
From: Oskar Dudycz 
Date: Wed, 26 Mar 2025 10:58:37 +0100
Subject: [PATCH 8/8] [DEVEX-250] Reshaped the API to handle regular and
 wrapped messages, together with explicit Message type

---
 .../io/kurrent/dbclient/AppendToStream.java   |  14 +-
 .../io/kurrent/dbclient/KurrentDBClient.java  | 199 +++++++++++++++---
 .../dbclient/OptionsWithStreamStateBase.java  |  14 +-
 .../serialization/SerializationTests.java     |  10 +-
 4 files changed, 196 insertions(+), 41 deletions(-)

diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java b/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java
index 3ddd75ff..9154fd0c 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java
@@ -17,12 +17,20 @@
 class AppendToStream {
     private final GrpcClient client;
     private final String streamName;
+    private final StreamState streamState;
     private final List events;
     private final AppendToStreamOptions options;
 
-    public AppendToStream(GrpcClient client, String streamName, Iterator events, AppendToStreamOptions options) {
+    public AppendToStream(
+            GrpcClient client, 
+            String streamName, 
+            StreamState streamState, 
+            Iterator events, 
+            AppendToStreamOptions options
+    ) {
         this.client = client;
         this.streamName = streamName;
+        this.streamState = streamState;
         this.events = new ArrayList<>();
         while (events.hasNext()) {
             this.events.add(events.next());
@@ -42,7 +50,7 @@ public CompletableFuture execute() {
 
     private CompletableFuture append(ManagedChannel channel, List events) {
         CompletableFuture result = new CompletableFuture<>();
-        StreamsOuterClass.AppendReq.Options.Builder options = this.options.getStreamState().applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
+        StreamsOuterClass.AppendReq.Options.Builder options = this.streamState.applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
                 .setStreamIdentifier(Shared.StreamIdentifier.newBuilder()
                         .setStreamName(ByteString.copyFromUtf8(streamName))
                         .build()));
@@ -117,7 +125,7 @@ private CompletableFuture append(ManagedChannel channel, List appendToStream(String streamName, EventDat
     /**
      * Appends events to a given stream.
      *
+     * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
      * @param streamName stream's name.
-     * @param events     events to send.
+     * @param events     events to store.
      * @return a write result if successful.
      * @see WriteResult
      */
@@ -57,22 +58,24 @@ public CompletableFuture appendToStream(String streamName, Iterator
     /**
      * Appends events to a given stream.
      *
+     * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
      * @param streamName stream's name.
      * @param options    append stream request's options.
-     * @param events     events to send.
+     * @param events     events to store.
      * @return a write result if successful.
      * @see WriteResult
      */
     public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, EventData... events) {
         return this.appendToStream(streamName, options, Arrays.stream(events).iterator());
     }
-
+    
     /**
      * Appends events to a given stream.
      *
+     * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
      * @param streamName stream's name.
      * @param options    append stream request's options.
-     * @param events     events to send.
+     * @param events     events to store.
      * @return a write result if successful.
      * @see WriteResult
      */
@@ -85,73 +88,209 @@ public CompletableFuture appendToStream(String streamName, AppendTo
                         .map(EventData::toMessageData)
                         .iterator();
 
-        return new AppendToStream(this.getGrpcClient(), streamName, messageData, options).execute();
+        return new AppendToStream(this.getGrpcClient(), streamName, options.getStreamState(), messageData, options).execute();
     }
 
     /**
      * Appends messages to a given stream.
      *
-     * @param streamName stream's name.
-     * @param messages   messages to send.
+     * @param streamName  stream's name.
+     * @param streamState expected stream's state in the database
+     * @param messages    messages to store.
      * @return a write result if successful.
      * @see WriteResult
      */
-    public CompletableFuture appendToStream(String streamName, List messages) {
-        // TODO: Sort type erasure issue with Iterator
-        return this.appendToStream(streamName, AppendToStreamOptions.get(), messages);
+    public CompletableFuture appendToStream(
+            String streamName, 
+            StreamState streamState,
+            Object... messages
+    ) {
+        return this.appendToStream(
+                streamName, 
+                streamState, 
+                Arrays.stream(messages).collect(Collectors.toList()), 
+                AppendToStreamOptions.get()
+        );
     }
 
+
     /**
      * Appends messages to a given stream.
      *
-     * @param streamName stream's name.
-     * @param options    append stream request's options.
-     * @param messages   messages to send.
+     * @param streamName  stream's name.
+     * @param streamState expected stream's state in the database
+     * @param messages    messages to store.
      * @return a write result if successful.
      * @see WriteResult
      */
-    public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, Message... messages) {
-        return this.appendToStream(streamName, options, Stream.of(messages).collect(Collectors.toList()));
+    public CompletableFuture appendToStream(
+            String streamName,
+            StreamState streamState,
+            Message... messages
+    ) {
+        List toAppend = Arrays.stream(messages).collect(Collectors.toList());
+        return this.appendToStream(
+                streamName,
+                streamState,
+                toAppend,
+                AppendToStreamOptions.get()
+        );
     }
 
     /**
      * Appends messages to a given stream.
      *
-     * @param streamName stream's name.
-     * @param options    append stream request's options.
-     * @param messages   messages to send.
+     * @param streamName  stream's name.
+     * @param streamState expected stream's state in the database
+     * @param messages    messages to store.
      * @return a write result if successful.
      * @see WriteResult
      */
-    public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, Object... messages) {
-        return this.appendToStream(streamName, options, Stream.of(messages).map(Message::from).collect(Collectors.toList()));
+    public CompletableFuture appendToStream(
+            String streamName,
+            StreamState streamState,
+            Iterable messages
+    ) {
+        return this.appendToStream(
+                streamName,
+                streamState,
+                messages,
+                AppendToStreamOptions.get()
+        );
     }
 
+
     /**
      * Appends messages to a given stream.
      *
-     * @param streamName stream's name.
-     * @param options    append stream request's options.
-     * @param messages   messages to send.
+     * @param streamName  stream's name.
+     * @param streamState expected stream's state in the database
+     * @param messages    messages to store.
+     * @param options     append stream request's options.
+     * @return a write result if successful.
+     * @see WriteResult
+     */
+    public CompletableFuture appendToStream(
+            String streamName,
+            StreamState streamState,
+            Object[] messages,
+            AppendToStreamOptions options
+    ) {
+        List toAppend =
+                Arrays.stream(messages)
+                        .map(Message::from)
+                        .collect(Collectors.toList());
+
+        return this.appendToStream(
+                streamName,
+                streamState,
+                toAppend,
+                options
+        );
+    }
+
+    /**
+     * Appends messages to a given stream.
+     *
+     * @param streamName  stream's name.
+     * @param streamState expected stream's state in the database
+     * @param messages    messages to store.
+     * @param options     append stream request's options.
      * @return a write result if successful.
      * @see WriteResult
      */
-    public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, List messages) {
-        // TODO: Sort type erasure issue with Iterator
+    public CompletableFuture appendToStream(
+            String streamName,
+            StreamState streamState,
+            Iterable messages,
+            AppendToStreamOptions options
+    ) {
+        List toAppend =
+                StreamSupport.stream(messages.spliterator(), false)
+                        .map(Message::from)
+                        .collect(Collectors.toList());
+
+        return this.appendToStream(
+                streamName,
+                streamState,
+                toAppend,
+                options
+        );
+    }
+
+    /**
+     * Appends messages to a given stream.
+     *
+     * @param streamName  stream's name.
+     * @param streamState expected stream's state in the database
+     * @param messages    messages to store.
+     * @return a write result if successful.
+     * @see WriteResult
+     */
+    public CompletableFuture appendToStream(
+            String streamName,
+            StreamState streamState,
+            List messages
+    ) {
+        return this.appendToStream(streamName, streamState, messages, AppendToStreamOptions.get());
+    }
+
+    /**
+     * Appends messages to a given stream.
+     *
+     * @param streamName  stream's name.
+     * @param streamState expected stream's state in the database.
+     * @param messages    messages to store.
+     * @param options     append stream request's options.
+     * @return a write result if successful.
+     * @see WriteResult
+     */
+    public CompletableFuture appendToStream(
+            String streamName,
+            StreamState streamState,
+            Message[] messages,
+            AppendToStreamOptions options
+    ) {
+        List toAppend = Arrays.stream(messages).collect(Collectors.toList());
+
+        return this.appendToStream(
+                streamName,
+                streamState,
+                toAppend,
+                options
+        );
+    }
+
+    /**
+     * Appends messages to a given stream.
+     *
+     * @param streamName  stream's name.
+     * @param streamState expected stream's state in the database.
+     * @param messages    messages to store.
+     * @param options     append stream request's options.
+     * @return a write result if successful.
+     * @see WriteResult
+     */
+    public CompletableFuture appendToStream(
+            String streamName,
+            StreamState streamState,
+            List messages,
+            AppendToStreamOptions options
+    ) {
         if (options == null)
             options = AppendToStreamOptions.get();
 
-        MessageSerializationContext serializationContext = 
+        MessageSerializationContext serializationContext =
                 new MessageSerializationContext(fromStreamName(streamName));
 
         MessageSerializer serializer = getGrpcClient()
                 .getSerializer(options.serializationSettings().orElse(null));
-        
+
         Iterator messageData = serializer
                 .serialize(messages, serializationContext)
                 .iterator();
 
-        return new AppendToStream(this.getGrpcClient(), streamName, messageData, options).execute();
+        return new AppendToStream(this.getGrpcClient(), streamName, streamState, messageData, options).execute();
     }
 
     /**
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStreamStateBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStreamStateBase.java
index 48eef358..8dcfcd5f 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStreamStateBase.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStreamStateBase.java
@@ -1,5 +1,9 @@
 package io.kurrent.dbclient;
 
+/**
+ * @deprecated This class may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter
+ */
+@Deprecated
 class OptionsWithStreamStateBase extends OptionsBase {
     private StreamState streamState;
 
@@ -7,17 +11,22 @@ protected OptionsWithStreamStateBase() {
         this.streamState = StreamState.any();
     }
 
+    /**
+     * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
+     */
+    @Deprecated
     StreamState getStreamState() {
         return this.streamState;
     }
 
     /**
      * Asks the server to check that the stream receiving is at the expected state.
-
+     * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
      * @param state - expected revision.
      * @return updated options.
      */
     @SuppressWarnings("unchecked")
+    @Deprecated
     public T streamState(StreamState state) {
         this.streamState = state;
         return (T) this;
@@ -26,10 +35,11 @@ public T streamState(StreamState state) {
 
     /**
      * Asks the server to check that the stream receiving is at the given expected revision.
-
+     * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
      * @param revision - expected revision.
      * @return updated options.
      */
+    @Deprecated
     public T streamRevision(long revision) {
         return streamState(StreamState.streamRevision(revision));
     }
diff --git a/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java b/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java
index ac798e57..151773a4 100644
--- a/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java
+++ b/db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java
@@ -4,6 +4,7 @@
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.stream.IntStream;
@@ -16,13 +17,10 @@ default void testPlainJavaObjectsAreSerializedAndDeserializedUsingAutoSerializat
 
         // Given
         final String streamName = generateName();
-        final Object[] expected = generateMessages(2).toArray();
+        final List expected = new ArrayList<>(generateMessages(2));
         
         // When
-        AppendToStreamOptions appendOptions = AppendToStreamOptions.get()
-                .streamState(StreamState.noStream());
-
-        WriteResult appendResult = client.appendToStream(streamName, appendOptions, expected)
+        WriteResult appendResult = client.appendToStream(streamName, StreamState.noStream(), expected)
                 .get();
 
         Assertions.assertEquals(StreamState.streamRevision(1), appendResult.getNextExpectedRevision());
@@ -34,7 +32,7 @@ default void testPlainJavaObjectsAreSerializedAndDeserializedUsingAutoSerializat
         Assertions.assertEquals(2, result.getEvents().size());
     }
 
-    static List generateMessages(int count){
+    static List generateMessages(int count){
         return IntStream.range(0, count)
                 .mapToObj(x -> 
                         new UserRegistered(