Skip to content

Topic split merge #1707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Supported topic split merge server feature for topic reader (no api changed)

## v3.104.7
* Added public type alias `ydb.Params` to `internal/params.Parameters` for external usage

Expand Down
12 changes: 10 additions & 2 deletions internal/grpcwrapper/rawtopic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func (c *Client) DropTopic(
return res, err
}

func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.StreamReader, error) {
func (c *Client) StreamRead(
ctxStreamLifeTime context.Context,
readerID int64,
tracer *trace.Topic,
) (rawtopicreader.StreamReader, error) {
protoResp, err := c.service.StreamRead(ctxStreamLifeTime)
if err != nil {
return rawtopicreader.StreamReader{}, xerrors.WithStackTrace(
Expand All @@ -98,7 +102,11 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S
)
}

return rawtopicreader.StreamReader{Stream: protoResp}, nil
return rawtopicreader.StreamReader{
Stream: protoResp,
ReaderID: readerID,
Tracer: tracer,
}, nil
}

func (c *Client) StreamWrite(
Expand Down
91 changes: 86 additions & 5 deletions internal/grpcwrapper/rawtopic/controlplane_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
)

var errUnexpectedNilPartitioningSettings = xerrors.Wrap(errors.New("ydb: unexpected nil partitioning settings"))
var (
errUnexpectedNilPartitioningSettings = xerrors.Wrap(errors.New("ydb: unexpected nil partitioning settings"))
errUnexpecredNilAutoPartitioningSettings = xerrors.Wrap(errors.New("ydb: unexpected nil auto-partitioning settings"))
errUnexpectedNilAutoPartitionWriteSpeed = xerrors.Wrap(errors.New("ydb: unexpected nil auto-partition write speed"))
)

type Consumer struct {
Name string
Expand Down Expand Up @@ -47,8 +51,10 @@ const (
)

type PartitioningSettings struct {
MinActivePartitions int64
PartitionCountLimit int64
MinActivePartitions int64
MaxActivePartitions int64
PartitionCountLimit int64
AutoPartitioningSettings AutoPartitioningSettings
}

func (s *PartitioningSettings) FromProto(proto *Ydb_Topic.PartitioningSettings) error {
Expand All @@ -57,26 +63,101 @@ func (s *PartitioningSettings) FromProto(proto *Ydb_Topic.PartitioningSettings)
}

s.MinActivePartitions = proto.GetMinActivePartitions()
s.MaxActivePartitions = proto.GetMaxActivePartitions()
s.PartitionCountLimit = proto.GetPartitionCountLimit() //nolint:staticcheck

return nil
}

func (s *PartitioningSettings) ToProto() *Ydb_Topic.PartitioningSettings {
return &Ydb_Topic.PartitioningSettings{
MinActivePartitions: s.MinActivePartitions,
PartitionCountLimit: s.PartitionCountLimit,
MinActivePartitions: s.MinActivePartitions,
MaxActivePartitions: s.MaxActivePartitions,
PartitionCountLimit: s.PartitionCountLimit,
AutoPartitioningSettings: s.AutoPartitioningSettings.ToProto(),
}
}

type AutoPartitioningSettings struct {
AutoPartitioningStrategy AutoPartitioningStrategy
AutoPartitioningWriteSpeedStrategy AutoPartitioningWriteSpeedStrategy
}

func (s *AutoPartitioningSettings) ToProto() *Ydb_Topic.AutoPartitioningSettings {
if s == nil {
return nil
}

return &Ydb_Topic.AutoPartitioningSettings{
Strategy: s.AutoPartitioningStrategy.ToProto(),
PartitionWriteSpeed: s.AutoPartitioningWriteSpeedStrategy.ToProto(),
}
}

func (s *AutoPartitioningSettings) FromProto(proto *Ydb_Topic.AutoPartitioningSettings) error {
if proto == nil {
return xerrors.WithStackTrace(errUnexpecredNilAutoPartitioningSettings)
}
s.AutoPartitioningStrategy = AutoPartitioningStrategy(proto.GetStrategy())

if proto.GetPartitionWriteSpeed() != nil {
if err := s.AutoPartitioningWriteSpeedStrategy.FromProto(proto.GetPartitionWriteSpeed()); err != nil {
return err
}
}

return nil
}

type AutoPartitioningStrategy int32

const (
AutoPartitioningStrategyUnspecified = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_UNSPECIFIED) //nolint:lll
AutoPartitioningStrategyDisabled = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_DISABLED) //nolint:lll
AutoPartitioningStrategyScaleUpAndDown = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN) //nolint:lll
AutoPartitioningStrategyPaused = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_PAUSED) //nolint:lll
)

func (s AutoPartitioningStrategy) ToProto() Ydb_Topic.AutoPartitioningStrategy {
return Ydb_Topic.AutoPartitioningStrategy(s)
}

type AutoPartitioningWriteSpeedStrategy struct {
StabilizationWindow rawoptional.Duration
UpUtilizationPercent int32
DownUtilizationPercent int32
}

func (s *AutoPartitioningWriteSpeedStrategy) ToProto() *Ydb_Topic.AutoPartitioningWriteSpeedStrategy {
return &Ydb_Topic.AutoPartitioningWriteSpeedStrategy{
StabilizationWindow: s.StabilizationWindow.ToProto(),
UpUtilizationPercent: s.UpUtilizationPercent,
DownUtilizationPercent: s.DownUtilizationPercent,
}
}

func (s *AutoPartitioningWriteSpeedStrategy) FromProto(speed *Ydb_Topic.AutoPartitioningWriteSpeedStrategy) error {
if speed == nil {
return xerrors.WithStackTrace(errUnexpectedNilAutoPartitionWriteSpeed)
}

s.StabilizationWindow.MustFromProto(speed.GetStabilizationWindow())
s.UpUtilizationPercent = speed.GetUpUtilizationPercent()
s.DownUtilizationPercent = speed.GetDownUtilizationPercent()

return nil
}

type AlterPartitioningSettings struct {
SetMinActivePartitions rawoptional.Int64
SetMaxActivePartitions rawoptional.Int64
SetPartitionCountLimit rawoptional.Int64
}

func (s *AlterPartitioningSettings) ToProto() *Ydb_Topic.AlterPartitioningSettings {
return &Ydb_Topic.AlterPartitioningSettings{
SetMinActivePartitions: s.SetMinActivePartitions.ToProto(),
SetMaxActivePartitions: s.SetMaxActivePartitions.ToProto(),
SetPartitionCountLimit: s.SetPartitionCountLimit.ToProto(),
}
}
29 changes: 27 additions & 2 deletions internal/grpcwrapper/rawtopic/rawtopicreader/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
errUnexpectedProtoNilStartPartitionSessionRequest = xerrors.Wrap(errors.New("ydb: unexpected proto nil start partition session request")) //nolint:lll
errUnexpectedNilPartitionSession = xerrors.Wrap(errors.New("ydb: unexpected proto nil partition session in start partition session request")) //nolint:lll
errUnexpectedGrpcNilStopPartitionSessionRequest = xerrors.Wrap(errors.New("ydb: unexpected grpc nil stop partition session request")) //nolint:lll
errUnexpectedGrpcNilEndPartitionSession = xerrors.Wrap(errors.New("ydb: unexpected grpc nil end partition session")) //nolint:lll
)

type PartitionSessionID int64
Expand Down Expand Up @@ -92,12 +93,14 @@ type InitRequest struct {

TopicsReadSettings []TopicReadSettings

Consumer string
Consumer string
AutoPartitioningSupport bool
}

func (r *InitRequest) toProto() *Ydb_Topic.StreamReadMessage_InitRequest {
p := &Ydb_Topic.StreamReadMessage_InitRequest{
Consumer: r.Consumer,
Consumer: r.Consumer,
AutoPartitioningSupport: r.AutoPartitioningSupport,
}

p.TopicsReadSettings = make([]*Ydb_Topic.StreamReadMessage_InitRequest_TopicReadSettings, len(r.TopicsReadSettings))
Expand Down Expand Up @@ -477,3 +480,25 @@ func (r *StopPartitionSessionResponse) toProto() *Ydb_Topic.StreamReadMessage_St
PartitionSessionId: r.PartitionSessionID.ToInt64(),
}
}

type EndPartitionSession struct {
serverMessageImpl

rawtopiccommon.ServerMessageMetadata

PartitionSessionID PartitionSessionID
AdjacentPartitionIDs []int64
ChildPartitionIDs []int64
}

func (r *EndPartitionSession) fromProto(proto *Ydb_Topic.StreamReadMessage_EndPartitionSession) error {
if proto == nil {
return xerrors.WithStackTrace(errUnexpectedGrpcNilEndPartitionSession)
}

r.PartitionSessionID.FromInt64(proto.GetPartitionSessionId())
r.AdjacentPartitionIDs = proto.GetAdjacentPartitionIds()
r.ChildPartitionIDs = proto.GetChildPartitionIds()

return nil
}
86 changes: 64 additions & 22 deletions internal/grpcwrapper/rawtopic/rawtopicreader/rawtopicreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var ErrUnexpectedMessageType = errors.New("unexpected message type")
Expand All @@ -22,16 +23,36 @@ type GrpcStream interface {
}

type StreamReader struct {
Stream GrpcStream
Stream GrpcStream
ReaderID int64

Tracer *trace.Topic
sessionID string
sentMessageCount int
receiveMessageCount int
}

func (s StreamReader) CloseSend() error {
return s.Stream.CloseSend()
}

//nolint:funlen
func (s StreamReader) Recv() (ServerMessage, error) {
func (s StreamReader) Recv() (_ ServerMessage, resErr error) {
grpcMess, err := s.Stream.Recv()

defer func() {
s.receiveMessageCount++

trace.TopicOnReaderReceiveGRPCMessage(
s.Tracer,
s.ReaderID,
s.sessionID,
s.receiveMessageCount,
grpcMess,
resErr,
)
}()

if xerrors.Is(err, io.EOF) {
return nil, err
}
Expand All @@ -53,6 +74,8 @@ func (s StreamReader) Recv() (ServerMessage, error) {

switch m := grpcMess.GetServerMessage().(type) {
case *Ydb_Topic.StreamReadMessage_FromServer_InitResponse:
s.sessionID = m.InitResponse.GetSessionId()

resp := &InitResponse{}
resp.ServerMessageMetadata = meta
resp.fromProto(m.InitResponse)
Expand Down Expand Up @@ -82,6 +105,16 @@ func (s StreamReader) Recv() (ServerMessage, error) {
}

return req, nil

case *Ydb_Topic.StreamReadMessage_FromServer_EndPartitionSession:
req := &EndPartitionSession{}
req.ServerMessageMetadata = meta
if err = req.fromProto(m.EndPartitionSession); err != nil {
return nil, err
}

return req, nil

case *Ydb_Topic.StreamReadMessage_FromServer_CommitOffsetResponse:
resp := &CommitOffsetResponse{}
resp.ServerMessageMetadata = meta
Expand Down Expand Up @@ -113,66 +146,75 @@ func (s StreamReader) Recv() (ServerMessage, error) {
}
}

func (s StreamReader) Send(msg ClientMessage) (err error) {
//nolint:funlen
func (s StreamReader) Send(msg ClientMessage) (resErr error) {
defer func() {
err = xerrors.Transport(err)
resErr = xerrors.Transport(resErr)
}()

var grpcMess *Ydb_Topic.StreamReadMessage_FromClient
switch m := msg.(type) {
case *InitRequest:
grpcMess := &Ydb_Topic.StreamReadMessage_FromClient{
grpcMess = &Ydb_Topic.StreamReadMessage_FromClient{
ClientMessage: &Ydb_Topic.StreamReadMessage_FromClient_InitRequest{InitRequest: m.toProto()},
}

return s.Stream.Send(grpcMess)
case *ReadRequest:
grpcMess := &Ydb_Topic.StreamReadMessage_FromClient{
grpcMess = &Ydb_Topic.StreamReadMessage_FromClient{
ClientMessage: &Ydb_Topic.StreamReadMessage_FromClient_ReadRequest{ReadRequest: m.toProto()},
}

return s.Stream.Send(grpcMess)
case *StartPartitionSessionResponse:
grpcMess := &Ydb_Topic.StreamReadMessage_FromClient{
grpcMess = &Ydb_Topic.StreamReadMessage_FromClient{
ClientMessage: &Ydb_Topic.StreamReadMessage_FromClient_StartPartitionSessionResponse{
StartPartitionSessionResponse: m.toProto(),
},
}

return s.Stream.Send(grpcMess)
case *StopPartitionSessionResponse:
grpcMess := &Ydb_Topic.StreamReadMessage_FromClient{
grpcMess = &Ydb_Topic.StreamReadMessage_FromClient{
ClientMessage: &Ydb_Topic.StreamReadMessage_FromClient_StopPartitionSessionResponse{
StopPartitionSessionResponse: m.toProto(),
},
}

return s.Stream.Send(grpcMess)
case *CommitOffsetRequest:
grpcMess := &Ydb_Topic.StreamReadMessage_FromClient{
grpcMess = &Ydb_Topic.StreamReadMessage_FromClient{
ClientMessage: &Ydb_Topic.StreamReadMessage_FromClient_CommitOffsetRequest{
CommitOffsetRequest: m.toProto(),
},
}

return s.Stream.Send(grpcMess)
case *PartitionSessionStatusRequest:
grpcMess := &Ydb_Topic.StreamReadMessage_FromClient{
grpcMess = &Ydb_Topic.StreamReadMessage_FromClient{
ClientMessage: &Ydb_Topic.StreamReadMessage_FromClient_PartitionSessionStatusRequest{
PartitionSessionStatusRequest: m.toProto(),
},
}

return s.Stream.Send(grpcMess)
case *UpdateTokenRequest:
grpcMess := &Ydb_Topic.StreamReadMessage_FromClient{
grpcMess = &Ydb_Topic.StreamReadMessage_FromClient{
ClientMessage: &Ydb_Topic.StreamReadMessage_FromClient_UpdateTokenRequest{
UpdateTokenRequest: m.ToProto(),
},
}
}

return s.Stream.Send(grpcMess)
default:
return xerrors.WithStackTrace(fmt.Errorf("ydb: send unexpected message type: %v", reflect.TypeOf(msg)))
if grpcMess == nil {
resErr = xerrors.WithStackTrace(fmt.Errorf("ydb: send unexpected message type: %v", reflect.TypeOf(msg)))
} else {
resErr = s.Stream.Send(grpcMess)
}

s.sentMessageCount++
trace.TopicOnReaderSentGRPCMessage(
s.Tracer,
s.ReaderID,
s.sessionID,
s.sentMessageCount,
grpcMess,
resErr,
)

return resErr
}

type ClientMessage interface {
Expand Down
8 changes: 6 additions & 2 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,14 @@ func (c *Client) StartReader(
readSelectors topicoptions.ReadSelectors,
opts ...topicoptions.ReaderOption,
) (*topicreader.Reader, error) {
var connector topicreaderinternal.TopicSteamReaderConnect = func(ctx context.Context) (
var connector topicreaderinternal.TopicSteamReaderConnect = func(
ctx context.Context,
readerID int64,
tracer *trace.Topic,
) (
topicreadercommon.RawTopicReaderStream, error,
) {
return c.rawClient.StreamRead(ctx)
return c.rawClient.StreamRead(ctx, readerID, tracer)
}

defaultOpts := []topicoptions.ReaderOption{
Expand Down
Loading
Loading