Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ dependencies {
testImplementation(testlibs.bundles.base)
testImplementation(testlibs.bundles.testcontainers)
testImplementation(libs.kafka.streams.test.utils)
testImplementation("io.grpc:grpc-inprocess:${libs.versions.grpc.orNull}")
testImplementation("software.amazon.awssdk:kms:2.20.0")
testImplementation("software.amazon.awssdk:sso:2.20.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.StreamSender;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,7 +119,7 @@ public RemoteWriteResult<Integer> postFlush(final long consumedOffset) {
return super.postFlush(consumedOffset);
}

Optional<Long> writtenOffset(final int pssId) {
public Optional<Long> writtenOffset(final int pssId) {
return writtenOffsets.get(pssId);
}

Expand Down Expand Up @@ -156,64 +161,131 @@ public CompletionStage<RemoteWriteResult<Integer>> flush() {
}
}

private static class RS3KVWriter implements RemoteWriter<Bytes, Integer> {
private final StreamSender<WalEntry> streamSender;
private final CompletionStage<Optional<Long>> resultFuture;
private RS3KVTable table;
private static final class RS3StreamFactory {
private final UUID storeId;
private final RS3Client rs3Client;
private final int pssId;
private final LssId lssId;
private final long endOffset;
private final int kafkaPartition;
private final Optional<Long> expectedWrittenOffset;

private RS3KVWriter(
private RS3StreamFactory(
final UUID storeId,
final RS3Client rs3Client,
final RS3KVTable table,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset,
final int kafkaPartition
final Optional<Long> expectedWrittenOffset
) {
this.table = Objects.requireNonNull(table);
this.storeId = storeId;
this.rs3Client = rs3Client;
this.pssId = pssId;
this.lssId = lssId;
this.endOffset = endOffset;
this.kafkaPartition = kafkaPartition;
final var sendRecv = rs3Client.writeWalSegmentAsync(
this.expectedWrittenOffset = expectedWrittenOffset;
}

StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync() {
return rs3Client.writeWalSegmentAsync(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset
);
this.streamSender = sendRecv.sender();
this.resultFuture = sendRecv.receiver();
}

Optional<Long> writeWalSegmentSync(List<WalEntry> entries) {
return rs3Client.writeWalSegment(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset,
entries
);
}

}

private static final class RS3KVWriter implements RemoteWriter<Bytes, Integer> {
private final RS3StreamFactory streamFactory;
private final RS3KVTable table;
private final int kafkaPartition;
private final List<WalEntry> retryBuffer = new ArrayList<>();
private final StreamSenderMessageReceiver<WalEntry, Optional<Long>> sendRecv;

private RS3KVWriter(
final UUID storeId,
final RS3Client rs3Client,
final RS3KVTable table,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset,
final int kafkaPartition
) {
this.table = Objects.requireNonNull(table);
this.streamFactory = new RS3StreamFactory(
storeId,
rs3Client,
pssId,
lssId,
endOffset,
expectedWrittenOffset
);
this.kafkaPartition = kafkaPartition;
this.sendRecv = streamFactory.writeWalSegmentAsync();
}

long endOffset() {
return endOffset;
return streamFactory.endOffset;
}

@Override
public void insert(final Bytes key, final byte[] value, final long timestampMs) {
streamSender.sendNext(table.insert(kafkaPartition, key, value, timestampMs));
maybeSendNext(table.insert(kafkaPartition, key, value, timestampMs));
}

@Override
public void delete(final Bytes key) {
streamSender.sendNext(table.delete(kafkaPartition, key));
maybeSendNext(table.delete(kafkaPartition, key));
}

private void maybeSendNext(WalEntry entry) {
retryBuffer.add(entry);
ifActiveStream(sender -> sender.sendNext(entry));
}

private void ifActiveStream(Consumer<StreamSender<WalEntry>> streamConsumer) {
if (sendRecv.isActive()) {
try {
streamConsumer.accept(sendRecv.sender());
} catch (final RS3TransientException e) {
// Retry the stream in flush()
}
}
}

@Override
public CompletionStage<RemoteWriteResult<Integer>> flush() {
streamSender.finish();
return resultFuture.thenApply(
flushedOffset -> {
LOG.debug("last flushed offset for pss/lss {}/{} is {}", pssId, lssId, flushedOffset);
return RemoteWriteResult.success(kafkaPartition);
}
);
ifActiveStream(StreamSender::finish);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably need to catch exceptions from finish as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ifActiveStream, we catch RS3TransientException.


return sendRecv.receiver().handle((result, throwable) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a failure when sending the stream, is it guaranteed to be propagated to the GrpcMessageReceiver and therefore result in handle being called with the error here? I can't find any docs that says that's the case. Therefore we probably want to fall back to the sync path if we ever hit any errors when sending the stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I handled this in the latest commit by tracking a completion for both the send and recv sides. If either fails, then the combined completion fails as well.

Optional<Long> flushedOffset = result;
if (throwable instanceof RS3TransientException) {
flushedOffset = streamFactory.writeWalSegmentSync(retryBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the retries for this call happen in the grpc client right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. I was trying to keep retry logic encapsulated in the grpc client. But there's not really a way I could think of to encapsulate retries for the async API.

} else if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (throwable != null) {
throw new RuntimeException(throwable);
}

LOG.debug("last flushed offset for pss/lss {}/{} is {}",
streamFactory.pssId, streamFactory.lssId, flushedOffset);
return RemoteWriteResult.success(kafkaPartition);
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
public class RS3TableFactory {
private final GrpcRS3Client.Connector connector;

public RS3TableFactory(
GrpcRS3Client.Connector connector
) {
public RS3TableFactory(GrpcRS3Client.Connector connector) {
this.connector = connector;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Objects;

public class LssId {
private final int id;

Expand All @@ -22,4 +24,21 @@ public LssId(int id) {
public int id() {
return id;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final LssId lssId = (LssId) o;
return id == lssId.id;
}

@Override
public int hashCode() {
return Objects.hashCode(id);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.db.rs3.client;

public class RS3TimeoutException extends RS3Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.db.rs3.client;

public class RS3TransientException extends RS3Exception {
private static final long serialVersionUID = 0L;

public RS3TransientException(final Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,42 @@
package dev.responsive.kafka.internal.db.rs3.client;

public interface StreamSender<S> {
/**
* Send the next message to the stream. If an exception is raised
* in this call, then the sender will no longer be usable and attempting
* to send another message will result in a
*
* @param msg the message to send to the stream
* @throws RS3Exception in the case of an unexpected error from RS3
* @throws IllegalStateException if invoked following a raised exception or
* a call to either {@link #finish()}, or {@link #cancel()}.
*/
void sendNext(S msg);

/**
* This method marks the end of the stream indicating that there are no
* further messages to send.
*
* @throws RS3Exception in the case of an unexpected error from RS3
* @throws IllegalStateException if invoked following a raised exception in
* {@link #sendNext(Object)} or a call to {@link #cancel()}.
*/
void finish();

/**
* Cancel the stream. This is used to signal the server about unexpected
* client errors so that it can clean up resources.
*
* @throws RS3Exception in the case of an unexpected error from RS3
* @throws IllegalStateException if invoked following a raised exception in
* {@link #sendNext(Object)} or a call to {@link #finish()}.
*/
void cancel();

/**
* Check whether the stream is active and available for sending or finishing.
*
* @return true if the stream is available, false otherwise
*/
boolean isActive();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class StreamSenderMessageReceiver<S, R> {
private final StreamSender<S> sender;
private final CompletionStage<R> message;
private final CompletableFuture<R> message;

public StreamSenderMessageReceiver(
final StreamSender<S> sender,
final CompletionStage<R> message) {
this.sender = Objects.requireNonNull(sender);
this.message = Objects.requireNonNull(message);
this.message = Objects.requireNonNull(message).toCompletableFuture();
}

public StreamSender<S> sender() {
Expand All @@ -33,4 +34,8 @@ public StreamSender<S> sender() {
public CompletionStage<R> receiver() {
return message;
}

public boolean isActive() {
return sender.isActive() && !message.isDone();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@

package dev.responsive.kafka.internal.db.rs3.client.grpc;

import dev.responsive.kafka.internal.db.rs3.client.RS3Exception;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -35,13 +32,7 @@ public synchronized void onNext(T result) {

@Override
public synchronized void onError(final Throwable throwable) {
if (throwable instanceof StatusRuntimeException || throwable instanceof StatusException) {
error = new RS3Exception(throwable);
} else if (throwable instanceof RuntimeException) {
error = (RuntimeException) throwable;
} else {
error = new RuntimeException(throwable);
}
error = GrpcRs3Util.wrapThrowable(throwable);
complete();
}

Expand Down
Loading
Loading