Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from 55c8b0 to 92841a
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public KeyValueIterator<Bytes, byte[]> range(
}

@Override
public List<Store> listStores() {
public List<StoreInfo> listStores() {
return delegate.listStores();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ KeyValueIterator<Bytes, byte[]> range(
Range range
);

List<Store> listStores();
List<StoreInfo> listStores();

CreateStoreResult createStore(
String storeId,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package dev.responsive.kafka.internal.db.rs3.client;

import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.StoreType;
import java.util.List;
import java.util.UUID;

public class StoreInfo {

public enum Status {
CREATING,
READY,
DELETED
}

private final String storeName;
private final UUID storeId;
private final StoreType storeType;
private final Status status;
private final List<Integer> pssIds;
private final String createOptions;

public StoreInfo(
final String storeName,
final UUID storeId,
final StoreType storeType,
final Status status,
final List<Integer> pssIds,
final String createOptions
) {
this.storeName = storeName;
this.storeId = storeId;
this.storeType = storeType;
this.status = status;
this.pssIds = List.copyOf(pssIds);
this.createOptions = createOptions;
}

public String storeName() {
return storeName;
}

public UUID storeId() {
return storeId;
}

public StoreType storeType() {
return storeType;
}

public Status status() {
return status;
}

public List<Integer> pssIds() {
return pssIds;
}

public String createOptions() {
return createOptions;
}

@Override
public String toString() {
return "StoreInfo{"
+ "storeName='" + storeName + '\''
+ ", storeId=" + storeId
+ ", storeType=" + storeType
+ ", status=" + status
+ ", pssIds=" + pssIds
+ ", createOptions=" + createOptions
+ '}';
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final StoreInfo storeInfo = (StoreInfo) o;

if (!storeName.equals(storeInfo.storeName)) {
return false;
}
if (!storeId.equals(storeInfo.storeId)) {
return false;
}
if (!storeType.equals(storeInfo.storeType)) {
return false;
}
if (!status.equals(storeInfo.status)) {
return false;
}
if (!pssIds.equals(storeInfo.pssIds)) {
return false;
}
return createOptions.equals(storeInfo.createOptions);
}

@Override
public int hashCode() {
int result = storeName.hashCode();
result = 31 * result + storeId.hashCode();
result = 31 * result + storeType.hashCode();
result = 31 * result + status.hashCode();
result = 31 * result + pssIds.hashCode();
result = 31 * result + createOptions.hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicDeleteProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicKeyProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeStatusFromProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeTypeFromProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetFromProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto;
import static dev.responsive.kafka.internal.utils.Utils.lssIdProto;
Expand All @@ -34,7 +36,7 @@
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.Range;
import dev.responsive.kafka.internal.db.rs3.client.RangeBound;
import dev.responsive.kafka.internal.db.rs3.client.Store;
import dev.responsive.kafka.internal.db.rs3.client.StoreInfo;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.rs3.RS3Grpc;
Expand Down Expand Up @@ -272,7 +274,7 @@ public Optional<byte[]> get(
}

@Override
public List<Store> listStores() {
public List<StoreInfo> listStores() {
final var request = Rs3.ListStoresRequest.newBuilder().build();
final RS3Grpc.RS3BlockingStub stub = stubs.globalStubs().syncStub();

Expand All @@ -283,7 +285,14 @@ public List<Store> listStores() {

return result.getStoresList()
.stream()
.map(t -> new Store(t.getStoreName(), uuidFromProto(t.getStoreId()), t.getPssIdsList()))
.map(t -> new StoreInfo(
t.getStoreName(),
uuidFromProto(t.getStoreId()),
storeTypeFromProto(t.getStoreType()),
storeStatusFromProto(t.getStatus()),
t.getPssIdsList(),
t.getOptions())
)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Exception;
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.StoreInfo;
import dev.responsive.rs3.Rs3;
import io.grpc.Status;
import io.grpc.StatusException;
Expand Down Expand Up @@ -102,7 +103,7 @@ public static Rs3.WriteWALSegmentRequest.Delete basicDeleteProto(final Delete de
.build();
}

public static Rs3.StoreType storeTypeProto(final CreateStoreTypes.StoreType storeType) {
public static Rs3.StoreType storeTypeToProto(final CreateStoreTypes.StoreType storeType) {
switch (storeType) {
case BASIC:
return Rs3.StoreType.BASIC;
Expand All @@ -113,12 +114,36 @@ public static Rs3.StoreType storeTypeProto(final CreateStoreTypes.StoreType stor
}
}

public static CreateStoreTypes.StoreType storeTypeFromProto(final Rs3.StoreType storeType) {
switch (storeType) {
case BASIC:
return CreateStoreTypes.StoreType.BASIC;
case WINDOW:
return CreateStoreTypes.StoreType.WINDOW;
default:
throw new IllegalArgumentException("Unknown store type: " + storeType);
}
}

public static StoreInfo.Status storeStatusFromProto(final Rs3.StoreInfo.Status status) {
switch (status) {
case CREATING:
return StoreInfo.Status.CREATING;
case READY:
return StoreInfo.Status.READY;
case DELETING:
return StoreInfo.Status.DELETED;
default:
throw new IllegalArgumentException("Unknown store status: " + status);
}
}

public static Rs3.CreateStoreOptions createStoreOptionsProto(
CreateStoreTypes.CreateStoreOptions options
) {
final var builder = Rs3.CreateStoreOptions.newBuilder();
builder.setLogicalShards(options.logicalShards());
builder.setStoreType(storeTypeProto(options.storeType()));
builder.setStoreType(storeTypeToProto(options.storeType()));
options.clockType().ifPresent(
type -> builder.setClockType(Rs3.ClockType.forNumber(type.ordinal()))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.StoreType;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Exception;
import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException;
import dev.responsive.kafka.internal.db.rs3.client.Range;
import dev.responsive.kafka.internal.db.rs3.client.StoreInfo;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.rs3.RS3Grpc;
import dev.responsive.rs3.Rs3;
import dev.responsive.rs3.Rs3.CreateStoreResult;
import dev.responsive.rs3.Rs3.ListStoresResult;
import dev.responsive.rs3.Rs3.StoreInfo;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -879,11 +880,15 @@ public void shouldTimeoutAfterObserverOnError() {
@Test
public void shouldListStores() {
// given:
final String createOptions = "opts-stub";
when(stub.listStores(any())).thenReturn(
ListStoresResult.newBuilder()
.addStores(StoreInfo.newBuilder()
.addStores(Rs3.StoreInfo.newBuilder()
.setStoreName(STORE_NAME)
.setStoreId(uuidToProto(STORE_ID))
.setStatus(Rs3.StoreInfo.Status.READY)
.setStoreType(Rs3.StoreType.BASIC)
.setOptions(createOptions)
.addAllPssIds(List.of(PSS_ID, PSS_ID_2))
.build()
).build()
Expand All @@ -893,9 +898,9 @@ public void shouldListStores() {
final var result = client.listStores();

// then:
final var expected = new dev.responsive.kafka.internal.db.rs3.client.Store(
STORE_NAME, STORE_ID, List.of(PSS_ID, PSS_ID_2)
);
final var expected = new StoreInfo(
STORE_NAME, STORE_ID, StoreType.BASIC, StoreInfo.Status.READY, List.of(PSS_ID, PSS_ID_2),
createOptions);

assertThat(result.size(), is(1));
assertThat(result.get(0), equalTo(expected));
Expand All @@ -919,15 +924,30 @@ public void shouldHandleEmptyStoresList() {
@Test
public void shouldRetryListStores() {
// given:
final String createOptions = "opts-stub";
when(stub.listStores(any()))
.thenThrow(new StatusRuntimeException(Status.UNAVAILABLE))
.thenReturn(ListStoresResult.newBuilder().build());
.thenReturn(ListStoresResult.newBuilder()
.addStores(Rs3.StoreInfo.newBuilder()
.setStoreName(STORE_NAME)
.setStoreId(uuidToProto(STORE_ID))
.setStatus(Rs3.StoreInfo.Status.READY)
.setStoreType(Rs3.StoreType.BASIC)
.setOptions(createOptions)
.addAllPssIds(List.of(PSS_ID, PSS_ID_2))
.build()
).build());

// when:
final var result = client.listStores();

// then:
assertThat(result.size(), is(0));
final var expected = new StoreInfo(
STORE_NAME, STORE_ID, StoreType.BASIC, StoreInfo.Status.READY, List.of(PSS_ID, PSS_ID_2),
createOptions);

assertThat(result.size(), is(1));
assertThat(result.get(0), equalTo(expected));
verify(stub, times(2)).listStores(Rs3.ListStoresRequest.newBuilder().build());
}

Expand Down
Loading