Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from c03fb0 to a1b3ef
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ public Optional<byte[]> get(
return result;
}

@Override
public List<Store> listStores() {
final Instant start = Instant.now();
final List<Store> result = delegate.listStores();
getSensor.record(Duration.between(start, Instant.now()).toNanos());
return result;
}

public void close() {
this.metrics.removeSensor(GET_SENSOR_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@ Optional<byte[]> get(
byte[] key
);

List<Store> listStores();

void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package dev.responsive.kafka.internal.db.rs3.client;

import java.util.List;
import java.util.UUID;

public class Store {

private final UUID storeId;
private final List<Integer> pssIds;

public Store(final UUID storeId, final List<Integer> pssIds) {
this.storeId = storeId;
this.pssIds = pssIds;
}

public UUID storeId() {
return storeId;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final Store store = (Store) o;

if (!storeId.equals(store.storeId)) {
return false;
}
return pssIds.equals(store.pssIds);
}

@Override
public int hashCode() {
int result = storeId.hashCode();
result = 31 * result + pssIds.hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

package dev.responsive.kafka.internal.db.rs3.client.grpc;

import static dev.responsive.kafka.internal.utils.Utils.lssIdProto;
import static dev.responsive.kafka.internal.utils.Utils.uuidProtoToUuid;
import static dev.responsive.kafka.internal.utils.Utils.uuidToUuidProto;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import dev.responsive.kafka.api.config.ResponsiveConfig;
Expand All @@ -22,6 +26,7 @@
import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException;
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.Store;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.rs3.RS3Grpc;
import dev.responsive.rs3.Rs3;
Expand All @@ -31,6 +36,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.Time;

Expand All @@ -57,7 +63,7 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f
final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub();

final Rs3.GetOffsetsRequest request = Rs3.GetOffsetsRequest.newBuilder()
.setStoreId(uuidProto(storeId))
.setStoreId(uuidToUuidProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.build();
Expand Down Expand Up @@ -130,7 +136,7 @@ public StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsyn
final var streamSender = new GrpcStreamSender<WalEntry, Rs3.WriteWALSegmentRequest>(
entry -> {
final var entryBuilder = Rs3.WriteWALSegmentRequest.newBuilder()
.setStoreId(uuidProto(storeId))
.setStoreId(uuidToUuidProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setEndOffset(endOffset)
Expand Down Expand Up @@ -209,7 +215,7 @@ public Optional<byte[]> get(
final byte[] key
) {
final var requestBuilder = Rs3.GetRequest.newBuilder()
.setStoreId(uuidProto(storeId))
.setStoreId(uuidToUuidProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setKey(ByteString.copyFrom(key));
Expand All @@ -229,17 +235,20 @@ public Optional<byte[]> get(
return Optional.of(keyValue.getValue().toByteArray());
}

private Rs3.UUID uuidProto(final UUID uuid) {
return Rs3.UUID.newBuilder()
.setHigh(uuid.getMostSignificantBits())
.setLow(uuid.getLeastSignificantBits())
.build();
}
@Override
public List<Store> listStores() {
final var request = Rs3.ListTablesRequest.newBuilder().build();
final RS3Grpc.RS3BlockingStub stub = stubs.globalStubs().syncStub();

private Rs3.LSSId lssIdProto(final LssId lssId) {
return Rs3.LSSId.newBuilder()
.setId(lssId.id())
.build();
final Rs3.ListTablesResult result = withRetry(
() -> stub.listTables(request),
() -> "ListTables()"
);

return result.getTablesList()
.stream()
.map(t -> new Store(uuidProtoToUuid(t.getStoreId()), t.getPssIdsList()))
.collect(Collectors.toList());
}

private void addWalEntryToSegment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ class PssStubsProvider {
private static final Logger LOG = LoggerFactory.getLogger(PssStubsProvider.class);

private final ManagedChannel channel;
private final Stubs globalStubs;
private final ConcurrentMap<StubsKey, Stubs> stubs = new ConcurrentHashMap<>();

@VisibleForTesting
PssStubsProvider(final ManagedChannel channel) {
this.channel = channel;
this.globalStubs = new Stubs(
RS3Grpc.newBlockingStub(channel),
RS3Grpc.newStub(channel)
);
}

static PssStubsProvider connect(
Expand Down Expand Up @@ -60,6 +65,10 @@ Stubs stubs(final UUID storeId, final int pssId) {
});
}

Stubs globalStubs() {
return globalStubs;
}

@VisibleForTesting
static class Stubs {
private final RS3Grpc.RS3BlockingStub syncStub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

package dev.responsive.kafka.internal.utils;

import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.rs3.Rs3;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
Expand Down Expand Up @@ -126,4 +129,21 @@ public static Bytes incrementWithoutOverflow(final Bytes input) {
return null;
}
}

public static UUID uuidProtoToUuid(final Rs3.UUID uuid) {
return new UUID(uuid.getHigh(), uuid.getLow());
}

public static Rs3.UUID uuidToUuidProto(final UUID uuid) {
return Rs3.UUID.newBuilder()
.setHigh(uuid.getMostSignificantBits())
.setLow(uuid.getLeastSignificantBits())
.build();
}

public static Rs3.LSSId lssIdProto(final LssId lssId) {
return Rs3.LSSId.newBuilder()
.setId(lssId.id())
.build();
}
}
Loading
Loading