Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_HOSTNAME_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_PORT_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_RETRY_TIMEOUT_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_TLS_ENABLED_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE;
import static dev.responsive.kafka.internal.config.ResponsiveStreamsConfig.validateNoStorageStreamsConfig;
Expand All @@ -44,6 +45,7 @@
import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions;
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.db.rs3.RS3TableFactory;
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
import dev.responsive.kafka.internal.license.LicenseAuthenticator;
import dev.responsive.kafka.internal.license.LicenseChecker;
import dev.responsive.kafka.internal.license.model.LicenseDocument;
Expand Down Expand Up @@ -576,11 +578,14 @@ public Params build() {
LOG.info("Using rs3 responsive store");
final var rs3Host = responsiveConfig.getString(RS3_HOSTNAME_CONFIG);
final var rs3Port = responsiveConfig.getInt(RS3_PORT_CONFIG);
final var useTls = responsiveConfig.getBoolean(RS3_TLS_ENABLED_CONFIG);
final var rs3Connector = new GrpcRS3Client.Connector(time, rs3Host, rs3Port);
rs3Connector.retryTimeoutMs(responsiveConfig.getInt(RS3_RETRY_TIMEOUT_CONFIG));
rs3Connector.useTls(responsiveConfig.getBoolean(RS3_TLS_ENABLED_CONFIG));

sessionClients = new SessionClients(
Optional.empty(),
Optional.empty(),
Optional.of(new RS3TableFactory(rs3Host, rs3Port, useTls)),
Optional.of(new RS3TableFactory(rs3Connector)),
storageBackend,
admin
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public class ResponsiveConfig extends AbstractConfig {

public static final String RS3_TLS_ENABLED_CONFIG = "responsive.rs3.tls.enabled";
private static final String RS3_TLS_ENABLED_DOC = "Enables/disable tls for rs3 connection";
public static final boolean RS3_TLS_ENABLED_DEFAULT = true;

public static final String RS3_RETRY_TIMEOUT_CONFIG = "responsive.rs3.retry.timeout.ms";
private static final String RS3_RETRY_TIMEOUT_DOC = "Timeout in milliseconds for retries when RS3 endpoint is unavailable";
public static final int RS3_RETRY_TIMEOUT_DEFAULT = 30000;

// ------------------ ScyllaDB specific configurations ----------------------

Expand Down Expand Up @@ -620,9 +625,16 @@ public class ResponsiveConfig extends AbstractConfig {
).define(
RS3_TLS_ENABLED_CONFIG,
Type.BOOLEAN,
true,
RS3_TLS_ENABLED_DEFAULT,
Importance.MEDIUM,
RS3_TLS_ENABLED_DOC
).define(
RS3_RETRY_TIMEOUT_CONFIG,
Type.LONG,
RS3_RETRY_TIMEOUT_DEFAULT,
atLeast(0),
Importance.MEDIUM,
RS3_RETRY_TIMEOUT_DOC
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,16 @@
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.kafka.common.config.ConfigException;

public class RS3TableFactory {
private final String rs3Host;
private final int rs3Port;
private final boolean useTls;
private final GrpcRS3Client.Connector connector;

public RS3TableFactory(
final String rs3Host,
final int rs3Port,
final boolean useTls
GrpcRS3Client.Connector connector
) {
this.rs3Host = Objects.requireNonNull(rs3Host);
this.rs3Port = rs3Port;
this.useTls = useTls;
this.connector = connector;
}

public RemoteKVTable<WalEntry> kvTable(
Expand All @@ -52,10 +45,7 @@ public RemoteKVTable<WalEntry> kvTable(

final UUID storeId = UUID.fromString(storeIdHex);
final PssPartitioner pssPartitioner = new PssDirectPartitioner();
final var rs3Client = GrpcRS3Client.connect(
String.format("%s:%d", rs3Host, rs3Port),
useTls
);
final var rs3Client = connector.connect();
return new RS3KVTable(
name,
storeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
public class RS3Exception extends RuntimeException {
private static final long serialVersionUID = 0L;

public RS3Exception(String message) {
super(message);
}

public RS3Exception(final Throwable cause) {
super(cause);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dev.responsive.kafka.internal.db.rs3.client;

public class RS3TimeoutException extends RS3Exception {
private static final long serialVersionUID = 0L;

public RS3TimeoutException(final Throwable cause) {
super(cause);
}

public RS3TimeoutException(final String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,62 @@

package dev.responsive.kafka.internal.db.rs3.client.grpc;

import static io.grpc.Status.UNAVAILABLE;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3Exception;
import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.rs3.RS3Grpc;
import dev.responsive.rs3.Rs3;
import io.grpc.StatusRuntimeException;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.Time;

public class GrpcRS3Client implements RS3Client {
static final long WAL_OFFSET_NONE = Long.MAX_VALUE;

private final PssStubsProvider stubs;
private final Time time;
private final long retryTimeoutMs;

@VisibleForTesting
GrpcRS3Client(final PssStubsProvider stubs) {
GrpcRS3Client(final PssStubsProvider stubs, final Time time, final long retryTimeoutMs) {
this.stubs = Objects.requireNonNull(stubs);
this.time = Objects.requireNonNull(time);
this.retryTimeoutMs = retryTimeoutMs;
}

public void close() {
stubs.close();
}

public static RS3Client connect(
final String target,
final boolean useTls
) {
return new GrpcRS3Client(PssStubsProvider.connect(target, useTls));
}

@Override
public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, final int pssId) {
final Rs3.GetOffsetsResult result;
final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub();
try {
result = stub.getOffsets(Rs3.GetOffsetsRequest.newBuilder()
.setStoreId(uuidProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.build());
} catch (final StatusRuntimeException e) {
throw new RS3Exception(e);
}

final Rs3.GetOffsetsRequest request = Rs3.GetOffsetsRequest.newBuilder()
.setStoreId(uuidProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.build();
final Rs3.GetOffsetsResult result = withRetry(
() -> stub.getOffsets(request),
() -> "GetOffsets(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")"
);
checkField(result::hasWrittenOffset, "writtenOffset");
checkField(result::hasFlushedOffset, "flushedOffset");
return new CurrentOffsets(
Expand All @@ -76,6 +78,35 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f
);
}

private <T> T withRetry(Supplier<T> supplier, Supplier<String> opDescription) {
// Using Kafka default backoff settings initially. We can pull them up
// if there is ever strong reason.
final var backoff = new ExponentialBackoff(50, 2, 1000, 0.2);
final var startTimeMs = time.milliseconds();
final var deadlineMs = startTimeMs + retryTimeoutMs;

var retries = 0;
long currentTimeMs;

do {
try {
return supplier.get();
} catch (final StatusRuntimeException e) {
if (e.getStatus() != UNAVAILABLE) {
throw new RS3Exception(e);
}
}

retries += 1;
currentTimeMs = time.milliseconds();
time.sleep(Math.min(
backoff.backoff(retries),
Math.max(0, deadlineMs - currentTimeMs))
);
} while (currentTimeMs - startTimeMs < retryTimeoutMs);
throw new RS3TimeoutException("Timeout while attempting operation " + opDescription.get());
}

@Override
public StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync(
final UUID storeId,
Expand Down Expand Up @@ -153,21 +184,19 @@ public Optional<byte[]> get(
final Optional<Long> expectedWrittenOffset,
final byte[] key
) {
final Instant start = Instant.now();
final var requestBuilder = Rs3.GetRequest.newBuilder()
.setStoreId(uuidProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setKey(ByteString.copyFrom(key));
expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset);
final var request = requestBuilder.build();
final Rs3.GetResult result;
final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub();
try {
result = stub.get(request);
} catch (final StatusRuntimeException e) {
throw new RS3Exception(e);
}

final Rs3.GetResult result = withRetry(
() -> stub.get(request),
() -> "Get(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")"
);
if (!result.hasResult()) {
return Optional.empty();
}
Expand Down Expand Up @@ -210,4 +239,41 @@ private void checkField(final Supplier<Boolean> check, final String field) {
throw new RuntimeException("rs3 resp proto missing field " + field);
}
}

public static class Connector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is added so that we have a hook to plug in a mock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added mainly because there are more parameters now. The static connect worked nicely when there were only two parameters, but it started feeling a little clumsy with the additions from the patch. Having a place to hook in mocks could be useful as well I guess.

private final Time time;
private final String host;
private final int port;

private boolean useTls = ResponsiveConfig.RS3_TLS_ENABLED_DEFAULT;
private long retryTimeoutMs = ResponsiveConfig.RS3_RETRY_TIMEOUT_DEFAULT;

public Connector(
final Time time,
final String host,
final int port
) {
this.time = Objects.requireNonNull(time);
this.host = Objects.requireNonNull(host);
this.port = port;
}

public void useTls(boolean useTls) {
this.useTls = useTls;
}

public void retryTimeoutMs(long retryTimeoutMs) {
this.retryTimeoutMs = retryTimeoutMs;
}

public RS3Client connect() {
String target = String.format("%s:%d", host, port);
return new GrpcRS3Client(
PssStubsProvider.connect(target, useTls),
time,
retryTimeoutMs
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -51,7 +52,7 @@ public class RS3KVTableTest {
private static final int PARTITION_ID = 8;

private String testName;
private RS3Client pocketClient;
private RS3Client rs3Client;
private final PssPartitioner pssPartitioner = new PssDirectPartitioner();
private final Metrics metrics = new Metrics();

Expand All @@ -70,7 +71,10 @@ public void setup(
testName = info.getTestMethod().orElseThrow().getName();
final int port = rs3Container.getMappedPort(50051);
this.rs3Container = rs3Container;
pocketClient = GrpcRS3Client.connect(String.format("localhost:%d", port), false);
final GrpcRS3Client.Connector connector =
new GrpcRS3Client.Connector(new MockTime(), "localhost", port);
connector.useTls(false);
rs3Client = connector.connect();
final ResponsiveMetrics responsiveMetrics = new ResponsiveMetrics(metrics);
responsiveMetrics.initializeTags(
"application-id",
Expand All @@ -92,7 +96,7 @@ public void setup(
this.table = new RS3KVTable(
testName,
STORE_ID,
pocketClient,
rs3Client,
pssPartitioner,
responsiveMetrics,
scopeBuilder
Expand All @@ -102,7 +106,7 @@ public void setup(
@AfterEach
public void teardown() {
System.out.println(rs3Container.getLogs());
pocketClient.close();
rs3Client.close();
}

@Test
Expand All @@ -126,7 +130,7 @@ public void shouldReadWriteFromKVStore() throws InterruptedException, ExecutionE
}

@Test
public void shouldWriteToPocketStore() throws InterruptedException, ExecutionException {
public void shouldWriteToStore() throws InterruptedException, ExecutionException {
// given:
final var flushManager = table.init(PARTITION_ID);
final var tablePartitioner = flushManager.partitioner();
Expand All @@ -141,7 +145,7 @@ public void shouldWriteToPocketStore() throws InterruptedException, ExecutionExc
flushManager.postFlush(10);

// then:
final var result = pocketClient.get(
final var result = rs3Client.get(
STORE_ID,
new LssId(PARTITION_ID),
pss,
Expand All @@ -156,7 +160,7 @@ public void shouldRestoreFromLowestPssWrittenOffset() {
// given:
int endOffset = 100;
for (final int pssId : pssPartitioner.pssForLss(new LssId(PARTITION_ID))) {
pocketClient.writeWalSegment(
rs3Client.writeWalSegment(
STORE_ID,
new LssId(PARTITION_ID),
pssId,
Expand All @@ -182,7 +186,7 @@ public void shouldRestoreFromStartIfLowestPssFlushedOffsetIsUnspecified() {
pssPartitioner.pssForLss(new LssId(PARTITION_ID)));
allPssExcept1.remove();
for (final int pssId : allPssExcept1) {
pocketClient.writeWalSegment(
rs3Client.writeWalSegment(
STORE_ID,
new LssId(PARTITION_ID),
pssId,
Expand Down
Loading
Loading