Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from c03fb0 to eab539
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ public Optional<byte[]> get(
return result;
}

@Override
public List<Store> listStores() {
final Instant start = Instant.now();
final List<Store> result = delegate.listStores();
getSensor.record(Duration.between(start, Instant.now()).toNanos());
return result;
}

public void close() {
this.metrics.removeSensor(GET_SENSOR_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@ Optional<byte[]> get(
byte[] key
);

List<Store> listStores();

void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package dev.responsive.kafka.internal.db.rs3.client;

import java.util.List;
import java.util.UUID;

public class Store {

private final UUID storeId;
private final List<Integer> pssIds;

public Store(final UUID storeId, final List<Integer> pssIds) {
this.storeId = storeId;
this.pssIds = pssIds;
}

public UUID storeId() {
return storeId;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final Store store = (Store) o;

if (!storeId.equals(store.storeId)) {
return false;
}
return pssIds.equals(store.pssIds);
}

@Override
public int hashCode() {
int result = storeId.hashCode();
result = 31 * result + pssIds.hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

package dev.responsive.kafka.internal.db.rs3.client.grpc;

import static dev.responsive.kafka.internal.utils.Utils.lssIdProto;
import static dev.responsive.kafka.internal.utils.Utils.uuidProtoToUuid;
import static dev.responsive.kafka.internal.utils.Utils.uuidToUuidProto;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import dev.responsive.kafka.api.config.ResponsiveConfig;
Expand All @@ -21,6 +25,7 @@
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException;
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.Store;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.rs3.RS3Grpc;
Expand All @@ -31,6 +36,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.Time;

Expand All @@ -57,7 +63,7 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f
final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub();

final Rs3.GetOffsetsRequest request = Rs3.GetOffsetsRequest.newBuilder()
.setStoreId(uuidProto(storeId))
.setStoreId(uuidToUuidProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.build();
Expand Down Expand Up @@ -130,7 +136,7 @@ public StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsyn
final var streamSender = new GrpcStreamSender<WalEntry, Rs3.WriteWALSegmentRequest>(
entry -> {
final var entryBuilder = Rs3.WriteWALSegmentRequest.newBuilder()
.setStoreId(uuidProto(storeId))
.setStoreId(uuidToUuidProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setEndOffset(endOffset)
Expand Down Expand Up @@ -209,7 +215,7 @@ public Optional<byte[]> get(
final byte[] key
) {
final var requestBuilder = Rs3.GetRequest.newBuilder()
.setStoreId(uuidProto(storeId))
.setStoreId(uuidToUuidProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setKey(ByteString.copyFrom(key));
Expand All @@ -229,17 +235,20 @@ public Optional<byte[]> get(
return Optional.of(keyValue.getValue().toByteArray());
}

private Rs3.UUID uuidProto(final UUID uuid) {
return Rs3.UUID.newBuilder()
.setHigh(uuid.getMostSignificantBits())
.setLow(uuid.getLeastSignificantBits())
.build();
}
@Override
public List<Store> listStores() {
final var request = Rs3.ListStoresRequest.newBuilder().build();
final RS3Grpc.RS3BlockingStub stub = stubs.globalStubs().syncStub();

private Rs3.LSSId lssIdProto(final LssId lssId) {
return Rs3.LSSId.newBuilder()
.setId(lssId.id())
.build();
final Rs3.ListStoresResult result = withRetry(
() -> stub.listStores(request),
() -> "ListStores()"
);

return result.getStoresList()
.stream()
.map(t -> new Store(uuidProtoToUuid(t.getStoreId()), t.getPssIdsList()))
.collect(Collectors.toList());
}

private void addWalEntryToSegment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ class PssStubsProvider {
private static final Logger LOG = LoggerFactory.getLogger(PssStubsProvider.class);

private final ManagedChannel channel;
private final Stubs globalStubs;
private final ConcurrentMap<StubsKey, Stubs> stubs = new ConcurrentHashMap<>();

@VisibleForTesting
PssStubsProvider(final ManagedChannel channel) {
this.channel = channel;
this.globalStubs = new Stubs(
RS3Grpc.newBlockingStub(channel),
RS3Grpc.newStub(channel)
);
}

static PssStubsProvider connect(
Expand Down Expand Up @@ -60,6 +65,10 @@ Stubs stubs(final UUID storeId, final int pssId) {
});
}

Stubs globalStubs() {
return globalStubs;
}

@VisibleForTesting
static class Stubs {
private final RS3Grpc.RS3BlockingStub syncStub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

package dev.responsive.kafka.internal.utils;

import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.rs3.Rs3;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
Expand Down Expand Up @@ -126,4 +129,21 @@ public static Bytes incrementWithoutOverflow(final Bytes input) {
return null;
}
}

public static UUID uuidProtoToUuid(final Rs3.UUID uuid) {
return new UUID(uuid.getHigh(), uuid.getLow());
}

public static Rs3.UUID uuidToUuidProto(final UUID uuid) {
return Rs3.UUID.newBuilder()
.setHigh(uuid.getMostSignificantBits())
.setLow(uuid.getLeastSignificantBits())
.build();
}

public static Rs3.LSSId lssIdProto(final LssId lssId) {
return Rs3.LSSId.newBuilder()
.setId(lssId.id())
.build();
}
}
Loading
Loading