Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.StreamSender;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -114,7 +118,7 @@ public RemoteWriteResult<Integer> postFlush(final long consumedOffset) {
return super.postFlush(consumedOffset);
}

Optional<Long> writtenOffset(final int pssId) {
public Optional<Long> writtenOffset(final int pssId) {
return writtenOffsets.get(pssId);
}

Expand Down Expand Up @@ -156,64 +160,140 @@ public CompletionStage<RemoteWriteResult<Integer>> flush() {
}
}

private static class RS3KVWriter implements RemoteWriter<Bytes, Integer> {
private final StreamSender<WalEntry> streamSender;
private final CompletionStage<Optional<Long>> resultFuture;
private RS3KVTable table;
private static final class RS3StreamFactory {
private final UUID storeId;
private final RS3Client rs3Client;
private final int pssId;
private final LssId lssId;
private final long endOffset;
private final int kafkaPartition;
private final Optional<Long> expectedWrittenOffset;

private RS3KVWriter(
private RS3StreamFactory(
final UUID storeId,
final RS3Client rs3Client,
final RS3KVTable table,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset,
final int kafkaPartition
final Optional<Long> expectedWrittenOffset
) {
this.table = Objects.requireNonNull(table);
this.storeId = storeId;
this.rs3Client = rs3Client;
this.pssId = pssId;
this.lssId = lssId;
this.endOffset = endOffset;
this.kafkaPartition = kafkaPartition;
final var sendRecv = rs3Client.writeWalSegmentAsync(
this.expectedWrittenOffset = expectedWrittenOffset;
}

StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync() {
return rs3Client.writeWalSegmentAsync(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset
);
}

Optional<Long> writeWalSegmentSync(List<WalEntry> entries) {
return rs3Client.writeWalSegment(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset,
entries
);
}

}

private static final class RS3KVWriter implements RemoteWriter<Bytes, Integer> {
private final RS3StreamFactory streamFactory;
private final RS3KVTable table;
private final int kafkaPartition;
private final List<WalEntry> retryBuffer = new ArrayList<>();
private final StreamSender<WalEntry> streamSender;
private final CompletionStage<Optional<Long>> resultFuture;

private boolean isStreamActive = true;

private RS3KVWriter(
final UUID storeId,
final RS3Client rs3Client,
final RS3KVTable table,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset,
final int kafkaPartition
) {
this.table = Objects.requireNonNull(table);
this.streamFactory = new RS3StreamFactory(
storeId,
rs3Client,
pssId,
lssId,
endOffset,
expectedWrittenOffset
);
this.kafkaPartition = kafkaPartition;

final var sendRecv = streamFactory.writeWalSegmentAsync();
this.streamSender = sendRecv.sender();
this.resultFuture = sendRecv.receiver();
}

long endOffset() {
return endOffset;
return streamFactory.endOffset;
}

@Override
public void insert(final Bytes key, final byte[] value, final long timestampMs) {
streamSender.sendNext(table.insert(kafkaPartition, key, value, timestampMs));
maybeSendNext(table.insert(kafkaPartition, key, value, timestampMs));
}

@Override
public void delete(final Bytes key) {
streamSender.sendNext(table.delete(kafkaPartition, key));
maybeSendNext(table.delete(kafkaPartition, key));
}

private void maybeSendNext(WalEntry entry) {
retryBuffer.add(entry);

if (isStreamActive) {
try {
streamSender.sendNext(entry);
} catch (IllegalStateException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we catching IllegalStateException here? I think streamSender should throw grpc exceptions if it can't send a frame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you are right. I let openai mislead me here since it suggested that onError could be invoked internally by grpc. If that were the case, then subsequent calls would fail with IllegalStateException. Instead it looks like onNext should throw the status exception and the caller is expected to either retry onNext or invoke onError to signal that it is giving up on the stream. I will revise the patch.

isStreamActive = false;
}
}
}

@Override
public CompletionStage<RemoteWriteResult<Integer>> flush() {
streamSender.finish();
return resultFuture.thenApply(
flushedOffset -> {
LOG.debug("last flushed offset for pss/lss {}/{} is {}", pssId, lssId, flushedOffset);
return RemoteWriteResult.success(kafkaPartition);
}
);
if (isStreamActive) {
try {
streamSender.finish();
} catch (IllegalStateException e) {
isStreamActive = false;
}
}

return resultFuture.handle((result, throwable) -> {
Optional<Long> flushedOffset = result;
if (throwable instanceof RS3TransientException) {
flushedOffset = streamFactory.writeWalSegmentSync(retryBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the retries for this call happen in the grpc client right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. I was trying to keep retry logic encapsulated in the grpc client. But there's not really a way I could think of to encapsulate retries for the async API.

} else if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (throwable != null) {
throw new RuntimeException(throwable);
}

LOG.debug("last flushed offset for pss/lss {}/{} is {}",
streamFactory.pssId, streamFactory.lssId, flushedOffset);
return RemoteWriteResult.success(kafkaPartition);
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
public class RS3TableFactory {
private final GrpcRS3Client.Connector connector;

public RS3TableFactory(
GrpcRS3Client.Connector connector
) {
public RS3TableFactory(GrpcRS3Client.Connector connector) {
this.connector = connector;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Objects;

public class LssId {
private final int id;

Expand All @@ -22,4 +24,21 @@ public LssId(int id) {
public int id() {
return id;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final LssId lssId = (LssId) o;
return id == lssId.id;
}

@Override
public int hashCode() {
return Objects.hashCode(id);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.db.rs3.client;

public class RS3TimeoutException extends RS3Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.db.rs3.client;

public class RS3TransientException extends RS3Exception {
private static final long serialVersionUID = 0L;

public RS3TransientException(final Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@

package dev.responsive.kafka.internal.db.rs3.client.grpc;

import dev.responsive.kafka.internal.db.rs3.client.RS3Exception;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -35,13 +32,7 @@ public synchronized void onNext(T result) {

@Override
public synchronized void onError(final Throwable throwable) {
if (throwable instanceof StatusRuntimeException || throwable instanceof StatusException) {
error = new RS3Exception(throwable);
} else if (throwable instanceof RuntimeException) {
error = (RuntimeException) throwable;
} else {
error = new RuntimeException(throwable);
}
error = GrpcRs3Util.wrapThrowable(throwable);
complete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,19 @@

package dev.responsive.kafka.internal.db.rs3.client.grpc;

import static io.grpc.Status.UNAVAILABLE;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3Exception;
import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException;
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.rs3.RS3Grpc;
import dev.responsive.rs3.Rs3;
import io.grpc.StatusRuntimeException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -78,7 +75,7 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f
);
}

private <T> T withRetry(Supplier<T> supplier, Supplier<String> opDescription) {
private <T> T withRetry(Supplier<T> grpcOperation, Supplier<String> opDescription) {
// Using Kafka default backoff settings initially. We can pull them up
// if there is ever strong reason.
final var backoff = new ExponentialBackoff(50, 2, 1000, 0.2);
Expand All @@ -90,10 +87,11 @@ private <T> T withRetry(Supplier<T> supplier, Supplier<String> opDescription) {

do {
try {
return supplier.get();
} catch (final StatusRuntimeException e) {
if (e.getStatus() != UNAVAILABLE) {
throw new RS3Exception(e);
return grpcOperation.get();
} catch (final Throwable t) {
var wrappedException = GrpcRs3Util.wrapThrowable(t);
if (!(wrappedException instanceof RS3TransientException)) {
throw wrappedException;
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should log something here about the retries we're doing

Expand All @@ -107,6 +105,7 @@ private <T> T withRetry(Supplier<T> supplier, Supplier<String> opDescription) {
throw new RS3TimeoutException("Timeout while attempting operation " + opDescription.get());
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we wrap asyncStub.writeWALSegmentStream in withRetry to catch failures establishing the stream? I'm not sure if those get raised there or in the callbacks

@Override
public StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync(
final UUID storeId,
Expand Down Expand Up @@ -150,7 +149,22 @@ public Optional<Long> writeWalSegment(
final int pssId,
final Optional<Long> expectedWrittenOffset,
final long endOffset,
final List<WalEntry> entries) {
final List<WalEntry> entries
) {
return withRetry(
() -> tryWriteWalSegment(storeId, lssId, pssId, expectedWrittenOffset, endOffset, entries),
() -> "WriteWalSegment(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")"
);
}

private Optional<Long> tryWriteWalSegment(
final UUID storeId,
final LssId lssId,
final int pssId,
final Optional<Long> expectedWrittenOffset,
final long endOffset,
final List<WalEntry> entries
) {
final var senderReceiver = writeWalSegmentAsync(
storeId,
lssId,
Expand All @@ -163,6 +177,7 @@ public Optional<Long> writeWalSegment(
}
senderReceiver.sender().finish();
final Optional<Long> result;

try {
result = senderReceiver.receiver().toCompletableFuture().get();
} catch (final ExecutionException e) {
Expand Down Expand Up @@ -276,4 +291,5 @@ public RS3Client connect() {
}
}


}
Loading
Loading