-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Topic split merge #1707
Topic split merge #1707
Conversation
github.com/ydb-platform/ydb-go-sdk/v3/tracecompatible changesTopic.OnReaderEndPartitionSession: added summaryBase version: v3.104.8-0.20250327190759-68da27dfe590 (master) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces new topic partition session events to support split and merge operations along with refinements in the reader trace and batching logic, as well as updates for auto-partitioning settings. Key changes include:
- Adding new callbacks and trace methods for handling end partition sessions and GRPC message events.
- Enhancing the batching mechanism to support partition flush operations.
- Updating integration tests and client/GRPC wrapper logic to accommodate the new events and settings.
Reviewed Changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
trace/topic_gtrace.go | Adds composition for OnReaderEndPartitionSession, OnReaderSentGRPCMessage, and OnReaderReceiveGRPCMessage. |
trace/topic.go | Introduces related event info structs and callbacks. |
tests/integration/topic_partitions_balanced_test.go | Adds new integration tests for split and merge partition flows. |
internal/xslices/delete_old.go, delete.go | Adds conditional implementation of Delete based on Go version. |
internal/topic/topicreaderinternal/stream_reader_impl.go | Processes EndPartitionSession events and flushes sessions accordingly. |
internal/topic/topicreaderinternal/reader.go | Updates the raw stream connector signature to pass readerID and trace. |
internal/topic/topicreaderinternal/batcher.go | Enhances batcher with sessionsForFlush management and updated lookup. |
internal/topic/topicreadercommon/read_partition_session.go | Adds new methods to mark partition sessions as having no more messages. |
internal/grpcwrapper/rawtopic/rawtopicreader/* | Updates message handling to include EndPartitionSession and GRPC trace events. |
internal/grpcwrapper/rawtopic/controlplane_types.go | Adds new fields and error handling for auto-partitioning settings. |
internal/grpcwrapper/rawtopic/client.go | Updates the StreamRead signature to accept readerID and tracer. |
Comments suppressed due to low confidence (1)
internal/topic/topicreaderinternal/reader.go:26
- [nitpick] The type name 'TopicSteamReaderConnect' appears to contain a typo; consider renaming it to 'TopicStreamReaderConnect' for improved clarity.
type TopicSteamReaderConnect func(connectionCtx context.Context, readerID int64, tracer *trace.Topic) (topicreadercommon.RawTopicReaderStream, error)
if len(b.sessionsForFlush) > 0 && b.sessionsForFlush[0] == res.Key { | ||
b.sessionsForFlush = xslices.Delete(b.sessionsForFlush, 0, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code only checks and removes a flush session if it is the first element in sessionsForFlush. If flush sessions can occur in any order, consider iterating over the slice to remove any matching session instead of only checking index 0.
if len(b.sessionsForFlush) > 0 && b.sessionsForFlush[0] == res.Key { | |
b.sessionsForFlush = xslices.Delete(b.sessionsForFlush, 0, 1) | |
for i, session := range b.sessionsForFlush { | |
if session == res.Key { | |
b.sessionsForFlush = xslices.Delete(b.sessionsForFlush, i, 1) | |
break | |
} |
Copilot is powered by AI, so mistakes are possible. Review output carefully before use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- change array during iteration is bad idea
- the code flush sessions in order - it is ok
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1707 +/- ##
==========================================
- Coverage 71.51% 71.40% -0.12%
==========================================
Files 381 383 +2
Lines 39688 39845 +157
==========================================
+ Hits 28383 28450 +67
- Misses 10176 10251 +75
- Partials 1129 1144 +15
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Pull request type
Please check the type of change your PR introduces:
What is the current behavior?
Issue Number: N/A
What is the new behavior?
Other information