Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RangeBound;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration;
import dev.responsive.kafka.internal.utils.MergeKeyValueIterator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -133,7 +137,21 @@ public KeyValueIterator<Bytes, byte[]> range(
final Bytes to,
final long streamTimeMs
) {
throw new UnsupportedOperationException();
final RangeBound fromBound = RangeBound.inclusive(from.get());
final RangeBound toBound = RangeBound.exclusive(to.get());
final List<KeyValueIterator<Bytes, byte[]>> pssIters = new ArrayList<>();

for (int pssId : pssPartitioner.pssForLss(this.lssId)) {
pssIters.add(rs3Client.range(
storeId,
lssId,
pssId,
flushManager.writtenOffset(pssId),
fromBound,
toBound
));
}
return new MergeKeyValueIterator<>(pssIters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.UUID;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;

public class MeteredRS3Client implements RS3Client {
public static final String GROUP_NAME = "rs3-table-metrics";
Expand Down Expand Up @@ -90,6 +92,25 @@ public Optional<byte[]> get(
return result;
}

@Override
public KeyValueIterator<Bytes, byte[]> range(
final UUID storeId,
final LssId lssId,
final int pssId,
final Optional<Long> expectedWrittenOffset,
final RangeBound from,
final RangeBound to
) {
return delegate.range(
storeId,
lssId,
pssId,
expectedWrittenOffset,
from,
to
);
}

@Override
public List<Store> listStores() {
final Instant start = Instant.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;

public interface RS3Client {
CurrentOffsets getCurrentOffsets(UUID storeId, LssId lssId, int pssId);
Expand Down Expand Up @@ -44,6 +46,15 @@ Optional<byte[]> get(
byte[] key
);

KeyValueIterator<Bytes, byte[]> range(
UUID storeId,
LssId lssId,
int pssId,
Optional<Long> expectedWrittenOffset,
RangeBound from,
RangeBound to
);

List<Store> listStores();

void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ public class RS3TransientException extends RS3Exception {
public RS3TransientException(final Throwable cause) {
super(cause);
}

public RS3TransientException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Arrays;

public class Range {
private final RangeBound start;
private final RangeBound end;

public Range(RangeBound start, RangeBound end) {
this.start = start;
this.end = end;
}

public RangeBound start() {
return start;
}

public RangeBound end() {
return end;
}

public boolean contains(byte[] key) {
return greaterThanStartBound(key) && lessThanEndBound(key);
}

public boolean greaterThanStartBound(byte[] key) {
return start.map(new RangeBound.Mapper<>() {
@Override
public Boolean map(final RangeBound.InclusiveBound b) {
return Arrays.compare(b.key(), key) <= 0;
}

@Override
public Boolean map(final RangeBound.ExclusiveBound b) {
return Arrays.compare(b.key(), key) < 0;
}

@Override
public Boolean map(final RangeBound.Unbounded b) {
return true;
}
});
}

public boolean lessThanEndBound(byte[] key) {
return end.map(new RangeBound.Mapper<>() {
@Override
public Boolean map(final RangeBound.InclusiveBound b) {
return Arrays.compare(b.key(), key) >= 0;
}

@Override
public Boolean map(final RangeBound.ExclusiveBound b) {
return Arrays.compare(b.key(), key) > 0;
}

@Override
public Boolean map(final RangeBound.Unbounded b) {
return true;
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2025 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.db.rs3.client;

import java.util.Arrays;
import java.util.Objects;

public interface RangeBound {

<T> T map(Mapper<T> mapper);

static Unbounded unbounded() {
return Unbounded.INSTANCE;
}

static InclusiveBound inclusive(byte[] key) {
return new InclusiveBound(key);
}

static ExclusiveBound exclusive(byte[] key) {
return new ExclusiveBound(key);
}

class InclusiveBound implements RangeBound {
private final byte[] key;

public InclusiveBound(final byte[] key) {
this.key = key;
}

public byte[] key() {
return key;
}

@Override
public <T> T map(final Mapper<T> mapper) {
return mapper.map(this);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final InclusiveBound that = (InclusiveBound) o;
return Objects.deepEquals(key, that.key);
}

@Override
public int hashCode() {
return Arrays.hashCode(key);
}
}

class ExclusiveBound implements RangeBound {
private final byte[] key;

public ExclusiveBound(final byte[] key) {
this.key = key;
}

public byte[] key() {
return key;
}

@Override
public <T> T map(final Mapper<T> mapper) {
return mapper.map(this);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ExclusiveBound that = (ExclusiveBound) o;
return Objects.deepEquals(key, that.key);
}

@Override
public int hashCode() {
return Arrays.hashCode(key);
}
}

class Unbounded implements RangeBound {
private static final Unbounded INSTANCE = new Unbounded();

private Unbounded() {}

@Override
public <T> T map(final Mapper<T> mapper) {
return mapper.map(this);
}
}

interface Mapper<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe call this a Visitor instead? This looks like the classic visitor pattern to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a visitor pattern, but I'm using it as a poor man's version of pattern matching to implement a map function. I thought using the Visitor name made its usage feel a little too vague. I don't feel strongly about it though..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to you - I felt the opposite 😛 . I was a bit confused at first and had to look at how it was used but it would've felt obvious if it was named Visitor. But that could just be me. We had the Visitor pattern drilled into us working on ksql syntax trees 😄

T map(InclusiveBound b);

T map(ExclusiveBound b);

T map(Unbounded b);
}

}
Loading
Loading