feat(admin): add KIP-396 list/alter offsets APIs#3419
feat(admin): add KIP-396 list/alter offsets APIs#3419
Conversation
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
edad338 to
11e34c1
Compare
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
|
|
||
| // OffsetSpec specifies which offset to look up for a partition. | ||
| type OffsetSpec struct { | ||
| timestamp int64 |
There was a problem hiding this comment.
I’m not sure why we need a ton of constructors, when we can simply export this value.
After all, given the functionality provided, we can already arbitrarily mutate any given OffsetSpec and access the field at will:
offspec := OffsetSpecLatest()
offspec = OffsetSpecForTimestamp(arbitraryTimestamp)
go func() {
offspec = OffsetSpecForTimestamp(OffsetNewest)
}()
ts := offspec.Timestamp() // write-read race conditionThere was a problem hiding this comment.
I’m not sure why we need a ton of constructors, when we can simply export this value.
After all, given the functionality provided, we can already arbitrarily mutate any given
OffsetSpecand access the field at will:offspec := OffsetSpecLatest() offspec = OffsetSpecForTimestamp(arbitraryTimestamp) go func() { offspec = OffsetSpecForTimestamp(OffsetNewest) }() ts := offspec.Timestamp() // write-read race condition
Agree that we should not mix two different input styles for the same concept (as noted in #3419 (comment)).
To stay consistent with existing Sarama APIs (e.g. Client.GetOffset(topic, partition, time int64) which uses OffsetOldest/OffsetNewest), I removed OffsetSpec and made ListOffsets take int64 directly (pass OffsetOldest/OffsetNewest or a millisecond timestamp).
Done in commit f3e40c7.
| close(results) | ||
|
|
||
| allResults := make(map[TopicPartitionID]*ListOffsetsResult, len(partitions)) | ||
| var firstErr error |
There was a problem hiding this comment.
var errs []error
for res := range results {
if res.err != nil {
errs = append(errs, res.err)
}
...
}
return allResults errors.Join(errs...)There was a problem hiding this comment.
var errs []error for res := range results { if res.err != nil { errs = append(errs, res.err) } ... } return allResults errors.Join(errs...)
Addressed in commit bce2339 by aggregating all errors with errors.Join.
|
|
||
| for _, req := range requests { | ||
| wg.Add(1) | ||
| go func(req *brokerOffsetRequest) { |
There was a problem hiding this comment.
How many requests are we likely to be spinning off here?
It can happen that spinning off too many goroutines will actually be performance-degrading rather than performance-enhancing.
There was a problem hiding this comment.
How many requests are we likely to be spinning off here?
It can happen that spinning off too many goroutines will actually be performance-degrading rather than performance-enhancing.
Thanks for the note. The Java AdminClient doesn’t appear to impose an explicit concurrency cap either — it groups by broker and issues one in‑flight request per broker via the admin I/O loop.
Key entry point (no concurrency limiting logic):
In practice, the Java I/O loop is equivalent to spawning goroutines in Go: both drive concurrent in‑flight requests without a per‑broker cap.
Given typical Kafka clusters are well below 1,000 brokers, I believe even 10,000 concurrent in‑flight requests should be relatively easy for modern hardware to handle.
| ListOffsets(partitions map[TopicPartitionID]OffsetSpec, options *ListOffsetsOptions) (map[TopicPartitionID]*ListOffsetsResult, error) | ||
|
|
||
| // AlterConsumerGroupOffsets alters offsets for the specified group by committing the provided offsets and metadata. | ||
| // The request targets the group's coordinator and returns per-partition results in the response. | ||
| // This operation is not transactional so it may succeed for some partitions while fail for others. | ||
| AlterConsumerGroupOffsets(group string, offsets map[TopicPartitionID]OffsetAndMetadata, options *AlterConsumerGroupOffsetsOptions) (*OffsetCommitResponse, error) |
There was a problem hiding this comment.
Our current design uses map[string]map[int32]V adding this TopicPartitionID might have been nice if we had started with it, but we haven’t really. https://pkg.go.dev/github.com/IBM/sarama#AlterPartitionReassignmentsResponse
Providing two different ways to do something is likely to increase confusion over any benefits of flattening the maps.
It also reduces the ability to iterate over topics individually, without needing to then iterate over all Topic × Partition combinations and select for Topic.
There was a problem hiding this comment.
Our current design uses
map[string]map[int32]Vadding thisTopicPartitionIDmight have been nice if we had started with it, but we haven’t really. https://pkg.go.dev/github.com/IBM/sarama#AlterPartitionReassignmentsResponseProviding two different ways to do something is likely to increase confusion over any benefits of flattening the maps.
It also reduces the ability to iterate over topics individually, without needing to then iterate over all
Topic×Partitioncombinations and select forTopic.
Thanks for the point about keeping the API shape consistent. I reverted to the existing map[string]map[int32]V style so callers can iterate by topic without scanning all partitions.
This removes TopicPartitionID from ListOffsets and aligns the input/return maps with the rest of Sarama’s admin APIs.
Done in commit be65797.
| } | ||
|
|
||
| // ListOffsetsResult contains the response for a single topic partition. | ||
| type ListOffsetsResult struct { |
There was a problem hiding this comment.
I’m not sure this is an Offsets result? As it seems to be single offset result?
There was a problem hiding this comment.
Renamed ListOffsetsResult to OffsetResult to reflect the single-partition result; done in commit f9150fe.
| type brokerOffsetRequest struct { | ||
| broker *Broker | ||
| request *OffsetRequest | ||
| partitions []TopicPartitionID | ||
| } | ||
|
|
||
| type brokerOffsetResult struct { | ||
| result map[TopicPartitionID]*ListOffsetsResult | ||
| err error | ||
| } |
There was a problem hiding this comment.
We could also isolate/scope these types into the ListOffsets right?
There was a problem hiding this comment.
We could also isolate/scope these types into the
ListOffsetsright?
Updated in commit 243445e (scoped the helper types inside ListOffsets).
| req = &brokerOffsetRequest{ | ||
| broker: broker, |
There was a problem hiding this comment.
I’m not sure why we double track the broker? We’re using it as the key, and as a field in the value?
We could just for broker, req := range requests { … } below, right?
There was a problem hiding this comment.
I’m not sure why we double track the
broker? We’re using it as the key, and as a field in the value?We could just
for broker, req := range requests { … }below, right?
Addressed in commit ea32b6e by removing the redundant broker field and passing the broker via the map key in the loop.
2e502be to
ad782bd
Compare
Co-authored-by: Cassondra Foesch <puellanivis@users.noreply.github.com> Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Co-authored-by: Cassondra Foesch <puellanivis@users.noreply.github.com> Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Co-authored-by: Cassondra Foesch <puellanivis@users.noreply.github.com> Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
puellanivis
left a comment
There was a problem hiding this comment.
Looks pretty good, just two things that are perhaps more style than substance.
|
|
||
| for broker, req := range requests { | ||
| wg.Add(1) | ||
| go func(broker *Broker, req *brokerOffsetRequest) { |
There was a problem hiding this comment.
[nitpick] We’re compiling this under go.mod go version v1.24.0, which should be well enough into the change of semantics for loopvars that removed the need to passing these variables through arguments, rather than accessing them by simple closure. 🤔 Though, I’m not sure how we would want to approach changing this common pattern, even if it is now unnecessary.
There was a problem hiding this comment.
[nitpick] We’re compiling this under
go.modgo version v1.24.0, which should be well enough into the change of semantics for loopvars that removed the need to passing these variables through arguments, rather than accessing them by simple closure. 🤔 Though, I’m not sure how we would want to approach changing this common pattern, even if it is now unnecessary.
Thanks for the style note.
Personally, I prefer passing loop variables into goroutines so it is clear what is shared.
I also want to match the project style. If you think it is better to rely on the Go 1.24 behavior, I can change it — or you can push a commit since maintainer edits are enabled.
There was a problem hiding this comment.
The usual problem is that to know for sure which values are being put into these parameters, one has to drop all the way to the bottom of the go func() { … }() to check, (though good here, you’re also shadowing the originals, so one cannot accidentally access the wrong one; since whether one likes to be explicit or not, the implicit closure access is always there), and it requires repeating the type information of the parameters as well, while when accessed through closure.
So, if we refactor a type name of one of the variables, this is then Yet Another Place where the type-name must be refactored.
PS: To be more clear, because I forgot to mention it, this is fine. 🤷♀️
There was a problem hiding this comment.
The usual problem is that to know for sure which values are being put into these parameters, one has to drop all the way to the bottom of the
go func() { … }()to check, (though good here, you’re also shadowing the originals, so one cannot accidentally access the wrong one; since whether one likes to be explicit or not, the implicit closure access is always there), and it requires repeating the type information of the parameters as well, while when accessed through closure.So, if we refactor a type name of one of the variables, this is then Yet Another Place where the type-name must be refactored.
PS: To be more clear, because I forgot to mention it, this is fine. 🤷♀️
Thanks for the clarification — you convinced me. I switched this to simple closure capture in 3d11625.
In a language without a “no implicit capture” guarantee, explicit parameters don’t really reduce review burden, since you still have to check for other implicit captures.
| } | ||
| broker.handleThrottledResponse(resp) | ||
|
|
||
| partitionResults := make(map[string]map[int32]*OffsetResult) |
There was a problem hiding this comment.
As a seeming reversal of my prior guidance; since this type is function internal, we could also use the topicPartition type as a key here, and avoid needing more than the one map.
(One of those cases where blackbox internals can choose different paradigms from the public API.)
There was a problem hiding this comment.
As a seeming reversal of my prior guidance; since this type is function internal, we could also use the
topicPartitiontype as a key here, and avoid needing more than the one map.(One of those cases where blackbox internals can choose different paradigms from the public API.)
Thanks! Updated in 494f7c3.
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
dnwe
left a comment
There was a problem hiding this comment.
I did a quick first pass, one stylistic comment below, but also one main question here: is there a reason why we have the ca.retryOnError wrapper on AlterConsumerGroupOffsets, but not on any of the calls in ListOffsets?
| if conf.Version.IsAtLeast(V0_9_0_0) { | ||
| request.Version = 2 | ||
| } else { | ||
| request.Version = 1 | ||
| } | ||
| if conf.Version.IsAtLeast(V0_11_0_0) { | ||
| request.Version = 3 | ||
| } | ||
| if conf.Version.IsAtLeast(V2_0_0_0) { | ||
| request.Version = 4 | ||
| } | ||
| if conf.Version.IsAtLeast(V2_1_0_0) { | ||
| request.Version = 6 | ||
| } | ||
| if conf.Version.IsAtLeast(V2_3_0_0) { | ||
| request.Version = 7 | ||
| } |
There was a problem hiding this comment.
Can we structure these in descending order like we do elsewhere in Sarama? e.g.,
sarama/offset_fetch_request.go
Lines 23 to 46 in 2fb8eaf
It is somewhat stylistic (but also avoids multiple assignment). It is also useful to have the version-to-version additions (taken from Kafka's protocol json files) to show what changed between them too
You may also want to move this into offset_commit_request.go as a NewOffsetCommitRequest func that sets up the correct version etc.
There was a problem hiding this comment.
Can we structure these in descending order like we do elsewhere in Sarama? e.g.,
sarama/offset_fetch_request.go
Lines 23 to 46 in 2fb8eaf
It is somewhat stylistic (but also avoids multiple assignment). It is also useful to have the version-to-version additions (taken from Kafka's protocol json files) to show what changed between them too
You may also want to move this into offset_commit_request.go as a NewOffsetCommitRequest func that sets up the correct version etc.
- Switched to descending version checks with explicit version-diff comments.
- Moved the admin
OffsetCommitRequestinitializer tooffset_commit_request.goasNewOffsetCommitRequestand used it fromadmin_offsets.go.
Done in commit c525eb0
There was a problem hiding this comment.
I wanna add as an aside, but we can also switch to a more go-style:
switch {
case version.IsAtLeast(V2_5_0_0):
case version.IsAtLeast(V2_4_0_0):
...
}But I’ve not mentioned this before, because the style already here is are if … else if … else … cascades.
There was a problem hiding this comment.
I wanna add as an aside, but we can also switch to a more go-style:
switch { case version.IsAtLeast(V2_5_0_0): case version.IsAtLeast(V2_4_0_0): ... }But I’ve not mentioned this before, because the style already here is are
if … else if … else …cascades.
Thanks for the suggestion! I agree it’s more Go‑style.
To keep this PR focused and clean, I’d prefer not to change this here.
@dnwe if you also prefer the switch {} style, I can follow up with a small PR to convert similar cascades across the project.
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
I missed the retry logic earlier; added it in commit 4446246. |
|
Hi @dnwe, I saw the new AI-assistance guidance in #3452, including the note about not putting AI tools/models in Some commits in this PR include: I add that via a global I also understand why Before opening PRs, I still review and validate all changes myself. Given the new policy, what would you prefer for this PR? Happy to follow your preference. (Also, this comment was lightly polished with AI for clarity because English is not my native language, and I want to avoid ambiguity in technical communication.) |
|
@dnwe Quick correction: for this PR, I had not enabled my AGENTS.md co-author rule yet, so these commits do not include an AI I still want to discuss the policy in general, but this specific PR is not an example of that case. |
|
So, it’s inappropriate to attribute co-authorship to anything that is not a person, and thus not able to hold any copyrights. Having random public domain uncopyrightable code mixed into your PR muddies the protections and/or requirements that may be necessary for license compliance. |
Signed-off-by: DCjanus <DCjanus@dcjanus.com>
| var netErr net.Error | ||
| return errors.As(err, &netErr) && netErr.Timeout() |
There was a problem hiding this comment.
Ah, errors.AsType is in v1.26 so not available yet for us.
Summary
ListOffsetsandAlterConsumerGroupOffsetsto support bulk offset queries and group offset commits.OffsetAndMetadata).Key changes
ListOffsetsandAlterConsumerGroupOffsetstoClusterAdminand wire protocol requests.Constraints / tradeoffs
ListOffsetscurrently acceptsint64queries (earliest/latest/timestamp) to stay consistent with existing offset conventions; we can introduce an explicit spec type in a follow-up if needed.Notes