diff --git a/CODE-OF-CONDUCT.md b/CODE-OF-CONDUCT.md index d9e26032..7da407a4 100644 --- a/CODE-OF-CONDUCT.md +++ b/CODE-OF-CONDUCT.md @@ -1,6 +1,6 @@ # Contributor Covenant Code of Conduct -`KurrentDB-Client-Java` follows the widely-adopted Contributor Covenant Code of Conduct. +`Kurrent-Client-Java` follows the widely-adopted Contributor Covenant Code of Conduct. ## Our Pledge diff --git a/build.gradle b/build.gradle index 1beb8d7d..e7805f20 100644 --- a/build.gradle +++ b/build.gradle @@ -61,6 +61,7 @@ dependencies { implementation "io.grpc:grpc-stub:${grpcVersion}" implementation "io.grpc:grpc-protobuf:${grpcVersion}" implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' implementation "org.slf4j:slf4j-api:2.0.17" implementation "org.bouncycastle:bcprov-jdk18on:1.80" implementation "org.bouncycastle:bcpkix-jdk18on:1.80" @@ -79,7 +80,6 @@ dependencies { testImplementation "org.reactivestreams:reactive-streams-tck:${reactiveStreamsApiVersion}" testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}" testImplementation platform("com.fasterxml.jackson:jackson-bom:${jacksonVersion}") - testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' testImplementation "com.github.javafaker:javafaker:1.0.2" testImplementation 'org.slf4j:slf4j-simple:2.0.17' testImplementation "io.opentelemetry:opentelemetry-sdk" diff --git a/src/main/java/io/kurrent/dbclient/AbstractRead.java b/src/main/java/io/kurrent/dbclient/AbstractRead.java index 55e0ba8e..fb44f9a2 100644 --- a/src/main/java/io/kurrent/dbclient/AbstractRead.java +++ b/src/main/java/io/kurrent/dbclient/AbstractRead.java @@ -10,9 +10,9 @@ abstract class AbstractRead implements Publisher { protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions; private final GrpcClient client; - private final OptionsWithBackPressure options; + private final OptionsWithBackPressureAndSerialization options; - protected AbstractRead(GrpcClient client, OptionsWithBackPressure options) { + protected AbstractRead(GrpcClient client, OptionsWithBackPressureAndSerialization options) { this.client = client; this.options = options; } @@ -27,13 +27,17 @@ protected AbstractRead(GrpcClient client, OptionsWithBackPressure options) { @Override public void subscribe(Subscriber subscriber) { - ReadResponseObserver observer = new ReadResponseObserver(options, new ReadStreamConsumer(subscriber)); + ReadResponseObserver observer = new ReadResponseObserver( + options, + new ReadStreamConsumer(subscriber), + this.client.getSerializer(options.serializationSettings().orElse(null)) + ); this.client.getWorkItemArgs().whenComplete((args, error) -> { - if (error != null) { - observer.onError(error); - return; - } + if (error != null) { + observer.onError(error); + return; + } StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder() .setOptions(createOptions()) diff --git a/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java b/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java index 34446efd..b837b246 100644 --- a/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java +++ b/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java @@ -3,13 +3,6 @@ import io.kurrent.dbclient.proto.shared.Shared; import io.kurrent.dbclient.proto.streams.StreamsGrpc; import io.kurrent.dbclient.proto.streams.StreamsOuterClass; -import io.grpc.Metadata; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientResponseObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.grpc.ManagedChannel; import java.util.concurrent.CompletableFuture; @@ -21,9 +14,9 @@ abstract class AbstractRegularSubscription { protected SubscriptionListener listener; protected Checkpointer checkpointer = null; private final GrpcClient client; - private final OptionsWithBackPressure options; + private final OptionsWithBackPressureAndSerialization options; - protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure options) { + protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressureAndSerialization options) { this.client = client; this.options = options; } @@ -77,6 +70,10 @@ private ReadResponseObserver createObserver(ManagedChannel channel, CompletableF } ); - return new ReadResponseObserver(this.options, consumer); + return new ReadResponseObserver( + this.options, + consumer, + this.client.getSerializer(options.serializationSettings().orElse(null)) + ); } } diff --git a/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java b/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java index 22a36804..5a399903 100644 --- a/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java +++ b/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java @@ -1,14 +1,15 @@ package io.kurrent.dbclient; -import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent; -import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc; -import io.kurrent.dbclient.proto.shared.Shared; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; +import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent; +import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc; +import io.kurrent.dbclient.proto.shared.Shared; +import io.kurrent.dbclient.serialization.MessageSerializer; import java.util.concurrent.CompletableFuture; @@ -18,6 +19,7 @@ abstract class AbstractSubscribePersistentSubscription { private final String group; private final PersistentSubscriptionListener listener; private final SubscribePersistentSubscriptionOptions options; + private final MessageSerializer messageSerializer; static { defaultReadOptions = Persistent.ReadReq.Options.newBuilder() @@ -25,13 +27,18 @@ abstract class AbstractSubscribePersistentSubscription { .setStructured(Shared.Empty.getDefaultInstance())); } - public AbstractSubscribePersistentSubscription(GrpcClient client, String group, - SubscribePersistentSubscriptionOptions options, - PersistentSubscriptionListener listener) { + public AbstractSubscribePersistentSubscription( + GrpcClient client, + String group, + SubscribePersistentSubscriptionOptions options, + PersistentSubscriptionListener listener, + MessageSerializer messageSerializer + ) { this.client = client; this.group = group; this.options = options; this.listener = listener; + this.messageSerializer = messageSerializer; } protected abstract Persistent.ReadReq.Options.Builder createOptions(); @@ -91,7 +98,7 @@ public void onNext(Persistent.ReadResp readResp) { int retryCount = readResp.getEvent().hasNoRetryCount() ? 0 : readResp.getEvent().getRetryCount(); try { - ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent()); + ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent(), messageSerializer); ClientTelemetry.traceSubscribe( () -> listener.onEvent(this._subscription, retryCount, resolvedEvent), _subscription.getSubscriptionId(), diff --git a/src/main/java/io/kurrent/dbclient/AppendToStream.java b/src/main/java/io/kurrent/dbclient/AppendToStream.java index 11ef0060..9154fd0c 100644 --- a/src/main/java/io/kurrent/dbclient/AppendToStream.java +++ b/src/main/java/io/kurrent/dbclient/AppendToStream.java @@ -17,12 +17,20 @@ class AppendToStream { private final GrpcClient client; private final String streamName; - private final List events; + private final StreamState streamState; + private final List events; private final AppendToStreamOptions options; - public AppendToStream(GrpcClient client, String streamName, Iterator events, AppendToStreamOptions options) { + public AppendToStream( + GrpcClient client, + String streamName, + StreamState streamState, + Iterator events, + AppendToStreamOptions options + ) { this.client = client; this.streamName = streamName; + this.streamState = streamState; this.events = new ArrayList<>(); while (events.hasNext()) { this.events.add(events.next()); @@ -40,9 +48,9 @@ public CompletableFuture execute() { this.options.getCredentials())); } - private CompletableFuture append(ManagedChannel channel, List events) { + private CompletableFuture append(ManagedChannel channel, List events) { CompletableFuture result = new CompletableFuture<>(); - StreamsOuterClass.AppendReq.Options.Builder options = this.options.getStreamState().applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder() + StreamsOuterClass.AppendReq.Options.Builder options = this.streamState.applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder() .setStreamIdentifier(Shared.StreamIdentifier.newBuilder() .setStreamName(ByteString.copyFromUtf8(streamName)) .build())); @@ -93,18 +101,18 @@ private CompletableFuture append(ManagedChannel channel, List append(ManagedChannel channel, List { + private OperationSerializationSettings serializationSettings = null; + private AppendToStreamOptions() { } + /** + * Returns optional serialization settings + */ + public Optional serializationSettings() { + return Optional.ofNullable(this.serializationSettings); + } + + /** + * Allows to customize or disable the automatic deserialization + * + * @param serializationSettings - expected revision. + * @return updated options. + */ + public AppendToStreamOptions serializationSettings(OperationSerializationSettings serializationSettings) { + this.serializationSettings = serializationSettings; + return this; + } + /** * Returns options with default values. */ diff --git a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java index 4d180053..26941881 100644 --- a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java +++ b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java @@ -28,15 +28,15 @@ private static Tracer getTracer() { ClientTelemetry.class.getPackage().getImplementationVersion()); } - private static List tryInjectTracingContext(Span span, List events) { - List injectedEvents = new ArrayList<>(); - for (EventData event : events) { + private static List tryInjectTracingContext(Span span, List events) { + List injectedEvents = new ArrayList<>(); + for (MessageData event : events) { boolean isJsonEvent = Objects.equals(event.getContentType(), ContentType.JSON); - injectedEvents.add(EventDataBuilder - .binary(event.getEventId(), event.getEventType(), event.getEventData(), isJsonEvent) - .metadataAsBytes(tryInjectTracingContext(span, event.getUserMetadata())) - .build()); + injectedEvents.add( + MessageDataBuilder + .with(event.getMessageType(), event.getMessageData(), tryInjectTracingContext(span, event.getMessageMetadata()), event.getMessageId(), isJsonEvent) + .build()); } return injectedEvents; } @@ -85,9 +85,9 @@ private static SpanContext tryExtractTracingContext(byte[] userMetadataBytes) { } static CompletableFuture traceAppend( - BiFunction, CompletableFuture> appendOperation, + BiFunction, CompletableFuture> appendOperation, ManagedChannel channel, - List events, String streamId, KurrentDBClientSettings settings, + List events, String streamId, KurrentDBClientSettings settings, UserCredentials optionalCallCredentials) { Span span = createSpan( ClientTelemetryConstants.Operations.APPEND, diff --git a/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java b/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java index df4cfe4e..dd02657b 100644 --- a/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java +++ b/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java @@ -2,6 +2,7 @@ import io.grpc.ClientInterceptor; +import io.kurrent.dbclient.serialization.KurrentDBClientSerializationSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ public class ConnectionSettingsBuilder { private List _interceptors = new ArrayList<>(); private String _tlsCaFile = null; private Set _features = new HashSet<>(); + private KurrentDBClientSerializationSettings _serializationSettings; ConnectionSettingsBuilder() {} @@ -60,7 +62,9 @@ public KurrentDBClientSettings buildConnectionSettings() { _defaultDeadline, _interceptors, _tlsCaFile, - _features); + _features, + _serializationSettings + ); } /** @@ -241,6 +245,15 @@ public ConnectionSettingsBuilder feature(String feature) { return this; } + /** + * Provides configuration options for messages serialization and deserialization in the KurrentDB client. + * If null, default settings are used. + */ + public ConnectionSettingsBuilder serialization(KurrentDBClientSerializationSettings serializationSettings) { + this._serializationSettings = serializationSettings; + return this; + } + void parseGossipSeed(String host) { String[] hostParts = host.split(":"); diff --git a/src/main/java/io/kurrent/dbclient/EventData.java b/src/main/java/io/kurrent/dbclient/EventData.java index ee94fe96..b99f153d 100644 --- a/src/main/java/io/kurrent/dbclient/EventData.java +++ b/src/main/java/io/kurrent/dbclient/EventData.java @@ -96,5 +96,9 @@ public static EventDataBuilder builderAsBinary(String eventType, byte[] eventDat public static EventDataBuilder builderAsBinary(UUID eventId, String eventType, byte[] eventData) { return EventDataBuilder.binary(eventId, eventType, eventData); } + + public MessageData toMessageData() { + return new MessageData(eventType, eventData, userMetadata, eventId, contentType); + } } diff --git a/src/main/java/io/kurrent/dbclient/GrpcClient.java b/src/main/java/io/kurrent/dbclient/GrpcClient.java index 0cce2e00..13df1448 100644 --- a/src/main/java/io/kurrent/dbclient/GrpcClient.java +++ b/src/main/java/io/kurrent/dbclient/GrpcClient.java @@ -3,6 +3,9 @@ import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.kurrent.dbclient.serialization.MessageSerializer; +import io.kurrent.dbclient.serialization.MessageSerializerBuilder; +import io.kurrent.dbclient.serialization.OperationSerializationSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,11 +22,14 @@ class GrpcClient { private final AtomicBoolean closed; private final LinkedBlockingQueue queue; private final KurrentDBClientSettings settings; + private final MessageSerializer serializer; GrpcClient(KurrentDBClientSettings settings, AtomicBoolean closed, LinkedBlockingQueue queue) { this.settings = settings; this.closed = closed; this.queue = queue; + + this.serializer = MessageSerializerBuilder.get(settings.getSerializationSettings()); } public boolean isShutdown() { @@ -101,7 +107,7 @@ public CompletableFuture runWithArgs(Function shutdown() { public KurrentDBClientSettings getSettings() { return this.settings; } + + public MessageSerializer getSerializer(OperationSerializationSettings serializationSettings) { + return this.serializer.with(serializationSettings); + } } \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java index 22cbe548..2a3f11d9 100644 --- a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java +++ b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java @@ -2,13 +2,16 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.json.JsonMapper; +import io.kurrent.dbclient.serialization.MessageSerializationContext; +import io.kurrent.dbclient.serialization.MessageSerializer; import org.reactivestreams.Publisher; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static io.kurrent.dbclient.serialization.MessageTypeNamingResolutionContext.fromStreamName; /** * Represents EventStoreDB client for stream operations. A client instance maintains a two-way communication to EventStoreDB. @@ -28,10 +31,12 @@ public static KurrentDBClient create(KurrentDBClientSettings settings) { /** * Appends events to a given stream. + * + * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter. * @param streamName stream's name. - * @param events events to send. - * @see WriteResult + * @param events events to store. * @return a write result if successful. + * @see WriteResult */ public CompletableFuture appendToStream(String streamName, EventData... events) { return this.appendToStream(streamName, Arrays.stream(events).iterator()); @@ -39,10 +44,12 @@ public CompletableFuture appendToStream(String streamName, EventDat /** * Appends events to a given stream. + * + * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter. * @param streamName stream's name. - * @param events events to send. - * @see WriteResult + * @param events events to store. * @return a write result if successful. + * @see WriteResult */ public CompletableFuture appendToStream(String streamName, Iterator events) { return this.appendToStream(streamName, AppendToStreamOptions.get(), events); @@ -50,37 +57,249 @@ public CompletableFuture appendToStream(String streamName, Iterator /** * Appends events to a given stream. + * + * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter. * @param streamName stream's name. - * @param options append stream request's options. - * @param events events to send. - * @see WriteResult + * @param options append stream request's options. + * @param events events to store. * @return a write result if successful. + * @see WriteResult */ public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, EventData... events) { return this.appendToStream(streamName, options, Arrays.stream(events).iterator()); } - + /** * Appends events to a given stream. + * + * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter. * @param streamName stream's name. - * @param options append stream request's options. - * @param events events to send. - * @see WriteResult + * @param options append stream request's options. + * @param events events to store. * @return a write result if successful. + * @see WriteResult */ public CompletableFuture appendToStream(String streamName, AppendToStreamOptions options, Iterator events) { if (options == null) options = AppendToStreamOptions.get(); - return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute(); + Iterator messageData = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(events, Spliterator.ORDERED), false) + .map(EventData::toMessageData) + .iterator(); + + return new AppendToStream(this.getGrpcClient(), streamName, options.getStreamState(), messageData, options).execute(); + } + + /** + * Appends messages to a given stream. + * + * @param streamName stream's name. + * @param streamState expected stream's state in the database + * @param messages messages to store. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream( + String streamName, + StreamState streamState, + Object... messages + ) { + return this.appendToStream( + streamName, + streamState, + Arrays.stream(messages).collect(Collectors.toList()), + AppendToStreamOptions.get() + ); + } + + + /** + * Appends messages to a given stream. + * + * @param streamName stream's name. + * @param streamState expected stream's state in the database + * @param messages messages to store. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream( + String streamName, + StreamState streamState, + Message... messages + ) { + List toAppend = Arrays.stream(messages).collect(Collectors.toList()); + return this.appendToStream( + streamName, + streamState, + toAppend, + AppendToStreamOptions.get() + ); + } + + /** + * Appends messages to a given stream. + * + * @param streamName stream's name. + * @param streamState expected stream's state in the database + * @param messages messages to store. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream( + String streamName, + StreamState streamState, + Iterable messages + ) { + return this.appendToStream( + streamName, + streamState, + messages, + AppendToStreamOptions.get() + ); + } + + + /** + * Appends messages to a given stream. + * + * @param streamName stream's name. + * @param streamState expected stream's state in the database + * @param messages messages to store. + * @param options append stream request's options. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream( + String streamName, + StreamState streamState, + Object[] messages, + AppendToStreamOptions options + ) { + List toAppend = + Arrays.stream(messages) + .map(Message::from) + .collect(Collectors.toList()); + + return this.appendToStream( + streamName, + streamState, + toAppend, + options + ); + } + + /** + * Appends messages to a given stream. + * + * @param streamName stream's name. + * @param streamState expected stream's state in the database + * @param messages messages to store. + * @param options append stream request's options. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream( + String streamName, + StreamState streamState, + Iterable messages, + AppendToStreamOptions options + ) { + List toAppend = + StreamSupport.stream(messages.spliterator(), false) + .map(Message::from) + .collect(Collectors.toList()); + + return this.appendToStream( + streamName, + streamState, + toAppend, + options + ); + } + + /** + * Appends messages to a given stream. + * + * @param streamName stream's name. + * @param streamState expected stream's state in the database + * @param messages messages to store. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream( + String streamName, + StreamState streamState, + List messages + ) { + return this.appendToStream(streamName, streamState, messages, AppendToStreamOptions.get()); + } + + /** + * Appends messages to a given stream. + * + * @param streamName stream's name. + * @param streamState expected stream's state in the database. + * @param messages messages to store. + * @param options append stream request's options. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream( + String streamName, + StreamState streamState, + Message[] messages, + AppendToStreamOptions options + ) { + List toAppend = Arrays.stream(messages).collect(Collectors.toList()); + + return this.appendToStream( + streamName, + streamState, + toAppend, + options + ); + } + + /** + * Appends messages to a given stream. + * + * @param streamName stream's name. + * @param streamState expected stream's state in the database. + * @param messages messages to store. + * @param options append stream request's options. + * @return a write result if successful. + * @see WriteResult + */ + public CompletableFuture appendToStream( + String streamName, + StreamState streamState, + List messages, + AppendToStreamOptions options + ) { + if (options == null) + options = AppendToStreamOptions.get(); + + MessageSerializationContext serializationContext = + new MessageSerializationContext(fromStreamName(streamName)); + + MessageSerializer serializer = getGrpcClient() + .getSerializer(options.serializationSettings().orElse(null)); + + Iterator messageData = serializer + .serialize(messages, serializationContext) + .iterator(); + + return new AppendToStream(this.getGrpcClient(), streamName, streamState, messageData, options).execute(); } /** * Sets a stream's metadata. + * * @param streamName stream's name. - * @param metadata stream's metadata - * @see WriteResult + * @param metadata stream's metadata * @return a write result if successful. + * @see WriteResult */ public CompletableFuture setStreamMetadata(String streamName, StreamMetadata metadata) { return setStreamMetadata(streamName, null, metadata); @@ -88,11 +307,12 @@ public CompletableFuture setStreamMetadata(String streamName, Strea /** * Sets a stream's metadata. + * * @param streamName stream's name. - * @param options append stream request's options. - * @param metadata stream's metadata - * @see WriteResult + * @param options append stream request's options. + * @param metadata stream's metadata * @return a write result if successful. + * @see WriteResult */ public CompletableFuture setStreamMetadata(String streamName, AppendToStreamOptions options, StreamMetadata metadata) { JsonMapper mapper = new JsonMapper(); @@ -103,12 +323,13 @@ public CompletableFuture setStreamMetadata(String streamName, Appen } catch (JsonProcessingException e) { throw new RuntimeException(e); } - } + } /** * Reads events from a given stream. The reading can be done forwards and backwards. + * * @param streamName stream's name. - * @param options read request's operations. + * @param options read request's operations. */ public CompletableFuture readStream(String streamName, ReadStreamOptions options) { return readEventsFromPublisher(readStreamReactive(streamName, options)); @@ -116,6 +337,7 @@ public CompletableFuture readStream(String streamName, ReadStreamOpt /** * Reads events from a given stream. The reading can be done forwards and backwards. + * * @param streamName stream's name. */ public Publisher readStreamReactive(String streamName) { @@ -125,8 +347,9 @@ public Publisher readStreamReactive(String streamName) { /** * Reads events from a given stream. The reading can be done forwards and backwards. + * * @param streamName stream's name. - * @param options read request's operations. + * @param options read request's operations. */ public Publisher readStreamReactive(String streamName, ReadStreamOptions options) { if (options == null) @@ -137,6 +360,7 @@ public Publisher readStreamReactive(String streamName, ReadStreamOp /** * Reads stream's metadata. + * * @param streamName stream's name. * @see StreamMetadata */ @@ -146,8 +370,9 @@ public CompletableFuture getStreamMetadata(String streamName) { /** * Reads stream's metadata. + * * @param streamName stream's name. - * @param options read request's operations. + * @param options read request's operations. * @see StreamMetadata */ public CompletableFuture getStreamMetadata(String streamName, ReadStreamOptions options) { @@ -183,6 +408,7 @@ public CompletableFuture readAll() { /** * Reads events from the $all stream. The reading can be done forwards and backwards. + * * @param options options of the read $all request. */ public CompletableFuture readAll(ReadAllOptions options) { @@ -195,6 +421,7 @@ public Publisher readAllReactive() { /** * Reads events from the $all stream. The reading can be done forwards and backwards. + * * @param options options of the read $all request. */ public Publisher readAllReactive(ReadAllOptions options) { @@ -210,8 +437,9 @@ public Publisher readAllReactive(ReadAllOptions options) { * event from the starting point onward. If events already exist, the handler will be called for each event one by * one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event * appears. + * * @param streamName stream's name. - * @param listener consumes a subscription's events. + * @param listener consumes a subscription's events. * @return a subscription handle. */ public CompletableFuture subscribeToStream(String streamName, SubscriptionListener listener) { @@ -224,9 +452,10 @@ public CompletableFuture subscribeToStream(String streamName, Subs * event from the starting point onward. If events already exist, the handler will be called for each event one by * one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event * appears. + * * @param streamName stream's name. - * @param listener consumes a subscription's events. - * @param options a subscription request's options. + * @param listener consumes a subscription's events. + * @param options a subscription request's options. * @return a subscription handle. */ public CompletableFuture subscribeToStream(String streamName, SubscriptionListener listener, SubscribeToStreamOptions options) { @@ -242,6 +471,7 @@ public CompletableFuture subscribeToStream(String streamName, Subs * event from the starting point onward. If events already exist, the handler will be called for each event one by * one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event * appears. + * * @param listener consumes a subscription's events. * @return a subscription handle. */ @@ -255,8 +485,9 @@ public CompletableFuture subscribeToAll(SubscriptionListener liste * event from the starting point onward. If events already exist, the handler will be called for each event one by * one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event * appears. + * * @param listener consumes a subscription's events. - * @param options subscription to $all request's options. + * @param options subscription to $all request's options. * @return a subscription handle. */ public CompletableFuture subscribeToAll(SubscriptionListener listener, SubscribeToAllOptions options) { @@ -274,9 +505,10 @@ public CompletableFuture subscribeToAll(SubscriptionListener liste * deleting the stream, you are able to write to it again, continuing from where it left off. *

* Note: Deletion is reversible until the scavenging process runs. + * * @param streamName stream's name - * @see DeleteResult * @return if successful, delete result. + * @see DeleteResult */ public CompletableFuture deleteStream(String streamName) { return this.deleteStream(streamName, DeleteStreamOptions.get()); @@ -290,10 +522,11 @@ public CompletableFuture deleteStream(String streamName) { * deleting the stream, you are able to write to it again, continuing from where it left off. *

* Note: soft deletion is reversible until the scavenging process runs. + * * @param streamName stream's name - * @param options delete stream request's options. - * @see DeleteResult + * @param options delete stream request's options. * @return if successful, delete result. + * @see DeleteResult */ public CompletableFuture deleteStream(String streamName, DeleteStreamOptions options) { if (options == null) @@ -309,9 +542,10 @@ public CompletableFuture deleteStream(String streamName, DeleteStr * written to again. Tombstone events are written with the event's type '$streamDeleted'. When a tombstoned stream * is read, the read will return a StreamDeleted error. *

+ * * @param streamName a stream's name. - * @see DeleteResult * @return if successful, delete result. + * @see DeleteResult */ public CompletableFuture tombstoneStream(String streamName) { return this.tombstoneStream(streamName, DeleteStreamOptions.get()); @@ -324,10 +558,11 @@ public CompletableFuture tombstoneStream(String streamName) { * written to again. Tombstone events are written with the event's type '$streamDeleted'. When a tombstoned stream * is read, the read will return a StreamDeleted error. *

+ * * @param streamName a stream's name. - * @param options delete stream request's options. - * @see DeleteResult + * @param options delete stream request's options. * @return if successful, delete result. + * @see DeleteResult */ public CompletableFuture tombstoneStream(String streamName, DeleteStreamOptions options) { if (options == null) diff --git a/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java b/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java index c552c3e7..80b34ee0 100644 --- a/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java +++ b/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java @@ -1,5 +1,7 @@ package io.kurrent.dbclient; +import io.kurrent.dbclient.serialization.MessageSerializer; +import io.kurrent.dbclient.serialization.MessageSerializerBuilder; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/io/kurrent/dbclient/KurrentDBClientSettings.java b/src/main/java/io/kurrent/dbclient/KurrentDBClientSettings.java index 44580d66..567a0174 100644 --- a/src/main/java/io/kurrent/dbclient/KurrentDBClientSettings.java +++ b/src/main/java/io/kurrent/dbclient/KurrentDBClientSettings.java @@ -1,6 +1,7 @@ package io.kurrent.dbclient; import io.grpc.ClientInterceptor; +import io.kurrent.dbclient.serialization.KurrentDBClientSerializationSettings; import java.net.InetSocketAddress; import java.util.List; @@ -8,8 +9,6 @@ /** * Gathers all the settings related to a gRPC client with a KurrentDB database. - * EventStoreDBClientSettings} can only be created when parsing a connection string. - * * KurrentDBClientSettings supports a wide range of settings. If a setting is not mentioned in the connection * string, that setting default value is used. * @@ -41,6 +40,7 @@ public class KurrentDBClientSettings { private final List interceptors; private final String tlsCaFile; private final Set features; + private final KurrentDBClientSerializationSettings serializationSettings; /** * If the dns discovery is enabled. @@ -167,6 +167,12 @@ public String getTlsCaFile() { */ public Set getFeatures() { return features; } + /** + * Provides configuration options for messages serialization and deserialization in the KurrentDB client. + * If null, default settings are used. + */ + public KurrentDBClientSerializationSettings getSerializationSettings() { return serializationSettings; } + KurrentDBClientSettings( boolean dnsDiscover, int maxDiscoverAttempts, @@ -183,7 +189,8 @@ public String getTlsCaFile() { Long defaultDeadline, List interceptors, String tlsCaFile, - Set features + Set features, + KurrentDBClientSerializationSettings serializationSettings ) { this.dnsDiscover = dnsDiscover; this.maxDiscoverAttempts = maxDiscoverAttempts; @@ -201,6 +208,7 @@ public String getTlsCaFile() { this.interceptors = interceptors; this.tlsCaFile = tlsCaFile; this.features = features; + this.serializationSettings = serializationSettings; } /** diff --git a/src/main/java/io/kurrent/dbclient/Message.java b/src/main/java/io/kurrent/dbclient/Message.java new file mode 100644 index 00000000..0895e7b3 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/Message.java @@ -0,0 +1,150 @@ +package io.kurrent.dbclient; + +import java.util.Objects; +import java.util.UUID; + +/** + * Represents a message wrapper in the KurrentDB system, containing both domain data and optional metadata. + * Messages can represent events, commands, or other domain objects along with their associated metadata. + */ +public final class Message { + private final Object data; + private final Object metadata; + private final UUID messageId; + + /** + * Creates a new Message with the specified properties. + * + * @param data The message domain data. + * @param metadata Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information. + * @param messageId Unique identifier for this specific message instance. + */ + public Message(Object data, Object metadata, UUID messageId) { + this.data = data; + this.metadata = metadata; + this.messageId = messageId; + } + + /** + * Creates a new Message with the specified domain data and random ID, but without metadata. + * This factory method is a convenient shorthand when working with systems that don't require metadata. + * + * @param data The message domain data. + * @return A new immutable Message instance containing the provided data and ID with null metadata. + * + *

Example: + *

+     * Create a message with a specific ID
+     * UserRegistered userRegistered = new UserRegistered("123", "Alice");
+     * Message message = Message.from(userRegistered);
+     * 
+ */ + public static Message from(Object data) { + return from(data, null); + } + + /** + * Creates a new Message with the specified domain data and message ID, but without metadata. + * This factory method is a convenient shorthand when working with systems that don't require metadata. + * + * @param data The message domain data. + * @param messageId Unique identifier for this message instance. Must not be a nil UUID. + * @return A new immutable Message instance containing the provided data and ID with null metadata. + * + *

Example: + *

+     * Create a message with a specific ID
+     * UserRegistered userRegistered = new UserRegistered("123", "Alice");
+     * UUID messageId = UUID.randomUUID();
+     * Message message = Message.from(userRegistered, messageId);
+     * 
+ */ + public static Message from(Object data, UUID messageId) { + return from(data, null, messageId); + } + + /** + * Creates a new Message with the specified domain data and message ID and metadata. + * + * @param data The message domain data. + * @param metadata Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information. + * @param messageId Unique identifier for this specific message instance. If null, a random UUID will be generated. + * @return A new immutable Message instance with the specified properties. + * @throws IllegalArgumentException Thrown when messageId is explicitly set to a nil UUID, which is an invalid identifier. + * + *

Example: + *

+     * Create a message with data and metadata
+     * OrderPlaced orderPlaced = new OrderPlaced("ORD-123", 99.99);
+     * EventMetadata metadata = new EventMetadata(
+     *     "user-456", 
+     *     Instant.now(),
+     *     correlationId
+     * );
+     *
+     * // Let the system assign an ID automatically
+     * Message message = Message.from(orderPlaced, metadata);
+     *
+     * // Or specify a custom ID
+     * Message messageWithId = Message.from(orderPlaced, metadata, UUID.randomUUID());
+     * 
+ */ + public static Message from(Object data, Object metadata, UUID messageId) { + if (messageId != null && messageId.equals(new UUID(0, 0))) { + throw new IllegalArgumentException("Message ID cannot be a nil UUID."); + } + + return new Message(data, metadata, messageId != null ? messageId : UUID.randomUUID()); + } + + /** + * Gets the message domain data. + * + * @return The message domain data. + */ + public Object data() { + return data; + } + + /** + * Gets the message metadata. + * + * @return The message metadata, may be null. + */ + public Object metadata() { + return metadata; + } + + /** + * Gets the unique identifier for this message. + * + * @return The message ID. + */ + public UUID messageId() { + return messageId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Message message = (Message) o; + return Objects.equals(data, message.data) && + Objects.equals(metadata, message.metadata) && + Objects.equals(messageId, message.messageId); + } + + @Override + public int hashCode() { + return Objects.hash(data, metadata, messageId); + } + + @Override + public String toString() { + return "Message{" + + "data=" + data + + ", metadata=" + metadata + + ", messageId=" + messageId + + '}'; + } +} diff --git a/src/main/java/io/kurrent/dbclient/MessageData.java b/src/main/java/io/kurrent/dbclient/MessageData.java new file mode 100644 index 00000000..acd06a72 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/MessageData.java @@ -0,0 +1,106 @@ +package io.kurrent.dbclient; + +import java.util.UUID; + +/** + * Represents a message that will be sent to KurrentDB. + */ +public final class MessageData { + private final UUID messageId; + private final String messageType; + private final String contentType; + private final byte[] messageData; + private final byte[] messageMetadata; + + public MessageData(String messageType, byte[] messageData) { + this(messageType, messageData, null, UUID.randomUUID(), ContentType.JSON); + } + + public MessageData(String messageType, byte[] messageData, byte[] userMetadata) { + this(messageType, messageData, userMetadata, UUID.randomUUID(), ContentType.JSON); + } + + public MessageData(String messageType, byte[] messageData, byte[] userMetadata, UUID messageId, String contentType) { + this.messageId = messageId; + this.messageType = messageType; + this.contentType = contentType; + this.messageData = messageData; + this.messageMetadata = userMetadata; + } + + /** + * Returns message's unique identifier + */ + public UUID getMessageId() { + return messageId; + } + + /** + * Returns message's type. + */ + public String getMessageType() { + return messageType; + } + + /** + * Returns message's content's type + */ + public String getContentType() { + return contentType; + } + + /** + * Returns message's payload data + */ + public byte[] getMessageData() { + return messageData; + } + + /** + * Returns message's custom user metadata. + */ + public byte[] getMessageMetadata() { + return messageMetadata; + } + + /** + * Configures a message data builder to host a JSON payload. + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder builderAsJson(String messageType, byte[] messageData) { + return MessageDataBuilder.json(messageType, messageData); + } + + /** + * Configures a message data builder to host a JSON payload. + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder builderAsJson(String messageType, byte[] messageData, byte[] messageMetadata) { + return MessageDataBuilder.json(messageType, messageData, messageMetadata); + } + + /** + * Configures a message data builder to host a binary payload. + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder builderAsBinary(String messageType, byte[] messageData) { + return MessageDataBuilder.binary(messageType, messageData); + } + + /** + * Configures a message data builder to host a binary payload. + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder builderAsBinary(String messageType, byte[] messageData, byte[] messageMetadata) { + return MessageDataBuilder.binary(messageType, messageMetadata); + } +} diff --git a/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java b/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java new file mode 100644 index 00000000..7b0c7730 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java @@ -0,0 +1,153 @@ +package io.kurrent.dbclient; + +import java.util.UUID; + +/** + * Utility class to help building an MessageData. + */ +public class MessageDataBuilder { + private String messageType; + private byte[] messageData; + private byte[] messageMetadata; + private UUID messageId; + private String contentType; + + MessageDataBuilder() { + } + + /** + * Configures a message data builder to host a JSON payload. + * + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder json(String messageType, byte[] messageData) { + return json(messageType, messageData, null, null); + } + + /** + * Configures a message data builder to host a JSON payload. + * + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder json(String messageType, byte[] messageData, byte[] messageMetadata) { + return json(messageType, messageData, messageMetadata, null); + } + + /** + * Configures a message data builder to host a JSON payload. + * + * @param messageId message's id. + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder json(String messageType, byte[] messageData, byte[] messageMetadata, UUID messageId) { + MessageDataBuilder self = new MessageDataBuilder(); + + self.messageType = messageType; + self.messageData = messageData; + self.messageMetadata = messageMetadata; + self.messageId = messageId; + self.contentType = ContentType.JSON; + + return self; + } + + /** + * Configures a message data builder to host a binary payload. + * + * @param messageType message's type. + * @param messageData message's payload. + * @return a message data builder. + */ + public static MessageDataBuilder binary(String messageType, byte[] messageData) { + return binary(messageType, messageData, null, null); + } + + /** + * Configures a message data builder to host a binary payload. + * + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder binary(String messageType, byte[] messageData, byte[] messageMetadata) { + return binary(messageType, messageData, messageMetadata, null); + } + + /** + * Configures a message data builder to host a binary payload. + * + * @param messageId message's id. + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder binary(String messageType, byte[] messageData, byte[] messageMetadata, UUID messageId) { + MessageDataBuilder self = new MessageDataBuilder(); + + self.messageType = messageType; + self.messageData = messageData; + self.messageId = messageId; + self.messageMetadata = messageMetadata; + self.contentType = ContentType.BYTES; + + return self; + } + + + /** + * Configures a message data builder to host a binary payload. + * + * @param messageId message's id. + * @param messageType message's type. + * @param messageData message's payload. + * @param messageMetadata message's metadata payload. + * @return a message data builder. + */ + public static MessageDataBuilder with(String messageType, byte[] messageData, byte[] messageMetadata, UUID messageId, boolean isJson) { + MessageDataBuilder self = new MessageDataBuilder(); + + self.messageType = messageType; + self.messageData = messageData; + self.messageId = messageId; + self.messageMetadata = messageMetadata; + self.contentType = isJson ? ContentType.JSON : ContentType.BYTES; + + return self; + } + + /** + * Sets message's unique identifier. + */ + public MessageDataBuilder messageId(UUID messageId) { + this.messageId = messageId; + return this; + } + + /** + * Sets message's custom user metadata. + */ + public MessageDataBuilder messageMetadata(byte[] value) { + this.messageMetadata = value; + return this; + } + + /** + * Builds a message ready to be sent to KurrentDB. + * + * @see MessageData + */ + public MessageData build() { + UUID messageId = this.messageId == null ? UUID.randomUUID() : this.messageId; + return new MessageData(this.messageType, this.messageData, this.messageMetadata, messageId, this.contentType); + } +} diff --git a/src/main/java/io/kurrent/dbclient/OptionsBase.java b/src/main/java/io/kurrent/dbclient/OptionsBase.java index f7199a88..bb50b6d0 100644 --- a/src/main/java/io/kurrent/dbclient/OptionsBase.java +++ b/src/main/java/io/kurrent/dbclient/OptionsBase.java @@ -8,7 +8,7 @@ class OptionsBase { private final OperationKind kind; private UserCredentials credentials; private boolean requiresLeader; - private Map headers = new HashMap<>(); + private final Map headers = new HashMap<>(); protected OptionsBase() { this(OperationKind.Regular); diff --git a/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java b/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java new file mode 100644 index 00000000..eb9b790c --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java @@ -0,0 +1,29 @@ +package io.kurrent.dbclient; + +import java.util.Optional; + +import io.kurrent.dbclient.serialization.OperationSerializationSettings; + +class OptionsWithBackPressureAndSerialization extends OptionsWithBackPressure { + public OperationSerializationSettings serializationSettings; + + protected OptionsWithBackPressureAndSerialization(OperationKind kind) { + super(kind); + } + + /** + * Allows to customize or disable the automatic deserialization. + */ + public Optional serializationSettings() { + return Optional.ofNullable(serializationSettings); + } + + /** + * Customize or disable the automatic deserialization. + */ + @SuppressWarnings("unchecked") + public T serializationSettings(OperationSerializationSettings serializationSettings) { + this.serializationSettings = serializationSettings; + return (T)this; + } +} diff --git a/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java b/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java index 543126af..dc777fa5 100644 --- a/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java +++ b/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java @@ -1,6 +1,6 @@ package io.kurrent.dbclient; -class OptionsWithPositionAndResolveLinkTosBase extends OptionsWithBackPressure { +class OptionsWithPositionAndResolveLinkTosBase extends OptionsWithBackPressureAndSerialization { private StreamPosition position; protected OptionsWithPositionAndResolveLinkTosBase(OperationKind kind) { diff --git a/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java b/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java index 9b978972..5afd050a 100644 --- a/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java +++ b/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java @@ -1,6 +1,6 @@ package io.kurrent.dbclient; -class OptionsWithStartRevisionAndResolveLinkTosBase extends OptionsWithBackPressure { +class OptionsWithStartRevisionAndResolveLinkTosBase extends OptionsWithBackPressureAndSerialization { private StreamPosition startRevision; protected OptionsWithStartRevisionAndResolveLinkTosBase(OperationKind kind) { diff --git a/src/main/java/io/kurrent/dbclient/OptionsWithStreamStateBase.java b/src/main/java/io/kurrent/dbclient/OptionsWithStreamStateBase.java index 48eef358..8dcfcd5f 100644 --- a/src/main/java/io/kurrent/dbclient/OptionsWithStreamStateBase.java +++ b/src/main/java/io/kurrent/dbclient/OptionsWithStreamStateBase.java @@ -1,5 +1,9 @@ package io.kurrent.dbclient; +/** + * @deprecated This class may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter + */ +@Deprecated class OptionsWithStreamStateBase extends OptionsBase { private StreamState streamState; @@ -7,17 +11,22 @@ protected OptionsWithStreamStateBase() { this.streamState = StreamState.any(); } + /** + * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter. + */ + @Deprecated StreamState getStreamState() { return this.streamState; } /** * Asks the server to check that the stream receiving is at the expected state. - + * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter. * @param state - expected revision. * @return updated options. */ @SuppressWarnings("unchecked") + @Deprecated public T streamState(StreamState state) { this.streamState = state; return (T) this; @@ -26,10 +35,11 @@ public T streamState(StreamState state) { /** * Asks the server to check that the stream receiving is at the given expected revision. - + * @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter. * @param revision - expected revision. * @return updated options. */ + @Deprecated public T streamRevision(long revision) { return streamState(StreamState.streamRevision(revision)); } diff --git a/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java b/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java index da6408c8..5f3b182b 100644 --- a/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java +++ b/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java @@ -7,6 +7,7 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; +import io.kurrent.dbclient.serialization.MessageSerializer; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,11 +25,16 @@ class ReadResponseObserver implements ClientResponseObserver requestStream; private int outstandingRequests; private WorkItemArgs args; - - - public ReadResponseObserver(OptionsWithBackPressure options, StreamConsumer consumer) { + private final MessageSerializer messageSerializer; + + public ReadResponseObserver( + OptionsWithBackPressure options, + StreamConsumer consumer, + MessageSerializer messageSerializer + ) { this.options = options; this.consumer = consumer; + this.messageSerializer = messageSerializer; } public Subscription getSubscription() { @@ -105,7 +111,7 @@ public void onNext(StreamsOuterClass.ReadResp value) { } if (value.hasEvent()) - consumer.onEvent(ResolvedEvent.fromWire(value.getEvent())); + consumer.onEvent(ResolvedEvent.fromWire(value.getEvent(), messageSerializer)); else if (value.hasConfirmation()) consumer.onSubscriptionConfirmation(value.getConfirmation().getSubscriptionId()); else if (value.hasCheckpoint()) { diff --git a/src/main/java/io/kurrent/dbclient/ResolvedEvent.java b/src/main/java/io/kurrent/dbclient/ResolvedEvent.java index 8f5a8e3d..652f404b 100644 --- a/src/main/java/io/kurrent/dbclient/ResolvedEvent.java +++ b/src/main/java/io/kurrent/dbclient/ResolvedEvent.java @@ -2,10 +2,10 @@ import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent; import io.kurrent.dbclient.proto.streams.StreamsOuterClass; +import io.kurrent.dbclient.serialization.MessageSerializer; import java.util.Objects; import java.util.Optional; -import java.util.StringJoiner; /** * Represents an event with a potential link. @@ -13,12 +13,17 @@ public class ResolvedEvent { private final RecordedEvent event; private final RecordedEvent link; - private final Position position; + private final Message message; public ResolvedEvent(RecordedEvent event, RecordedEvent link, Position position) { + this(event, link, null, position); + } + + public ResolvedEvent(RecordedEvent event, RecordedEvent link, Message message, Position position) { this.event = event; this.link = link; + this.message = message; this.position = position; } @@ -44,8 +49,26 @@ public RecordedEvent getOriginalEvent() { return this.link != null ? this.link : this.event; } + + /** + * Returns the deserialized message + * It will be provided or equal to null, depending on the automatic deserialization settings you choose. + * If it's null, you can use OriginalEvent to deserialize it manually. + */ + public Optional getMessage() { + return Optional.ofNullable(message); + } + + /** + * Returns the deserialized message data. + */ + public Optional getDeserializedData() { + return getMessage().map(Message::data); + } + /** * Returns the transaction log position of the resolved event. + * * @see Position */ public Optional getPosition() { @@ -65,21 +88,41 @@ public int hashCode() { return Objects.hash(event, link); } - static ResolvedEvent fromWire(StreamsOuterClass.ReadResp.ReadEvent wireEvent) { + static ResolvedEvent fromWire( + StreamsOuterClass.ReadResp.ReadEvent wireEvent, + MessageSerializer messageSerializer + ) { RecordedEvent event = wireEvent.hasEvent() ? RecordedEvent.fromWire(wireEvent.getEvent()) : null; RecordedEvent link = wireEvent.hasLink() ? RecordedEvent.fromWire(wireEvent.getLink()) : null; Position position = wireEvent.hasNoPosition() ? null : new Position(wireEvent.getCommitPosition(), wireEvent.getCommitPosition()); - return new ResolvedEvent(event, link, position); + return ResolvedEvent.from(event, link, position, messageSerializer); } - static ResolvedEvent fromWire(Persistent.ReadResp.ReadEvent wireEvent) { + static ResolvedEvent fromWire( + Persistent.ReadResp.ReadEvent wireEvent, + MessageSerializer messageSerializer + ) { RecordedEvent event = wireEvent.hasEvent() ? RecordedEvent.fromWire(wireEvent.getEvent()) : null; RecordedEvent link = wireEvent.hasLink() ? RecordedEvent.fromWire(wireEvent.getLink()) : null; Position position = wireEvent.hasNoPosition() ? null : new Position(wireEvent.getCommitPosition(), wireEvent.getCommitPosition()); - return new ResolvedEvent(event, link, position); + return ResolvedEvent.from(event, link, position, messageSerializer); + } + + static ResolvedEvent from( + RecordedEvent event, + RecordedEvent link, + Position position, + MessageSerializer messageSerializer + ) { + RecordedEvent originalEvent = link != null ? link : event; + Optional message = messageSerializer.tryDeserialize(originalEvent); + + return message + .map(value -> new ResolvedEvent(event, link, value, position)) + .orElseGet(() -> new ResolvedEvent(event, link, position)); } @Override diff --git a/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionOptions.java b/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionOptions.java index 70cfa70a..9d61147e 100644 --- a/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionOptions.java +++ b/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionOptions.java @@ -1,9 +1,14 @@ package io.kurrent.dbclient; +import io.kurrent.dbclient.serialization.OperationSerializationSettings; + +import java.util.Optional; + /** * Options of the subscribe persistent subscription request. */ public class SubscribePersistentSubscriptionOptions extends OptionsBase { + public OperationSerializationSettings serializationSettings; private int bufferSize; private SubscribePersistentSubscriptionOptions() { @@ -29,4 +34,20 @@ public SubscribePersistentSubscriptionOptions bufferSize(int value) { bufferSize = value; return this; } + + + /** + * Allows to customize or disable the automatic deserialization. + */ + public Optional serializationSettings() { + return Optional.ofNullable(serializationSettings); + } + + /** + * Customize or disable the automatic deserialization. + */ + public SubscribePersistentSubscriptionOptions serializationSettings(OperationSerializationSettings serializationSettings) { + this.serializationSettings = serializationSettings; + return this; + } } diff --git a/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToAll.java b/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToAll.java index ba1abbb4..e7761661 100644 --- a/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToAll.java +++ b/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToAll.java @@ -7,7 +7,7 @@ class SubscribePersistentSubscriptionToAll extends AbstractSubscribePersistentSu public SubscribePersistentSubscriptionToAll(GrpcClient connection, String group, SubscribePersistentSubscriptionOptions options, PersistentSubscriptionListener listener) { - super(connection, group, options, listener); + super(connection, group, options, listener, connection.getSerializer(options.serializationSettings().orElse(null))); } @Override diff --git a/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToStream.java b/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToStream.java index 7be619ca..547494d9 100644 --- a/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToStream.java +++ b/src/main/java/io/kurrent/dbclient/SubscribePersistentSubscriptionToStream.java @@ -10,7 +10,7 @@ class SubscribePersistentSubscriptionToStream extends AbstractSubscribePersisten public SubscribePersistentSubscriptionToStream(GrpcClient connection, String stream, String group, SubscribePersistentSubscriptionOptions options, PersistentSubscriptionListener listener) { - super(connection, group, options, listener); + super(connection, group, options, listener, connection.getSerializer(options.serializationSettings().orElse(null))); this.stream = stream; } diff --git a/src/main/java/io/kurrent/dbclient/serialization/AutomaticDeserialization.java b/src/main/java/io/kurrent/dbclient/serialization/AutomaticDeserialization.java new file mode 100644 index 00000000..715b26cb --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/AutomaticDeserialization.java @@ -0,0 +1,31 @@ +package io.kurrent.dbclient.serialization; + +/** + * Controls whether the KurrentDB client should automatically deserialize message payloads + * into their corresponding Java types based on the configured type mappings. + */ +public enum AutomaticDeserialization { + /** + * Disables automatic deserialization. Messages will be returned in their raw serialized form, + * requiring manual deserialization by the application. Use this when you need direct access to the raw data + * or when working with messages that don't have registered type mappings. + */ + DISABLED(0), + + /** + * Enables automatic deserialization. The client will attempt to convert messages into their appropriate + * Java types using the configured serializers and type mappings. This simplifies working with strongly-typed + * domain messages but requires proper type registration. + */ + ENABLED(1); + + private final int value; + + AutomaticDeserialization(int value) { + this.value = value; + } + + public int getValue() { + return value; + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/ClassProvider.java b/src/main/java/io/kurrent/dbclient/serialization/ClassProvider.java new file mode 100644 index 00000000..54629982 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/ClassProvider.java @@ -0,0 +1,21 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Optional; + +public class ClassProvider { + public static Optional> getClassByName(String fullName) { + try { + return Optional.of(Class.forName(fullName)); + } catch (ClassNotFoundException e) { + ClassLoader contextLoader = Thread.currentThread().getContextClassLoader(); + if (contextLoader == null) + return Optional.empty(); + + try { + return Optional.ofNullable(contextLoader.loadClass(fullName)); + } catch (ClassNotFoundException ignored) { + return Optional.empty(); + } + } + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/ContentType.java b/src/main/java/io/kurrent/dbclient/serialization/ContentType.java new file mode 100644 index 00000000..617902bb --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/ContentType.java @@ -0,0 +1,16 @@ +package io.kurrent.dbclient.serialization; + +public enum ContentType { + JSON(1), + // PROTBUF(2), + // AVRO(3), + BYTES(4); + + private final int value; + + ContentType(final int newValue) { + value = newValue; + } + + public int getValue() { return value; } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/serialization/DefaultMessageTypeNamingStrategy.java b/src/main/java/io/kurrent/dbclient/serialization/DefaultMessageTypeNamingStrategy.java new file mode 100644 index 00000000..157c1778 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/DefaultMessageTypeNamingStrategy.java @@ -0,0 +1,41 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Optional; + +/** + * Default implementation of MessageTypeNamingStrategy. + */ +public class DefaultMessageTypeNamingStrategy implements MessageTypeNamingStrategy { + private final Class defaultMetadataType; + + /** + * Creates a new strategy with the specified default metadata type. + * + * @param defaultMetadataType The default metadata type + */ + public DefaultMessageTypeNamingStrategy(Class defaultMetadataType) { + this.defaultMetadataType = defaultMetadataType != null ? defaultMetadataType : TracingMetadata.class; + } + + @Override + public String resolveTypeName(Class messageType, MessageTypeNamingResolutionContext resolutionContext) { + return resolutionContext.getCategoryName() + "-" + messageType.getName(); + } + + @Override + public Optional> tryResolveJavaClass(String messageTypeName) { + int categorySeparatorIndex = messageTypeName.indexOf('-'); + + if (categorySeparatorIndex == -1 || categorySeparatorIndex == messageTypeName.length() - 1) { + return Optional.empty(); + } + + String clrTypeName = messageTypeName.substring(categorySeparatorIndex + 1); + return ClassProvider.getClassByName(clrTypeName); + } + + @Override + public Optional> tryResolveMetadataJavaClass(String messageTypeName) { + return Optional.of(defaultMetadataType); + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java b/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java new file mode 100644 index 00000000..d6c73875 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/JacksonSerializer.java @@ -0,0 +1,71 @@ +package io.kurrent.dbclient.serialization; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Optional; + +public class JacksonSerializer implements Serializer { + public static class Settings { + public static final JsonMapper.Builder defaultBuilder = JsonMapper.builder() + .addModule(new JavaTimeModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + .propertyNamingStrategy(PropertyNamingStrategies.LOWER_CAMEL_CASE) + .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) + .serializationInclusion(JsonInclude.Include.NON_NULL) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + private JsonMapper.Builder jsonMapperBuilder = defaultBuilder; + + public JsonMapper.Builder jsonMapperBuilder() { + return jsonMapperBuilder; + } + + public void jsonMapperBuilder(JsonMapper.Builder jsonMapperBuilder) { + this.jsonMapperBuilder = jsonMapperBuilder; + } + } + + private static final Logger logger = LoggerFactory.getLogger(JacksonSerializer.class); + + private final JsonMapper jsonMapper; + + public JacksonSerializer() { + this(new Settings()); + } + + public JacksonSerializer(Settings settings) { + jsonMapper = settings.jsonMapperBuilder().build(); + } + + @Override + public byte[] serialize(Object value) { + try { + return jsonMapper.writeValueAsBytes(value); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + public Optional deserialize(Class eventClass, byte[] data) { + try { + MessageType result = jsonMapper.readValue(data, eventClass); + + return Optional.ofNullable(result); + } catch (IOException e) { + logger.warn("Error deserializing event {}", eventClass.getName(), e); + return Optional.empty(); + } + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java b/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java new file mode 100644 index 00000000..599cd70f --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/KurrentDBClientSerializationSettings.java @@ -0,0 +1,340 @@ +package io.kurrent.dbclient.serialization; + +import com.fasterxml.jackson.databind.json.JsonMapper; + +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Provides configuration options for messages serialization and deserialization in the KurrentDB client. + */ +public class KurrentDBClientSerializationSettings implements Cloneable { + /** + * The serializer responsible for handling JSON-formatted data. This serializer is used both for + * serializing outgoing JSON messages and deserializing incoming JSON messages. If not specified, + * a default System.Text.Json serializer will be used with standard settings. + *

+ * That also allows you to bring your custom JSON serializer implementation (e.g. JSON.NET) + */ + private Serializer jsonSerializer; + + + /** + * The serializer responsible for handling binary data formats. This is used when working with + * binary-encoded messages rather than text-based formats (e.g. Protobuf or Avro). Required when storing + * or retrieving content with "application/octet-stream" content type + */ + private Serializer bytesSerializer; + + /** + * Determines which serialization format (JSON or binary) is used by default when writing messages + * where the content type isn't explicitly specified. The default content type is "application/json" + */ + private ContentType defaultContentType = ContentType.JSON; + + /** + * Defines the custom strategy used to map between the type name stored in messages and Java type names. + * If not provided the default {@link io.kurrent.dbclient.serialization.DefaultMessageTypeNamingStrategy} will be used. + * It resolves the class name to the format: "{stream category name}-{Class Message Type}". + * You can provide your own implementation of {@link io.kurrent.dbclient.serialization.MessageTypeNamingStrategy} + * and register it here to override the default behavior + */ + private MessageTypeNamingStrategy messageTypeNamingStrategy; + + /** + * Allows to register mapping of Java message types to their corresponding message type names used in serialized messages. + */ + private Map, String> messageTypeMap = new HashMap<>(); + + /** + * Registers Java message types that can be appended to the specific stream category. + * Types will have message type names resolved based on the used {@link io.kurrent.dbclient.serialization.MessageTypeNamingStrategy} + */ + private Map[]> categoryMessageTypesMap = new HashMap<>(); + + /** + * Specifies the Java type that should be used when deserializing metadata for all events. + * When set, the client will attempt to deserialize event metadata into this type. + * If not provided, {@link io.kurrent.dbclient.serialization.TracingMetadata} will be used. + */ + private Class defaultMetadataType; + + /** + * Creates a new instance of serialization settings with either default values or custom configuration. + * This factory method is the recommended way to create serialization settings for the KurrentDB client. + * + * @param configure Optional callback to customize the settings. If null, default settings are used. + * @return A fully configured instance ready to be used with the KurrentDB client. + *

+     * {@code
+     * KurrentDBClientSerializationSettings settings = KurrentDBClientSerializationSettings.get(options -> {
+     *     options.registerMessageType(UserRegistered.class, "user-registered");
+     *     options.registerMessageType(UserRoleAssigned.class, "user-role-assigned");
+     *     options.registerMessageTypeForCategory(UserRegistered.class, "user-registered");
+     * });
+     * }
+     * 
+ */ + public static KurrentDBClientSerializationSettings get( + Consumer configure + ) { + KurrentDBClientSerializationSettings settings = get(); + + configure.accept(settings); + + return settings; + } + + /** + * Creates a new instance of serialization settings with either default values. + * This factory method is the recommended way to create serialization settings for the KurrentDB client. + * + * @return A fully configured default instance ready to be used with the KurrentDB client. + */ + public static KurrentDBClientSerializationSettings get() { + return new KurrentDBClientSerializationSettings(); + } + + /** + * Configures the JSON serializer using custom options while inheriting from the default System.Text.Json settings. + * This allows fine-tuning serialization behavior such as case sensitivity, property naming, etc. + * + * @param configure A function that receives the default options and returns modified options. + * @return The current instance for method chaining. + * @example + *
+     * {@code
+     * settings.useJsonSettings(builder -> 
+     *     builder.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true)
+     *             .propertyNamingStrategy(PropertyNamingStrategies.KEBAB_CASE)
+     * );
+     * }
+     * 
+ */ + public KurrentDBClientSerializationSettings useJsonSettings( + Function configure + ) { + JacksonSerializer.Settings settings = new JacksonSerializer.Settings(); + settings.jsonMapperBuilder(configure.apply(settings.jsonMapperBuilder())); + + jsonSerializer = new JacksonSerializer(settings); + + return this; + } + + /** + * Sets a custom JSON serializer implementation. + * That also allows you to bring your custom JSON serializer implementation (e.g. Jackson) + * + * @param serializer The serializer to use for JSON content. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useJsonSerializer(Serializer serializer) { + jsonSerializer = serializer; + + return this; + } + + /** + * Sets a custom binary serializer implementation. + * That also allows you to bring your custom binary serializer implementation (e.g. Protobuf or Avro) + * + * @param serializer The serializer to use for binary content. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useBytesSerializer(Serializer serializer) { + bytesSerializer = serializer; + + return this; + } + + /** + * Configures a custom message type naming strategy. + * + * @param The type of naming strategy to use. + * @param strategyClass The class of the naming strategy to instantiate. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useMessageTypeNamingStrategy( + Class strategyClass + ) { + try { + return useMessageTypeNamingStrategy(strategyClass.getDeclaredConstructor().newInstance()); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new RuntimeException("Failed to instantiate message type naming strategy", e); + } + } + + /** + * Configures a custom message type naming strategy. + * + * @param messageTypeNamingStrategy The naming strategy instance to use. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useMessageTypeNamingStrategy( + MessageTypeNamingStrategy messageTypeNamingStrategy + ) { + this.messageTypeNamingStrategy = messageTypeNamingStrategy; + + return this; + } + + /** + * Associates a message type with a specific stream category to enable automatic deserialization. + * In event sourcing, streams are often prefixed with a category (e.g., "user-123", "order-456"). + * This method tells the client which message types can appear in streams of a given category. + * + * @param The event or message type that can appear in the category's streams. + * @param categoryName The category prefix (e.g., "user", "order", "account"). + * @param clazz The class representing the event type. + * @return The current instance for method chaining. + * @example + *
+     * {@code
+     * // Register event types that can appear in user streams
+     * settings.registerMessageTypeForCategory(UserCreated.class, "user")
+     *        .registerMessageTypeForCategory(UserUpdated.class, "user")
+     *        .registerMessageTypeForCategory(UserDeleted.class, "user");
+     * }
+     * 
+ */ + public KurrentDBClientSerializationSettings registerMessageTypeForCategory(Class clazz, String categoryName) { + return registerMessageTypeForCategory(categoryName, clazz); + } + + /** + * Registers multiple message types for a specific stream category. + * + * @param categoryName The category name to register the types with. + * @param types The message types to register. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings registerMessageTypeForCategory(String categoryName, Class... types) { + if (categoryMessageTypesMap.containsKey(categoryName)) { + Class[] current = categoryMessageTypesMap.get(categoryName); + Class[] combined = Arrays.copyOf(current, current.length + types.length); + System.arraycopy(types, 0, combined, current.length, types.length); + categoryMessageTypesMap.put(categoryName, combined); + } else { + categoryMessageTypesMap.put(categoryName, types); + } + + return this; + } + + /** + * Maps a Java type to a specific message type name that will be stored in the message metadata. + * This mapping is used during automatic deserialization, as it tells the client which Java class + * to instantiate when encountering a message with a particular type name in the database. + * + * @param The Java type to register (typically a message class). + * @param clazz The class representing the message type. + * @param typeName The string identifier to use for this type in the database. + * @return The current instance for method chaining. + * @remarks The type name is often different from the Java type name to support versioning and evolution + * of your domain model without breaking existing stored messages. + * @example + *
+     * {@code
+     * // Register me types with their corresponding type identifiers
+     * settings.registerMessageType(UserCreated.class, "user-created-v1")
+     *        .registerMessageType(OrderPlaced.class, "order-placed-v2");
+     * }
+     * 
+ */ + public KurrentDBClientSerializationSettings registerMessageType(Class clazz, String typeName) { + messageTypeMap.put(clazz, typeName); + + return this; + } + + /** + * Registers multiple message types with their corresponding type names. + * + * @param typeMap Map mapping types to their type names. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings registerMessageTypes(Map, String> typeMap) { + messageTypeMap.putAll(typeMap); + + return this; + } + + /** + * Configures a strongly-typed metadata class for all messages in the system. + * This enables accessing metadata properties in a type-safe manner rather than using dynamic objects. + * + * @param The metadata class type containing properties matching the expected metadata fields. + * @param clazz The class representing the metadata type. + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useMetadataType(Class clazz) { + defaultMetadataType = clazz; + + return this; + } + + /** + * Configures which serialization format (JSON or binary) is used by default when writing messages + * where the content type isn't explicitly specified. The default content type is "application/json" + * + * @param contentType The serialization format content type + * @return The current instance for method chaining. + */ + public KurrentDBClientSerializationSettings useContentType(ContentType contentType) { + defaultContentType = contentType; + + return this; + } + + @Override + public KurrentDBClientSerializationSettings clone() { + try { + KurrentDBClientSerializationSettings clone = (KurrentDBClientSerializationSettings) super.clone(); + clone.bytesSerializer = this.bytesSerializer; + clone.jsonSerializer = this.jsonSerializer; + clone.defaultContentType = this.defaultContentType; + clone.messageTypeMap = new HashMap<>(this.messageTypeMap); + clone.categoryMessageTypesMap = new HashMap<>(this.categoryMessageTypesMap); + clone.messageTypeNamingStrategy = this.messageTypeNamingStrategy; + clone.defaultMetadataType = this.defaultMetadataType; + return clone; + } catch (CloneNotSupportedException e) { + throw new InternalError(e); + } + } + + // Getters + public Optional jsonSerializer() { + return Optional.ofNullable(jsonSerializer); + } + + public Optional bytesSerializer() { + return Optional.ofNullable(bytesSerializer); + } + + public ContentType defaultContentType() { + return defaultContentType; + } + + public Optional messageTypeNamingStrategy() { + return Optional.ofNullable(messageTypeNamingStrategy); + } + + public Map, String> messageTypeMap() { + return messageTypeMap; + } + + public Map[]> categoryMessageTypesMap() { + return categoryMessageTypesMap; + } + + public Class defaultMetadataType() { + return defaultMetadataType; + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java b/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java new file mode 100644 index 00000000..94f42358 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/MessageSerializationContext.java @@ -0,0 +1,13 @@ +package io.kurrent.dbclient.serialization; + +public class MessageSerializationContext { + private final MessageTypeNamingResolutionContext namingResolution; + + public MessageSerializationContext(MessageTypeNamingResolutionContext namingResolution) { + this.namingResolution = namingResolution; + } + + public MessageTypeNamingResolutionContext namingResolution() { + return namingResolution; + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java b/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java new file mode 100644 index 00000000..b20ca094 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java @@ -0,0 +1,172 @@ +package io.kurrent.dbclient.serialization; + +import io.kurrent.dbclient.Message; +import io.kurrent.dbclient.MessageData; +import io.kurrent.dbclient.RecordedEvent; +import static io.kurrent.dbclient.serialization.ContentTypeUtils.*; + +import java.util.function.Consumer; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +public interface MessageSerializer { + MessageSerializer with(OperationSerializationSettings serializationSettings); + + MessageData serialize(Message message, MessageSerializationContext context); + + List serialize(List messages, MessageSerializationContext serializationContext); + + public Optional tryDeserialize(RecordedEvent record); +} + +class MessageSerializerImpl implements MessageSerializer { + private final SchemaRegistry schemaRegistry; + private final KurrentDBClientSerializationSettings serializationSettings; + // TODO: Ensure that settings are aligned between clients + private final JacksonSerializer metadataSerializer; + private final String contentType; + + public MessageSerializerImpl(SchemaRegistry schemaRegistry, KurrentDBClientSerializationSettings serializationSettings) { + this.schemaRegistry = schemaRegistry; + this.serializationSettings = serializationSettings; + this.metadataSerializer = new JacksonSerializer(); + this.contentType = ContentTypeUtils.toMessageContentType(serializationSettings.defaultContentType()); + } + + public static MessageSerializer from(KurrentDBClientSerializationSettings settings) { + settings = settings != null ? settings: KurrentDBClientSerializationSettings.get(); + + return new MessageSerializerImpl(SchemaRegistry.from(settings), settings); + } + + @Override + public MessageSerializer with(OperationSerializationSettings operationSettings) { + if (operationSettings == null) { + return this; + } + + if (operationSettings.automaticDeserialization() == AutomaticDeserialization.DISABLED) { + return NullMessageSerializer.INSTANCE; + } + + Consumer configureSettings = operationSettings.configureSettings(); + if (configureSettings == null) { + return this; + } + + KurrentDBClientSerializationSettings settings = serializationSettings.clone(); + configureSettings.accept(settings); + + return new MessageSerializerImpl(SchemaRegistry.from(settings), settings); + } + + @Override + public MessageData serialize(Message message, MessageSerializationContext context) { + Object data = message.data(); + Object metadata = message.metadata(); + UUID messageId = message.messageId(); + + String messageType = schemaRegistry.resolveTypeName( + data.getClass(), + context.namingResolution() + ); + + byte[] serializedData = schemaRegistry + .getSerializer(serializationSettings.defaultContentType()) + .serialize(data); + + byte[] serializedMetadata = metadata != null + ? metadataSerializer.serialize(metadata) + : new byte[0]; + + return new MessageData( + messageType, + serializedData, + serializedMetadata, + messageId, + contentType + ); + } + + @Override + public List serialize(List messages, MessageSerializationContext serializationContext) { + return messages.stream().map(m -> serialize(m, serializationContext)).collect(Collectors.toList()); + } + + @Override + public Optional tryDeserialize(RecordedEvent record) { + Optional> messageClass = schemaRegistry.tryResolveDataJavaClass(record.getEventType()); + if (!messageClass.isPresent()) { + return Optional.empty(); + } + + Object data = schemaRegistry + .getSerializer(fromMessageContentType(record.getContentType())) + .deserialize(messageClass.get(), record.getEventData()); + + if (data == null) { + return Optional.empty(); + } + + Optional> metadataClass = schemaRegistry.tryResolveMetadataJavaClass(record.getEventType()); + + Object metadata = metadataClass.isPresent() && record.getUserMetadata().length > 0 + ? metadataSerializer.deserialize(metadataClass.get(), record.getUserMetadata()) + : null; + + return Optional.of(Message.from(data, metadata, record.getEventId())); + } +} + +class NullMessageSerializer implements MessageSerializer { + public static final NullMessageSerializer INSTANCE = new NullMessageSerializer(); + + @Override + public MessageData serialize(Message value, MessageSerializationContext context) { + throw new UnsupportedOperationException("Cannot serialize, automatic deserialization is disabled"); + } + + @Override + public List serialize(List messages, MessageSerializationContext serializationContext) { + throw new UnsupportedOperationException("Cannot serialize, automatic deserialization is disabled"); + } + + @Override + public Optional tryDeserialize(RecordedEvent eventRecord) { + return Optional.empty(); + } + + @Override + public MessageSerializer with(OperationSerializationSettings operationSettings) { + return this; + } +} + +class ContentTypeUtils { + public static String toMessageContentType(ContentType contentType) { + switch (contentType) { + case JSON: + return "application/json"; + case BYTES: + return "application/octet-stream"; + default: + throw new IllegalArgumentException("Unknown content type: " + contentType); + } + } + + public static ContentType fromMessageContentType(String contentTypeString) { + if (contentTypeString == null || contentTypeString.isEmpty()) { + return ContentType.JSON; + } + + if ("application/json".equals(contentTypeString)) { + return ContentType.JSON; + } else if ("application/octet-stream".equals(contentTypeString)) { + return ContentType.BYTES; + } else { + throw new IllegalArgumentException("Unknown content type: " + contentTypeString); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java b/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java new file mode 100644 index 00000000..58cc4895 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerBuilder.java @@ -0,0 +1,7 @@ +package io.kurrent.dbclient.serialization; + +public final class MessageSerializerBuilder { + public static MessageSerializer get(KurrentDBClientSerializationSettings settings) { + return MessageSerializerImpl.from(settings); + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingResolutionContext.java b/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingResolutionContext.java new file mode 100644 index 00000000..7342ddce --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingResolutionContext.java @@ -0,0 +1,21 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Arrays; + +public class MessageTypeNamingResolutionContext { + private final String categoryName; + + public MessageTypeNamingResolutionContext(String streamName) { + this.categoryName = streamName; + } + + public String getCategoryName() { + return categoryName; + } + + public static MessageTypeNamingResolutionContext fromStreamName(String streamName) { + return new MessageTypeNamingResolutionContext( + Arrays.stream(streamName.split("-")).findFirst().orElse("no_stream_category") + ); + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingStrategy.java b/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingStrategy.java new file mode 100644 index 00000000..b2f3744e --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/MessageTypeNamingStrategy.java @@ -0,0 +1,79 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Optional; + +/** + * Strategy for naming message types. + */ +public interface MessageTypeNamingStrategy { + /** + * Resolves a type name for the given message class. + * + * @param messageClass The message class to resolve a name for + * @param resolutionContext The context for resolution + * @return The resolved type name + */ + String resolveTypeName(Class messageClass, MessageTypeNamingResolutionContext resolutionContext); + + /** + * Tries to resolve a Java class from a message type name. + * + * @param messageTypeName Message type name to resolve + * @return Optional with resolved class, or empty if class wasn't found + */ + Optional> tryResolveJavaClass(String messageTypeName); + + /** + * Tries to resolve a Java metadata class from a message type name. + * + * @param messageTypeName Message type name to resolve + * @return Optional with resolved class, or empty if class wasn't found + */ + Optional> tryResolveMetadataJavaClass(String messageTypeName); +} + +/** + * Wrapper for message type naming strategies. + */ +class MessageTypeNamingStrategyWrapper implements MessageTypeNamingStrategy { + private final MessageTypeRegistry messageTypeRegistry; + private final MessageTypeNamingStrategy messageTypeNamingStrategy; + + /** + * Creates a new wrapper with the specified registry and strategy. + * + * @param messageTypeRegistry The message type registry + * @param messageTypeNamingStrategy The strategy to wrap + */ + public MessageTypeNamingStrategyWrapper( + MessageTypeRegistry messageTypeRegistry, + MessageTypeNamingStrategy messageTypeNamingStrategy) { + this.messageTypeRegistry = messageTypeRegistry; + this.messageTypeNamingStrategy = messageTypeNamingStrategy; + } + + @Override + public String resolveTypeName(Class messageType, MessageTypeNamingResolutionContext resolutionContext) { + return messageTypeRegistry.getOrAddTypeName( + messageType, + type -> messageTypeNamingStrategy.resolveTypeName(messageType, resolutionContext) + ); + } + + @Override + public Optional> tryResolveJavaClass(String messageTypeName) { + return messageTypeRegistry.getOrAddJavaClass( + messageTypeName, + name -> messageTypeNamingStrategy.tryResolveMetadataJavaClass(messageTypeName) + ); + } + + @Override + public Optional> tryResolveMetadataJavaClass(String messageTypeName) { + return messageTypeRegistry.getOrAddJavaClass( + messageTypeName + "-metadata", + name -> messageTypeNamingStrategy.tryResolveMetadataJavaClass(messageTypeName) + ); + } +} + diff --git a/src/main/java/io/kurrent/dbclient/serialization/MessageTypeRegistry.java b/src/main/java/io/kurrent/dbclient/serialization/MessageTypeRegistry.java new file mode 100644 index 00000000..4aa26b7b --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/MessageTypeRegistry.java @@ -0,0 +1,65 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +interface MessageTypeRegistry { + void register(Map, String> messageTypeMap); + + void register(Class messageType, String messageTypeName); + + Optional getTypeName(Class messageType); + + String getOrAddTypeName(Class javaClass, Function, String> getTypeName); + + Optional> getJavaClass(String messageTypeName); + + Optional> getOrAddJavaClass(String messageTypeName, Function>> getJavaClass); +} + +class MessageTypeRegistryImpl implements MessageTypeRegistry { + private final ConcurrentHashMap> classMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, String> typeNameMap = new ConcurrentHashMap<>(); + + + @Override + public void register(Map, String> messageTypeMap) { + for (Map.Entry, String> entry : messageTypeMap.entrySet()) { + register(entry.getKey(), entry.getValue()); + } + } + + @Override + public void register(Class messageType, String messageTypeName) { + classMap.put(messageTypeName, messageType); + typeNameMap.put(messageType, messageTypeName); + } + + @Override + public Optional getTypeName(Class messageType) { + return Optional.ofNullable(typeNameMap.getOrDefault(messageType, null)); + } + + @Override + public String getOrAddTypeName(Class javaClass, Function, String> getTypeName) { + return typeNameMap.computeIfAbsent( + javaClass, + c -> getTypeName.apply(javaClass) + ); + } + + @Override + public Optional> getJavaClass(String messageTypeName) { + return Optional.ofNullable(classMap.getOrDefault(messageTypeName, null)); + } + + @Override + public Optional> getOrAddJavaClass(String messageTypeName, Function>> getJavaClass) { + return Optional.ofNullable(classMap.computeIfAbsent( + messageTypeName, + c -> getJavaClass.apply(messageTypeName).orElse(null) + )); + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java b/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java new file mode 100644 index 00000000..c60cfe6e --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/OperationSerializationSettings.java @@ -0,0 +1,58 @@ +package io.kurrent.dbclient.serialization; + +import java.util.function.Consumer; + +/** + * Provides operation-specific serialization settings that override the global client configuration + * for individual operations like reading from or appending to streams. This allows fine-tuning + * serialization behavior on a per-operation basis without changing the client-wide settings. + */ +public class OperationSerializationSettings { + /** + * Controls whether messages should be automatically deserialized for this specific operation. + * When enabled (the default), messages will be converted to their appropriate Java types. + * When disabled, messages will be returned in their raw serialized form. + */ + private AutomaticDeserialization automaticDeserialization = AutomaticDeserialization.ENABLED; + + /** + * A callback that allows customizing serialization settings for this specific operation. + * This can be used to override type mappings, serializers, or other settings just for + * the scope of a single operation without affecting other operations. + */ + private Consumer configureSettings; + + /** + * A pre-configured settings instance that disables automatic deserialization. + * Use this when you need to access raw message data in its serialized form. + */ + public static final OperationSerializationSettings DISABLED = new OperationSerializationSettings(); + + static { + DISABLED.automaticDeserialization = AutomaticDeserialization.DISABLED; + } + + /** + * Creates operation-specific serialization settings with custom configuration while keeping + * automatic deserialization enabled. This allows operation-specific type mappings or + * serializer settings without changing the global client configuration. + * + * @param configure A callback to customize serialization settings for this operation. + * @return A configured instance of {@link OperationSerializationSettings} with enabled deserialization. + */ + public static OperationSerializationSettings configure(Consumer configure) { + OperationSerializationSettings settings = new OperationSerializationSettings(); + settings.automaticDeserialization = AutomaticDeserialization.ENABLED; + settings.configureSettings = configure; + return settings; + } + + // Getters + public AutomaticDeserialization automaticDeserialization() { + return automaticDeserialization; + } + + public Consumer configureSettings() { + return configureSettings; + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java b/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java new file mode 100644 index 00000000..f2fe5a1c --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/SchemaRegistry.java @@ -0,0 +1,89 @@ +package io.kurrent.dbclient.serialization; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class SchemaRegistry { + private final Map serializers; + private final MessageTypeNamingStrategy messageTypeNamingStrategy; + + public SchemaRegistry( + Map serializers, + MessageTypeNamingStrategy messageTypeNamingStrategy) { + this.serializers = serializers; + this.messageTypeNamingStrategy = messageTypeNamingStrategy; + } + + public Serializer getSerializer(ContentType schemaType) { + return serializers.get(schemaType); + } + + public String resolveTypeName(Class messageClass, MessageTypeNamingResolutionContext resolutionContext) { + return messageTypeNamingStrategy.resolveTypeName(messageClass, resolutionContext); + } + + public Optional> tryResolveDataJavaClass(String messageTypeName) { + return messageTypeNamingStrategy.tryResolveJavaClass(messageTypeName); + } + + public Optional> tryResolveMetadataJavaClass(String messageTypeName) { + return messageTypeNamingStrategy.tryResolveMetadataJavaClass(messageTypeName); + } + + public static SchemaRegistry from(KurrentDBClientSerializationSettings settings) { + MessageTypeNamingStrategy messageTypeNamingStrategy = + settings.messageTypeNamingStrategy() + .orElse(new DefaultMessageTypeNamingStrategy(settings.defaultMetadataType())); + + Map, String> categoriesTypeMap = resolveMessageTypeUsingNamingStrategy( + settings.categoryMessageTypesMap(), + messageTypeNamingStrategy + ); + + MessageTypeRegistry messageTypeRegistry = new MessageTypeRegistryImpl(); + messageTypeRegistry.register(settings.messageTypeMap()); + messageTypeRegistry.register(categoriesTypeMap); + + Map serializers = new HashMap<>(); + + serializers.put( + ContentType.JSON, + settings.jsonSerializer().orElse(new JacksonSerializer()) + ); + + serializers.put( + ContentType.BYTES, + settings.bytesSerializer().orElse(new JacksonSerializer()) + ); + + return new SchemaRegistry( + serializers, + new MessageTypeNamingStrategyWrapper( + messageTypeRegistry, + settings.messageTypeNamingStrategy() + .orElse(new DefaultMessageTypeNamingStrategy(settings.defaultMetadataType())) + ) + ); + } + + private static Map, String> resolveMessageTypeUsingNamingStrategy( + Map[]> categoryMessageTypesMap, + MessageTypeNamingStrategy messageTypeNamingStrategy + ) { + Map, String> result = new HashMap<>(); + + for (Map.Entry[]> entry : categoryMessageTypesMap.entrySet()) { + String category = entry.getKey(); + for (Class type : entry.getValue()) { + String typeName = messageTypeNamingStrategy.resolveTypeName( + type, + new MessageTypeNamingResolutionContext(category) + ); + result.put(type, typeName); + } + } + + return result; + } +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/Serializer.java b/src/main/java/io/kurrent/dbclient/serialization/Serializer.java new file mode 100644 index 00000000..7b87331f --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/Serializer.java @@ -0,0 +1,32 @@ +package io.kurrent.dbclient.serialization; + +import java.util.Optional; + +/// +/// Defines the core serialization capabilities required by the KurrentDB client. +/// Implementations of this interface handle the conversion between Java objects and their +/// binary representation for storage in and retrieval from the event store. +///
+/// The client ships default Jackson implementation, but custom implementations can be provided or other formats. +///
+public interface Serializer { + /// + /// Converts a Java object to its binary representation for storage in the event store. + /// + /// The object to serialize. This could be an event, command, or metadata object. + /// + /// A binary representation of the object that can be stored in KurrentDB. + /// + byte[] serialize(Object value); + + /// + /// Reconstructs a Java object from its binary representation retrieved from the event store. + /// + /// The binary data to deserialize, typically retrieved from a KurrentDB event. + /// The target Java type to deserialize the data into, determined from message type mappings. + /// + /// The deserialized object cast to the specified type, or null if the data cannot be deserialized. + /// The returned object will be an instance of the specified type or a compatible subtype. + /// + Optional deserialize(Class eventClass, byte[] data); +} diff --git a/src/main/java/io/kurrent/dbclient/serialization/TracingMetadata.java b/src/main/java/io/kurrent/dbclient/serialization/TracingMetadata.java new file mode 100644 index 00000000..7bee489c --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/serialization/TracingMetadata.java @@ -0,0 +1,5 @@ +package io.kurrent.dbclient.serialization; + +public class TracingMetadata { + +} diff --git a/src/test/java/io/kurrent/dbclient/StreamsTests.java b/src/test/java/io/kurrent/dbclient/StreamsTests.java index 571ef250..dafd3ebf 100644 --- a/src/test/java/io/kurrent/dbclient/StreamsTests.java +++ b/src/test/java/io/kurrent/dbclient/StreamsTests.java @@ -3,6 +3,7 @@ import io.kurrent.dbclient.streams.*; import io.kurrent.dbclient.streams.ReadStreamTests; +import io.kurrent.dbclient.streams.serialization.SerializationTests; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; @@ -16,6 +17,7 @@ public class StreamsTests implements DeadlineTests, InterceptorTests, MetadataTests, + SerializationTests, ClientLifecycleTests { static private Database database; static private Logger logger; diff --git a/src/test/java/io/kurrent/dbclient/samples/authentication/UserCertificate.java b/src/test/java/io/kurrent/dbclient/samples/authentication/UserCertificate.java index 023e6295..ede2d2e3 100644 --- a/src/test/java/io/kurrent/dbclient/samples/authentication/UserCertificate.java +++ b/src/test/java/io/kurrent/dbclient/samples/authentication/UserCertificate.java @@ -8,7 +8,7 @@ public class UserCertificate { private static void tracing() { // region client-with-user-certificates KurrentDBClientSettings settings = KurrentDBConnectionString - .parseOrThrow("kurrentdb://admin:changeit@{endpoint}?tls=true&userCertFile={pathToCaFile}&userKeyFile={pathToKeyFile}"); + .parseOrThrow("esdb://admin:changeit@{endpoint}?tls=true&userCertFile={pathToCaFile}&userKeyFile={pathToKeyFile}"); KurrentDBClient client = KurrentDBClient.create(settings); // endregion client-with-user-certificates } diff --git a/src/test/java/io/kurrent/dbclient/samples/reading_events/ReadingEvents.java b/src/test/java/io/kurrent/dbclient/samples/reading_events/ReadingEvents.java index 9098358d..02064b36 100644 --- a/src/test/java/io/kurrent/dbclient/samples/reading_events/ReadingEvents.java +++ b/src/test/java/io/kurrent/dbclient/samples/reading_events/ReadingEvents.java @@ -3,12 +3,9 @@ import io.kurrent.dbclient.*; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.reactivestreams.*; -import org.reactivestreams.Subscription; import java.util.concurrent.ExecutionException; -@SuppressWarnings("ALL") public class ReadingEvents { private static void readFromStream(KurrentDBClient client) throws ExecutionException, InterruptedException, JsonProcessingException { // region read-from-stream @@ -19,9 +16,6 @@ private static void readFromStream(KurrentDBClient client) throws ExecutionExcep ReadResult result = client.readStream("some-stream", options) .get(); - - // or using read reactive - Publisher publisher = client.readStreamReactive("some-stream", options); // endregion read-from-stream // region iterate-stream @@ -29,31 +23,6 @@ private static void readFromStream(KurrentDBClient client) throws ExecutionExcep RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent(); System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); } - - // or using read reactive - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - } - - @Override - public void onNext(ReadMessage readMessage) { - RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent(); - try { - System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onComplete() { - } - }); // endregion iterate-stream } @@ -67,8 +36,6 @@ private static void readFromStreamPosition(KurrentDBClient client) throws Execut ReadResult result = client.readStream("some-stream", options) .get(); - // or using read reactive - Publisher publisher = client.readStreamReactive("some-stream", options); // endregion read-from-stream-position // region iterate-stream @@ -76,31 +43,6 @@ private static void readFromStreamPosition(KurrentDBClient client) throws Execut RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent(); System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); } - - // or using read reactive - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - } - - @Override - public void onNext(ReadMessage readMessage) { - RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent(); - try { - System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onComplete() { - } - }); // endregion iterate-stream } @@ -113,9 +55,6 @@ private static void readStreamOverridingUserCredentials(KurrentDBClient client) ReadResult result = client.readStream("some-stream", options) .get(); - - // Or using reactive stream - Publisher publisher = client.readStreamReactive("some-stream", options); // endregion overriding-user-credentials } @@ -142,39 +81,6 @@ private static void readFromStreamPositionCheck(KurrentDBClient client) throws J RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent(); System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); } - - // or using read reactive - Publisher publisher = client.readStreamReactive("some-stream", options); - - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - } - - @Override - public void onNext(ReadMessage readMessage) { - RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent(); - try { - System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onError(Throwable throwable) { - Throwable innerException = throwable.getCause(); - - if (innerException instanceof StreamNotFoundException) { - return; - } - // Handle other errors - } - - @Override - public void onComplete() { - } - }); // endregion checking-for-stream-presence } @@ -191,33 +97,6 @@ private static void readFromStreamBackwards(KurrentDBClient client) throws JsonP RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent(); System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); } - - // or using read reactive - Publisher publisher = client.readStreamReactive("some-stream", options); - - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - } - - @Override - public void onNext(ReadMessage readMessage) { - RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent(); - try { - System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onComplete() { - } - }); // endregion reading-backwards } @@ -230,8 +109,6 @@ private static void readFromAllStream(KurrentDBClient client) throws JsonProcess ReadResult result = client.readAll(options) .get(); - // or using read reactive - Publisher publisher = client.readAllReactive(options); // endregion read-from-all-stream // region read-from-all-stream-iterate @@ -239,31 +116,6 @@ private static void readFromAllStream(KurrentDBClient client) throws JsonProcess RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent(); System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); } - - // or using read reactive - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - } - - @Override - public void onNext(ReadMessage readMessage) { - RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent(); - try { - System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onComplete() { - } - }); // endregion read-from-all-stream-iterate } @@ -276,9 +128,6 @@ private static void readAllOverridingUserCredentials(KurrentDBClient client) thr ReadResult result = client.readAll(options) .get(); - - // or using read reactive - Publisher publisher = client.readAllReactive(options); // endregion read-all-overriding-user-credentials } @@ -298,38 +147,6 @@ private static void ignoreSystemEvents(KurrentDBClient client) throws JsonProces } System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); } - - // or using read reactive - Publisher publisher = client.readAllReactive(options); - - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - } - - @Override - public void onNext(ReadMessage readMessage) { - RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent(); - - if (recordedEvent.getEventType().startsWith("$")) { - return; - } - - try { - System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onComplete() { - } - }); // endregion ignore-system-events } @@ -342,8 +159,6 @@ private static void readFromAllStreamBackwards(KurrentDBClient client) throws Js ReadResult result = client.readAll(options) .get(); - // or using read reactive - Publisher publisher = client.readAllReactive(options); // endregion read-from-all-stream-backwards // region read-from-all-stream-iterate @@ -351,31 +166,6 @@ private static void readFromAllStreamBackwards(KurrentDBClient client) throws Js RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent(); System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); } - - // or using read reactive - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - } - - @Override - public void onNext(ReadMessage readMessage) { - RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent(); - try { - System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onComplete() { - } - }); // endregion read-from-all-stream-iterate } @@ -394,36 +184,6 @@ private static void filteringOutSystemEvents(KurrentDBClient client) throws Json } System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); } - - // or using read reactive - Publisher publisher = client.readAllReactive(options); - - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - } - - @Override - public void onNext(ReadMessage readMessage) { - RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent(); - if (!recordedEvent.getEventType().startsWith("$")) { - return; - } - try { - System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData())); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onComplete() { - } - }); } private static void readFromStreamResolvingLinkTos(KurrentDBClient client) throws JsonProcessingException, ExecutionException, InterruptedException { @@ -436,9 +196,6 @@ private static void readFromStreamResolvingLinkTos(KurrentDBClient client) throw ReadResult result = client.readAll(options) .get(); - // or using read reactive - Publisher publisher = client.readAllReactive(options); - // endregion read-from-all-stream-resolving-link-Tos for (ResolvedEvent resolvedEvent : result.getEvents()) { RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent(); diff --git a/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java b/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java new file mode 100644 index 00000000..151773a4 --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java @@ -0,0 +1,97 @@ +package io.kurrent.dbclient.streams.serialization; + +import io.kurrent.dbclient.*; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.IntStream; +import java.util.stream.Collectors; + +public interface SerializationTests extends ConnectionAware { + @Test + default void testPlainJavaObjectsAreSerializedAndDeserializedUsingAutoSerialization() throws Throwable { + KurrentDBClient client = getDatabase().defaultClient(); + + // Given + final String streamName = generateName(); + final List expected = new ArrayList<>(generateMessages(2)); + + // When + WriteResult appendResult = client.appendToStream(streamName, StreamState.noStream(), expected) + .get(); + + Assertions.assertEquals(StreamState.streamRevision(1), appendResult.getNextExpectedRevision()); + + // Ensure appended event is readable + ReadResult result = client.readStream(streamName, ReadStreamOptions.get()) + .get(); + + Assertions.assertEquals(2, result.getEvents().size()); + } + + static List generateMessages(int count){ + return IntStream.range(0, count) + .mapToObj(x -> + new UserRegistered( + UUID.randomUUID(), + new Address(UUID.randomUUID().toString(), UUID.randomUUID().hashCode()) + ) + ) + .collect(Collectors.toList()); + } + + class Address{ + String street; + int number; + + public Address(String street, int number) { + this.street = street; + this.number = number; + } + + public String getStreet() { + return street; + } + + public void setStreet(String street) { + this.street = street; + } + + public int getNumber() { + return number; + } + + public void setNumber(int number) { + this.number = number; + } + } + + class UserRegistered{ + UUID userId; + Address address; + + public UserRegistered(UUID userId, Address address) { + this.userId = userId; + this.address = address; + } + + public UUID getUserId() { + return userId; + } + + public void setUserId(UUID userId) { + this.userId = userId; + } + + public Address getAddress() { + return address; + } + + public void setAddress(Address address) { + this.address = address; + } + } +}