Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from f385e7 to 59f088
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.Range;
import dev.responsive.kafka.internal.db.rs3.client.RangeBound;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
Expand Down Expand Up @@ -137,8 +138,7 @@ public KeyValueIterator<Bytes, byte[]> range(
final Bytes to,
final long streamTimeMs
) {
final RangeBound fromBound = RangeBound.inclusive(from.get());
final RangeBound toBound = RangeBound.exclusive(to.get());
final var range = new Range(RangeBound.inclusive(from.get()), RangeBound.exclusive(to.get()));
final List<KeyValueIterator<Bytes, byte[]>> pssIters = new ArrayList<>();

for (int pssId : pssPartitioner.pssForLss(this.lssId)) {
Expand All @@ -147,8 +147,7 @@ public KeyValueIterator<Bytes, byte[]> range(
lssId,
pssId,
flushManager.writtenOffset(pssId),
fromBound,
toBound
range
));
}
return new MergeKeyValueIterator<>(pssIters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package dev.responsive.kafka.internal.db.rs3;

import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
Expand Down Expand Up @@ -76,12 +77,14 @@ public static UUID createStore(
: Optional.empty();

final var options = new CreateStoreOptions(
numKafkaPartitions,
CreateStoreTypes.StoreType.BASIC,
ttlResolver.isPresent() ? Optional.of(ClockType.WALL_CLOCK) : Optional.empty(),
defaultTtl,
Optional.empty()
);

final var result = rs3Client.createStore(storeName, numKafkaPartitions, options);
final var result = rs3Client.createStore(storeName, options);
LOG.info("Created store {} ({}) with {} logical shards and {} physical shards",
storeName, result.storeId(), numKafkaPartitions, result.pssIds().size());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dev.responsive.kafka.internal.db.rs3.client;

import dev.responsive.rs3.Rs3;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -12,21 +12,39 @@ public enum ClockType {
STREAM_TIME
}

public enum StoreType {
BASIC,
WINDOW
}

public static class CreateStoreOptions {

private final int logicalShards;
private final StoreType storeType;
private final Optional<ClockType> clockType;
private final Optional<Long> defaultTtl;
private final Optional<Integer> filterBitsPerKey;

private final Optional<SlateDbStorageOptions> slateDbOptions;

public CreateStoreOptions(
final int logicalShards,
final StoreType storeType,
final Optional<ClockType> clockType,
final Optional<Long> defaultTtl,
final Optional<Integer> filterBitsPerKey
final Optional<SlateDbStorageOptions> slateDbOptions
) {
this.clockType = clockType;
this.defaultTtl = defaultTtl;
this.filterBitsPerKey = filterBitsPerKey;
this.logicalShards = logicalShards;
this.storeType = Objects.requireNonNull(storeType);
this.clockType = Objects.requireNonNull(clockType);
this.defaultTtl = Objects.requireNonNull(defaultTtl);
this.slateDbOptions = Objects.requireNonNull(slateDbOptions);
}

public int logicalShards() {
return logicalShards;
}

public StoreType storeType() {
return storeType;
}

public Optional<ClockType> clockType() {
Expand All @@ -37,27 +55,19 @@ public Optional<Long> defaultTtl() {
return defaultTtl;
}

public Optional<Integer> filterBitsPerKey() {
return filterBitsPerKey;
}

public Rs3.CreateStoreOptions toProto() {
final var builder = Rs3.CreateStoreOptions.newBuilder();
clockType.ifPresent(
type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal()))
);
defaultTtl.ifPresent(builder::setDefaultTtl);
filterBitsPerKey.ifPresent(builder::setFilterBitsPerKey);
return builder.build();
public Optional<SlateDbStorageOptions> slateDbOptions() {
return slateDbOptions;
}

@Override
public String toString() {
return "CreateStoreOptions{"
+ "clockType=" + clockType
+ ", defaultTtl=" + defaultTtl
+ ", filterBitsPerKey=" + filterBitsPerKey
+ '}';
return "CreateStoreOptions{" +
"logicalShards=" + logicalShards +
", storeType=" + storeType +
", clockType=" + clockType +
", defaultTtl=" + defaultTtl +
", slateDbOptions=" + slateDbOptions +
'}';
}

@Override
Expand All @@ -68,24 +78,54 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

final CreateStoreOptions that = (CreateStoreOptions) o;
return logicalShards == that.logicalShards && storeType == that.storeType
&& Objects.equals(clockType, that.clockType) && Objects.equals(
defaultTtl,
that.defaultTtl
) && Objects.equals(slateDbOptions, that.slateDbOptions);
}

if (!clockType.equals(that.clockType)) {
return false;
@Override
public int hashCode() {
return Objects.hash(logicalShards, storeType, clockType, defaultTtl, slateDbOptions);
}
}

public static class SlateDbStorageOptions {
private final Optional<Integer> filterBitsPerKey;

public SlateDbStorageOptions(
final Optional<Integer> filterBitsPerKey) {
this.filterBitsPerKey = Objects.requireNonNull(filterBitsPerKey);
}

public Optional<Integer> filterBitsPerKey() {
return filterBitsPerKey;
}

@Override
public String toString() {
return "SlateDbStorageOptions{"
+ "filterBitsPerKey=" + filterBitsPerKey
+ '}';
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!defaultTtl.equals(that.defaultTtl)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
return filterBitsPerKey.equals(that.filterBitsPerKey);
final SlateDbStorageOptions that = (SlateDbStorageOptions) o;
return Objects.equals(filterBitsPerKey, that.filterBitsPerKey);
}

@Override
public int hashCode() {
int result = clockType.hashCode();
result = 31 * result + defaultTtl.hashCode();
result = 31 * result + filterBitsPerKey.hashCode();
return result;
return Objects.hashCode(filterBitsPerKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Arrays;
import java.util.Objects;

public class Delete extends WalEntry {
private final byte[] key;

public Delete(final byte[] key) {
this.key = Objects.requireNonNull(key);
}

public byte[] key() {
return key;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Delete delete = (Delete) o;
return Objects.deepEquals(key, delete.key);
}

@Override
public int hashCode() {
return Arrays.hashCode(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,14 @@ public KeyValueIterator<Bytes, byte[]> range(
final LssId lssId,
final int pssId,
final Optional<Long> expectedWrittenOffset,
final RangeBound from,
final RangeBound to
final Range range
) {
return delegate.range(
storeId,
lssId,
pssId,
expectedWrittenOffset,
from,
to
range
);
}

Expand All @@ -121,10 +119,9 @@ public List<Store> listStores() {
@Override
public CreateStoreResult createStore(
final String storeName,
final int logicalShards,
final CreateStoreOptions options
) {
return delegate.createStore(storeName, logicalShards, options);
return delegate.createStore(storeName, options);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,37 @@

import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;

public class Put extends WalEntry {
private final byte[] key;
private final byte[] value;

public Put(final byte[] key, final byte[] value) {
this.key = Objects.requireNonNull(key);
this.value = value;
this.value = Objects.requireNonNull(value);
}

public byte[] key() {
return key;
}

public Optional<byte[]> value() {
return Optional.ofNullable(value);
public byte[] value() {
return value;
}

@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Put)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final Put put = (Put) o;
return Objects.deepEquals(key, put.key) && Objects.deepEquals(value, put.value);
return Objects.deepEquals(key, put.key) && Objects.deepEquals(
value,
put.value
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,13 @@ KeyValueIterator<Bytes, byte[]> range(
LssId lssId,
int pssId,
Optional<Long> expectedWrittenOffset,
RangeBound from,
RangeBound to
Range range
);

List<Store> listStores();

CreateStoreResult createStore(
String storeId,
int logicalShards,
CreateStoreOptions options
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Arrays;
import java.util.Objects;

public class Range {
private final RangeBound start;
Expand Down Expand Up @@ -61,4 +62,24 @@ public Boolean map(final RangeBound.Unbounded b) {
});
}

public static Range unbounded() {
return new Range(RangeBound.unbounded(), RangeBound.unbounded());
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Range range = (Range) o;
return Objects.equals(start, range.start) && Objects.equals(end, range.end);
}

@Override
public int hashCode() {
return Objects.hash(start, end);
}
}
Loading
Loading