Skip to content

Commit 094039d

Browse files
Merge pull request #5 from hypertrace/add-stream-utils
feat: add rxjava grpc utils
2 parents 7c51358 + e84d867 commit 094039d

File tree

11 files changed

+350
-7
lines changed

11 files changed

+350
-7
lines changed

grpc-client-rx-utils/build.gradle.kts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
plugins {
2+
`java-library`
3+
jacoco
4+
id("org.hypertrace.publish-plugin")
5+
id("org.hypertrace.jacoco-report-plugin")
6+
}
7+
8+
dependencies {
9+
api("io.reactivex.rxjava3:rxjava:3.0.5")
10+
api("io.grpc:grpc-stub:1.31.1")
11+
api(project(":grpc-context-utils"))
12+
implementation("io.grpc:grpc-context:1.31.1")
13+
14+
testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
15+
testImplementation("org.mockito:mockito-core:3.5.0")
16+
testImplementation("org.mockito:mockito-junit-jupiter:3.5.0")
17+
}
18+
19+
tasks.test {
20+
useJUnitPlatform()
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.hypertrace.core.grpcutils.client.rx;
2+
3+
import io.grpc.Context;
4+
import io.grpc.stub.StreamObserver;
5+
import io.reactivex.rxjava3.core.Completable;
6+
import io.reactivex.rxjava3.core.Observable;
7+
import io.reactivex.rxjava3.core.Single;
8+
import java.util.concurrent.Callable;
9+
import java.util.function.Consumer;
10+
import org.hypertrace.core.grpcutils.context.RequestContext;
11+
12+
class DefaultGrpcRxExecutionContext implements GrpcRxExecutionContext {
13+
14+
private final RequestContext requestContext;
15+
16+
DefaultGrpcRxExecutionContext(RequestContext requestContext) {
17+
this.requestContext = requestContext;
18+
}
19+
20+
@Override
21+
public <TResp> Single<TResp> call(Callable<TResp> callable) {
22+
return Single.fromCallable(buildContext().wrap(callable));
23+
}
24+
25+
@Override
26+
public Completable run(Runnable runnable) {
27+
return Completable.fromRunnable(buildContext().wrap(runnable));
28+
}
29+
30+
@Override
31+
public <TResponse> Observable<TResponse> stream(
32+
Consumer<StreamObserver<TResponse>> streamConsumer) {
33+
return Observable.create(
34+
emitter ->
35+
buildContext()
36+
.run(() -> streamConsumer.accept(new StreamingClientResponseObserver<>(emitter))));
37+
}
38+
39+
private Context buildContext() {
40+
return Context.current().withValue(RequestContext.CURRENT, this.requestContext);
41+
}
42+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.hypertrace.core.grpcutils.client.rx;
2+
3+
import io.grpc.stub.StreamObserver;
4+
import io.reactivex.rxjava3.core.Completable;
5+
import io.reactivex.rxjava3.core.Observable;
6+
import io.reactivex.rxjava3.core.Single;
7+
import java.util.concurrent.Callable;
8+
import java.util.function.Consumer;
9+
import org.hypertrace.core.grpcutils.context.RequestContext;
10+
11+
/** An execution context that can turn various types of executions into their Rx equivalents. */
12+
public interface GrpcRxExecutionContext {
13+
/**
14+
* Executes the given callable in this execution context, returning any result or error as a
15+
* {@link Single}.
16+
*/
17+
<TResp> Single<TResp> call(Callable<TResp> callable);
18+
19+
/**
20+
* Executes the given runnable in this execution context, triggering completion or error once the
21+
* call completes.
22+
*/
23+
Completable run(Runnable runnable);
24+
25+
/**
26+
* Provides a stream observer to the provided consumer, converting the result into an Observable.
27+
*/
28+
<TResponse> Observable<TResponse> stream(Consumer<StreamObserver<TResponse>> requestExecutor);
29+
30+
static GrpcRxExecutionContext forCurrentContext() {
31+
return forContext(RequestContext.CURRENT.get());
32+
}
33+
34+
static GrpcRxExecutionContext forContext(RequestContext requestContext) {
35+
return new DefaultGrpcRxExecutionContext(requestContext);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package org.hypertrace.core.grpcutils.client.rx;
2+
3+
import io.grpc.stub.ClientCallStreamObserver;
4+
import io.grpc.stub.ClientResponseObserver;
5+
import io.reactivex.rxjava3.core.ObservableEmitter;
6+
7+
/**
8+
* Adapter class for bridging from a GRPC stream to an Rx Observable.
9+
* @param <ReqT>
10+
* @param <RespT>
11+
*/
12+
public class StreamingClientResponseObserver<ReqT, RespT> implements ClientResponseObserver<ReqT, RespT> {
13+
14+
private final ObservableEmitter<RespT> emitter;
15+
private ClientCallStreamObserver<ReqT> requestStream;
16+
17+
StreamingClientResponseObserver(final ObservableEmitter<RespT> emitter) {
18+
this.emitter = emitter;
19+
this.emitter.setCancellable(
20+
() ->
21+
this.requestStream.cancel(
22+
"StreamingClientResponseObserver cancelling after emitter disposed", null));
23+
}
24+
25+
@Override
26+
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
27+
this.requestStream = requestStream;
28+
}
29+
30+
@Override
31+
public void onNext(RespT value) {
32+
if (!this.emitter.isDisposed()) {
33+
this.emitter.onNext(value);
34+
}
35+
}
36+
37+
@Override
38+
public void onError(Throwable t) {
39+
if (!this.emitter.isDisposed()) {
40+
this.emitter.onError(t);
41+
}
42+
}
43+
44+
@Override
45+
public void onCompleted() {
46+
// This shouldn't generally happen - either an error or next response
47+
if (!this.emitter.isDisposed()) {
48+
emitter.onComplete();
49+
}
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.hypertrace.core.grpcutils.client.rx;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
5+
import static org.mockito.Mockito.times;
6+
import static org.mockito.Mockito.verify;
7+
import static org.mockito.Mockito.verifyNoInteractions;
8+
import static org.mockito.Mockito.when;
9+
10+
import io.reactivex.rxjava3.core.Completable;
11+
import io.reactivex.rxjava3.core.Observable;
12+
import io.reactivex.rxjava3.core.Single;
13+
import io.reactivex.rxjava3.observers.TestObserver;
14+
import java.util.List;
15+
import java.util.Optional;
16+
import org.hypertrace.core.grpcutils.context.RequestContext;
17+
import org.junit.jupiter.api.BeforeEach;
18+
import org.junit.jupiter.api.Test;
19+
import org.junit.jupiter.api.extension.ExtendWith;
20+
import org.mockito.Mock;
21+
import org.mockito.junit.jupiter.MockitoExtension;
22+
23+
@ExtendWith(MockitoExtension.class)
24+
class DefaultGrpcRxExecutionContextTest {
25+
26+
private static final Optional<String> TEST_TENANT_ID_OPTIONAL = Optional.of("test-tenant-id");
27+
@Mock RequestContext mockContext;
28+
29+
@BeforeEach
30+
void beforeEach() {
31+
when(mockContext.getTenantId()).thenReturn(TEST_TENANT_ID_OPTIONAL);
32+
}
33+
34+
@Test
35+
void canRunInContext() {
36+
Completable completable =
37+
new DefaultGrpcRxExecutionContext(this.mockContext)
38+
.run(() -> RequestContext.CURRENT.get().getTenantId());
39+
verifyNoInteractions(this.mockContext);
40+
completable.subscribe();
41+
verify(this.mockContext, times(1)).getTenantId();
42+
}
43+
44+
@Test
45+
void canCallInContext() {
46+
Single<?> single =
47+
new DefaultGrpcRxExecutionContext(this.mockContext)
48+
.call(() -> RequestContext.CURRENT.get().getTenantId());
49+
verifyNoInteractions(this.mockContext);
50+
assertEquals(TEST_TENANT_ID_OPTIONAL, single.blockingGet());
51+
}
52+
53+
@Test
54+
void canStreamInContext() {
55+
Observable<Optional<String>> observable =
56+
new DefaultGrpcRxExecutionContext(this.mockContext)
57+
.stream(
58+
observer -> {
59+
observer.onNext(RequestContext.CURRENT.get().getTenantId());
60+
observer.onCompleted();
61+
});
62+
verifyNoInteractions(this.mockContext);
63+
assertIterableEquals(List.of(TEST_TENANT_ID_OPTIONAL), observable.blockingIterable());
64+
}
65+
66+
@Test
67+
void canPropagateErrors() {
68+
Completable completable =
69+
new DefaultGrpcRxExecutionContext(this.mockContext)
70+
.run(
71+
() -> {
72+
RequestContext.CURRENT.get().getTenantId();
73+
throw new UnsupportedOperationException();
74+
});
75+
TestObserver<?> testObserver = new TestObserver<>();
76+
completable.subscribe(testObserver);
77+
testObserver.assertError(UnsupportedOperationException.class);
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.hypertrace.core.grpcutils.client.rx;
2+
3+
import static org.junit.jupiter.api.Assertions.assertSame;
4+
5+
import io.grpc.Context;
6+
import org.hypertrace.core.grpcutils.context.RequestContext;
7+
import org.junit.jupiter.api.Test;
8+
import org.junit.jupiter.api.extension.ExtendWith;
9+
import org.mockito.Mock;
10+
import org.mockito.junit.jupiter.MockitoExtension;
11+
12+
@ExtendWith(MockitoExtension.class)
13+
class GrpcRxExecutionContextTest {
14+
15+
@Mock RequestContext firstMockContext;
16+
17+
@Mock RequestContext secondMockContext;
18+
19+
@Test
20+
void canCreateExecutionContextWithCurrentContext() throws Exception {
21+
assertSame(
22+
this.firstMockContext,
23+
Context.current()
24+
.withValue(RequestContext.CURRENT, this.firstMockContext)
25+
.call(
26+
() -> GrpcRxExecutionContext.forCurrentContext().call(RequestContext.CURRENT::get))
27+
.blockingGet());
28+
}
29+
30+
@Test
31+
void canCreateExecutionContextWithProvidedContext() throws Exception {
32+
assertSame(
33+
this.secondMockContext,
34+
Context.current()
35+
.withValue(RequestContext.CURRENT, this.firstMockContext)
36+
.call(
37+
() ->
38+
GrpcRxExecutionContext.forContext(this.secondMockContext)
39+
.call(RequestContext.CURRENT::get))
40+
.blockingGet());
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package org.hypertrace.core.grpcutils.client.rx;
2+
3+
import static org.mockito.ArgumentMatchers.any;
4+
import static org.mockito.Mockito.verify;
5+
import static org.mockito.Mockito.verifyNoInteractions;
6+
7+
import io.grpc.stub.ClientCallStreamObserver;
8+
import io.reactivex.rxjava3.core.Observable;
9+
import io.reactivex.rxjava3.observers.TestObserver;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.api.extension.ExtendWith;
13+
import org.mockito.Mock;
14+
import org.mockito.junit.jupiter.MockitoExtension;
15+
16+
@ExtendWith(MockitoExtension.class)
17+
class StreamingClientResponseObserverTest {
18+
19+
@Mock ClientCallStreamObserver<String> requestStreamObserver;
20+
21+
private StreamingClientResponseObserver<String, String> responseObserver;
22+
private TestObserver<String> testObserver;
23+
24+
@BeforeEach
25+
void beforeEach() {
26+
Observable<String> observable =
27+
Observable.create(
28+
observer -> this.responseObserver = new StreamingClientResponseObserver<>(observer));
29+
this.testObserver = new TestObserver<>();
30+
observable.subscribe(this.testObserver);
31+
this.responseObserver.beforeStart(this.requestStreamObserver);
32+
}
33+
34+
@Test
35+
void returnsValues() {
36+
this.responseObserver.onNext("first");
37+
this.testObserver.assertValue("first");
38+
39+
this.responseObserver.onNext("second");
40+
this.testObserver.assertValueAt(1, "second");
41+
this.testObserver.assertNotComplete();
42+
43+
verifyNoInteractions(requestStreamObserver);
44+
}
45+
46+
@Test
47+
void propagatesExceptionOnError() {
48+
Throwable t = new Exception("error");
49+
this.responseObserver.onError(t);
50+
51+
this.testObserver.assertError(t);
52+
this.testObserver.assertNoValues();
53+
verify(requestStreamObserver).cancel(any(), any());
54+
}
55+
56+
@Test
57+
void propagatesCompletion() {
58+
this.responseObserver.onCompleted();
59+
60+
this.testObserver.assertComplete();
61+
this.testObserver.assertNoValues();
62+
63+
verify(requestStreamObserver).cancel(any(), any());
64+
}
65+
66+
@Test
67+
void propagatesCancellation() {
68+
this.testObserver.dispose();
69+
verify(requestStreamObserver).cancel(any(), any());
70+
}
71+
}

grpc-client-utils/build.gradle.kts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@ plugins {
55
id("org.hypertrace.jacoco-report-plugin")
66
}
77

8-
tasks.test {
9-
useJUnitPlatform()
10-
}
11-
128
dependencies {
139
implementation(project(":grpc-context-utils"))
1410

@@ -17,9 +13,12 @@ dependencies {
1713
// End Logging
1814

1915
// grpc
20-
implementation("io.grpc:grpc-core:1.31.0")
16+
implementation("io.grpc:grpc-core:1.31.1")
2117

2218
testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
2319
testImplementation("org.mockito:mockito-core:3.4.4")
2420
}
2521

22+
tasks.test {
23+
useJUnitPlatform()
24+
}

grpc-context-utils/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ tasks.test {
1111

1212
dependencies {
1313
// grpc
14-
implementation("io.grpc:grpc-core:1.31.0")
14+
implementation("io.grpc:grpc-core:1.31.1")
1515

1616
// Logging
1717
implementation("org.slf4j:slf4j-api:1.7.30")

grpc-server-utils/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ dependencies {
1717
// End Logging
1818

1919
// grpc
20-
implementation("io.grpc:grpc-core:1.31.0")
20+
implementation("io.grpc:grpc-core:1.31.1")
2121

2222
testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
2323
}

0 commit comments

Comments
 (0)