Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ responsive.rs3.hostname=rs3
responsive.rs3.port=50051
responsive.rs3.logical.store.mapping=e2e:b1a45157-e2f0-4698-be0e-5bf3a9b8e9d1
responsive.rs3.tls.enabled=false
responsive.rs3.retry.timeout.ms=1800000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what default are we using? should probably be set to like half the max poll interval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it at 30s, which might be on the low side. Do we use the default poll interval of 5min?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the default to 2 minutes. That is close to half the poll interval and matches the default producer delivery timeout.

1 change: 1 addition & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ dependencies {
testImplementation(testlibs.bundles.base)
testImplementation(testlibs.bundles.testcontainers)
testImplementation(libs.kafka.streams.test.utils)
testImplementation("io.grpc:grpc-inprocess:${libs.versions.grpc.orNull}")
testImplementation("software.amazon.awssdk:kms:2.20.0")
testImplementation("software.amazon.awssdk:sso:2.20.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.StreamSender;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,7 +120,7 @@ public RemoteWriteResult<Integer> postFlush(final long consumedOffset) {
return super.postFlush(consumedOffset);
}

Optional<Long> writtenOffset(final int pssId) {
public Optional<Long> writtenOffset(final int pssId) {
return writtenOffsets.get(pssId);
}

Expand Down Expand Up @@ -156,64 +162,137 @@ public CompletionStage<RemoteWriteResult<Integer>> flush() {
}
}

private static class RS3KVWriter implements RemoteWriter<Bytes, Integer> {
private final StreamSender<WalEntry> streamSender;
private final CompletionStage<Optional<Long>> resultFuture;
private RS3KVTable table;
private static final class RS3StreamFactory {
private final UUID storeId;
private final RS3Client rs3Client;
private final int pssId;
private final LssId lssId;
private final long endOffset;
private final int kafkaPartition;
private final Optional<Long> expectedWrittenOffset;

private RS3KVWriter(
private RS3StreamFactory(
final UUID storeId,
final RS3Client rs3Client,
final RS3KVTable table,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset,
final int kafkaPartition
final Optional<Long> expectedWrittenOffset
) {
this.table = Objects.requireNonNull(table);
this.storeId = storeId;
this.rs3Client = rs3Client;
this.pssId = pssId;
this.lssId = lssId;
this.endOffset = endOffset;
this.kafkaPartition = kafkaPartition;
final var sendRecv = rs3Client.writeWalSegmentAsync(
this.expectedWrittenOffset = expectedWrittenOffset;
}

StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync() {
return rs3Client.writeWalSegmentAsync(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset
);
this.streamSender = sendRecv.sender();
this.resultFuture = sendRecv.receiver();
}

Optional<Long> writeWalSegmentSync(List<WalEntry> entries) {
return rs3Client.writeWalSegment(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset,
entries
);
}

}

private static final class RS3KVWriter implements RemoteWriter<Bytes, Integer> {
private final RS3StreamFactory streamFactory;
private final RS3KVTable table;
private final int kafkaPartition;
private final List<WalEntry> retryBuffer = new ArrayList<>();
private final StreamSenderMessageReceiver<WalEntry, Optional<Long>> sendRecv;

private RS3KVWriter(
final UUID storeId,
final RS3Client rs3Client,
final RS3KVTable table,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset,
final int kafkaPartition
) {
this.table = Objects.requireNonNull(table);
this.streamFactory = new RS3StreamFactory(
storeId,
rs3Client,
pssId,
lssId,
endOffset,
expectedWrittenOffset
);
this.kafkaPartition = kafkaPartition;
this.sendRecv = streamFactory.writeWalSegmentAsync();
}

long endOffset() {
return endOffset;
return streamFactory.endOffset;
}

@Override
public void insert(final Bytes key, final byte[] value, final long timestampMs) {
streamSender.sendNext(table.insert(kafkaPartition, key, value, timestampMs));
maybeSendNext(table.insert(kafkaPartition, key, value, timestampMs));
}

@Override
public void delete(final Bytes key) {
streamSender.sendNext(table.delete(kafkaPartition, key));
maybeSendNext(table.delete(kafkaPartition, key));
}

private void maybeSendNext(WalEntry entry) {
retryBuffer.add(entry);
ifActiveStream(sender -> sender.sendNext(entry));
}

private void ifActiveStream(Consumer<StreamSender<WalEntry>> streamConsumer) {
if (sendRecv.isActive()) {
try {
streamConsumer.accept(sendRecv.sender());
} catch (final RS3TransientException e) {
// Retry the stream in flush()
}
}
}

@Override
public CompletionStage<RemoteWriteResult<Integer>> flush() {
streamSender.finish();
return resultFuture.thenApply(
flushedOffset -> {
LOG.debug("last flushed offset for pss/lss {}/{} is {}", pssId, lssId, flushedOffset);
return RemoteWriteResult.success(kafkaPartition);
}
);
ifActiveStream(StreamSender::finish);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably need to catch exceptions from finish as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ifActiveStream, we catch RS3TransientException.


return sendRecv.completion().handle((result, throwable) -> {
Optional<Long> flushedOffset = result;

var cause = throwable;
if (throwable instanceof CompletionException) {
cause = throwable.getCause();
}

if (cause instanceof RS3TransientException) {
flushedOffset = streamFactory.writeWalSegmentSync(retryBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the retries for this call happen in the grpc client right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. I was trying to keep retry logic encapsulated in the grpc client. But there's not really a way I could think of to encapsulate retries for the async API.

} else if (cause instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (cause != null) {
throw new RuntimeException(throwable);
}

LOG.debug("last flushed offset for pss/lss {}/{} is {}",
streamFactory.pssId, streamFactory.lssId, flushedOffset);
return RemoteWriteResult.success(kafkaPartition);
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
public class RS3TableFactory {
private final GrpcRS3Client.Connector connector;

public RS3TableFactory(
GrpcRS3Client.Connector connector
) {
public RS3TableFactory(GrpcRS3Client.Connector connector) {
this.connector = connector;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Objects;

public class LssId {
private final int id;

Expand All @@ -22,4 +24,21 @@ public LssId(int id) {
public int id() {
return id;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final LssId lssId = (LssId) o;
return id == lssId.id;
}

@Override
public int hashCode() {
return Objects.hashCode(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ public RS3Exception(String message) {
public RS3Exception(final Throwable cause) {
super(cause);
}

public RS3Exception(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.db.rs3.client;

public class RS3TimeoutException extends RS3Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.db.rs3.client;

public class RS3TransientException extends RS3Exception {
private static final long serialVersionUID = 0L;

public RS3TransientException(final Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,54 @@

package dev.responsive.kafka.internal.db.rs3.client;

import java.util.concurrent.CompletionStage;

public interface StreamSender<S> {
/**
* Send the next message to the stream. If an exception is raised
* in this call, then the sender will no longer be usable and attempting
* to send another message will result in a
*
* @param msg the message to send to the stream
* @throws RS3Exception in the case of an unexpected error from RS3
* @throws IllegalStateException if invoked following a raised exception or
* a call to either {@link #finish()}, or {@link #cancel()}.
*/
void sendNext(S msg);

/**
* This method marks the end of the stream indicating that there are no
* further messages to send.
*
* @throws RS3Exception in the case of an unexpected error from RS3
* @throws IllegalStateException if invoked following a raised exception in
* {@link #sendNext(Object)} or a call to {@link #cancel()}.
*/
void finish();

/**
* Cancel the stream. This is used to signal the server about unexpected
* client errors so that it can clean up resources.
*
* @throws RS3Exception in the case of an unexpected error from RS3
* @throws IllegalStateException if invoked following a raised exception in
* {@link #sendNext(Object)} or a call to {@link #finish()}.
*/
void cancel();

/**
* Get a completion stage tied to this stream sender. The stage is completed
* upon an explicit call to {@link #finish()}, {@link #cancel()}, or if an error
* is raised while sending or finishing.
*
* @return a completion stage for this sender
*/
CompletionStage<Void> completion();

/**
* Check whether the stream is complete (either finished or failed).
*
* @return true if the stream has finished or failed
*/
boolean isDone();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,42 @@
package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class StreamSenderMessageReceiver<S, R> {
private final StreamSender<S> sender;
private final CompletionStage<R> message;
private final CompletableFuture<R> message;

public StreamSenderMessageReceiver(
final StreamSender<S> sender,
final CompletionStage<R> message) {
this.sender = Objects.requireNonNull(sender);
this.message = Objects.requireNonNull(message);
this.message = Objects.requireNonNull(message).toCompletableFuture();
}

public StreamSender<S> sender() {
return sender;
}

public CompletionStage<R> receiver() {
return message;
public boolean isActive() {
return !sender.isDone() && !message.isDone();
}

public CompletionStage<R> completion() {
final var future = new CompletableFuture<R>();
sender.completion().whenComplete((voidValue, sendException) -> {
if (sendException != null) {
future.completeExceptionally(sendException);
}
});
message.whenComplete((result, recvException) -> {
if (recvException != null) {
future.completeExceptionally(recvException);
} else {
future.complete(result);
}
});
return future;
}
}
Loading
Loading