Skip to content

Commit b16a402

Browse files
feat: add server call stream observer for rx (#6)
* feat: add server call stream observer for rx * refactor: make constructor public
1 parent 899ce73 commit b16a402

File tree

9 files changed

+213
-12
lines changed

9 files changed

+213
-12
lines changed

build.gradle.kts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,11 @@ subprojects {
1515
license.set(License.APACHE_2_0)
1616
}
1717
}
18+
19+
pluginManager.withPlugin("java") {
20+
configure<JavaPluginExtension> {
21+
sourceCompatibility = JavaVersion.VERSION_11
22+
targetCompatibility = JavaVersion.VERSION_11
23+
}
24+
}
1825
}

grpc-client-rx-utils/build.gradle.kts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ plugins {
66
}
77

88
dependencies {
9-
api("io.reactivex.rxjava3:rxjava:3.0.5")
10-
api("io.grpc:grpc-stub:1.31.1")
9+
api("io.reactivex.rxjava3:rxjava:3.0.6")
10+
api("io.grpc:grpc-stub:1.32.1")
1111
api(project(":grpc-context-utils"))
12-
implementation("io.grpc:grpc-context:1.31.1")
12+
implementation("io.grpc:grpc-context:1.32.1")
1313

14-
testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
15-
testImplementation("org.mockito:mockito-core:3.5.0")
16-
testImplementation("org.mockito:mockito-junit-jupiter:3.5.0")
14+
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
15+
testImplementation("org.mockito:mockito-core:3.5.11")
16+
testImplementation("org.mockito:mockito-junit-jupiter:3.5.11")
1717
}
1818

1919
tasks.test {

grpc-client-utils/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ dependencies {
1313
// End Logging
1414

1515
// grpc
16-
implementation("io.grpc:grpc-core:1.31.1")
16+
implementation("io.grpc:grpc-core:1.32.1")
1717

18-
testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
18+
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
1919
testImplementation("org.mockito:mockito-core:3.4.4")
2020
}
2121

grpc-context-utils/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ tasks.test {
1111

1212
dependencies {
1313
// grpc
14-
implementation("io.grpc:grpc-core:1.31.1")
14+
implementation("io.grpc:grpc-core:1.32.1")
1515

1616
// Logging
1717
implementation("org.slf4j:slf4j-api:1.7.30")
1818
// End Logging
1919

20-
testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
20+
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
2121
}

grpc-server-rx-utils/build.gradle.kts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
plugins {
2+
`java-library`
3+
jacoco
4+
id("org.hypertrace.publish-plugin")
5+
id("org.hypertrace.jacoco-report-plugin")
6+
}
7+
8+
dependencies {
9+
api("io.reactivex.rxjava3:rxjava:3.0.6")
10+
api("io.grpc:grpc-stub:1.32.1")
11+
12+
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
13+
testImplementation("org.mockito:mockito-core:3.5.11")
14+
testImplementation("org.mockito:mockito-junit-jupiter:3.5.11")
15+
}
16+
17+
tasks.test {
18+
useJUnitPlatform()
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.hypertrace.core.grpcutils.server.rx;
2+
3+
import io.grpc.stub.ServerCallStreamObserver;
4+
import io.reactivex.rxjava3.annotations.NonNull;
5+
import io.reactivex.rxjava3.core.CompletableObserver;
6+
import io.reactivex.rxjava3.core.MaybeObserver;
7+
import io.reactivex.rxjava3.core.Observer;
8+
import io.reactivex.rxjava3.core.SingleObserver;
9+
import io.reactivex.rxjava3.observers.DefaultObserver;
10+
11+
public class ServerCallStreamRxObserver<T> extends DefaultObserver<T>
12+
implements Observer<T>, MaybeObserver<T>, SingleObserver<T>, CompletableObserver {
13+
14+
private final ServerCallStreamObserver<T> serverCallStreamObserver;
15+
16+
public ServerCallStreamRxObserver(ServerCallStreamObserver<T> serverCallStreamObserver) {
17+
this.serverCallStreamObserver = serverCallStreamObserver;
18+
}
19+
20+
@Override
21+
protected void onStart() {
22+
this.serverCallStreamObserver.setOnCancelHandler(this::cancel);
23+
}
24+
25+
@Override
26+
public void onSuccess(@NonNull T value) {
27+
this.onNext(value);
28+
this.onComplete();
29+
}
30+
31+
@Override
32+
public void onNext(@NonNull T value) {
33+
this.serverCallStreamObserver.onNext(value);
34+
}
35+
36+
@Override
37+
public void onError(@NonNull Throwable throwable) {
38+
this.serverCallStreamObserver.onError(throwable);
39+
}
40+
41+
@Override
42+
public void onComplete() {
43+
this.serverCallStreamObserver.onCompleted();
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package org.hypertrace.core.grpcutils.server.rx;
2+
3+
import static org.mockito.ArgumentMatchers.any;
4+
import static org.mockito.Mockito.inOrder;
5+
import static org.mockito.Mockito.verify;
6+
import static org.mockito.Mockito.verifyNoMoreInteractions;
7+
8+
import io.grpc.stub.ServerCallStreamObserver;
9+
import io.reactivex.rxjava3.core.Completable;
10+
import io.reactivex.rxjava3.core.Maybe;
11+
import io.reactivex.rxjava3.core.Observable;
12+
import io.reactivex.rxjava3.core.Single;
13+
import java.util.Arrays;
14+
import org.junit.jupiter.api.Test;
15+
import org.junit.jupiter.api.extension.ExtendWith;
16+
import org.mockito.ArgumentCaptor;
17+
import org.mockito.InOrder;
18+
import org.mockito.Mock;
19+
import org.mockito.junit.jupiter.MockitoExtension;
20+
21+
@ExtendWith(MockitoExtension.class)
22+
class ServerCallStreamRxObserverTest {
23+
24+
@Mock ServerCallStreamObserver<Object> mockGrpcObserver;
25+
26+
@Test
27+
void propagatesObservableValues() {
28+
Observable.just("foo", "bar")
29+
.blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
30+
verifyValuesAndCompletion("foo", "bar");
31+
}
32+
33+
@Test
34+
void propagatesEmptyObservable() {
35+
Observable.empty().blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
36+
verifyValuesAndCompletion();
37+
}
38+
39+
@Test
40+
void propagatesSingleValue() {
41+
Single.just("single")
42+
.blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
43+
verifyValuesAndCompletion("single");
44+
}
45+
46+
@Test
47+
void propagatesMaybeValue() {
48+
Maybe.just("maybe").blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
49+
verifyValuesAndCompletion("maybe");
50+
}
51+
52+
@Test
53+
void propagatesEmptyMaybe() {
54+
Maybe.empty().blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
55+
verifyValuesAndCompletion();
56+
}
57+
58+
@Test
59+
void propagatesCompletableCompletion() {
60+
Completable.complete()
61+
.blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
62+
verifyValuesAndCompletion();
63+
}
64+
65+
@Test
66+
void propagatesObservableError() {
67+
Throwable error = new IllegalArgumentException("observable");
68+
Observable.error(error)
69+
.blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
70+
verifyThrows(error);
71+
}
72+
73+
@Test
74+
void propagateSingleError() {
75+
Throwable error = new IllegalArgumentException("single");
76+
Single.error(error).blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
77+
verifyThrows(error);
78+
}
79+
80+
@Test
81+
void propagatesMaybeError() {
82+
Throwable error = new IllegalArgumentException("maybe");
83+
Maybe.error(error).blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
84+
verifyThrows(error);
85+
}
86+
87+
@Test
88+
void propagatesCompletableError() {
89+
Throwable error = new IllegalArgumentException("completable");
90+
Completable.error(error)
91+
.blockingSubscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
92+
verifyThrows(error);
93+
}
94+
95+
@Test
96+
void propagatesCancellationRequest() {
97+
Observable.just("first", "second")
98+
.doAfterNext(
99+
value -> {
100+
// Capture the cancellation handler and invoke it to prevent values beyond the first
101+
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
102+
verify(this.mockGrpcObserver).setOnCancelHandler(captor.capture());
103+
captor.getValue().run();
104+
})
105+
.subscribe(new ServerCallStreamRxObserver<>(this.mockGrpcObserver));
106+
107+
verify(this.mockGrpcObserver).onNext("first");
108+
verifyNoMoreInteractions(this.mockGrpcObserver);
109+
}
110+
111+
private void verifyValuesAndCompletion(String... values) {
112+
InOrder callOrder = inOrder(this.mockGrpcObserver);
113+
114+
callOrder.verify(this.mockGrpcObserver).setOnCancelHandler(any());
115+
Arrays.asList(values).forEach(value -> callOrder.verify(this.mockGrpcObserver).onNext(value));
116+
callOrder.verify(this.mockGrpcObserver).onCompleted();
117+
118+
verifyNoMoreInteractions(this.mockGrpcObserver);
119+
}
120+
121+
private void verifyThrows(Throwable throwable) {
122+
InOrder callOrder = inOrder(this.mockGrpcObserver);
123+
124+
callOrder.verify(this.mockGrpcObserver).setOnCancelHandler(any());
125+
callOrder.verify(this.mockGrpcObserver).onError(throwable);
126+
127+
verifyNoMoreInteractions(this.mockGrpcObserver);
128+
}
129+
}

grpc-server-utils/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ dependencies {
1717
// End Logging
1818

1919
// grpc
20-
implementation("io.grpc:grpc-core:1.31.1")
20+
implementation("io.grpc:grpc-core:1.32.1")
2121

22-
testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
22+
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
2323
}

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ plugins {
1414

1515
include(":grpc-client-utils")
1616
include(":grpc-client-rx-utils")
17+
include(":grpc-server-rx-utils")
1718
include(":grpc-context-utils")
1819
include(":grpc-server-utils")

0 commit comments

Comments
 (0)