Skip to content

Commit 12021e6

Browse files
authored
Merge pull request kurrent-io#295 from rickardoberg/batchingreactivestream
Support back-pressure on reads and subscriptions.
2 parents cdd1ca2 + 693a35e commit 12021e6

22 files changed

+539
-372
lines changed

Diff for: db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java

+13-80
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@
33
import com.eventstore.dbclient.proto.shared.Shared;
44
import com.eventstore.dbclient.proto.streams.StreamsGrpc;
55
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
6-
import io.grpc.Metadata;
7-
import io.grpc.StatusRuntimeException;
8-
import io.grpc.stub.ClientCallStreamObserver;
9-
import io.grpc.stub.ClientResponseObserver;
106
import org.reactivestreams.Publisher;
117
import org.reactivestreams.Subscriber;
128

@@ -16,9 +12,9 @@ abstract class AbstractRead implements Publisher<ReadMessage> {
1612
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions;
1713

1814
private final GrpcClient client;
19-
private final OptionsBase<?> options;
15+
private final OptionsWithBackPressure<?> options;
2016

21-
protected AbstractRead(GrpcClient client, OptionsBase<?> options) {
17+
protected AbstractRead(GrpcClient client, OptionsWithBackPressure<?> options) {
2218
this.client = client;
2319
this.options = options;
2420
}
@@ -32,86 +28,23 @@ protected AbstractRead(GrpcClient client, OptionsBase<?> options) {
3228
public abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();
3329

3430
@Override
35-
@SuppressWarnings("unchecked")
3631
public void subscribe(Subscriber<? super ReadMessage> subscriber) {
37-
ReadSubscription readSubscription = new ReadSubscription(subscriber);
38-
subscriber.onSubscribe(readSubscription);
32+
ReadResponseObserver observer = new ReadResponseObserver(options, new ReadStreamConsumer(subscriber));
33+
34+
this.client.getWorkItemArgs().whenComplete((args, error) -> {
35+
if (error != null) {
36+
observer.onError(error);
37+
return;
38+
}
3939

40-
CompletableFuture<ReadSubscription> result = new CompletableFuture<>();
41-
this.client.run(channel -> {
4240
StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder()
4341
.setOptions(createOptions())
4442
.build();
4543

46-
StreamsGrpc.StreamsStub client = GrpcUtils.configureStub(StreamsGrpc.newStub(channel), this.client.getSettings(), this.options);
47-
48-
client.read(request, new ClientResponseObserver<StreamsOuterClass.ReadReq, StreamsOuterClass.ReadResp>() {
49-
@Override
50-
public void beforeStart(ClientCallStreamObserver<StreamsOuterClass.ReadReq> requestStream) {
51-
readSubscription.setStreamObserver(requestStream);
52-
}
53-
54-
private boolean completed = false;
55-
56-
@Override
57-
public void onNext(StreamsOuterClass.ReadResp value) {
58-
if (this.completed) {
59-
return;
60-
}
61-
if (value.hasStreamNotFound()) {
62-
StreamNotFoundException streamNotFoundException = new StreamNotFoundException();
63-
handleError(streamNotFoundException);
64-
return;
65-
}
66-
67-
try {
68-
readSubscription.onNext(new ReadMessage(value));
69-
} catch (Throwable t) {
70-
handleError(t);
71-
}
72-
}
73-
74-
@Override
75-
public void onCompleted() {
76-
if (this.completed) {
77-
return;
78-
}
79-
this.completed = true;
80-
result.complete(readSubscription);
81-
readSubscription.onCompleted();
82-
}
83-
84-
@Override
85-
public void onError(Throwable t) {
86-
if (this.completed) {
87-
return;
88-
}
89-
90-
if (t instanceof StatusRuntimeException) {
91-
StatusRuntimeException e = (StatusRuntimeException) t;
92-
String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
93-
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));
94-
95-
if (leaderHost != null && leaderPort != null) {
96-
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
97-
handleError(reason);
98-
return;
99-
}
100-
}
101-
102-
handleError(t);
103-
}
104-
105-
private void handleError(Throwable t) {
106-
this.completed = true;
107-
result.completeExceptionally(t);
108-
readSubscription.onError(t);
109-
}
110-
});
111-
return result;
112-
}).exceptionally(t -> {
113-
readSubscription.onError(t);
114-
return readSubscription;
44+
StreamsGrpc.StreamsStub client = GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
45+
observer.onConnected(args);
46+
subscriber.onSubscribe(observer.getSubscription());
47+
client.read(request, observer);
11548
});
11649
}
11750
}

Diff for: db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java

+29-118
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,19 @@
33
import com.eventstore.dbclient.proto.shared.Shared;
44
import com.eventstore.dbclient.proto.streams.StreamsGrpc;
55
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
6-
import io.grpc.Metadata;
7-
import io.grpc.Status;
8-
import io.grpc.StatusRuntimeException;
9-
import io.grpc.stub.ClientCallStreamObserver;
10-
import io.grpc.stub.ClientResponseObserver;
11-
import org.slf4j.Logger;
12-
import org.slf4j.LoggerFactory;
136

14-
import javax.validation.constraints.NotNull;
157
import java.util.concurrent.CompletableFuture;
168

179
abstract class AbstractRegularSubscription {
18-
private static Logger logger = LoggerFactory.getLogger(AbstractRegularSubscription.class);
1910
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions;
2011
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultSubscribeOptions;
2112

2213
protected SubscriptionListener listener;
2314
protected Checkpointer checkpointer = null;
2415
private final GrpcClient client;
25-
private final OptionsBase<?> options;
16+
private final OptionsWithBackPressure<?> options;
2617

27-
protected AbstractRegularSubscription(GrpcClient client, OptionsBase<?> options) {
18+
protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure<?> options) {
2819
this.client = client;
2920
this.options = options;
3021
}
@@ -40,120 +31,40 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsBase<?> options)
4031

4132
protected abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();
4233

43-
@SuppressWarnings("unchecked")
4434
public CompletableFuture<Subscription> execute() {
45-
return this.client.run(channel -> {
46-
StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder()
47-
.setOptions(createOptions())
48-
.build();
49-
50-
StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(StreamsGrpc.newStub(channel), this.client.getSettings(), this.options);
51-
52-
CompletableFuture<Subscription> future = new CompletableFuture<>();
53-
ClientResponseObserver<StreamsOuterClass.ReadReq, StreamsOuterClass.ReadResp> observer = new ClientResponseObserver<StreamsOuterClass.ReadReq, StreamsOuterClass.ReadResp>() {
54-
private boolean _confirmed;
55-
private Subscription _subscription;
56-
private ClientCallStreamObserver<StreamsOuterClass.ReadReq> _requestStream;
57-
58-
@Override
59-
public void beforeStart(ClientCallStreamObserver<StreamsOuterClass.ReadReq> requestStream) {
60-
this._requestStream = requestStream;
61-
}
62-
63-
@Override
64-
public void onNext(@NotNull StreamsOuterClass.ReadResp readResp) {
65-
if (!_confirmed && readResp.hasConfirmation()) {
66-
this._confirmed = true;
67-
this._subscription = new Subscription(this._requestStream,
68-
readResp.getConfirmation().getSubscriptionId(), checkpointer);
69-
future.complete(this._subscription);
70-
listener.onConfirmation(this._subscription);
71-
return;
72-
}
73-
74-
if (!_confirmed && readResp.hasEvent()) {
75-
onError(new IllegalStateException("Unconfirmed subscription received event"));
76-
return;
77-
}
78-
79-
if (_confirmed && readResp.hasCheckpoint()) {
80-
Checkpointer checkpointer = this._subscription.getCheckpointer();
81-
if (checkpointer == null) {
82-
return;
83-
}
84-
85-
StreamsOuterClass.ReadResp.Checkpoint checkpoint = readResp.getCheckpoint();
86-
Position checkpointPos = new Position(checkpoint.getCommitPosition(), checkpoint.getPreparePosition());
87-
checkpointer.onCheckpoint(this._subscription, checkpointPos);
88-
return;
89-
}
90-
91-
if (_confirmed && readResp.hasCaughtUp()) {
92-
listener.onCaughtUp(_subscription);
93-
return;
94-
}
95-
96-
if (_confirmed && readResp.hasFellBehind()) {
97-
listener.onFellBehind(_subscription);
98-
return;
99-
}
35+
CompletableFuture<Subscription> future = new CompletableFuture<>();
10036

101-
if (_confirmed && !readResp.hasEvent()) {
102-
logger.warn(
103-
String.format("Confirmed subscription %s received non-{event,checkpoint} variant",
104-
_subscription.getSubscriptionId()));
105-
return;
106-
}
37+
this.client.getWorkItemArgs().whenComplete((args, error) -> {
38+
if (error != null) {
39+
future.completeExceptionally(error);
40+
return;
41+
}
10742

108-
try {
109-
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent());
110-
ClientTelemetry.traceSubscribe(
111-
() -> listener.onEvent(this._subscription, resolvedEvent),
112-
_subscription.getSubscriptionId(),
113-
channel,
114-
client.getSettings(),
115-
options.getCredentials(),
116-
resolvedEvent.getEvent());
117-
} catch (Exception e) {
118-
onError(e);
119-
}
120-
}
43+
ReadResponseObserver observer = createObserver(args, future);
44+
observer.onConnected(args);
12145

122-
@Override
123-
public void onError(Throwable throwable) {
124-
if (!_confirmed) {
125-
future.completeExceptionally(throwable);
126-
}
127-
128-
Throwable error = throwable;
129-
if (error instanceof StatusRuntimeException) {
130-
StatusRuntimeException sre = (StatusRuntimeException) error;
131-
String desc = sre.getStatus().getDescription();
132-
if (sre.getStatus().getCode() == Status.Code.CANCELLED && desc != null && desc.equals("user-initiated")) {
133-
listener.onCancelled(this._subscription, null);
134-
return;
135-
}
136-
137-
String leaderHost = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
138-
String leaderPort = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));
139-
140-
if (leaderHost != null && leaderPort != null) {
141-
error = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
142-
}
143-
}
144-
145-
listener.onCancelled(this._subscription, error);
146-
}
147-
148-
@Override
149-
public void onCompleted() {
150-
// Subscriptions should only complete on error.
151-
}
152-
};
46+
StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder()
47+
.setOptions(createOptions())
48+
.build();
15349

50+
StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
15451
streamsClient.read(readReq, observer);
52+
});
15553

156-
return future;
54+
return future;
55+
}
56+
57+
private ReadResponseObserver createObserver(WorkItemArgs args, CompletableFuture<Subscription> future) {
58+
StreamConsumer consumer = new SubscriptionStreamConsumer(this.listener, this.checkpointer, future, (subscriptionId, event, action) -> {
59+
ClientTelemetry.traceSubscribe(
60+
action,
61+
subscriptionId,
62+
args.getChannel(),
63+
client.getSettings(),
64+
options.getCredentials(),
65+
event);
15766
});
67+
68+
return new ReadResponseObserver(this.options, consumer);
15869
}
15970
}

Diff for: db-client-java/src/main/java/com/eventstore/dbclient/ConnectionService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ public void process(RunWorkItem args) {
175175
this.channelId,
176176
this.connection.getCurrentChannel(),
177177
this.connection.getLastConnectedEndpoint(),
178-
this.serverInfo);
178+
this.serverInfo,
179+
this.queue);
179180

180181
args.getItem().accept(workArgs, null);
181182
}

0 commit comments

Comments
 (0)