-
Notifications
You must be signed in to change notification settings - Fork 5
Rs3 Window Operations #443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ableegoldman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor comments, from a high level this all looks good
kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowFlushManager.java
Outdated
Show resolved
Hide resolved
| final long batchOffset, | ||
| final Integer failedTablePartition | ||
| ) { | ||
| return String.format(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forget how exactly this gets used but we probably want to log at least the batch offset and failedTablePartition, probably also the current streamTime. And definitely the table name if it's not already included in the error message (maybe compare other implementations to see what's included in this info and what isn't)
|
|
||
| @Override | ||
| public void writeAdded(final WindowedKey key) { | ||
| streamTime = Long.max(streamTime, key.windowStartMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's not actually being used anywhere at the moment so it doesn't matter too much, but to avoid confusion in the future we might want to name this more specifically to indicate that it isn't the persisted streamTime but rather the stream-time of the current pending batch (eg batchStreamTime or pendingStreamTime or something better)
OTOH just thinking about how we'll want to use the streamTime in the future, maybe we actually do want the persisted streamTime only, to ensure it monotonically increases? Or will slateDB handle that for us automatically 🤔
kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowTable.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowTable.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowedKeySerde.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3Writer.java
Show resolved
Hide resolved
| final var scopeBuilder = responsiveMetrics.storeLevelMetricScopeBuilder( | ||
| Utils.extractThreadIdFromThreadName(Thread.currentThread().getName()), | ||
| changelog, | ||
| params.name().tableName() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we use #tableName for the existing caller of this method but I think it should actually be #kafkaName (here and in PartitionedOperations)
We seem to use the kafka store name rather than the table name for these metrics elsewhere
| throw new UnsupportedOperationException("Duplicate retention is not yet supported in RS3"); | ||
| } | ||
|
|
||
| // TODO: Pass through retention period once we have support for TTL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI I'm actually doing this as part of the CreateStore implementation
It's just setting the default ttl for the store (so we still need the STREAM_TIME clock type for full window store ttl support) but we can do the same thing here. So I can handle this as part of integrating the CreateStore API with the RS3 window stores after both PRs are merged
|
As I was discussing the patch with @ableegoldman a bit, I realized there is a fork in the road that I probably haven't given enough consideration to. I've taken the approach here of making the RS3 server agnostic to window store semantics. The window start time is baked directly into the key that gets shipped to RS3 and the server knows nothing about it. An alternative is to involve the server directly. We could pass through the window start time explicitly in the |
…3/RS3WindowFlushManager.java Co-authored-by: A. Sophie Blee-Goldman <[email protected]>
…3/RS3WindowTable.java Co-authored-by: A. Sophie Blee-Goldman <[email protected]>
| ); | ||
| } | ||
|
|
||
| private UUID lookupStoreId( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi i'm just about to merge the rs3 PR to convert store ids from UUID to string, haven't done the responsive-pub one yet but it probably makes sense to just drop the UUID and use string for the window store now so you don't have to rebase this
kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java
Outdated
Show resolved
Hide resolved
ableegoldman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
| final UUID storeId = createdStores.computeIfAbsent(storeName, n -> createStore( | ||
| storeName, | ||
| CreateStoreTypes.StoreType.WINDOW, | ||
| Optional.of(ClockType.STREAM_TIME), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a heads up we still throw a ConfigErr on the server side if stream time is used so we'll need to implement that as a followup
Adds support for window tables. Basically a big bunch of banal boilerplate to send window timestamps as needed in the GRPC protocol.