Skip to content

Conversation

@hachikuji
Copy link
Contributor

Retries for WriteWalSegments are not as straightforward as some of the blocking calls because we are using an asynchronous stream. The simplest approach I could come up with was to buffer the writes as they are being written. If an UNAVAILABLE error occurs while writing to the stream, then we continue writing to the buffer, but stop writing to the stream. Once all wal entries have been received, then the retry will be done using the synchronous writeWalSegments API.

import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.RegisterExtension;

public class RS3KVTableIntegrationTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows up as an addition, but it is a simple rename. I thought IntegrationTest seemed more appropriate since we are using the RS3 container. I wanted RS3KVTableTest to allow mocking dependencies. If the rename seems reasonable, I can submit a separate PR so that it doesn't show up in the diff.

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! My main question is how we actually get errors returned back to us when doing a streaming rpc. Is it always through the response callback? Or can the stream sender throw exceptions directly. If it's the latter, then we wouldn't be handling those exceptions.

return new RS3Exception(statusRuntimeException);
}
} else if (t instanceof StatusException) {
return new RS3Exception(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check for status UNAVAILABLE here?

throw wrappedException;
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should log something here about the retries we're doing

throw new RS3TimeoutException("Timeout while attempting operation " + opDescription.get());
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we wrap asyncStub.writeWALSegmentStream in withRetry to catch failures establishing the stream? I'm not sure if those get raised there or in the callbacks

if (isStreamActive) {
try {
streamSender.sendNext(entry);
} catch (IllegalStateException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we catching IllegalStateException here? I think streamSender should throw grpc exceptions if it can't send a frame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you are right. I let openai mislead me here since it suggested that onError could be invoked internally by grpc. If that were the case, then subsequent calls would fail with IllegalStateException. Instead it looks like onNext should throw the status exception and the caller is expected to either retry onNext or invoke onError to signal that it is giving up on the stream. I will revise the patch.

}

@Test
public void shouldRetryPutWithNetworkInterruption() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests get us a little closer to simulating bad network behavior, but still rely on assumptions about how network errors are raised in the grpc stack.

return sendRecv.receiver().handle((result, throwable) -> {
Optional<Long> flushedOffset = result;
if (throwable instanceof RS3TransientException) {
flushedOffset = streamFactory.writeWalSegmentSync(retryBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the retries for this call happen in the grpc client right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. I was trying to keep retry logic encapsulated in the grpc client. But there's not really a way I could think of to encapsulate retries for the async API.

return RemoteWriteResult.success(kafkaPartition);
}
);
ifActiveStream(StreamSender::finish);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably need to catch exceptions from finish as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ifActiveStream, we catch RS3TransientException.

);
ifActiveStream(StreamSender::finish);

return sendRecv.receiver().handle((result, throwable) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a failure when sending the stream, is it guaranteed to be propagated to the GrpcMessageReceiver and therefore result in handle being called with the error here? I can't find any docs that says that's the case. Therefore we probably want to fall back to the sync path if we ever hit any errors when sending the stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I handled this in the latest commit by tracking a completion for both the send and recv sides. If either fails, then the combined completion fails as well.

try {
grpcObserver.onNext(protoFactory.apply(msg));
} catch (Exception e) {
grpcObserver.onError(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to my q below on how errors are surfaced, this grpcObserver is the grpc side of the client->server stream. Does that propagate the error to our side of the server->client stream?

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! The e2e test looks nice

responsive.rs3.port=50051
responsive.rs3.logical.store.mapping=e2e:b1a45157-e2f0-4698-be0e-5bf3a9b8e9d1
responsive.rs3.tls.enabled=false
responsive.rs3.retry.timeout.ms=1800000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what default are we using? should probably be set to like half the max poll interval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it at 30s, which might be on the low side. Do we use the default poll interval of 5min?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the default to 2 minutes. That is close to half the poll interval and matches the default producer delivery timeout.

@hachikuji hachikuji merged commit 0820608 into main Mar 20, 2025
1 check passed
@hachikuji hachikuji deleted the rs3-flush-retries branch March 20, 2025 17:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants