Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,13 @@ public class ResponsiveConfig extends AbstractConfig {
// ------------------ WindowStore configurations ----------------------

public static final String MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_CONFIG = "responsive.mongo.windowed.key.timestamp.first";
private static final boolean MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_DEFAULT = false;
private static final boolean MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_DEFAULT = true;
private static final String MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_DOC = "Whether to put the window start timestamp "
+ "first in the composite windowed key format for MongoDB. This can be toggled true/false to get better "
+ "performance depending on the density of unique keys per window, and should be experimented "
+ "with for best results. However it is important to note that this cannot be changed for "
+ "an active application. Messing with this can corrupt existing state!";
+ "with for best results. Must be true for any application that uses range queries on window stores. "
+ "It is also important to note that this cannot be changed for an active application -- flipping "
+ "this can corrupt existing state.";

public static final String WINDOW_BLOOM_FILTER_COUNT_CONFIG = "responsive.window.bloom.filter.count";
private static final int WINDOW_BLOOM_FILTER_COUNT_DEFAULT = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,22 @@ public KeyValueIterator<WindowedKey, byte[]> backFetchAll(
);
}

@Override
public KeyValueIterator<WindowedKey, byte[]> all(
final int kafkaPartition,
final long streamTime
) {
throw new UnsupportedOperationException("all is not yet supported for Cassandra backends");
}

@Override
public KeyValueIterator<WindowedKey, byte[]> backAll(
final int kafkaPartition,
final long streamTime
) {
throw new UnsupportedOperationException("backAll is not yet supported for Cassandra backends");
}

private static KeyValue<WindowedKey, byte[]> windowRows(final Row row) {
final long startTs = row.getInstant(WINDOW_START.column()).toEpochMilli();
final Bytes key = Bytes.wrap(row.getByteBuffer(DATA_KEY.column()).array());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import dev.responsive.kafka.internal.utils.WindowedKey;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;

public interface RemoteWindowedTable<S> extends RemoteTable<WindowedKey, S> {
Expand Down Expand Up @@ -157,4 +158,22 @@ KeyValueIterator<WindowedKey, byte[]> backFetchAll(
long timeFrom,
long timeTo
);

/**
* Retrieves the windows of the given {@code kafkaPartition} across all keys and timestamps.
*
* @param kafkaPartition the kafka partition
*
* @return a forwards iterator over all the windows and values previously set
*/
KeyValueIterator<WindowedKey, byte[]> all(int kafkaPartition, long streamTime);

/**
* Retrieves the windows of the given {@code kafkaPartition} across all keys and timestamps.
*
* @param kafkaPartition the kafka partition
*
* @return a backwards iterator over all the windows and values previously set
*/
KeyValueIterator<WindowedKey, byte[]> backAll(int kafkaPartition, long streamTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry;
import static dev.responsive.kafka.internal.db.partitioning.SegmentPartitioner.UNINITIALIZED_STREAM_TIME;
import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET;
import static dev.responsive.kafka.internal.stores.SegmentedOperations.MIN_KEY;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;

Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
Expand Down Expand Up @@ -457,13 +459,37 @@ public KeyValueIterator<WindowedKey, byte[]> backFetch(
@Override
public KeyValueIterator<WindowedKey, byte[]> fetchRange(
final int kafkaPartition,
final Bytes fromKey,
final Bytes toKey,
final Bytes keyFrom,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question about the state store api in general - what are these cross-key APIs actually used for? All the kafka streams processors are scoped to one key afaik.

final Bytes keyTo,
final long timeFrom,
final long timeTo
) {
throw new UnsupportedOperationException("fetchRange not yet supported for Mongo backends");
final List<KeyValueIterator<WindowedKey, byte[]>> segmentIterators = new LinkedList<>();
final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition);

for (final var segment : partitioner.range(kafkaPartition, timeFrom, timeTo)) {
final var segmentWindows = partitionSegments.segmentWindows.get(segment);

// Since we use a flat keyspace by concatenating the timestamp with the data key and have
// variable length data keys, it's impossible to request only valid data that's within
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question to clarify my understanding - why does the variability of the length of the data key matter? even if we knew all the keys were the same size, this filter would still pick up too much data right? if the key is first, you'd still pick up a bunch of time windows you don't want. And if the time is first you'd still pick up a bunch of keys you don't want.

// the given bounds. Instead issue a broad request from the valid bounds and then post-filter
final var lowerBound = compositeKey(keyFrom, timeFrom);
final var upperBound = compositeKey(keyTo, timeTo);

final FindIterable<WindowDoc> fetchResults = segmentWindows.find(
Filters.and(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should include these fields outside of the key and filter on them server-side. This has the potential to jack up our data transfer costs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the question -- what are "these fields"?

And what do you mean by including them "outside the key" -- are you talking about the composite key, and moving the timestamp to a separate field rather than a composite? I don't see how that would affect our data transfer costs for range queries, as in all cases where we return the key we would need to return the timestamp as well.

I'm wondering if there might be a case for point lookups, though. Since AFAICT there's no way to return a doc without sending the key/ID, whereas if the timestamp was an additional field then maybe it's possible to filter that out. Is that what you're getting at?

Is it even true that there's no way to do a point lookup without returning the key/ID in the response? I'm assuming that it's possible to filter out other fields besides the key, but even that I don't know for sure 🤔

Filters.gte(WindowDoc.ID, lowerBound),
Filters.lte(WindowDoc.ID, upperBound))
);


segmentIterators.add(
Iterators.filterKv(
Iterators.kv(fetchResults.iterator(), MongoWindowedTable::windowFromDoc),
kv -> filterFetchRange(kv, timeFrom, timeTo, keyFrom, keyTo, timestampFirstOrder))
);
}
return Iterators.wrapped(segmentIterators);
}

@Override
Expand All @@ -483,7 +509,35 @@ public KeyValueIterator<WindowedKey, byte[]> fetchAll(
final long timeFrom,
final long timeTo
) {
throw new UnsupportedOperationException("fetchAll not yet supported for Mongo backends");
if (!timestampFirstOrder) {
throw new UnsupportedOperationException("Range queries such as fetchAll require stores to be "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because mongo doesn't let you scan the whole table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this was a "prevent users from shooting themselves in the foot" moment -- it's possible, just really inefficient since we have to scan everything in between (see comment about variable length keys).

Of course if we're going to go the route of scanning everything in order to enable partition scaling, then this constraint doesn't make any sense. I'll remove it

+ "configured with timestamp-first order");
}

final List<KeyValueIterator<WindowedKey, byte[]>> segmentIterators = new LinkedList<>();
final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition);

for (final var segment : partitioner.range(kafkaPartition, timeFrom, timeTo)) {
final var segmentWindows = partitionSegments.segmentWindows.get(segment);

// To avoid scanning the entire segment, we use the bytewise "minimum key" to start the scan
// at the lower time bound. Since there's no corresponding "maximum key" given the variable
// length keys, we have to set the upper bound at timeTo + 1, while using strict comparison
// (ie #lt rather than #lte) to exclude said upper bound
final var lowerBound = compositeKey(MIN_KEY, timeFrom);
final var upperBound = compositeKey(MIN_KEY, timeTo + 1);

final FindIterable<WindowDoc> fetchResults = segmentWindows.find(
Filters.and(
Filters.gte(WindowDoc.ID, lowerBound),
Filters.lt(WindowDoc.ID, upperBound))
);

segmentIterators.add(
Iterators.kv(fetchResults.iterator(), MongoWindowedTable::windowFromDoc)
);
}
return Iterators.wrapped(segmentIterators);
}

@Override
Expand All @@ -495,6 +549,34 @@ public KeyValueIterator<WindowedKey, byte[]> backFetchAll(
throw new UnsupportedOperationException("backFetchAll not yet supported for MongoDB backends");
}

@Override
public KeyValueIterator<WindowedKey, byte[]> all(
final int kafkaPartition,
final long streamTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to filter on this time?

) {
final List<KeyValueIterator<WindowedKey, byte[]>> segmentIterators = new LinkedList<>();
final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition);

for (final var segment : partitioner.activeSegments(kafkaPartition, streamTime)) {
final var segmentWindows = partitionSegments.segmentWindows.get(segment);

final FindIterable<WindowDoc> fetchResults = segmentWindows.find();

segmentIterators.add(
Iterators.kv(fetchResults.iterator(), MongoWindowedTable::windowFromDoc)
);
}
return Iterators.wrapped(segmentIterators);
}

@Override
public KeyValueIterator<WindowedKey, byte[]> backAll(
final int kafkaPartition,
final long streamTime
) {
throw new UnsupportedOperationException("backAll not yet supported for MongoDB backends");
}

public BasicDBObject compositeKey(final WindowedKey windowedKey) {
return compositeKey(windowedKey.key, windowedKey.windowStartMs);
}
Expand All @@ -511,5 +593,22 @@ private static KeyValue<WindowedKey, byte[]> windowFromDoc(final WindowDoc windo
return new KeyValue<>(WindowDoc.windowedKey(windowDoc.id), windowDoc.value);
}

private static boolean filterFetchRange(
final WindowedKey windowedKey,
final long timeFrom,
final long timeTo,
final Bytes keyFrom,
final Bytes keyTo,
final boolean timestampFirstOrder
) {
// If we use timestamp-first order, then the upper/lower bounds guarantee the timestamps are
// valid, so therefore we only need to filter out the invalid keys (and vice versa)
if (timestampFirstOrder) {
return windowedKey.key.compareTo(keyFrom) > 0 && windowedKey.key.compareTo(keyTo) < 0;
} else {
return windowedKey.windowStartMs > timeFrom && windowedKey.windowStartMs < timeTo;
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,20 @@ public KeyValueIterator<K, Result<K>> range(final K from, final K to) {
);
}

public KeyValueIterator<K, Result<K>> range(
final K from,
final K to,
final boolean fromInclusive,
final boolean toInclusive
) {
return Iterators.kv(
Iterators.filter(
buffer.getReader().subMap(from, fromInclusive, to, toInclusive).entrySet().iterator(),
e -> keySpec.retain(e.getKey())),
result -> new KeyValue<>(result.getKey(), result.getValue())
);
}

public KeyValueIterator<K, Result<K>> range(
final K from,
final K to,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
return windowOperations.all();
return windowOperations.all(observedStreamTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@

public class SegmentedOperations implements WindowOperations {

// The "minimum" possible key when comparing bytewise, used to define range query bounds
public static final Bytes MIN_KEY = Bytes.wrap(new byte[0]);

@SuppressWarnings("rawtypes")
private final InternalProcessorContext context;
private final ResponsiveWindowParams params;
Expand Down Expand Up @@ -267,12 +270,26 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(
final long timeFrom,
final long timeTo
) {
throw new UnsupportedOperationException("Not yet implemented");
final WindowedKey lowerBound = new WindowedKey(MIN_KEY, timeFrom);
final WindowedKey upperBound = new WindowedKey(MIN_KEY, timeTo + 1);

// set toInclusive bound to false due to the +1 in the upper bound
return Iterators.windowedKey(
new LocalRemoteKvIterator<>(
buffer.range(lowerBound, upperBound, true, false),
table.fetchAll(changelog.partition(), timeFrom, timeTo)),
params.windowSize()
);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
throw new UnsupportedOperationException("Not yet implemented");
public KeyValueIterator<Windowed<Bytes>, byte[]> all(final long streamTime) {
return Iterators.windowedKey(
new LocalRemoteKvIterator<>(
buffer.all(),
table.all(changelog.partition(), streamTime)),
params.windowSize()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(
final long timeTo
);

KeyValueIterator<Windowed<Bytes>, byte[]> all();
KeyValueIterator<Windowed<Bytes>, byte[]> all(final long streamTime);

WindowStoreIterator<byte[]> backwardFetch(
final Bytes key,
Expand Down
Loading