From f0fa8ae77f5207afc88dbd96a0f13be99773e1ef Mon Sep 17 00:00:00 2001 From: YoEight Date: Tue, 10 Jun 2025 22:43:47 -0400 Subject: [PATCH 01/16] feat: add multi stream append support --- .../kurrent/dbclient/AppendStreamRequest.java | 27 +++ .../io/kurrent/dbclient/KurrentDBClient.java | 4 + .../dbclient/MultiAppendWriteResult.java | 4 + .../kurrent/dbclient/MultiStreamAppend.java | 87 +++++++++ src/main/proto/dynamic-value.proto | 42 +++++ src/main/proto/streams.v2.proto | 170 ++++++++++++++++++ 6 files changed, 334 insertions(+) create mode 100644 src/main/java/io/kurrent/dbclient/AppendStreamRequest.java create mode 100644 src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java create mode 100644 src/main/java/io/kurrent/dbclient/MultiStreamAppend.java create mode 100644 src/main/proto/dynamic-value.proto create mode 100644 src/main/proto/streams.v2.proto diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamRequest.java b/src/main/java/io/kurrent/dbclient/AppendStreamRequest.java new file mode 100644 index 00000000..d2c666b3 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/AppendStreamRequest.java @@ -0,0 +1,27 @@ +package io.kurrent.dbclient; + +import java.util.Iterator; + +public class AppendStreamRequest { + private final String streamName; + private final Iterator events; + private final StreamState expectedState; + + public AppendStreamRequest(String streamName, Iterator events, StreamState expectedState) { + this.streamName = streamName; + this.events = events; + this.expectedState = expectedState; + } + + public String getStreamName() { + return streamName; + } + + public Iterator getEvents() { + return events; + } + + public StreamState getExpectedState() { + return expectedState; + } +} diff --git a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java index 22cbe548..3cbea889 100644 --- a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java +++ b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java @@ -75,6 +75,10 @@ public CompletableFuture appendToStream(String streamName, AppendTo return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute(); } + public CompletableFuture multiAppend(AppendToStreamOptions options, Iterator requests) { + return new MultiStreamAppend(this.getGrpcClient(), requests).execute(); + } + /** * Sets a stream's metadata. * @param streamName stream's name. diff --git a/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java b/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java new file mode 100644 index 00000000..29dfeabb --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java @@ -0,0 +1,4 @@ +package io.kurrent.dbclient; + +public class MultiAppendWriteResult { +} diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java new file mode 100644 index 00000000..e3870e90 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -0,0 +1,87 @@ +package io.kurrent.dbclient; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import io.kurrentdb.v2.AppendRecord; +import io.kurrentdb.v2.MultiStreamAppendResponse; +import io.kurrentdb.v2.StreamsServiceGrpc; +import kurrentdb.protobuf.DynamicValueOuterClass; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +class MultiStreamAppend { + private final GrpcClient client; + private final Iterator requests; + + public MultiStreamAppend(GrpcClient client, Iterator requests) { + this.client = client; + this.requests = requests; + } + + public CompletableFuture execute() { + return this.client.run(this::append); + } + + private CompletableFuture append(ManagedChannel channel) { + CompletableFuture result = new CompletableFuture<>(); + StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(channel), this.client.getSettings(), new OptionsBase<>()); + StreamObserver requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse)); + + try { + while (this.requests.hasNext()) { + AppendStreamRequest request = this.requests.next(); + io.kurrentdb.v2.AppendStreamRequest.Builder builder = io.kurrentdb.v2.AppendStreamRequest.newBuilder() + .setStream(request.getStreamName()); + + while (request.getEvents().hasNext()) { + EventData event = request.getEvents().next(); + builder.addRecords(AppendRecord.newBuilder() + .setData(ByteString.copyFrom(event.getEventData())) + .setRecordId(event.getEventId().toString()) + .putProperties(SystemMetadataKeys.CONTENT_TYPE, DynamicValueOuterClass + .DynamicValue + .newBuilder() + .setStringValue(event.getContentType()) + .build()) + .putProperties(SystemMetadataKeys.TYPE, DynamicValueOuterClass + .DynamicValue + .newBuilder() + .setStringValue(event.getEventType()) + .build()) + .build()); + } + + requestStream.onNext(builder.build()); + } + + requestStream.onCompleted(); + } catch (StatusRuntimeException e) { + String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); + String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); + + if (leaderHost != null && leaderPort != null) { + NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); + requestStream.onError(reason); + result.completeExceptionally(reason); + } else { + requestStream.onError(e); + result.completeExceptionally(e); + } + } catch (RuntimeException e) { + requestStream.onError(e); + result.completeExceptionally(e); + } + + return result; + } + + public MultiAppendWriteResult onResponse(MultiStreamAppendResponse response) { + // Handle the response from the multi-stream append operation + // This could involve processing the results for each stream in the response + throw new RuntimeException("Not implemented"); + } +} diff --git a/src/main/proto/dynamic-value.proto b/src/main/proto/dynamic-value.proto new file mode 100644 index 00000000..04ce17e5 --- /dev/null +++ b/src/main/proto/dynamic-value.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "google/protobuf/struct.proto"; + +package kurrentdb.protobuf; +option csharp_namespace = "KurrentDB.Protobuf"; + +message DynamicValue { + oneof kind { + // Represents a null value. + google.protobuf.NullValue null_value = 1; + + // Represents a 32-bit signed integer value. + sint32 int32_value = 2; + + // Represents a 64-bit signed integer value. + sint64 int64_value = 3; + + // Represents a byte array value. + bytes bytes_value = 4; + + // Represents a 64-bit double-precision floating-point value. + double double_value = 5; + + // Represents a 32-bit single-precision floating-point value + float float_value = 6; + + // Represents a string value. + string string_value = 7; + + // Represents a boolean value. + bool boolean_value = 8; + + // Represents a timestamp value. + google.protobuf.Timestamp timestamp_value = 9; + + // Represents a duration value. + google.protobuf.Duration duration_value = 10; + } +} \ No newline at end of file diff --git a/src/main/proto/streams.v2.proto b/src/main/proto/streams.v2.proto new file mode 100644 index 00000000..6aac3647 --- /dev/null +++ b/src/main/proto/streams.v2.proto @@ -0,0 +1,170 @@ +syntax = "proto3"; + +// +// This protocol is UNSTABLE in the sense of being subject to change. +// + +package kurrentdb.protocol.v2; + +option csharp_namespace = "KurrentDB.Protocol.V2"; +option java_package = "io.kurrentdb.v2"; +option java_multiple_files = true; + +import "dynamic-value.proto"; + +service StreamsService { + // Executes an atomic operation to append records to multiple streams. + // This transactional method ensures that all appends either succeed + // completely, or are entirely rolled back, thereby maintaining strict data + // consistency across all involved streams. + rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse); + + // Streaming version of MultiStreamAppend that allows clients to send multiple + // append requests over a single connection. When the stream completes, all + // records are appended transactionally (all succeed or fail together). + // Provides improved efficiency for high-throughput scenarios while + // maintaining the same transactional guarantees. + rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse); +} + +// Record to be appended to a stream. +message AppendRecord { + // Universally Unique identifier for the record. Must be a guid. + // If not provided, the server will generate a new one. + optional string record_id = 1; + + // A collection of properties providing additional system information about the + // record. + map properties = 2; + + // The actual data payload of the record, stored as bytes. + bytes data = 3; +} + +// Constants that match the expected state of a stream during an +// append operation. It can be used to specify whether the stream should exist, +// not exist, or can be in any state. +enum ExpectedRevisionConstants { + // The stream should exist and have a single event. + EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; + + // It is not important whether the stream exists or not. + EXPECTED_REVISION_CONSTANTS_ANY = -2; + + // The stream should not exist. If it does, the append will fail. + EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; + + // The stream should exist + EXPECTED_REVISION_CONSTANTS_EXISTS = -4; +} + +// Represents the input for appending records to a specific stream. +message AppendStreamRequest { + // The name of the stream to append records to. + string stream = 1; + + // The records to append to the stream. + repeated AppendRecord records = 2; + + // The expected revision of the stream. If the stream's current revision does + // not match, the append will fail. + // The expected revision can also be one of the special values + // from ExpectedRevisionConstants. + // missing value means no expectation: same as EXPECTED_REVISION_CONSTANTS_ANY + optional sint64 expected_revision = 3; +} + +// Success represents the successful outcome of an append operation. +message AppendStreamSuccess { + // The name of the stream to which records were appended. + string stream = 1; + + // The position of the last appended record in the transaction. + int64 position = 2; + + // The revision of the stream after the append operation. + int64 stream_revision = 3; +} + +// Failure represents the detailed error information when an append operation fails. +message AppendStreamFailure { + // The name of the stream to which records failed to append. + string stream = 1; + + // The error details + oneof error { + // Failed because the actual stream revision didn't match the expected revision. + ErrorDetails.WrongExpectedRevision wrong_expected_revision = 2; + + // Failed because the client lacks sufficient permissions. + ErrorDetails.AccessDenied access_denied = 3; + + // Failed because the target stream has been deleted. + ErrorDetails.StreamDeleted stream_deleted = 4; + + ErrorDetails.TransactionMaxSizeExceeded transaction_max_size_exceeded = 5; + } +} + +// Represents the output of appending records to a specific stream. +message AppendStreamResponse { + // The result of the append operation. + oneof result { + // Success represents the successful outcome of an append operation. + AppendStreamSuccess success = 1; + + // Failure represents the details of a failed append operation. + AppendStreamFailure failure = 2; + } +} + +// MultiStreamAppendRequest represents a request to append records to multiple streams. +message MultiStreamAppendRequest { + // A list of AppendStreamInput messages, each representing a stream to which records should be appended. + repeated AppendStreamRequest input = 1; +} + +// Response from the MultiStreamAppend operation. +message MultiStreamAppendResponse { + oneof result { + // Success represents the successful outcome of a multi-stream append operation. + Success success = 1; + + // Failure represents the details of a failed multi-stream append operation. + Failure failure = 2; + } + + message Success { + repeated AppendStreamSuccess output = 1; + } + + message Failure { + repeated AppendStreamFailure output = 1; + } +} + +// ErrorDetails provides detailed information about specific error conditions. +message ErrorDetails { + // When the user does not have sufficient permissions to perform the operation. + message AccessDenied { + // The reason for access denial. + string reason = 1; + } + + // When the stream has been deleted. + message StreamDeleted { + } + + // When the expected revision of the stream does not match the actual revision. + message WrongExpectedRevision { + // The actual revision of the stream. + int64 stream_revision = 1; + } + + // When the transaction exceeds the maximum size allowed + // (it's bigger than the configured chunk size). + message TransactionMaxSizeExceeded { + // The maximum allowed size of the transaction. + int32 max_size = 1; + } +} \ No newline at end of file From aed7e4b87b79d8d704804a3408028a14981299ab Mon Sep 17 00:00:00 2001 From: YoEight Date: Wed, 11 Jun 2025 15:39:04 -0400 Subject: [PATCH 02/16] add more grpc parsing --- .../kurrent/dbclient/AppendStreamFailure.java | 36 +++++++++++++++++++ .../kurrent/dbclient/AppendStreamSuccess.java | 21 +++++++++++ .../MultiAppendStreamErrorVisitor.java | 8 +++++ .../dbclient/MultiAppendWriteResult.java | 18 ++++++++++ .../kurrent/dbclient/MultiStreamAppend.java | 19 ++++++++++ 5 files changed, 102 insertions(+) create mode 100644 src/main/java/io/kurrent/dbclient/AppendStreamFailure.java create mode 100644 src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java create mode 100644 src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java new file mode 100644 index 00000000..0d9d5728 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java @@ -0,0 +1,36 @@ +package io.kurrent.dbclient; + +public class AppendStreamFailure { + private final io.kurrentdb.v2.AppendStreamFailure inner; + + AppendStreamFailure(io.kurrentdb.v2.AppendStreamFailure inner) { + this.inner = inner; + } + + public String getStreamName() { + return this.inner.getStream(); + } + + public void visit(MultiAppendStreamErrorVisitor visitor) { + if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.WRONG_EXPECTED_REVISION) { + visitor.onWrongExpectedRevision(this.inner.getWrongExpectedRevision().getStreamRevision()); + return; + } + + if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) { + visitor.onAccessDenied(this.inner.getAccessDenied().getReason()); + } + + if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) { + visitor.onStreamDeleted(); + return; + } + + if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) { + visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize()); + return; + } + + throw new IllegalArgumentException("Append failure does not match any known error type"); + } +} diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java b/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java new file mode 100644 index 00000000..cff5be7e --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java @@ -0,0 +1,21 @@ +package io.kurrent.dbclient; + +public class AppendStreamSuccess { + private final io.kurrentdb.v2.AppendStreamSuccess inner; + + AppendStreamSuccess(io.kurrentdb.v2.AppendStreamSuccess inner) { + this.inner = inner; + } + + public String getStreamName() { + return this.inner.getStream(); + } + + public long getStreamRevision() { + return this.inner.getStreamRevision(); + } + + public long getPosition() { + return this.inner.getPosition(); + } +} diff --git a/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java b/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java new file mode 100644 index 00000000..02b4d70a --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java @@ -0,0 +1,8 @@ +package io.kurrent.dbclient; + +public interface MultiAppendStreamErrorVisitor { + default void onWrongExpectedRevision(long streamRevision) {} + default void onAccessDenied(String reason) {} + default void onStreamDeleted() {} + default void onTransactionMaxSizeExceeded(int maxSize) {} +} diff --git a/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java b/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java index 29dfeabb..ad9f1dad 100644 --- a/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java +++ b/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java @@ -1,4 +1,22 @@ package io.kurrent.dbclient; +import java.util.List; +import java.util.Optional; + public class MultiAppendWriteResult { + private final List successes; + private final List failures; + + public MultiAppendWriteResult(List successes, List failures) { + this.successes = successes; + this.failures = failures; + } + + public Optional> getSuccesses() { + return Optional.ofNullable(successes); + } + + public Optional> getFailures() { + return Optional.ofNullable(failures); + } } diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index e3870e90..4338d632 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -10,7 +10,9 @@ import io.kurrentdb.v2.StreamsServiceGrpc; import kurrentdb.protobuf.DynamicValueOuterClass; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.CompletableFuture; class MultiStreamAppend { @@ -80,6 +82,23 @@ private CompletableFuture append(ManagedChannel channel) } public MultiAppendWriteResult onResponse(MultiStreamAppendResponse response) { + List failures = null; + List successes = null; + + if (response.hasFailure()) { + failures = new ArrayList<>(response.getFailure().getOutputCount()); + + for (io.kurrentdb.v2.AppendStreamFailure failure : response.getFailure().getOutputList()) { + failures.add(new AppendStreamFailure(failure)); + } + } else { + successes = new ArrayList<>(response.getSuccess().getOutputCount()); + + for (io.kurrentdb.v2.AppendStreamSuccess success : response.getSuccess().getOutputList()) { + successes.add(new AppendStreamSuccess(success)); + } + } + // Handle the response from the multi-stream append operation // This could involve processing the results for each stream in the response throw new RuntimeException("Not implemented"); From 70af183e4fea14447f9c73f7ecae9ae394ce406b Mon Sep 17 00:00:00 2001 From: YoEight Date: Wed, 11 Jun 2025 16:19:26 -0400 Subject: [PATCH 03/16] fixup --- src/main/java/io/kurrent/dbclient/MultiStreamAppend.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index 4338d632..fbd05f42 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -99,8 +99,6 @@ public MultiAppendWriteResult onResponse(MultiStreamAppendResponse response) { } } - // Handle the response from the multi-stream append operation - // This could involve processing the results for each stream in the response - throw new RuntimeException("Not implemented"); + return new MultiAppendWriteResult(successes, failures); } } From 8b177119cb15fa24dc6f1367880cfcd4aa5bf32d Mon Sep 17 00:00:00 2001 From: YoEight Date: Wed, 11 Jun 2025 23:33:41 -0400 Subject: [PATCH 04/16] works but need to run the test only on CI version --- .github/workflows/tests.yml | 4 +- .../io/kurrent/dbclient/FeatureFlags.java | 1 + .../io/kurrent/dbclient/GossipClient.java | 2 +- .../java/io/kurrent/dbclient/GrpcUtils.java | 10 +++- .../kurrent/dbclient/MultiStreamAppend.java | 21 ++++--- .../io/kurrent/dbclient/ServerFeatures.java | 2 + .../kurrent/dbclient/SystemMetadataKeys.java | 2 + .../dbclient/MultiStreamAppendTests.java | 56 +++++++++++++++++++ 8 files changed, 84 insertions(+), 14 deletions(-) create mode 100644 src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0495aefc..bbbc8006 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -14,7 +14,7 @@ jobs: strategy: fail-fast: false matrix: - test: [Streams, PersistentSubscriptions, Telemetry] + test: [Streams, PersistentSubscriptions, Telemetry, MultiStreamAppend] runs-on: ubuntu-latest steps: @@ -101,7 +101,7 @@ jobs: strategy: fail-fast: false matrix: - test: [Streams, PersistentSubscriptions] + test: [Streams, PersistentSubscriptions, MultiStreamAppendTests] runs-on: ubuntu-latest steps: diff --git a/src/main/java/io/kurrent/dbclient/FeatureFlags.java b/src/main/java/io/kurrent/dbclient/FeatureFlags.java index 83e755a5..11b66517 100644 --- a/src/main/java/io/kurrent/dbclient/FeatureFlags.java +++ b/src/main/java/io/kurrent/dbclient/FeatureFlags.java @@ -7,5 +7,6 @@ class FeatureFlags { public final static int PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM = 8; public final static int PERSISTENT_SUBSCRIPTION_GET_INFO = 16; public final static int PERSISTENT_SUBSCRIPTION_TO_ALL = 32; + public final static int MULTI_STREAM_APPEND = 64; public final static int PERSISTENT_SUBSCRIPTION_MANAGEMENT = PERSISTENT_SUBSCRIPTION_LIST | PERSISTENT_SUBSCRIPTION_REPLAY | PERSISTENT_SUBSCRIPTION_GET_INFO | PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM; } diff --git a/src/main/java/io/kurrent/dbclient/GossipClient.java b/src/main/java/io/kurrent/dbclient/GossipClient.java index 3ba89b7c..cd84e5a7 100644 --- a/src/main/java/io/kurrent/dbclient/GossipClient.java +++ b/src/main/java/io/kurrent/dbclient/GossipClient.java @@ -21,7 +21,7 @@ class GossipClient { public GossipClient(KurrentDBClientSettings settings, ManagedChannel channel) { _channel = channel; - _stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), (long)settings.getGossipTimeout()); + _stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), settings.getGossipTimeout()); } public void shutdown() { diff --git a/src/main/java/io/kurrent/dbclient/GrpcUtils.java b/src/main/java/io/kurrent/dbclient/GrpcUtils.java index 9dce2240..2779f9b2 100644 --- a/src/main/java/io/kurrent/dbclient/GrpcUtils.java +++ b/src/main/java/io/kurrent/dbclient/GrpcUtils.java @@ -113,10 +113,14 @@ static public StreamsOuterClass.ReadReq.Options.StreamOptions toStreamOptions(St } static public , O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase options) { - return configureStub(stub, settings, options, null); + return configureStub(stub, settings, options, null, true); } - static public , O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase options, Long forceDeadlineInMs) { + static public , O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase options, long forceDeadlineInMs) { + return configureStub(stub, settings, options, forceDeadlineInMs, true); + } + + static public , O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase options, Long forceDeadlineInMs, boolean forwardRequiresLeader) { S finalStub = stub; ConnectionMetadata metadata = new ConnectionMetadata(); @@ -146,7 +150,7 @@ static public , O> S configureStub(S stub, Kurren metadata.authenticated(credentials); } - if (options.isLeaderRequired() || settings.getNodePreference() == NodePreference.LEADER) { + if (forwardRequiresLeader && (options.isLeaderRequired() || settings.getNodePreference() == NodePreference.LEADER)) { metadata.requiresLeader(); } diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index fbd05f42..17967a52 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -1,7 +1,6 @@ package io.kurrent.dbclient; import com.google.protobuf.ByteString; -import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -25,12 +24,18 @@ public MultiStreamAppend(GrpcClient client, Iterator reques } public CompletableFuture execute() { - return this.client.run(this::append); + return this.client.runWithArgs(this::append); } - private CompletableFuture append(ManagedChannel channel) { + private CompletableFuture append(WorkItemArgs args) { CompletableFuture result = new CompletableFuture<>(); - StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(channel), this.client.getSettings(), new OptionsBase<>()); + + if (!args.supportFeature(FeatureFlags.MULTI_STREAM_APPEND)) { + result.completeExceptionally(new UnsupportedOperationException("Multi-stream append is not supported by the server")); + return result; + } + + StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(args.getChannel()), this.client.getSettings(), new OptionsBase<>(), null, false); StreamObserver requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse)); try { @@ -44,15 +49,15 @@ private CompletableFuture append(ManagedChannel channel) builder.addRecords(AppendRecord.newBuilder() .setData(ByteString.copyFrom(event.getEventData())) .setRecordId(event.getEventId().toString()) - .putProperties(SystemMetadataKeys.CONTENT_TYPE, DynamicValueOuterClass + .putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValueOuterClass .DynamicValue .newBuilder() - .setStringValue(event.getContentType()) + .setBytesValue(ByteString.copyFromUtf8(event.getContentType())) .build()) - .putProperties(SystemMetadataKeys.TYPE, DynamicValueOuterClass + .putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValueOuterClass .DynamicValue .newBuilder() - .setStringValue(event.getEventType()) + .setBytesValue(ByteString.copyFromUtf8(event.getEventType())) .build()) .build()); } diff --git a/src/main/java/io/kurrent/dbclient/ServerFeatures.java b/src/main/java/io/kurrent/dbclient/ServerFeatures.java index 12d377c4..4943c187 100644 --- a/src/main/java/io/kurrent/dbclient/ServerFeatures.java +++ b/src/main/java/io/kurrent/dbclient/ServerFeatures.java @@ -95,6 +95,8 @@ private static CompletableFuture getSupportedFeaturesInternal(Server default: break; } + } else if (method.getMethodName().equals("multistreamappendsession")) { + features |= FeatureFlags.MULTI_STREAM_APPEND; } } diff --git a/src/main/java/io/kurrent/dbclient/SystemMetadataKeys.java b/src/main/java/io/kurrent/dbclient/SystemMetadataKeys.java index d5ae2cc7..d46a02dc 100644 --- a/src/main/java/io/kurrent/dbclient/SystemMetadataKeys.java +++ b/src/main/java/io/kurrent/dbclient/SystemMetadataKeys.java @@ -5,4 +5,6 @@ class SystemMetadataKeys { static final String CREATED = "created"; static final String IS_JSON = "is-json"; static final String TYPE = "type"; + static final String SCHEMA_NAME = "$schema.name"; + static final String DATA_FORMAT = "$schema.data-format"; } diff --git a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java new file mode 100644 index 00000000..07f21f4c --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java @@ -0,0 +1,56 @@ +package io.kurrent.dbclient; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; + +public class MultiStreamAppendTests implements ConnectionAware { + static private Database database; + static private Logger logger; + + @BeforeAll + public static void setup() { + database = DatabaseFactory.spawn(); + logger = LoggerFactory.getLogger(MultiStreamAppendTests.class); + } + + @Override + public Database getDatabase() { + return database; + } + + @Override + public Logger getLogger() { + return logger; + } + + @AfterAll + public static void cleanup() { + database.dispose(); + } + + @Test + public void testMultiStreamAppend() throws ExecutionException, InterruptedException { + KurrentDBClient client = getDefaultClient(); + + List requests = new ArrayList<>(); + + List events = new ArrayList<>(); + for (int i = 0; i < 10; i++) + events.add(EventData.builderAsBinary("created", new byte[0]).build()); + + requests.add(new AppendStreamRequest("foobar", events.iterator(), StreamState.any())); + requests.add(new AppendStreamRequest("baz", events.iterator(), StreamState.any())); + + MultiAppendWriteResult result = client.multiAppend(AppendToStreamOptions.get(), requests.iterator()).get(); + + Assertions.assertTrue(result.getSuccesses().isPresent()); + } +} From 8594144e61cc30737a1af4e27d70d7bb64438d4e Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 13:56:10 -0400 Subject: [PATCH 05/16] repo needs a bit of a revamp about its github actions --- docker-compose.yml | 34 +++++++++------------------------- vars.env | 2 +- 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index e64e7e83..8d0f0ddd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.5" - services: volumes-provisioner: image: hasnat/volumes-provisioner @@ -26,14 +24,14 @@ services: - volumes-provisioner esdb-node1: &template - image: ${KURRENTDB_IMAGE:-docker.kurrent.io/eventstore/eventstoredb-ee:lts} + image: ${KURRENTDB_DOCKER_REGISTRY:-docker.kurrent.io}/${KURRENTDB_DOCKER_REPO:-eventstore}/${KURRENTDB_DOCKER_CONTAINER:-eventstoredb-ee}:${KURRENTDB_DOCKER_CONTAINER_VERSION:-lts} env_file: - vars.env environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.12:2113,172.30.240.13:2113 - EVENTSTORE_INT_IP=172.30.240.11 - - EVENTSTORE_CERTIFICATE_FILE=/etc/eventstore/certs/node1/node.crt - - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/eventstore/certs/node1/node.key + - EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node1/node.crt + - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node1/node.key - EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2111 ports: - 2111:2113 @@ -41,56 +39,42 @@ services: clusternetwork: ipv4_address: 172.30.240.11 volumes: - - ./certs:/etc/eventstore/certs + - ./certs:/etc/kurrentdb/certs restart: unless-stopped depends_on: - cert-gen esdb-node2: <<: *template - env_file: - - vars.env environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.13:2113 - EVENTSTORE_INT_IP=172.30.240.12 - - EVENTSTORE_CERTIFICATE_FILE=/etc/eventstore/certs/node2/node.crt - - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/eventstore/certs/node2/node.key + - EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node2/node.crt + - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node2/node.key - EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2112 ports: - 2112:2113 networks: clusternetwork: ipv4_address: 172.30.240.12 - volumes: - - ./certs:/etc/eventstore/certs - restart: unless-stopped - depends_on: - - cert-gen esdb-node3: <<: *template - env_file: - - vars.env environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.12:2113 - EVENTSTORE_INT_IP=172.30.240.13 - - EVENTSTORE_CERTIFICATE_FILE=/etc/eventstore/certs/node3/node.crt - - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/eventstore/certs/node3/node.key + - EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node3/node.crt + - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node3/node.key - EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2113 ports: - 2113:2113 networks: clusternetwork: ipv4_address: 172.30.240.13 - volumes: - - ./certs:/etc/eventstore/certs - restart: unless-stopped - depends_on: - - cert-gen networks: clusternetwork: - name: eventstoredb.local + name: kurrentdb.local driver: bridge ipam: driver: default diff --git a/vars.env b/vars.env index 7883b8c5..3b7be956 100644 --- a/vars.env +++ b/vars.env @@ -2,7 +2,7 @@ EVENTSTORE_CLUSTER_SIZE=3 EVENTSTORE_RUN_PROJECTIONS=All EVENTSTORE_INT_TCP_PORT=1112 EVENTSTORE_HTTP_PORT=2113 -EVENTSTORE_TRUSTED_ROOT_CERTIFICATES_PATH=/etc/eventstore/certs/ca +EVENTSTORE_TRUSTED_ROOT_CERTIFICATES_PATH=/etc/kurrentdb/certs/ca EVENTSTORE_DISCOVER_VIA_DNS=false EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true EVENTSTORE_ADVERTISE_HOST_TO_CLIENT_AS=localhost From 92adc86877bf4f7580ede1e553bfe9af3a3a8f3a Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 19:16:11 -0400 Subject: [PATCH 06/16] let's hope it works the first time! --- .github/workflows/ci.yml | 2 +- .github/workflows/load-configuration.yml | 46 ++++++++++++++++++++++++ .github/workflows/lts.yml | 2 +- .github/workflows/previous-lts.yml | 2 +- .github/workflows/tests.yml | 18 +++++++--- docker-compose.yml | 2 +- 6 files changed, 64 insertions(+), 8 deletions(-) create mode 100644 .github/workflows/load-configuration.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9dba48c9..5a4d4f68 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,5 +15,5 @@ jobs: name: Tests (CI) uses: ./.github/workflows/tests.yml with: - image: ${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES).ci.fullname }} + runtime: ci secrets: inherit diff --git a/.github/workflows/load-configuration.yml b/.github/workflows/load-configuration.yml new file mode 100644 index 00000000..bd1eb7d6 --- /dev/null +++ b/.github/workflows/load-configuration.yml @@ -0,0 +1,46 @@ +name: Load KurrentDB Runtime Configuration +on: + workflow_dispatch: + inputs: + runtime: + description: "The runtime's name. Current options are: `ci`, `previous-lts`, `latest`" + type: string + + outputs: + runtime: + description: The runtime's name + value: ${{ input.runtime }} + + registry: + description: The Docker registry + value: $${{ jobs.load.outputs.registry }} + + image: + description: The Docker image + value: $${{ jobs.load.outputs.image }} + + tag: + description: The Docker image tag + value: $${{ jobs.load.outputs.tag }} + + full_image_name: + description: The full Docker image name (including registry, image, and tag) + value: $${{ jobs.load.outputs.full_image_name }} + +jobs: + load: + runs-on: ubuntu-latest + outputs: + registry: ${{ steps.set.outputs.registry }} + image: ${{ steps.set.outputs.image }} + tag: ${{ steps.set.outputs.tag }} + full_image_name: ${{ steps.set.outputs.full_image_name }} + + steps: + - name: Set KurrentDB Runtime Configuration Properties + id: set + run: | + echo "registry=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[input.runtime].registry }}" >> $GITHUB_OUTPUT + echo "tag=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[input.runtime].tag }}" >> $GITHUB_OUTPUT + echo "image=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[input.runtime].image }}" >> $GITHUB_OUTPUT + echo "full_image_name=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[input.runtime].fullname }}" >> $GITHUB_OUTPUT diff --git a/.github/workflows/lts.yml b/.github/workflows/lts.yml index c0936f87..b2aba969 100644 --- a/.github/workflows/lts.yml +++ b/.github/workflows/lts.yml @@ -15,7 +15,7 @@ jobs: name: Tests (LTS) uses: ./.github/workflows/tests.yml with: - image: ${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES).lts.fullname }} + runtime: lts secrets: inherit # Will be removed in the future diff --git a/.github/workflows/previous-lts.yml b/.github/workflows/previous-lts.yml index 0fd060c2..f1941397 100644 --- a/.github/workflows/previous-lts.yml +++ b/.github/workflows/previous-lts.yml @@ -15,5 +15,5 @@ jobs: name: Tests (Previous LTS) uses: ./.github/workflows/tests.yml with: - image: ${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)['previous-lts'].fullname }} + runtime: previous-lts secrets: inherit diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index bbbc8006..e5bc19e0 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -3,12 +3,18 @@ name: tests workflow on: workflow_call: inputs: - image: + runtime: required: true type: string jobs: + load_configuration: + uses: ./.github/workflows/load-configuration.yml + with: + runtime: ${{ inputs.runtime }} + single_node: + needs: load_configuration name: Single node strategy: @@ -41,7 +47,7 @@ jobs: - name: Execute Gradle build run: ./gradlew ci --tests ${{ matrix.test }}Tests env: - KURRENTDB_IMAGE: ${{ inputs.image }} + KURRENTDB_IMAGE: ${{ needs.load_configuration.outputs.full_image_name }} - uses: actions/upload-artifact@v4 if: failure() @@ -51,6 +57,7 @@ jobs: if-no-files-found: error secure: + needs: load_configuration name: Secure strategy: @@ -86,7 +93,7 @@ jobs: - name: Execute Gradle build run: ./gradlew ci --tests ${{ matrix.test }}Tests env: - KURRENTDB_IMAGE: ${{ inputs.image }} + KURRENTDB_IMAGE: ${{ needs.load_configuration.outputs.full_image_name }} SECURE: true - uses: actions/upload-artifact@v4 @@ -96,6 +103,7 @@ jobs: path: /tmp/esdb_logs.tar.gz cluster: + needs: load_configuration name: Cluster strategy: @@ -117,7 +125,9 @@ jobs: - name: Set up cluster with Docker Compose run: docker compose up -d env: - KURRENTDB_IMAGE: ${{ inputs.image }} + KURRENTDB_DOCKER_REGISTRY: ${{ needs.load_configuration.outputs.registry }} + KURRENTDB_DOCKER_IMAGE: ${{ needs.load_configuration.outputs.image }} + KURRENTDB_DOCKER_TAG: ${{ needs.load_configuration.outputs.tag }} - name: Set up JDK 8 uses: actions/setup-java@v3 diff --git a/docker-compose.yml b/docker-compose.yml index 8d0f0ddd..7854cbfb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,7 +24,7 @@ services: - volumes-provisioner esdb-node1: &template - image: ${KURRENTDB_DOCKER_REGISTRY:-docker.kurrent.io}/${KURRENTDB_DOCKER_REPO:-eventstore}/${KURRENTDB_DOCKER_CONTAINER:-eventstoredb-ee}:${KURRENTDB_DOCKER_CONTAINER_VERSION:-lts} + image: ${KURRENTDB_DOCKER_REGISTRY:-docker.kurrent.io/eventstore}/${KURRENTDB_DOCKER_IMAGE:-eventstoredb-ee}:${KURRENTDB_DOCKER_TAG:-lts} env_file: - vars.env environment: From f9b37b6432aadfcc1fef4d7dc43c8c15ca220f92 Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 19:19:15 -0400 Subject: [PATCH 07/16] use the right workflow type now --- .github/workflows/load-configuration.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/load-configuration.yml b/.github/workflows/load-configuration.yml index bd1eb7d6..de022ace 100644 --- a/.github/workflows/load-configuration.yml +++ b/.github/workflows/load-configuration.yml @@ -1,6 +1,6 @@ name: Load KurrentDB Runtime Configuration on: - workflow_dispatch: + workflow_call: inputs: runtime: description: "The runtime's name. Current options are: `ci`, `previous-lts`, `latest`" From ff8ec75e8670cb5d01c841f8ad7735fc236f34eb Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 19:20:23 -0400 Subject: [PATCH 08/16] wrong variable name --- .github/workflows/load-configuration.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/load-configuration.yml b/.github/workflows/load-configuration.yml index de022ace..14720a06 100644 --- a/.github/workflows/load-configuration.yml +++ b/.github/workflows/load-configuration.yml @@ -9,7 +9,7 @@ on: outputs: runtime: description: The runtime's name - value: ${{ input.runtime }} + value: ${{ inputs.runtime }} registry: description: The Docker registry From eb655fff624764a9c75805300b81fc2910b71ae7 Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 19:21:12 -0400 Subject: [PATCH 09/16] fixup --- .github/workflows/load-configuration.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/load-configuration.yml b/.github/workflows/load-configuration.yml index 14720a06..3c17b530 100644 --- a/.github/workflows/load-configuration.yml +++ b/.github/workflows/load-configuration.yml @@ -40,7 +40,7 @@ jobs: - name: Set KurrentDB Runtime Configuration Properties id: set run: | - echo "registry=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[input.runtime].registry }}" >> $GITHUB_OUTPUT - echo "tag=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[input.runtime].tag }}" >> $GITHUB_OUTPUT - echo "image=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[input.runtime].image }}" >> $GITHUB_OUTPUT - echo "full_image_name=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[input.runtime].fullname }}" >> $GITHUB_OUTPUT + echo "registry=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].registry }}" >> $GITHUB_OUTPUT + echo "tag=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].tag }}" >> $GITHUB_OUTPUT + echo "image=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].image }}" >> $GITHUB_OUTPUT + echo "full_image_name=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].fullname }}" >> $GITHUB_OUTPUT From 76664b405fa03c7195fc18bbf3be4c5f56d36b5b Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 19:22:35 -0400 Subject: [PATCH 10/16] didn't see that extra $ in the outputs --- .github/workflows/load-configuration.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/load-configuration.yml b/.github/workflows/load-configuration.yml index 3c17b530..a2995585 100644 --- a/.github/workflows/load-configuration.yml +++ b/.github/workflows/load-configuration.yml @@ -13,19 +13,19 @@ on: registry: description: The Docker registry - value: $${{ jobs.load.outputs.registry }} + value: ${{ jobs.load.outputs.registry }} image: description: The Docker image - value: $${{ jobs.load.outputs.image }} + value: ${{ jobs.load.outputs.image }} tag: description: The Docker image tag - value: $${{ jobs.load.outputs.tag }} + value: ${{ jobs.load.outputs.tag }} full_image_name: description: The full Docker image name (including registry, image, and tag) - value: $${{ jobs.load.outputs.full_image_name }} + value: ${{ jobs.load.outputs.full_image_name }} jobs: load: @@ -40,7 +40,7 @@ jobs: - name: Set KurrentDB Runtime Configuration Properties id: set run: | - echo "registry=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].registry }}" >> $GITHUB_OUTPUT - echo "tag=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].tag }}" >> $GITHUB_OUTPUT - echo "image=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].image }}" >> $GITHUB_OUTPUT - echo "full_image_name=$${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].fullname }}" >> $GITHUB_OUTPUT + echo "registry=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].registry }}" >> $GITHUB_OUTPUT + echo "tag=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].tag }}" >> $GITHUB_OUTPUT + echo "image=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].image }}" >> $GITHUB_OUTPUT + echo "full_image_name=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].fullname }}" >> $GITHUB_OUTPUT From 7b6f8224f976a2b3f50779547d312f4c1e53ec41 Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 20:56:36 -0400 Subject: [PATCH 11/16] fixup leftover issues --- .github/workflows/lts.yml | 4 +++- .github/workflows/plugins-tests.yml | 12 ++++++++++-- .../io/kurrent/dbclient/MultiStreamAppendTests.java | 6 ++++++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/.github/workflows/lts.yml b/.github/workflows/lts.yml index b2aba969..afb923b2 100644 --- a/.github/workflows/lts.yml +++ b/.github/workflows/lts.yml @@ -24,5 +24,7 @@ jobs: uses: ./.github/workflows/plugins-tests.yml with: - image: "docker.eventstore.com/eventstore-ee/eventstoredb-commercial:24.2.0-jammy" + registry: "docker.eventstore.com/eventstore-ee + image: eventstoredb-commercial + tag: :24.2.0-jammy secrets: inherit diff --git a/.github/workflows/plugins-tests.yml b/.github/workflows/plugins-tests.yml index 08aed4c8..fed0338a 100644 --- a/.github/workflows/plugins-tests.yml +++ b/.github/workflows/plugins-tests.yml @@ -3,9 +3,15 @@ name: enterprise plugins tests workflow on: workflow_call: inputs: + registry: + required: true + type: string image: required: true type: string + tag: + required: true + type: string jobs: single_node: @@ -46,7 +52,7 @@ jobs: - name: Execute Gradle build run: ./gradlew ci --tests ${{ matrix.test }}Tests env: - KURRENTDB_IMAGE: ${{ inputs.image }} + KURRENTDB_IMAGE: ${{inputs.registry}}/${{ inputs.image }}:${{ inputs.tag }} SECURE: true - uses: actions/upload-artifact@v4 @@ -81,7 +87,9 @@ jobs: - name: Set up cluster with Docker Compose run: docker compose up -d env: - KURRENTDB_IMAGE: ${{ inputs.image }} + KURRENTDB_DOCKER_REGISTRY: ${{ inputs.registry }} + KURRENTDB_DOCKER_IMAGE: ${{ inputs.image }} + KURRENTDB_DOCKER_TAG: ${{ inputs.tag }} - name: Generate user certificates run: docker compose --file configure-user-certs-for-tests.yml up diff --git a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java index 07f21f4c..ef7bed14 100644 --- a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java +++ b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; public class MultiStreamAppendTests implements ConnectionAware { @@ -40,6 +41,11 @@ public static void cleanup() { public void testMultiStreamAppend() throws ExecutionException, InterruptedException { KurrentDBClient client = getDefaultClient(); + Optional version = client.getServerVersion().get(); + + if (!version.isPresent() || version.get().isLessThan(25, 0, 0)) + return; + List requests = new ArrayList<>(); List events = new ArrayList<>(); From bddcb3a41ce0c9be4f793795de01951621317e75 Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 21:10:59 -0400 Subject: [PATCH 12/16] fixup --- .github/workflows/lts.yml | 4 ++-- .github/workflows/tests.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/lts.yml b/.github/workflows/lts.yml index afb923b2..4b1fc04b 100644 --- a/.github/workflows/lts.yml +++ b/.github/workflows/lts.yml @@ -24,7 +24,7 @@ jobs: uses: ./.github/workflows/plugins-tests.yml with: - registry: "docker.eventstore.com/eventstore-ee + registry: docker.eventstore.com/eventstore-ee image: eventstoredb-commercial - tag: :24.2.0-jammy + tag: 24.2.0-jammy secrets: inherit diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e5bc19e0..6b2845ac 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -109,7 +109,7 @@ jobs: strategy: fail-fast: false matrix: - test: [Streams, PersistentSubscriptions, MultiStreamAppendTests] + test: [Streams, PersistentSubscriptions, MultiStreamAppend] runs-on: ubuntu-latest steps: From b7ae1a2c89acfbd3ad2de9ad16c8ac1760031289 Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 22:10:44 -0400 Subject: [PATCH 13/16] fix deprecated options usage --- .github/workflows/tests.yml | 7 ------- docker-compose.yml | 12 ++++++------ vars.env | 4 ++-- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6b2845ac..92ce26e8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -49,13 +49,6 @@ jobs: env: KURRENTDB_IMAGE: ${{ needs.load_configuration.outputs.full_image_name }} - - uses: actions/upload-artifact@v4 - if: failure() - with: - name: esdb_logs.tar.gz - path: /tmp/esdb_logs.tar.gz - if-no-files-found: error - secure: needs: load_configuration name: Secure diff --git a/docker-compose.yml b/docker-compose.yml index 7854cbfb..344f19f3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,10 +29,10 @@ services: - vars.env environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.12:2113,172.30.240.13:2113 - - EVENTSTORE_INT_IP=172.30.240.11 + - EVENTSTORE_REPLICATION_IP=172.30.240.11 - EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node1/node.crt - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node1/node.key - - EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2111 + - EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS=2111 ports: - 2111:2113 networks: @@ -48,10 +48,10 @@ services: <<: *template environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.13:2113 - - EVENTSTORE_INT_IP=172.30.240.12 + - EVENTSTORE_REPLICATION_IP=172.30.240.12 - EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node2/node.crt - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node2/node.key - - EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2112 + - EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS=2112 ports: - 2112:2113 networks: @@ -62,10 +62,10 @@ services: <<: *template environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.12:2113 - - EVENTSTORE_INT_IP=172.30.240.13 + - EVENTSTORE_REPLICATION_IP=172.30.240.13 - EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node3/node.crt - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node3/node.key - - EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2113 + - EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS=2113 ports: - 2113:2113 networks: diff --git a/vars.env b/vars.env index 3b7be956..a3ecdad9 100644 --- a/vars.env +++ b/vars.env @@ -1,7 +1,7 @@ EVENTSTORE_CLUSTER_SIZE=3 EVENTSTORE_RUN_PROJECTIONS=All -EVENTSTORE_INT_TCP_PORT=1112 -EVENTSTORE_HTTP_PORT=2113 +EVENTSTORE_REPLICATION_PORT=1112 +EVENTSTORE_NODE_PORT=2113 EVENTSTORE_TRUSTED_ROOT_CERTIFICATES_PATH=/etc/kurrentdb/certs/ca EVENTSTORE_DISCOVER_VIA_DNS=false EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true From 15a5fc22bce7bfedf622d7e5b3c1d82f41907497 Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 22:36:27 -0400 Subject: [PATCH 14/16] better message when we skip test on unsupported server versions --- .../io/kurrent/dbclient/ServerVersion.java | 11 ++++++ .../dbclient/MultiStreamAppendTests.java | 38 ++++++++++++++++--- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/kurrent/dbclient/ServerVersion.java b/src/main/java/io/kurrent/dbclient/ServerVersion.java index f61f9a1d..64c16d69 100644 --- a/src/main/java/io/kurrent/dbclient/ServerVersion.java +++ b/src/main/java/io/kurrent/dbclient/ServerVersion.java @@ -58,6 +58,17 @@ public boolean isLessOrEqualThan(int major, int minor) { return true; } + public boolean isGreaterOrEqualThan(int major, int minor) { + int cmp; + if ((cmp = Integer.compare(this.major, major)) != 0) + return cmp > 0; + + if ((cmp = Integer.compare(this.minor, minor)) != 0) + return cmp > 0; + + return true; + } + @Override public String toString() { return "ServerVersion{" + diff --git a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java index ef7bed14..ce2efae9 100644 --- a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java +++ b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java @@ -1,9 +1,6 @@ package io.kurrent.dbclient; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +40,10 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept Optional version = client.getServerVersion().get(); - if (!version.isPresent() || version.get().isLessThan(25, 0, 0)) - return; + Assumptions.assumeTrue( + !version.isPresent() || version.get().isLessThan(25, 0, 0), + "Multi-stream append is not supported server versions below 25.0.0" + ); List requests = new ArrayList<>(); @@ -59,4 +58,31 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept Assertions.assertTrue(result.getSuccesses().isPresent()); } + + @Test + public void testMultiStreamAppendWhenUnsupported() throws ExecutionException, InterruptedException { + KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + Assumptions.assumeTrue( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream is supported server versions greater or equal to 25.0.0" + ); + + List requests = new ArrayList<>(); + + List events = new ArrayList<>(); + for (int i = 0; i < 10; i++) + events.add(EventData.builderAsBinary("created", new byte[0]).build()); + + requests.add(new AppendStreamRequest("foobar", events.iterator(), StreamState.any())); + requests.add(new AppendStreamRequest("baz", events.iterator(), StreamState.any())); + + ExecutionException e = Assertions.assertThrows( + ExecutionException.class, + () -> client.multiAppend(AppendToStreamOptions.get(), requests.iterator()).get()); + + Assertions.assertInstanceOf(UnsupportedOperationException.class, e.getCause()); + } } + From 83434deb9712765a1bbb4c644f35eee143a33136 Mon Sep 17 00:00:00 2001 From: YoEight Date: Thu, 12 Jun 2025 22:50:21 -0400 Subject: [PATCH 15/16] inversed logic in assumptions --- src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java index ce2efae9..122ce7ca 100644 --- a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java +++ b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java @@ -41,7 +41,7 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept Optional version = client.getServerVersion().get(); Assumptions.assumeTrue( - !version.isPresent() || version.get().isLessThan(25, 0, 0), + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), "Multi-stream append is not supported server versions below 25.0.0" ); @@ -64,7 +64,7 @@ public void testMultiStreamAppendWhenUnsupported() throws ExecutionException, In KurrentDBClient client = getDefaultClient(); Optional version = client.getServerVersion().get(); - Assumptions.assumeTrue( + Assumptions.assumeFalse( version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), "Multi-stream is supported server versions greater or equal to 25.0.0" ); From 446589f6095dcea70d8cec5b590cc7ab7a1b026f Mon Sep 17 00:00:00 2001 From: William Chong Date: Mon, 21 Jul 2025 09:48:02 +0400 Subject: [PATCH 16/16] Update proto files --- .../kurrent/dbclient/AppendStreamFailure.java | 16 +- .../kurrent/dbclient/AppendStreamSuccess.java | 4 +- .../dbclient/ConnectionSettingsBuilder.java | 2 +- .../kurrent/dbclient/ContentTypeMapper.java | 20 + .../MultiAppendStreamErrorVisitor.java | 4 +- .../kurrent/dbclient/MultiStreamAppend.java | 26 +- src/main/proto/dynamic-value.proto | 42 -- .../{ => kurrentdb/protocol/v1}/code.proto | 0 .../{ => kurrentdb/protocol/v1}/gossip.proto | 2 +- .../protocol/v1}/persistent.proto | 2 +- .../protocol/v1}/projectionmanagement.proto | 2 +- .../protocol/v1}/serverfeatures.proto | 2 +- .../{ => kurrentdb/protocol/v1}/shared.proto | 0 .../{ => kurrentdb/protocol/v1}/status.proto | 2 +- .../{ => kurrentdb/protocol/v1}/streams.proto | 4 +- .../proto/kurrentdb/protocol/v2/core.proto | 82 +++ .../protocol/v2/features/service.proto | 144 ++++++ .../protocol/v2/streams/shared.proto | 118 +++++ .../protocol/v2/streams/streams.proto | 481 ++++++++++++++++++ src/main/proto/streams.v2.proto | 170 ------- .../misc/ParseValidConnectionStringTests.java | 2 +- 21 files changed, 879 insertions(+), 246 deletions(-) create mode 100644 src/main/java/io/kurrent/dbclient/ContentTypeMapper.java delete mode 100644 src/main/proto/dynamic-value.proto rename src/main/proto/{ => kurrentdb/protocol/v1}/code.proto (100%) rename src/main/proto/{ => kurrentdb/protocol/v1}/gossip.proto (94%) rename src/main/proto/{ => kurrentdb/protocol/v1}/persistent.proto (99%) rename src/main/proto/{ => kurrentdb/protocol/v1}/projectionmanagement.proto (98%) rename src/main/proto/{ => kurrentdb/protocol/v1}/serverfeatures.proto (91%) rename src/main/proto/{ => kurrentdb/protocol/v1}/shared.proto (100%) rename src/main/proto/{ => kurrentdb/protocol/v1}/status.proto (97%) rename src/main/proto/{ => kurrentdb/protocol/v1}/streams.proto (98%) create mode 100644 src/main/proto/kurrentdb/protocol/v2/core.proto create mode 100644 src/main/proto/kurrentdb/protocol/v2/features/service.proto create mode 100644 src/main/proto/kurrentdb/protocol/v2/streams/shared.proto create mode 100644 src/main/proto/kurrentdb/protocol/v2/streams/streams.proto delete mode 100644 src/main/proto/streams.v2.proto diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java index 0d9d5728..28218bef 100644 --- a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java +++ b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java @@ -1,9 +1,9 @@ package io.kurrent.dbclient; public class AppendStreamFailure { - private final io.kurrentdb.v2.AppendStreamFailure inner; + private final io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner; - AppendStreamFailure(io.kurrentdb.v2.AppendStreamFailure inner) { + AppendStreamFailure(io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner) { this.inner = inner; } @@ -12,21 +12,21 @@ public String getStreamName() { } public void visit(MultiAppendStreamErrorVisitor visitor) { - if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.WRONG_EXPECTED_REVISION) { - visitor.onWrongExpectedRevision(this.inner.getWrongExpectedRevision().getStreamRevision()); + if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_REVISION_CONFLICT) { + visitor.onWrongExpectedRevision(this.inner.getStreamRevisionConflict().getStreamRevision()); return; } - if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) { - visitor.onAccessDenied(this.inner.getAccessDenied().getReason()); + if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) { + visitor.onAccessDenied(this.inner.getAccessDenied()); } - if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) { + if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) { visitor.onStreamDeleted(); return; } - if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) { + if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) { visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize()); return; } diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java b/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java index cff5be7e..be55e3ba 100644 --- a/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java +++ b/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java @@ -1,9 +1,9 @@ package io.kurrent.dbclient; public class AppendStreamSuccess { - private final io.kurrentdb.v2.AppendStreamSuccess inner; + private final io.kurrentdb.protocol.streams.v2.AppendStreamSuccess inner; - AppendStreamSuccess(io.kurrentdb.v2.AppendStreamSuccess inner) { + AppendStreamSuccess(io.kurrentdb.protocol.streams.v2.AppendStreamSuccess inner) { this.inner = inner; } diff --git a/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java b/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java index df4cfe4e..48340435 100644 --- a/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java +++ b/src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java @@ -17,7 +17,7 @@ public class ConnectionSettingsBuilder { private static final Logger logger = LoggerFactory.getLogger(ConnectionSettingsBuilder.class); private static final Set SUPPORTED_PROTOCOLS = new HashSet<>(Arrays.asList( - "esdb", "esdb+discover", "kurrent", "kurrent+discover", "kdb", "kdb+discover", "kurrentdb", "kurrentdb+discover" + "esdb", "esdb+discover", "kurrentdb", "kurrent+discover", "kdb", "kdb+discover", "kurrentdb", "kurrentdb+discover" )); private boolean _dnsDiscover = false; diff --git a/src/main/java/io/kurrent/dbclient/ContentTypeMapper.java b/src/main/java/io/kurrent/dbclient/ContentTypeMapper.java new file mode 100644 index 00000000..a74bb316 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/ContentTypeMapper.java @@ -0,0 +1,20 @@ +package io.kurrent.dbclient; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class ContentTypeMapper { + private static final Map CONTENT_TYPE_MAP; + + static { + Map map = new HashMap<>(); + map.put("application/json", "Json"); + map.put("application/octet-stream", "Binary"); + CONTENT_TYPE_MAP = Collections.unmodifiableMap(map); + } + + public static String toSchemaDataFormat(String contentType) { + return CONTENT_TYPE_MAP.getOrDefault(contentType, contentType); + } +} diff --git a/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java b/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java index 02b4d70a..97b360ff 100644 --- a/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java +++ b/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java @@ -1,8 +1,10 @@ package io.kurrent.dbclient; +import io.kurrentdb.protocol.streams.v2.ErrorDetails; + public interface MultiAppendStreamErrorVisitor { default void onWrongExpectedRevision(long streamRevision) {} - default void onAccessDenied(String reason) {} + default void onAccessDenied(ErrorDetails.AccessDenied detail) {} default void onStreamDeleted() {} default void onTransactionMaxSizeExceeded(int maxSize) {} } diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index 17967a52..9c434a7b 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -4,10 +4,10 @@ import io.grpc.Metadata; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -import io.kurrentdb.v2.AppendRecord; -import io.kurrentdb.v2.MultiStreamAppendResponse; -import io.kurrentdb.v2.StreamsServiceGrpc; -import kurrentdb.protobuf.DynamicValueOuterClass; +import io.kurrentdb.protocol.DynamicValue; +import io.kurrentdb.protocol.streams.v2.AppendRecord; +import io.kurrentdb.protocol.streams.v2.MultiStreamAppendResponse; +import io.kurrentdb.protocol.streams.v2.StreamsServiceGrpc; import java.util.ArrayList; import java.util.Iterator; @@ -36,12 +36,12 @@ private CompletableFuture append(WorkItemArgs args) { } StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(args.getChannel()), this.client.getSettings(), new OptionsBase<>(), null, false); - StreamObserver requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse)); + StreamObserver requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse)); try { while (this.requests.hasNext()) { AppendStreamRequest request = this.requests.next(); - io.kurrentdb.v2.AppendStreamRequest.Builder builder = io.kurrentdb.v2.AppendStreamRequest.newBuilder() + io.kurrentdb.protocol.streams.v2.AppendStreamRequest.Builder builder = io.kurrentdb.protocol.streams.v2.AppendStreamRequest.newBuilder() .setStream(request.getStreamName()); while (request.getEvents().hasNext()) { @@ -49,15 +49,13 @@ private CompletableFuture append(WorkItemArgs args) { builder.addRecords(AppendRecord.newBuilder() .setData(ByteString.copyFrom(event.getEventData())) .setRecordId(event.getEventId().toString()) - .putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValueOuterClass - .DynamicValue + .putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValue .newBuilder() - .setBytesValue(ByteString.copyFromUtf8(event.getContentType())) + .setStringValue(ContentTypeMapper.toSchemaDataFormat(event.getContentType())) .build()) - .putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValueOuterClass - .DynamicValue + .putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValue .newBuilder() - .setBytesValue(ByteString.copyFromUtf8(event.getEventType())) + .setStringValue(event.getEventType()) .build()) .build()); } @@ -93,13 +91,13 @@ public MultiAppendWriteResult onResponse(MultiStreamAppendResponse response) { if (response.hasFailure()) { failures = new ArrayList<>(response.getFailure().getOutputCount()); - for (io.kurrentdb.v2.AppendStreamFailure failure : response.getFailure().getOutputList()) { + for (io.kurrentdb.protocol.streams.v2.AppendStreamFailure failure : response.getFailure().getOutputList()) { failures.add(new AppendStreamFailure(failure)); } } else { successes = new ArrayList<>(response.getSuccess().getOutputCount()); - for (io.kurrentdb.v2.AppendStreamSuccess success : response.getSuccess().getOutputList()) { + for (io.kurrentdb.protocol.streams.v2.AppendStreamSuccess success : response.getSuccess().getOutputList()) { successes.add(new AppendStreamSuccess(success)); } } diff --git a/src/main/proto/dynamic-value.proto b/src/main/proto/dynamic-value.proto deleted file mode 100644 index 04ce17e5..00000000 --- a/src/main/proto/dynamic-value.proto +++ /dev/null @@ -1,42 +0,0 @@ -syntax = "proto3"; - -import "google/protobuf/duration.proto"; -import "google/protobuf/timestamp.proto"; -import "google/protobuf/struct.proto"; - -package kurrentdb.protobuf; -option csharp_namespace = "KurrentDB.Protobuf"; - -message DynamicValue { - oneof kind { - // Represents a null value. - google.protobuf.NullValue null_value = 1; - - // Represents a 32-bit signed integer value. - sint32 int32_value = 2; - - // Represents a 64-bit signed integer value. - sint64 int64_value = 3; - - // Represents a byte array value. - bytes bytes_value = 4; - - // Represents a 64-bit double-precision floating-point value. - double double_value = 5; - - // Represents a 32-bit single-precision floating-point value - float float_value = 6; - - // Represents a string value. - string string_value = 7; - - // Represents a boolean value. - bool boolean_value = 8; - - // Represents a timestamp value. - google.protobuf.Timestamp timestamp_value = 9; - - // Represents a duration value. - google.protobuf.Duration duration_value = 10; - } -} \ No newline at end of file diff --git a/src/main/proto/code.proto b/src/main/proto/kurrentdb/protocol/v1/code.proto similarity index 100% rename from src/main/proto/code.proto rename to src/main/proto/kurrentdb/protocol/v1/code.proto diff --git a/src/main/proto/gossip.proto b/src/main/proto/kurrentdb/protocol/v1/gossip.proto similarity index 94% rename from src/main/proto/gossip.proto rename to src/main/proto/kurrentdb/protocol/v1/gossip.proto index 4faaa61c..94804fe8 100644 --- a/src/main/proto/gossip.proto +++ b/src/main/proto/kurrentdb/protocol/v1/gossip.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package event_store.client.gossip; option java_package = "io.kurrent.dbclient.proto.gossip"; -import "shared.proto"; +import "kurrentdb/protocol/v1/shared.proto"; service Gossip { rpc Read (event_store.client.Empty) returns (ClusterInfo); diff --git a/src/main/proto/persistent.proto b/src/main/proto/kurrentdb/protocol/v1/persistent.proto similarity index 99% rename from src/main/proto/persistent.proto rename to src/main/proto/kurrentdb/protocol/v1/persistent.proto index 22762bcd..59ad36a3 100644 --- a/src/main/proto/persistent.proto +++ b/src/main/proto/kurrentdb/protocol/v1/persistent.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package event_store.client.persistent_subscriptions; option java_package = "io.kurrent.dbclient.proto.persistentsubscriptions"; -import "shared.proto"; +import "kurrentdb/protocol/v1/shared.proto"; service PersistentSubscriptions { rpc Create (CreateReq) returns (CreateResp); diff --git a/src/main/proto/projectionmanagement.proto b/src/main/proto/kurrentdb/protocol/v1/projectionmanagement.proto similarity index 98% rename from src/main/proto/projectionmanagement.proto rename to src/main/proto/kurrentdb/protocol/v1/projectionmanagement.proto index 88badaa0..b61e940b 100644 --- a/src/main/proto/projectionmanagement.proto +++ b/src/main/proto/kurrentdb/protocol/v1/projectionmanagement.proto @@ -3,7 +3,7 @@ package event_store.client.projections; option java_package = "io.kurrent.dbclient.proto.projections"; import "google/protobuf/struct.proto"; -import "shared.proto"; +import "kurrentdb/protocol/v1/shared.proto"; service Projections { rpc Create (CreateReq) returns (CreateResp); diff --git a/src/main/proto/serverfeatures.proto b/src/main/proto/kurrentdb/protocol/v1/serverfeatures.proto similarity index 91% rename from src/main/proto/serverfeatures.proto rename to src/main/proto/kurrentdb/protocol/v1/serverfeatures.proto index e0bede70..38b1ecff 100644 --- a/src/main/proto/serverfeatures.proto +++ b/src/main/proto/kurrentdb/protocol/v1/serverfeatures.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package event_store.client.server_features; option java_package = "io.kurrent.dbclient.proto.serverfeatures"; -import "shared.proto"; +import "kurrentdb/protocol/v1/shared.proto"; service ServerFeatures { rpc GetSupportedMethods (event_store.client.Empty) returns (SupportedMethods); diff --git a/src/main/proto/shared.proto b/src/main/proto/kurrentdb/protocol/v1/shared.proto similarity index 100% rename from src/main/proto/shared.proto rename to src/main/proto/kurrentdb/protocol/v1/shared.proto diff --git a/src/main/proto/status.proto b/src/main/proto/kurrentdb/protocol/v1/status.proto similarity index 97% rename from src/main/proto/status.proto rename to src/main/proto/kurrentdb/protocol/v1/status.proto index 4bd4614c..2875e016 100644 --- a/src/main/proto/status.proto +++ b/src/main/proto/kurrentdb/protocol/v1/status.proto @@ -17,7 +17,7 @@ syntax = "proto3"; package google.rpc; import "google/protobuf/any.proto"; -import "code.proto"; +import "kurrentdb/protocol/v1/code.proto"; option cc_enable_arenas = true; option go_package = "google.golang.org/genproto/googleapis/rpc/status;status"; diff --git a/src/main/proto/streams.proto b/src/main/proto/kurrentdb/protocol/v1/streams.proto similarity index 98% rename from src/main/proto/streams.proto rename to src/main/proto/kurrentdb/protocol/v1/streams.proto index 4ff613a7..d3c91287 100644 --- a/src/main/proto/streams.proto +++ b/src/main/proto/kurrentdb/protocol/v1/streams.proto @@ -2,8 +2,8 @@ syntax = "proto3"; package event_store.client.streams; option java_package = "io.kurrent.dbclient.proto.streams"; -import "shared.proto"; -import "status.proto"; +import "kurrentdb/protocol/v1/shared.proto"; +import "kurrentdb/protocol/v1/status.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; diff --git a/src/main/proto/kurrentdb/protocol/v2/core.proto b/src/main/proto/kurrentdb/protocol/v2/core.proto new file mode 100644 index 00000000..b4626652 --- /dev/null +++ b/src/main/proto/kurrentdb/protocol/v2/core.proto @@ -0,0 +1,82 @@ +syntax = "proto3"; + +package kurrentdb.protocol; + +option csharp_namespace = "KurrentDB.Protocol"; +option java_package = "io.kurrentdb.protocol"; +option java_multiple_files = true; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/struct.proto"; +import "google/protobuf/descriptor.proto"; + +//=================================================================== +// Error Annotations +//=================================================================== + +message ErrorAnnotations { + // Identifies the error condition. + string code = 1; + + // Severity of the error. + Severity severity = 2; + + // Human-readable message that describes the error condition. + optional string message = 3; + + enum Severity { + // The error is recoverable, the operation failed but the session can continue. + RECOVERABLE = 0; + // The error is fatal and the session should be terminated. + FATAL = 1; + } +} + +// Extend the MessageOptions to include error information. +extend google.protobuf.MessageOptions { + // Provides additional information about the error condition. + optional ErrorAnnotations error_info = 50000; +} + +//=================================================================== +// Dynamic values map +//=================================================================== + +// Represents a list of dynamically typed values. +message DynamicValueList { + // Repeated property of dynamically typed values. + repeated DynamicValue values = 1; +} + +// Represents a map of dynamically typed values. +message DynamicValueMap { + // A map of string keys to dynamically typed values. + map values = 1; +} + +// Represents a dynamic value +message DynamicValue { + oneof kind { + // Represents a null value. + google.protobuf.NullValue null_value = 1; + // Represents a 32-bit signed integer value. + sint32 int32_value = 2; + // Represents a 64-bit signed integer value. + sint64 int64_value = 3; + // Represents a byte array value. + bytes bytes_value = 4; + // Represents a 64-bit double-precision floating-point value. + double double_value = 5; + // Represents a 32-bit single-precision floating-point value + float float_value = 6; + // Represents a string value. + string string_value = 7; + // Represents a boolean value. + bool boolean_value = 8; + // Represents a timestamp value. + google.protobuf.Timestamp timestamp_value = 9; + // Represents a duration value. + google.protobuf.Duration duration_value = 10; + } +} diff --git a/src/main/proto/kurrentdb/protocol/v2/features/service.proto b/src/main/proto/kurrentdb/protocol/v2/features/service.proto new file mode 100644 index 00000000..4db99a80 --- /dev/null +++ b/src/main/proto/kurrentdb/protocol/v2/features/service.proto @@ -0,0 +1,144 @@ +syntax = "proto3"; + +/** + * KurrentDB Server Features Protocol + * + * This protocol defines services and messages for discovering server features + * in a KurrentDB environment. It enables clients to adapt their behavior based + * on server features, their enablement status, and requirements. + */ +package kurrentdb.protocol.v2; + +option csharp_namespace = "KurrentDB.Protocol.Features.V2"; +option java_package = "io.kurrentdb.protocol.features.v2"; +option java_multiple_files = true; + +import "google/protobuf/timestamp.proto"; +import "kurrentdb/protocol/v2/core.proto"; + +/** + * Service for retrieving information about the server, including features + * and metadata. + */ +service ServerInfoService { + // Retrieves server information and available features + rpc GetServerInfo(ServerInfoRequest) returns (ServerInfoResponse) {} +} + +/** + * Contains server version information, build details, and compatibility requirements. + */ +message ServerMetadata { + // Semantic version of the server software + string version = 1; + + // Build identifier or hash + string build = 2; + + // Minimum client version required for compatibility + string min_compatible_client_version = 3; + + // Unique identifier for this server node + string node_id = 4; +} + +/** + * Request message for retrieving server information. + */ +message ServerInfoRequest { + // Client version making the request + string client_version = 1; + + // Unique client identifier + string client_id = 2; +} + +/** + * Response containing server information. + */ +message ServerInfoResponse { + // Server information and features + ServerInfo info = 1; +} + +/** + * Top-level server information container including metadata + * and available features. + */ +message ServerInfo { + // Server metadata (version, build info, etc.) + ServerMetadata metadata = 1; + + // Features organized by namespace + map features = 2; +} + +/** + * Container for features within a specific namespace. + */ +message FeaturesList { + // Features in this namespace + repeated Feature features = 1; +} + +/** + * Defines a specific server feature with its enablement status, + * requirements, and metadata. + */ +message Feature { + // Unique identifier for this feature + string name = 1; + + // Human-readable description of the feature + optional string description = 2; + + // Whether this feature is currently enabled + bool enabled = 3; + + // Whether this feature is deprecated and may be removed in future versions + bool deprecated = 4; + + // Requirements associated with this feature that clients must satisfy + repeated FeatureRequirement requirements = 5; + + // Whether clients can request changes to this feature's enabled status + bool client_configurable = 6; + + // For temporary features, indicates when the feature will no longer be available + optional google.protobuf.Timestamp available_until = 7; +} + +/** + * Defines a requirement that must be satisfied to use a feature. + * Requirements can be optional, required, or prohibited. + */ +message FeatureRequirement { + // Unique identifier for this requirement + string name = 1; + + // The value of this requirement, which can contain various data types + DynamicValue value = 2; + + // Enforcement level for this requirement + PolicyStatus policy_status = 3; + + // Human-readable description of the requirement + optional string description = 4; + + // Message shown when the requirement is violated + optional string violation_message = 5; +} + +/** + * Defines how requirements are enforced. + */ +enum PolicyStatus { + // Feature is optional with no warnings + OPTIONAL = 0; + + // Feature must be enabled; operations rejected if disabled + REQUIRED = 3; + + // Feature must be disabled; operations rejected if enabled + PROHIBITED = 4; +} diff --git a/src/main/proto/kurrentdb/protocol/v2/streams/shared.proto b/src/main/proto/kurrentdb/protocol/v2/streams/shared.proto new file mode 100644 index 00000000..e70ba804 --- /dev/null +++ b/src/main/proto/kurrentdb/protocol/v2/streams/shared.proto @@ -0,0 +1,118 @@ +syntax = "proto3"; + +package kurrentdb.protocol.v2; + +option csharp_namespace = "KurrentDB.Protocol.Streams.V2"; +option java_package = "io.kurrentdb.protocol.streams.v2"; +option java_multiple_files = true; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/descriptor.proto"; +import "kurrentdb/protocol/v2/core.proto"; + +// ErrorDetails provides detailed information about specific error conditions. +message ErrorDetails { + // When the user does not have sufficient permissions to perform the + // operation. + message AccessDenied { + option (error_info) = { + code : "ACCESS_DENIED", + severity : RECOVERABLE, + message : "The user does not have sufficient permissions to perform the operation." + }; + } + + // When the stream has been deleted. + message StreamDeleted { + option (error_info) = { + code : "STREAM_DELETED", + severity : RECOVERABLE, + message : "The stream has been soft deleted. It will not be visible in the stream list, until it is restored by appending to it again." + }; + + // The name of the stream that was deleted. + optional string stream = 1; + + // The time when the stream was deleted. + google.protobuf.Timestamp deleted_at = 2; + } + + // When the stream has been tombstoned. + message StreamTombstoned { + option (error_info) = { + code : "STREAM_TOMBSTONED", + severity : RECOVERABLE, + message : "The stream has been tombstoned and cannot be used anymore." + }; + + // The name of the stream that was tombstoned. + optional string stream = 1; + + // The time when the stream was tombstoned. + google.protobuf.Timestamp tombstoned_at = 2; + } + + // When the stream is not found. + message StreamNotFound { + option (error_info) = { + code : "STREAM_NOT_FOUND", + severity : RECOVERABLE, + message : "The specified stream was not found." + }; + + // The name of the stream that was not found. + optional string stream = 1; + } + + // When the expected revision of the stream does not match the actual + // revision. + message StreamRevisionConflict { + option (error_info) = { + code : "REVISION_CONFLICT", + severity : RECOVERABLE, + message : "The actual stream revision does not match the expected revision." + }; + + // The actual revision of the stream. + int64 stream_revision = 1 [jstype = JS_STRING]; + } + + // When the transaction exceeds the maximum size allowed + // (its bigger than the configured chunk size). + message TransactionMaxSizeExceeded { + option (error_info) = { + code : "TRANSACTION_MAX_SIZE_EXCEEDED", + severity : FATAL, + message : "The transaction exceeds the maximum size allowed." + }; + + // The maximum allowed size of the transaction. + uint32 max_size = 1; + } + + // When the user is not found. + message UserNotFound { + option (error_info) = { + code : "USER_NOT_FOUND", + severity : RECOVERABLE, + message : "The specified user was not found." + }; + } + + // When the user is not authenticated. + message NotAuthenticated { + option (error_info) = { + code : "NOT_AUTHENTICATED", + severity : RECOVERABLE, + message : "The user is not authenticated." + }; + } + + message LogPositionNotFound { + option (error_info) = { + code : "LOG_POSITION_NOT_FOUND", + severity : RECOVERABLE, + message : "The specified log position was not found." + }; + } +} diff --git a/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto b/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto new file mode 100644 index 00000000..7fe57056 --- /dev/null +++ b/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto @@ -0,0 +1,481 @@ +syntax = "proto3"; + +package kurrentdb.protocol.v2; + +option csharp_namespace = "KurrentDB.Protocol.Streams.V2"; +option java_package = "io.kurrentdb.protocol.streams.v2"; +option java_multiple_files = true; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/descriptor.proto"; + +import "kurrentdb/protocol/v2/streams/shared.proto"; +import "kurrentdb/protocol/v2/core.proto"; + +service StreamsService { + // Executes an atomic operation to append records to multiple streams. + // This transactional method ensures that all appends either succeed + // completely, or are entirely rolled back, thereby maintaining strict data + // consistency across all involved streams. + rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse); + + // Streaming version of MultiStreamAppend that allows clients to send multiple + // append requests over a single connection. When the stream completes, all + // records are appended transactionally (all succeed or fail together). + // Provides improved efficiency for high-throughput scenarios while + // maintaining the same transactional guarantees. + rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse); + +// // Appends records to a specific stream. +// rpc AppendStream(AppendStreamRequest) returns (AppendStreamResponse); + +// // Append batches of records to a stream continuously, while guaranteeing pipelined +// // requests are processed in order. If any request fails, the session is terminated. +// rpc AppendStreamSession(stream AppendStreamRequest) returns (stream AppendStreamResponse); + +// // Retrieve a batch of records +// rpc ReadStream(ReadRequest) returns (ReadResponse); + + // Retrieve batches of records continuously. + rpc ReadSession(ReadRequest) returns (stream ReadResponse); +} + +//=================================================================== +// Append Operations +//=================================================================== + +// Record to be appended to a stream. +message AppendRecord { + // Universally Unique identifier for the record. + // If not provided, the server will generate a new one. + optional string record_id = 1; + +// // The name of the stream to append the record to. +// optional string stream = 6; +// +// // The name of the schema in the registry that defines the structure of the record. +// string schema_name = 4; +// +// // The format of the data in the record. +// SchemaDataFormat data_format = 5; + + // A collection of properties providing additional information about the + // record. This can include user-defined metadata or system properties. + // System properties are prefixed with "$." to avoid conflicts with user-defined properties. + // For example, "$schema.name" or "$schema.data-format". + map properties = 2; + + // The actual data payload of the record, stored as bytes. + bytes data = 3; +} + +// Constants that match the expected state of a stream during an +// append operation. It can be used to specify whether the stream should exist, +// not exist, or can be in any state. +enum ExpectedRevisionConstants { + // The stream should exist and the expected revision should match the current + EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; + // It is not important whether the stream exists or not. + EXPECTED_REVISION_CONSTANTS_ANY = -2; + // The stream should not exist. If it does, the append will fail. + EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; + // The stream should exist + EXPECTED_REVISION_CONSTANTS_EXISTS = -4; +} + +// Represents the input for appending records to a specific stream. +message AppendStreamRequest { + // The name of the stream to append records to. + string stream = 1; + // The records to append to the stream. + repeated AppendRecord records = 2; + // The expected revision of the stream. If the stream's current revision does + // not match, the append will fail. + // The expected revision can also be one of the special values + // from ExpectedRevisionConstants. + // Missing value means no expectation, the same as EXPECTED_REVISION_CONSTANTS_ANY + optional sint64 expected_revision = 3 [jstype = JS_STRING]; +} + +// Success represents the successful outcome of an append operation. +message AppendStreamSuccess { + // The name of the stream to which records were appended. + string stream = 1; + // The position of the last appended record in the stream. + int64 position = 2 [jstype = JS_STRING]; + // The expected revision of the stream after the append operation. + int64 stream_revision = 3 [jstype = JS_STRING]; +} + +// Failure represents the detailed error information when an append operation fails. +message AppendStreamFailure { + // The name of the stream to which records were appended. + string stream = 1; + + // The error details + oneof error { + // Failed because the actual stream revision didn't match the expected revision. + ErrorDetails.StreamRevisionConflict stream_revision_conflict = 2; + // Failed because the client lacks sufficient permissions. + ErrorDetails.AccessDenied access_denied = 3; + // Failed because the target stream has been deleted. + ErrorDetails.StreamDeleted stream_deleted = 4; + // Failed because the stream was not found. + ErrorDetails.StreamNotFound stream_not_found = 5; + // Failed because the transaction exceeded the maximum size allowed + ErrorDetails.TransactionMaxSizeExceeded transaction_max_size_exceeded = 6; + } +} + +// AppendStreamResponse represents the output of appending records to a specific +// stream. +message AppendStreamResponse { + // The result of the append operation. + oneof result { + // Success represents the successful outcome of an append operation. + AppendStreamSuccess success = 1; + // Failure represents the details of a failed append operation. + AppendStreamFailure failure = 2; + } +} + +// MultiStreamAppendRequest represents a request to append records to multiple streams. +message MultiStreamAppendRequest { + // A list of AppendStreamInput messages, each representing a stream to which records should be appended. + repeated AppendStreamRequest input = 1; +} + +// Response from the MultiStreamAppend operation. +message MultiStreamAppendResponse { + oneof result { + // Success represents the successful outcome of a multi-stream append operation. + Success success = 1; + // Failure represents the details of a failed multi-stream append operation. + Failure failure = 2; + } + + message Success { + repeated AppendStreamSuccess output = 1; + } + + message Failure { + repeated AppendStreamFailure output = 1; + } +} + +//=================================================================== +// Read Operations +//=================================================================== + +// The scope of the read filter determines where the filter will be applied. +enum ReadFilterScope { + READ_FILTER_SCOPE_UNSPECIFIED = 0; + // The filter will be applied to the record stream name + READ_FILTER_SCOPE_STREAM = 1; + // The filter will be applied to the record schema name + READ_FILTER_SCOPE_SCHEMA_NAME = 2; + // The filter will be applied to the properties of the record + READ_FILTER_SCOPE_PROPERTIES = 3; + // The filter will be applied to all the record properties + // including the stream and schema name + READ_FILTER_SCOPE_RECORD = 4; +} + +// The filter to apply when reading records from the database +// The combination of stream scope and literal expression indicates a direct stream name match, +// while a regex expression indicates a pattern match across multiple streams. +message ReadFilter { + // The scope of the filter. + ReadFilterScope scope = 1; + // The expression can be a regular expression or a literal value. + // If it starts with "~" it will be considered a regex. + string expression = 2; + + // // The optional name of the record property to filter on. + // optional string property_name = 3; + + // The optional property names to filter on. + repeated string property_names = 4; +} + +// Record retrieved from the database. +message Record { + // The unique identifier of the record in the database. + string record_id = 1; + // The position of the record in the database. + int64 position = 5 [jstype = JS_STRING]; + // The actual data payload of the record, stored as bytes. + bytes data = 2; + // Additional information about the record. + map properties = 3; + // When the record was created. + google.protobuf.Timestamp timestamp = 4; + // The stream to which the record belongs. + optional string stream = 6; + // The revision of the stream created when the record was appended. + optional int64 stream_revision = 7 [jstype = JS_STRING]; +} + +// The direction in which to read records from the database (forwards or backwards). +enum ReadDirection { + READ_DIRECTION_FORWARDS = 0; + READ_DIRECTION_BACKWARDS = 1; +} + +// The position from which to start reading records. +// This can be either the earliest or latest position in the stream. +enum ReadPositionConstants { + READ_POSITION_CONSTANTS_UNSPECIFIED = 0; + READ_POSITION_CONSTANTS_EARLIEST = 1; + READ_POSITION_CONSTANTS_LATEST = 2; +} + +// Represents the successful outcome of a read operation. +message ReadSuccess { + repeated Record records = 1; +} + +// Represents the detailed error information when a read operation fails. +message ReadFailure { + // The error details + oneof error { + // Failed because the client lacks sufficient permissions. + ErrorDetails.AccessDenied access_denied = 1; + // Failed because the target stream has been deleted. + ErrorDetails.StreamDeleted stream_deleted = 2; + // Failed because the expected stream revision did not match the actual revision. + ErrorDetails.StreamNotFound stream_not_found = 3; + } +} + +message ReadRequest { + // The filter to apply when reading records. + optional ReadFilter filter = 1; + // The starting position of the log from which to read records. + optional int64 start_position = 2 [jstype = JS_STRING]; + // Limit how many records can be returned. + // This will get capped at the default limit, + // which is up to 1000 records. + optional int64 limit = 3 [jstype = JS_STRING]; + // The direction in which to read the stream (forwards or backwards). + ReadDirection direction = 4; + // Heartbeats can be enabled to monitor end-to-end session health. + HeartbeatOptions heartbeats = 5; + // The number of records to read in a single batch. + int32 batch_size = 6; +} + +//message SubscriptionConfirmed { +// // The subscription ID that was confirmed. +// string subscription_id = 1; +// // The position of the last record read by the server. +// optional int64 position = 2 [jstype = JS_STRING]; +// // When the subscription was confirmed. +// google.protobuf.Timestamp timestamp = 3; +//} + +// Read session response. +message ReadResponse { + oneof result { + // Success represents the successful outcome of an read operation. + ReadSuccess success = 1; + // Failure represents the details of a failed read operation. + ReadFailure failure = 2; + // Heartbeat represents the health check of the read operation when + // the server has not found any records matching the filter for the specified + // period of time or records threshold. + // A heartbeat will be sent when the initial switch to real-time tailing happens. + Heartbeat heartbeat = 3; + } +} + +// A health check will be sent when the server has not found any records +// matching the filter for the specified period of time or records threshold. A +// heartbeat will be sent when the initial switch to real-time tailing happens. +message HeartbeatOptions { + bool enable = 1; + optional google.protobuf.Duration period = 2; // 30 seconds + optional int32 records_threshold = 3; // 500 +} + +enum HeartbeatType { + HEARTBEAT_TYPE_UNSPECIFIED = 0; + HEARTBEAT_TYPE_CHECKPOINT = 1; + HEARTBEAT_TYPE_CAUGHT_UP = 2; + HEARTBEAT_TYPE_FELL_BEHIND = 3; +} + +message Heartbeat { + // This indicates whether the subscription is caught up, fell behind, or + // the filter has not been satisfied after a period of time or records threshold. + HeartbeatType type = 1; + // Checkpoint for resuming reads. + // It will always be populated unless the database is empty. + int64 position = 2 [jstype = JS_STRING]; + // When the heartbeat was sent. + google.protobuf.Timestamp timestamp = 3; +} + +////=================================================================== +//// Read Operations +////=================================================================== +// +//enum ConsumeFilterScope { +// CONSUME_FILTER_SCOPE_UNSPECIFIED = 0; +// // The filter will be applied to the stream name +// CONSUME_FILTER_SCOPE_STREAM = 1; +// // The filter will be applied to the record schema name +// CONSUME_FILTER_SCOPE_RECORD = 2; +// // The filter will be applied to the properties of record +// CONSUME_FILTER_SCOPE_PROPERTIES = 3; +// // The filter will be applied to the record data +// CONSUME_FILTER_SCOPE_DATA = 4; +//} +// +//// The filter to apply when reading records from the database +//// It applies to a stream or a record +//message ConsumeFilter { +// // The scope of the filter. +// ConsumeFilterScope scope = 1; +// // The expression can be a regular expression, a jsonpath expression, or a literal value. +// // if it starts with "~" it will be considered a regex and if it starts with "$" it will be considered a jsonpath filter, else its a literal. +// string expression = 2; +// // The name of the record property to filter on. +// optional string property_name = 3; +//} +// +//// Record retrieved from the database. +//message Record { +// // The unique identifier of the record in the database. +// string record_id = 1; +// // The position of the record in the database. +// int64 position = 5 [jstype = JS_STRING]; +// // The actual data payload of the record, stored as bytes. +// bytes data = 2; +// // Additional information about the record. +// map properties = 3; +// // When the record was created. +// google.protobuf.Timestamp timestamp = 4; +// // The stream to which the record belongs. +// optional string stream = 6; +// // The revision of the stream created when the record was appended. +// optional int64 stream_revision = 7 [jstype = JS_STRING]; +//} +// +////// A batch of records. +////message RecordBatch { +//// repeated Record records = 1; +////} +// +//// The direction in which to read records from the database (forwards or backwards). +//enum ReadDirection { +// READ_DIRECTION_FORWARDS = 0; +// READ_DIRECTION_BACKWARDS = 1; +//} +// +//// The position from which to start reading records. +//// This can be either the earliest or latest position in the stream. +//enum ReadPositionConstants { +// READ_POSITION_CONSTANTS_UNSPECIFIED = 0; +// READ_POSITION_CONSTANTS_EARLIEST = 1; +// READ_POSITION_CONSTANTS_LATEST = 2; +//} +// +//message ReadStreamRequest { +// // The filter to apply when reading records. +// optional ConsumeFilter filter = 1; +// // The starting position of the log from which to read records. +// optional int64 start_position = 2 [jstype = JS_STRING]; +// // Limit how many records can be returned. +// // This will get capped at the default limit, +// // which is up to 1000 records. +// optional int64 limit = 3 [jstype = JS_STRING]; +// // The direction in which to read the stream (forwards or backwards). +// ReadDirection direction = 4; +//} +// +//message ReadStreamSuccess { +// repeated Record records = 1; +//} +// +//// Represents the detailed error information when a read operation fails. +//message ReadStreamFailure { +// // The error details +// oneof error { +// // Failed because the client lacks sufficient permissions. +// ErrorDetails.AccessDenied access_denied = 3; +// // Failed because the target stream has been deleted. +// ErrorDetails.StreamDeleted stream_deleted = 4; +// } +//} +//message ReadStreamResponse { +// // The result of the read operation. +// oneof result { +// // Success represents the successful outcome of an read operation. +// ReadStreamSuccess success = 1; +// // Failure represents the details of a failed read operation. +// ReadStreamFailure failure = 2; +// // Heartbeat represents the health check of the read operation when +// // the server has not found any records matching the filter for the specified +// // period of time or records threshold. +// // A heartbeat will be sent when the initial switch to real-time tailing happens. +// Heartbeat heartbeat = 3; +// } +//} +// +//message ReadSessionRequest { +// // The filter to apply when reading records. +// optional ConsumeFilter filter = 1; +// // The starting position of the log from which to read records. +// optional int64 start_position = 2 [jstype = JS_STRING]; +// // Limit how many records can be returned. +// // This will get capped at the default limit, +// // which is up to 1000 records. +// optional int64 limit = 3 [jstype = JS_STRING]; +// // The direction in which to read the stream (forwards or backwards). +// ReadDirection direction = 4; +// // Heartbeats can be enabled to monitor end-to-end session health. +// HeartbeatOptions heartbeats = 5; +//} +// +//// Read session response. +//message ReadSessionResponse { +// oneof result { +// // Success represents the successful outcome of an read operation. +// ReadStreamSuccess success = 1; +// // Failure represents the details of a failed read operation. +// ReadStreamFailure failure = 2; +// // Heartbeat represents the health check of the read operation when +// // the server has not found any records matching the filter for the specified +// // period of time or records threshold. +// // A heartbeat will be sent when the initial switch to real-time tailing happens. +// Heartbeat heartbeat = 3; +// } +//} +// +//// A health check will be sent when the server has not found any records +//// matching the filter for the specified period of time or records threshold. A +//// heartbeat will be sent when the initial switch to real-time tailing happens. +//message HeartbeatOptions { +// bool enable = 1; +// //optional google.protobuf.Duration period = 2; +// optional int32 records_threshold = 3; // 1000 +//} +// +//enum HeartbeatType { +// HEARTBEAT_TYPE_UNSPECIFIED = 0; +// HEARTBEAT_TYPE_CHECKPOINT = 1; +// HEARTBEAT_TYPE_CAUGHT_UP = 2; +//} +// +//message Heartbeat { +// // This indicates whether the subscription is caught up, fell behind, or +// // the filter has not been satisfied after a period of time or records threshold. +// HeartbeatType type = 1; +// // Checkpoint for resuming reads. +// // It will always be populated unless the database is empty. +// int64 position = 2 [jstype = JS_STRING]; +// // When the heartbeat was sent. +// google.protobuf.Timestamp timestamp = 3; +//} diff --git a/src/main/proto/streams.v2.proto b/src/main/proto/streams.v2.proto deleted file mode 100644 index 6aac3647..00000000 --- a/src/main/proto/streams.v2.proto +++ /dev/null @@ -1,170 +0,0 @@ -syntax = "proto3"; - -// -// This protocol is UNSTABLE in the sense of being subject to change. -// - -package kurrentdb.protocol.v2; - -option csharp_namespace = "KurrentDB.Protocol.V2"; -option java_package = "io.kurrentdb.v2"; -option java_multiple_files = true; - -import "dynamic-value.proto"; - -service StreamsService { - // Executes an atomic operation to append records to multiple streams. - // This transactional method ensures that all appends either succeed - // completely, or are entirely rolled back, thereby maintaining strict data - // consistency across all involved streams. - rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse); - - // Streaming version of MultiStreamAppend that allows clients to send multiple - // append requests over a single connection. When the stream completes, all - // records are appended transactionally (all succeed or fail together). - // Provides improved efficiency for high-throughput scenarios while - // maintaining the same transactional guarantees. - rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse); -} - -// Record to be appended to a stream. -message AppendRecord { - // Universally Unique identifier for the record. Must be a guid. - // If not provided, the server will generate a new one. - optional string record_id = 1; - - // A collection of properties providing additional system information about the - // record. - map properties = 2; - - // The actual data payload of the record, stored as bytes. - bytes data = 3; -} - -// Constants that match the expected state of a stream during an -// append operation. It can be used to specify whether the stream should exist, -// not exist, or can be in any state. -enum ExpectedRevisionConstants { - // The stream should exist and have a single event. - EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; - - // It is not important whether the stream exists or not. - EXPECTED_REVISION_CONSTANTS_ANY = -2; - - // The stream should not exist. If it does, the append will fail. - EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; - - // The stream should exist - EXPECTED_REVISION_CONSTANTS_EXISTS = -4; -} - -// Represents the input for appending records to a specific stream. -message AppendStreamRequest { - // The name of the stream to append records to. - string stream = 1; - - // The records to append to the stream. - repeated AppendRecord records = 2; - - // The expected revision of the stream. If the stream's current revision does - // not match, the append will fail. - // The expected revision can also be one of the special values - // from ExpectedRevisionConstants. - // missing value means no expectation: same as EXPECTED_REVISION_CONSTANTS_ANY - optional sint64 expected_revision = 3; -} - -// Success represents the successful outcome of an append operation. -message AppendStreamSuccess { - // The name of the stream to which records were appended. - string stream = 1; - - // The position of the last appended record in the transaction. - int64 position = 2; - - // The revision of the stream after the append operation. - int64 stream_revision = 3; -} - -// Failure represents the detailed error information when an append operation fails. -message AppendStreamFailure { - // The name of the stream to which records failed to append. - string stream = 1; - - // The error details - oneof error { - // Failed because the actual stream revision didn't match the expected revision. - ErrorDetails.WrongExpectedRevision wrong_expected_revision = 2; - - // Failed because the client lacks sufficient permissions. - ErrorDetails.AccessDenied access_denied = 3; - - // Failed because the target stream has been deleted. - ErrorDetails.StreamDeleted stream_deleted = 4; - - ErrorDetails.TransactionMaxSizeExceeded transaction_max_size_exceeded = 5; - } -} - -// Represents the output of appending records to a specific stream. -message AppendStreamResponse { - // The result of the append operation. - oneof result { - // Success represents the successful outcome of an append operation. - AppendStreamSuccess success = 1; - - // Failure represents the details of a failed append operation. - AppendStreamFailure failure = 2; - } -} - -// MultiStreamAppendRequest represents a request to append records to multiple streams. -message MultiStreamAppendRequest { - // A list of AppendStreamInput messages, each representing a stream to which records should be appended. - repeated AppendStreamRequest input = 1; -} - -// Response from the MultiStreamAppend operation. -message MultiStreamAppendResponse { - oneof result { - // Success represents the successful outcome of a multi-stream append operation. - Success success = 1; - - // Failure represents the details of a failed multi-stream append operation. - Failure failure = 2; - } - - message Success { - repeated AppendStreamSuccess output = 1; - } - - message Failure { - repeated AppendStreamFailure output = 1; - } -} - -// ErrorDetails provides detailed information about specific error conditions. -message ErrorDetails { - // When the user does not have sufficient permissions to perform the operation. - message AccessDenied { - // The reason for access denial. - string reason = 1; - } - - // When the stream has been deleted. - message StreamDeleted { - } - - // When the expected revision of the stream does not match the actual revision. - message WrongExpectedRevision { - // The actual revision of the stream. - int64 stream_revision = 1; - } - - // When the transaction exceeds the maximum size allowed - // (it's bigger than the configured chunk size). - message TransactionMaxSizeExceeded { - // The maximum allowed size of the transaction. - int32 max_size = 1; - } -} \ No newline at end of file diff --git a/src/test/java/io/kurrent/dbclient/misc/ParseValidConnectionStringTests.java b/src/test/java/io/kurrent/dbclient/misc/ParseValidConnectionStringTests.java index 8000293f..d5ed09fe 100644 --- a/src/test/java/io/kurrent/dbclient/misc/ParseValidConnectionStringTests.java +++ b/src/test/java/io/kurrent/dbclient/misc/ParseValidConnectionStringTests.java @@ -18,7 +18,7 @@ public class ParseValidConnectionStringTests { private final JsonMapper mapper = new JsonMapper(); - private static final List PROTOCOLS = Arrays.asList("esdb", "kurrent", "kdb"); + private static final List PROTOCOLS = Arrays.asList("esdb", "kurrentdb", "kdb"); public static Stream validConnectionStrings() { List baseConnectionStrings = Arrays.asList(