From b1c980ade6100d254c39e2f840095a12b51f42b6 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Wed, 29 Jan 2025 11:40:50 +0000 Subject: [PATCH 01/14] ServerImpl: Made ServerImpl.internalClose thread-safe. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 5 ++--- core/src/test/java/io/grpc/internal/ServerImplTest.java | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index eceb7d7a738..41c5e408294 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -808,10 +808,9 @@ void setListener(ServerStreamListener listener) { /** * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ - private void internalClose(Throwable t) { - // TODO(ejona86): this is not thread-safe :) + private synchronized void internalClose(Throwable t) { String description = "Application error processing RPC"; - stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata()); + stream.cancel(Status.UNKNOWN.withDescription(description).withCause(t)); } @Override diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 3125edca1e6..0c9506e39ea 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -1563,11 +1563,10 @@ private void verifyExecutorsReturned() { } private void ensureServerStateNotLeaked() { - verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); + verify(stream).cancel(statusCaptor.capture()); assertEquals(Status.UNKNOWN.getCode(), statusCaptor.getValue().getCode()); // Used in InProcessTransport when set to include the cause with the status assertNotNull(statusCaptor.getValue().getCause()); - assertTrue(metadataCaptor.getValue().keys().isEmpty()); } private static class SimpleServer implements io.grpc.internal.InternalServer { From fd19633702a709483f5b014ceb493e786ca59cab Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Thu, 30 Jan 2025 09:38:25 +0000 Subject: [PATCH 02/14] core: fixed build errors. --- core/src/test/java/io/grpc/internal/ServerImplTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 0c9506e39ea..711cdf6a776 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -189,8 +189,6 @@ public List getServices() { @Captor private ArgumentCaptor statusCaptor; @Captor - private ArgumentCaptor metadataCaptor; - @Captor private ArgumentCaptor streamListenerCaptor; @Mock From eac7ef7247d6a9533a11eba86ad820a1d2d62eef Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Fri, 31 Jan 2025 05:41:06 +0000 Subject: [PATCH 03/14] core: fixed build errors. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 41c5e408294..406dd0d2457 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -810,7 +810,7 @@ void setListener(ServerStreamListener listener) { */ private synchronized void internalClose(Throwable t) { String description = "Application error processing RPC"; - stream.cancel(Status.UNKNOWN.withDescription(description).withCause(t)); + stream.cancel(Status.INTERNAL.withDescription(description).withCause(t)); } @Override From 9cf7cb57bfd6d4be9cdc1c037bd1cf1483bcd0b7 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Fri, 31 Jan 2025 05:53:33 +0000 Subject: [PATCH 04/14] core: fixed build errors. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 406dd0d2457..41c5e408294 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -810,7 +810,7 @@ void setListener(ServerStreamListener listener) { */ private synchronized void internalClose(Throwable t) { String description = "Application error processing RPC"; - stream.cancel(Status.INTERNAL.withDescription(description).withCause(t)); + stream.cancel(Status.UNKNOWN.withDescription(description).withCause(t)); } @Override From c246e758269f30f4916d2c5307bfcd2ea6e441f1 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Wed, 5 Feb 2025 04:53:12 +0000 Subject: [PATCH 05/14] core: fixed build errors. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 4 ++-- core/src/test/java/io/grpc/internal/ServerImplTest.java | 2 +- .../java/io/grpc/testing/integration/MoreInProcessTest.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 41c5e408294..cc5f52c5bbe 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -809,8 +809,8 @@ void setListener(ServerStreamListener listener) { * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ private synchronized void internalClose(Throwable t) { - String description = "Application error processing RPC"; - stream.cancel(Status.UNKNOWN.withDescription(description).withCause(t)); + String description = "server cancelled stream"; + stream.cancel(Status.CANCELLED.withDescription(description).withCause(t)); } @Override diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 711cdf6a776..ae7f4d98818 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -1562,7 +1562,7 @@ private void verifyExecutorsReturned() { private void ensureServerStateNotLeaked() { verify(stream).cancel(statusCaptor.capture()); - assertEquals(Status.UNKNOWN.getCode(), statusCaptor.getValue().getCode()); + assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode()); // Used in InProcessTransport when set to include the cause with the status assertNotNull(statusCaptor.getValue().getCause()); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java index d97aa8cd36c..dcf8ef3cf91 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java @@ -248,9 +248,9 @@ public void onCompleted() { TestServiceGrpc.newStub(inProcessChannel).streamingInputCall(responseObserver) .onNext(StreamingInputCallRequest.getDefaultInstance()); - assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS)); + assertTrue(finishLatch.await(3000, TimeUnit.MILLISECONDS)); Status actualStatus = Status.fromThrowable(throwableRef.get()); - Status expectedStatus = Status.UNKNOWN.withDescription("Application error processing RPC"); + Status expectedStatus = Status.CANCELLED.withDescription("server cancelled stream"); assertEquals(expectedStatus.getCode(), actualStatus.getCode()); assertEquals(expectedStatus.getDescription(), actualStatus.getDescription()); assertNull(actualStatus.getCause()); From a19ba4c142aff1fce61bee917128cc210c778887 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Wed, 5 Feb 2025 10:05:28 +0000 Subject: [PATCH 06/14] core: fixed build errors. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 13 ++++++++++--- .../test/java/io/grpc/internal/ServerImplTest.java | 6 ++++-- .../grpc/testing/integration/MoreInProcessTest.java | 4 ++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index cc5f52c5bbe..85d442f4dbb 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -503,7 +503,7 @@ private void streamCreatedInternal( final JumpToApplicationThreadServerStreamListener jumpListener = new JumpToApplicationThreadServerStreamListener( - wrappedExecutor, executor, stream, context, tag); + wrappedExecutor, executor, stream, context, tag, headers); stream.setListener(jumpListener); final SettableFuture> future = SettableFuture.create(); // Run in serializing executor so jumpListener.setListener() is called before any callbacks @@ -778,14 +778,21 @@ static final class JumpToApplicationThreadServerStreamListener implements Server private final Tag tag; // Only accessed from callExecutor. private ServerStreamListener listener; + private Metadata trailers; public JumpToApplicationThreadServerStreamListener(Executor executor, Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { + this(executor, cancelExecutor, stream, context, tag, null); + } + + public JumpToApplicationThreadServerStreamListener(Executor executor, + Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag, Metadata trailers) { this.callExecutor = executor; this.cancelExecutor = cancelExecutor; this.stream = stream; this.context = context; this.tag = tag; + this.trailers = trailers; } /** @@ -809,8 +816,8 @@ void setListener(ServerStreamListener listener) { * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ private synchronized void internalClose(Throwable t) { - String description = "server cancelled stream"; - stream.cancel(Status.CANCELLED.withDescription(description).withCause(t)); + String description = "Application error processing RPC"; + stream.close(Status.UNKNOWN.withDescription(description).withCause(t), this.trailers); } @Override diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index ae7f4d98818..36a6e69f73e 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -189,6 +189,8 @@ public List getServices() { @Captor private ArgumentCaptor statusCaptor; @Captor + private ArgumentCaptor metadataCaptor; + @Captor private ArgumentCaptor streamListenerCaptor; @Mock @@ -1561,8 +1563,8 @@ private void verifyExecutorsReturned() { } private void ensureServerStateNotLeaked() { - verify(stream).cancel(statusCaptor.capture()); - assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode()); + verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); + assertEquals(Status.UNKNOWN.getCode(), statusCaptor.getValue().getCode()); // Used in InProcessTransport when set to include the cause with the status assertNotNull(statusCaptor.getValue().getCause()); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java index dcf8ef3cf91..d97aa8cd36c 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java @@ -248,9 +248,9 @@ public void onCompleted() { TestServiceGrpc.newStub(inProcessChannel).streamingInputCall(responseObserver) .onNext(StreamingInputCallRequest.getDefaultInstance()); - assertTrue(finishLatch.await(3000, TimeUnit.MILLISECONDS)); + assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS)); Status actualStatus = Status.fromThrowable(throwableRef.get()); - Status expectedStatus = Status.CANCELLED.withDescription("server cancelled stream"); + Status expectedStatus = Status.UNKNOWN.withDescription("Application error processing RPC"); assertEquals(expectedStatus.getCode(), actualStatus.getCode()); assertEquals(expectedStatus.getDescription(), actualStatus.getDescription()); assertNull(actualStatus.getCause()); From 036e241525bb9bd6dff9af1498be5d624b9b9b78 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Wed, 5 Feb 2025 10:18:27 +0000 Subject: [PATCH 07/14] core: fixed build errors. --- .../main/java/io/grpc/internal/ServerImpl.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 85d442f4dbb..e1ca8abcd5e 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -779,14 +779,22 @@ static final class JumpToApplicationThreadServerStreamListener implements Server // Only accessed from callExecutor. private ServerStreamListener listener; private Metadata trailers; - - public JumpToApplicationThreadServerStreamListener(Executor executor, - Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { + public JumpToApplicationThreadServerStreamListener( + Executor executor, + Executor cancelExecutor, + ServerStream stream, + Context.CancellableContext context, + Tag tag) { this(executor, cancelExecutor, stream, context, tag, null); } - public JumpToApplicationThreadServerStreamListener(Executor executor, - Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag, Metadata trailers) { + public JumpToApplicationThreadServerStreamListener( + Executor executor, + Executor cancelExecutor, + ServerStream stream, + Context.CancellableContext context, + Tag tag, + Metadata trailers) { this.callExecutor = executor; this.cancelExecutor = cancelExecutor; this.stream = stream; From 0d59d54e7069f33a530e0423f0e8ebaa8fb7e1f3 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Wed, 5 Feb 2025 10:31:32 +0000 Subject: [PATCH 08/14] core: fixed build errors. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index e1ca8abcd5e..8b581aa142d 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -779,6 +779,7 @@ static final class JumpToApplicationThreadServerStreamListener implements Server // Only accessed from callExecutor. private ServerStreamListener listener; private Metadata trailers; + public JumpToApplicationThreadServerStreamListener( Executor executor, Executor cancelExecutor, From d185b4769f3d75c8216c461204254a8908575ec5 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Wed, 5 Feb 2025 15:04:14 +0000 Subject: [PATCH 09/14] core: fixed build errors and changes done in impl class. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 8b581aa142d..b6afce232f1 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -786,7 +786,7 @@ public JumpToApplicationThreadServerStreamListener( ServerStream stream, Context.CancellableContext context, Tag tag) { - this(executor, cancelExecutor, stream, context, tag, null); + this(executor, cancelExecutor, stream, context, tag, new Metadata()); } public JumpToApplicationThreadServerStreamListener( From 1dbd35333a7516b92e7126fc54e9789ffbeafd76 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Thu, 20 Feb 2025 05:48:06 +0000 Subject: [PATCH 10/14] core: Resolve the comments. --- .../java/io/grpc/internal/ServerImpl.java | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index b6afce232f1..022006a4754 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -503,7 +503,7 @@ private void streamCreatedInternal( final JumpToApplicationThreadServerStreamListener jumpListener = new JumpToApplicationThreadServerStreamListener( - wrappedExecutor, executor, stream, context, tag, headers); + wrappedExecutor, executor, stream, context, tag); stream.setListener(jumpListener); final SettableFuture> future = SettableFuture.create(); // Run in serializing executor so jumpListener.setListener() is called before any callbacks @@ -778,7 +778,7 @@ static final class JumpToApplicationThreadServerStreamListener implements Server private final Tag tag; // Only accessed from callExecutor. private ServerStreamListener listener; - private Metadata trailers; + private boolean closeCalled; public JumpToApplicationThreadServerStreamListener( Executor executor, @@ -786,22 +786,11 @@ public JumpToApplicationThreadServerStreamListener( ServerStream stream, Context.CancellableContext context, Tag tag) { - this(executor, cancelExecutor, stream, context, tag, new Metadata()); - } - - public JumpToApplicationThreadServerStreamListener( - Executor executor, - Executor cancelExecutor, - ServerStream stream, - Context.CancellableContext context, - Tag tag, - Metadata trailers) { this.callExecutor = executor; this.cancelExecutor = cancelExecutor; this.stream = stream; this.context = context; this.tag = tag; - this.trailers = trailers; } /** @@ -824,9 +813,13 @@ void setListener(ServerStreamListener listener) { /** * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ - private synchronized void internalClose(Throwable t) { + private void internalClose(Throwable t) { String description = "Application error processing RPC"; - stream.close(Status.UNKNOWN.withDescription(description).withCause(t), this.trailers); + Metadata metadata = Status.trailersFromThrowable(t); + if (metadata == null) { + metadata = new Metadata(); + } + stream.close(Status.UNKNOWN.withDescription(description).withCause(t), metadata); } @Override From 4d72fcd72047a9c24cf90022924c15f8025a5005 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Thu, 20 Feb 2025 06:08:46 +0000 Subject: [PATCH 11/14] core: Resolve build errors. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 022006a4754..a5bab393c7a 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -778,8 +778,6 @@ static final class JumpToApplicationThreadServerStreamListener implements Server private final Tag tag; // Only accessed from callExecutor. private ServerStreamListener listener; - private boolean closeCalled; - public JumpToApplicationThreadServerStreamListener( Executor executor, Executor cancelExecutor, From 327085ac103a1cbc7442fd1de94faefee3a6c5b8 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Thu, 20 Feb 2025 06:18:58 +0000 Subject: [PATCH 12/14] core: Resolve build errors. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index a5bab393c7a..4c5bcef2013 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -778,6 +778,7 @@ static final class JumpToApplicationThreadServerStreamListener implements Server private final Tag tag; // Only accessed from callExecutor. private ServerStreamListener listener; + public JumpToApplicationThreadServerStreamListener( Executor executor, Executor cancelExecutor, From efa740c7e1eeed3de8ba6e66e8ad4da1199810d9 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Fri, 21 Feb 2025 11:24:06 +0000 Subject: [PATCH 13/14] core: Resolved review comments. --- .../java/io/grpc/internal/AbstractTransportTest.java | 7 +++++-- .../main/java/io/grpc/inprocess/InProcessTransport.java | 6 ++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index aea7ff49032..d85c54a31e3 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; @@ -1598,8 +1599,10 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception { assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); // Ensure that for a closed ServerStream, interactions are noops - server.stream.writeHeaders(new Metadata(), true); - server.stream.writeMessage(methodDescriptor.streamResponse("response")); + assertThrows(Exception.class, () -> + server.stream.writeHeaders(new Metadata(), true)); + assertThrows(Exception.class, () -> + server.stream.writeMessage(methodDescriptor.streamResponse("response"))); server.stream.close(Status.INTERNAL, new Metadata()); // Make sure new streams still work properly diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index eacf46ca4a2..0fea117a9bf 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -17,6 +17,7 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static java.lang.Math.max; @@ -414,6 +415,7 @@ private class InProcessServerStream implements ServerStream { private boolean closed; @GuardedBy("this") private int outboundSeqNo; + private boolean closeCalled; InProcessServerStream(MethodDescriptor method, Metadata headers) { statsTraceCtx = StatsTraceContext.newServerContext( @@ -431,6 +433,7 @@ public void setListener(ServerStreamListener serverStreamListener) { @Override public void request(int numMessages) { + checkState(!closeCalled, "call already closed"); boolean onReady = clientStream.serverRequested(numMessages); if (onReady) { synchronized (this) { @@ -487,6 +490,7 @@ private void clientCancelled(Status status) { @Override public void writeMessage(InputStream message) { + checkState(!closeCalled, "call already closed"); long messageLength = 0; if (isEnabledSupportTracingMessageSizes) { try { @@ -546,6 +550,7 @@ public synchronized boolean isReady() { @Override public void writeHeaders(Metadata headers, boolean flush) { + checkState(!closeCalled, "call already closed"); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { int metadataSize = metadataSize(headers); if (metadataSize > clientMaxInboundMetadataSize) { @@ -581,6 +586,7 @@ public void close(Status status, Metadata trailers) { // clientStreamListener.closed can trigger clientStream.cancel (see code in // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are // calling internalCancel(). + closeCalled = true; clientStream.serverClosed(Status.OK, status); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { From 2e6722292d1daab643cd967f9eaa82f87671a0bf Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Mon, 24 Feb 2025 04:59:27 +0000 Subject: [PATCH 14/14] core: Resolved review comments. --- .../test/java/io/grpc/testing/integration/MoreInProcessTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java index d97aa8cd36c..cfde8a0dad2 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java @@ -131,7 +131,6 @@ public void onCompleted() { assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS)); assertEquals(fakeResponse, responseRef.get()); - assertNull(throwableRef.get()); } @Test