Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from f385e7 to 59f088
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.Range;
import dev.responsive.kafka.internal.db.rs3.client.RangeBound;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
Expand Down Expand Up @@ -137,8 +138,7 @@ public KeyValueIterator<Bytes, byte[]> range(
final Bytes to,
final long streamTimeMs
) {
final RangeBound fromBound = RangeBound.inclusive(from.get());
final RangeBound toBound = RangeBound.exclusive(to.get());
final var range = new Range(RangeBound.inclusive(from.get()), RangeBound.exclusive(to.get()));
final List<KeyValueIterator<Bytes, byte[]>> pssIters = new ArrayList<>();

for (int pssId : pssPartitioner.pssForLss(this.lssId)) {
Expand All @@ -147,8 +147,7 @@ public KeyValueIterator<Bytes, byte[]> range(
lssId,
pssId,
flushManager.writtenOffset(pssId),
fromBound,
toBound
range
));
}
return new MergeKeyValueIterator<>(pssIters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package dev.responsive.kafka.internal.db.rs3;

import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
Expand Down Expand Up @@ -76,12 +77,14 @@ public static UUID createStore(
: Optional.empty();

final var options = new CreateStoreOptions(
numKafkaPartitions,
CreateStoreTypes.StoreType.BASIC,
ttlResolver.isPresent() ? Optional.of(ClockType.WALL_CLOCK) : Optional.empty(),
defaultTtl,
Optional.empty()
);

final var result = rs3Client.createStore(storeName, numKafkaPartitions, options);
final var result = rs3Client.createStore(storeName, options);
LOG.info("Created store {} ({}) with {} logical shards and {} physical shards",
storeName, result.storeId(), numKafkaPartitions, result.pssIds().size());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dev.responsive.kafka.internal.db.rs3.client;

import dev.responsive.rs3.Rs3;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -12,21 +12,39 @@ public enum ClockType {
STREAM_TIME
}

public enum StoreType {
BASIC,
WINDOW
}

public static class CreateStoreOptions {

private final int logicalShards;
private final StoreType storeType;
private final Optional<ClockType> clockType;
private final Optional<Long> defaultTtl;
private final Optional<Integer> filterBitsPerKey;

private final Optional<SlateDbStorageOptions> slateDbOptions;

public CreateStoreOptions(
final int logicalShards,
final StoreType storeType,
final Optional<ClockType> clockType,
final Optional<Long> defaultTtl,
final Optional<Integer> filterBitsPerKey
final Optional<SlateDbStorageOptions> slateDbOptions
) {
this.clockType = clockType;
this.defaultTtl = defaultTtl;
this.filterBitsPerKey = filterBitsPerKey;
this.logicalShards = logicalShards;
this.storeType = Objects.requireNonNull(storeType);
this.clockType = Objects.requireNonNull(clockType);
this.defaultTtl = Objects.requireNonNull(defaultTtl);
this.slateDbOptions = Objects.requireNonNull(slateDbOptions);
}

public int logicalShards() {
return logicalShards;
}

public StoreType storeType() {
return storeType;
}

public Optional<ClockType> clockType() {
Expand All @@ -37,26 +55,18 @@ public Optional<Long> defaultTtl() {
return defaultTtl;
}

public Optional<Integer> filterBitsPerKey() {
return filterBitsPerKey;
}

public Rs3.CreateStoreOptions toProto() {
final var builder = Rs3.CreateStoreOptions.newBuilder();
clockType.ifPresent(
type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal()))
);
defaultTtl.ifPresent(builder::setDefaultTtl);
filterBitsPerKey.ifPresent(builder::setFilterBitsPerKey);
return builder.build();
public Optional<SlateDbStorageOptions> slateDbOptions() {
return slateDbOptions;
}

@Override
public String toString() {
return "CreateStoreOptions{"
+ "clockType=" + clockType
+ "logicalShards=" + logicalShards
+ ", storeType=" + storeType
+ ", clockType=" + clockType
+ ", defaultTtl=" + defaultTtl
+ ", filterBitsPerKey=" + filterBitsPerKey
+ ", slateDbOptions=" + slateDbOptions
+ '}';
}

Expand All @@ -68,24 +78,54 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

final CreateStoreOptions that = (CreateStoreOptions) o;
return logicalShards == that.logicalShards && storeType == that.storeType
&& Objects.equals(clockType, that.clockType) && Objects.equals(
defaultTtl,
that.defaultTtl
) && Objects.equals(slateDbOptions, that.slateDbOptions);
}

if (!clockType.equals(that.clockType)) {
return false;
@Override
public int hashCode() {
return Objects.hash(logicalShards, storeType, clockType, defaultTtl, slateDbOptions);
}
}

public static class SlateDbStorageOptions {
private final Optional<Integer> filterBitsPerKey;

public SlateDbStorageOptions(
final Optional<Integer> filterBitsPerKey) {
this.filterBitsPerKey = Objects.requireNonNull(filterBitsPerKey);
}

public Optional<Integer> filterBitsPerKey() {
return filterBitsPerKey;
}

@Override
public String toString() {
return "SlateDbStorageOptions{"
+ "filterBitsPerKey=" + filterBitsPerKey
+ '}';
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!defaultTtl.equals(that.defaultTtl)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
return filterBitsPerKey.equals(that.filterBitsPerKey);
final SlateDbStorageOptions that = (SlateDbStorageOptions) o;
return Objects.equals(filterBitsPerKey, that.filterBitsPerKey);
}

@Override
public int hashCode() {
int result = clockType.hashCode();
result = 31 * result + defaultTtl.hashCode();
result = 31 * result + filterBitsPerKey.hashCode();
return result;
return Objects.hashCode(filterBitsPerKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Arrays;
import java.util.Objects;

public class Delete extends WalEntry {
private final byte[] key;

public Delete(final byte[] key) {
this.key = Objects.requireNonNull(key);
}

public byte[] key() {
return key;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Delete delete = (Delete) o;
return Objects.deepEquals(key, delete.key);
}

@Override
public int hashCode() {
return Arrays.hashCode(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,14 @@ public KeyValueIterator<Bytes, byte[]> range(
final LssId lssId,
final int pssId,
final Optional<Long> expectedWrittenOffset,
final RangeBound from,
final RangeBound to
final Range range
) {
return delegate.range(
storeId,
lssId,
pssId,
expectedWrittenOffset,
from,
to
range
);
}

Expand All @@ -121,10 +119,9 @@ public List<Store> listStores() {
@Override
public CreateStoreResult createStore(
final String storeName,
final int logicalShards,
final CreateStoreOptions options
) {
return delegate.createStore(storeName, logicalShards, options);
return delegate.createStore(storeName, options);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,37 @@

import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;

public class Put extends WalEntry {
private final byte[] key;
private final byte[] value;

public Put(final byte[] key, final byte[] value) {
this.key = Objects.requireNonNull(key);
this.value = value;
this.value = Objects.requireNonNull(value);
}

public byte[] key() {
return key;
}

public Optional<byte[]> value() {
return Optional.ofNullable(value);
public byte[] value() {
return value;
}

@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Put)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final Put put = (Put) o;
return Objects.deepEquals(key, put.key) && Objects.deepEquals(value, put.value);
return Objects.deepEquals(key, put.key) && Objects.deepEquals(
value,
put.value
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,13 @@ KeyValueIterator<Bytes, byte[]> range(
LssId lssId,
int pssId,
Optional<Long> expectedWrittenOffset,
RangeBound from,
RangeBound to
Range range
);

List<Store> listStores();

CreateStoreResult createStore(
String storeId,
int logicalShards,
CreateStoreOptions options
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Arrays;
import java.util.Objects;

public class Range {
private final RangeBound start;
Expand Down Expand Up @@ -61,4 +62,24 @@ public Boolean map(final RangeBound.Unbounded b) {
});
}

public static Range unbounded() {
return new Range(RangeBound.unbounded(), RangeBound.unbounded());
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Range range = (Range) o;
return Objects.equals(start, range.start) && Objects.equals(end, range.end);
}

@Override
public int hashCode() {
return Objects.hash(start, end);
}
}
Loading
Loading