Skip to content

Commit 13e31a0

Browse files
committed
fix: unittest
Signed-off-by: chyezh <[email protected]>
1 parent 1a6e2cd commit 13e31a0

File tree

19 files changed

+858
-155
lines changed

19 files changed

+858
-155
lines changed

internal/flushcommon/pipeline/data_sync_service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,7 @@ func NewDataSyncServiceWithMetaCache(metaCache metacache.MetaCache) *DataSyncSer
410410
return &DataSyncService{metacache: metaCache}
411411
}
412412

413+
// NewEmptyStreamingNodeDataSyncService is used to create a new data sync service when incoming create collection message.
413414
func NewEmptyStreamingNodeDataSyncService(
414415
initCtx context.Context,
415416
pipelineParams *util.PipelineParams,

internal/flushcommon/util/checkpoint_updater.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ type ChannelCheckpointUpdater struct {
4848
tasks map[string]*channelCPUpdateTask
4949
notifyChan chan struct{}
5050

51-
closeCh chan struct{}
52-
closeOnce sync.Once
51+
closeCh chan struct{}
52+
closeOnce sync.Once
53+
updateDoneCallback func(*msgpb.MsgPosition)
5354
}
5455

5556
func NewChannelCheckpointUpdater(broker broker.Broker) *ChannelCheckpointUpdater {
@@ -61,6 +62,17 @@ func NewChannelCheckpointUpdater(broker broker.Broker) *ChannelCheckpointUpdater
6162
}
6263
}
6364

65+
// NewChannelCheckpointUpdaterWithCallback creates a ChannelCheckpointUpdater with a callback function
66+
func NewChannelCheckpointUpdaterWithCallback(broker broker.Broker, updateDoneCallback func(*msgpb.MsgPosition)) *ChannelCheckpointUpdater {
67+
return &ChannelCheckpointUpdater{
68+
broker: broker,
69+
tasks: make(map[string]*channelCPUpdateTask),
70+
closeCh: make(chan struct{}),
71+
notifyChan: make(chan struct{}, 1),
72+
updateDoneCallback: updateDoneCallback,
73+
}
74+
}
75+
6476
func (ccu *ChannelCheckpointUpdater) Start() {
6577
log.Info("channel checkpoint updater start")
6678
ticker := time.NewTicker(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.GetAsDuration(time.Second))
@@ -134,6 +146,9 @@ func (ccu *ChannelCheckpointUpdater) updateCheckpoints(tasks []*channelCPUpdateT
134146
for _, task := range tasks {
135147
task.callback()
136148
finished.Insert(task.pos.GetChannelName(), task)
149+
if ccu.updateDoneCallback != nil {
150+
ccu.updateDoneCallback(task.pos)
151+
}
137152
}
138153
}(tasks)
139154
}

internal/rootcoord/meta_table.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,11 @@ func (mt *MetaTable) GetPChannelInfo(ctx context.Context, pchannel string) *root
925925
Collections: make([]*rootcoordpb.CollectionInfoOnPChannel, 0),
926926
}
927927
for _, collInfo := range mt.collID2Meta {
928+
if collInfo.State != pb.CollectionState_CollectionCreated {
929+
// streamingnode, skip non-created collections when recovering
930+
// streamingnode will receive the createCollectionMessage to recover if the collection is creating.
931+
continue
932+
}
928933
if idx := lo.IndexOf(collInfo.PhysicalChannelNames, pchannel); idx >= 0 {
929934
partitions := make([]*rootcoordpb.PartitionInfoOnPChannel, 0, len(collInfo.Partitions))
930935
for _, part := range collInfo.Partitions {

internal/streamingnode/server/flusher/flusherimpl/data_service_wrapper.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,21 @@ import (
99
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
1010
)
1111

12-
// dataSyncServiceWrapper wraps DataSyncService with related fields.
12+
// newDataSyncServiceWrapper creates a new data sync service wrapper.
13+
func newDataSyncServiceWrapper(input chan<- *msgstream.MsgPack, ds *pipeline.DataSyncService) *dataSyncServiceWrapper {
14+
handler := adaptor.NewBaseMsgPackAdaptorHandler()
15+
return &dataSyncServiceWrapper{
16+
input: input,
17+
handler: handler,
18+
ds: ds,
19+
}
20+
}
21+
22+
// dataSyncServiceWrapper wraps DataSyncService and related input channel.
1323
type dataSyncServiceWrapper struct {
14-
input chan<- *msgstream.MsgPack
15-
handler *adaptor.BaseMsgPackAdaptorHandler
16-
ds *pipeline.DataSyncService
17-
startMessageID message.MessageID
24+
input chan<- *msgstream.MsgPack
25+
handler *adaptor.BaseMsgPackAdaptorHandler
26+
ds *pipeline.DataSyncService
1827
}
1928

2029
// Start starts the data sync service.
@@ -30,6 +39,7 @@ func (ds *dataSyncServiceWrapper) HandleMessage(ctx context.Context, msg message
3039
case <-ctx.Done():
3140
return ctx.Err()
3241
case ds.input <- ds.handler.PendingMsgPack.Next():
42+
// The input channel will never get stuck because the data sync service will consume the message continuously.
3343
ds.handler.PendingMsgPack.UnsafeAdvance()
3444
}
3545
}
@@ -38,6 +48,7 @@ func (ds *dataSyncServiceWrapper) HandleMessage(ctx context.Context, msg message
3848

3949
// Close close the input channel and gracefully close the data sync service.
4050
func (ds *dataSyncServiceWrapper) Close() {
51+
// The input channel should be closed first, otherwise the flowgraph in datasync service will be blocked.
4152
close(ds.input)
4253
ds.ds.GracefullyClose()
4354
}

internal/streamingnode/server/flusher/flusherimpl/flusher_components.go

Lines changed: 23 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -24,88 +24,28 @@ import (
2424
"github.com/milvus-io/milvus/pkg/streaming/util/message"
2525
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
2626
"github.com/milvus-io/milvus/pkg/util/conc"
27-
"github.com/milvus-io/milvus/pkg/util/paramtable"
2827
"github.com/milvus-io/milvus/pkg/util/retry"
2928
)
3029

31-
// buildFlusherComponents builds the components of the flusher.
32-
func (impl *WALFlusherImpl) buildFlusherComponents(ctx context.Context, l wal.WAL) (*flusherComponents, error) {
33-
// Get all existed vchannels of the pchannel.
34-
vchannels, err := impl.getVchannels(ctx, l.Channel().Name)
35-
if err != nil {
36-
impl.logger.Warn("get vchannels failed", zap.Error(err))
37-
return nil, err
38-
}
39-
impl.logger.Info("fetch vchannel done", zap.Int("vchannelNum", len(vchannels)))
40-
41-
// Get all the recovery info of the recoverable vchannels.
42-
recoverInfos, err := impl.getRecoveryInfos(ctx, vchannels)
43-
if err != nil {
44-
impl.logger.Warn("get recovery info failed", zap.Error(err))
45-
return nil, err
46-
}
47-
impl.logger.Info("fetch recovery info done", zap.Int("recoveryInfoNum", len(recoverInfos)))
48-
49-
// build up components
50-
dc, err := resource.Resource().DataCoordClient().GetWithContext(ctx)
51-
if err != nil {
52-
impl.logger.Warn("flusher recovery is canceled before data coord client ready", zap.Error(err))
53-
return nil, err
54-
}
55-
impl.logger.Info("data coord client ready")
56-
57-
broker := broker.NewCoordBroker(dc, paramtable.GetNodeID())
58-
chunkManager := resource.Resource().ChunkManager()
59-
syncMgr := syncmgr.NewSyncManager(chunkManager)
60-
wbMgr := writebuffer.NewManager(syncMgr)
61-
wbMgr.Start()
62-
cpUpdater := util.NewChannelCheckpointUpdater(broker)
63-
go cpUpdater.Start()
64-
65-
fc := &flusherComponents{
66-
wal: l,
67-
broker: broker,
68-
syncMgr: syncMgr,
69-
wbMgr: wbMgr,
70-
cpUpdater: cpUpdater,
71-
chunkManager: chunkManager,
72-
dataServices: make(map[string]*dataSyncServiceWrapper),
73-
logger: impl.logger,
74-
}
75-
impl.logger.Info("flusher components intiailizing done")
76-
if err := fc.recover(ctx, recoverInfos); err != nil {
77-
impl.logger.Warn("flusher recovery is canceled before recovery done, recycle the resource", zap.Error(err))
78-
fc.Close()
79-
impl.logger.Info("flusher recycle the resource done")
80-
return nil, err
81-
}
82-
impl.logger.Info("flusher recovery done")
83-
return fc, nil
84-
}
85-
8630
// flusherComponents is the components of the flusher.
8731
type flusherComponents struct {
88-
wal wal.WAL
89-
broker broker.Broker
90-
syncMgr syncmgr.SyncManager
91-
wbMgr writebuffer.BufferManager
92-
cpUpdater *util.ChannelCheckpointUpdater
93-
chunkManager storage.ChunkManager
94-
dataServices map[string]*dataSyncServiceWrapper
95-
logger *log.MLogger
32+
wal wal.WAL
33+
broker broker.Broker
34+
syncMgr syncmgr.SyncManager
35+
wbMgr writebuffer.BufferManager
36+
cpUpdater *util.ChannelCheckpointUpdater
37+
chunkManager storage.ChunkManager
38+
dataServices map[string]*dataSyncServiceWrapper
39+
checkpointManager *pchannelCheckpointManager
40+
logger *log.MLogger
9641
}
9742

98-
// GetMinimumStartMessage gets the minimum start message of all the data services.
99-
func (impl *flusherComponents) GetMinimumStartMessage() message.MessageID {
100-
var startMessageID message.MessageID
101-
for _, ds := range impl.dataServices {
102-
if startMessageID == nil || ds.startMessageID.LT(startMessageID) {
103-
startMessageID = ds.startMessageID
104-
}
105-
}
106-
return startMessageID
43+
// StartMessageID returns the start message id of the flusher after recovering.
44+
func (impl *flusherComponents) StartMessageID() message.MessageID {
45+
return impl.checkpointManager.StartMessageID()
10746
}
10847

48+
// WhenCreateCollection handles the create collection message.
10949
func (impl *flusherComponents) WhenCreateCollection(createCollectionMsg message.ImmutableCreateCollectionMessageV1) {
11050
if _, ok := impl.dataServices[createCollectionMsg.VChannel()]; ok {
11151
impl.logger.Info("the data sync service of current vchannel is built, skip it", zap.String("vchannel", createCollectionMsg.VChannel()))
@@ -172,8 +112,10 @@ func (impl *flusherComponents) WhenDropCollection(vchannel string) {
172112
delete(impl.dataServices, vchannel)
173113
impl.logger.Info("drop data sync service", zap.String("vchannel", vchannel))
174114
}
115+
impl.checkpointManager.DropVChannel(vchannel)
175116
}
176117

118+
// HandleMessage handles the plain message.
177119
func (impl *flusherComponents) HandleMessage(ctx context.Context, msg message.ImmutableMessage) error {
178120
vchannel := msg.VChannel()
179121
if vchannel == "" {
@@ -185,6 +127,7 @@ func (impl *flusherComponents) HandleMessage(ctx context.Context, msg message.Im
185127
return impl.dataServices[vchannel].HandleMessage(ctx, msg)
186128
}
187129

130+
// broadcastToAllDataSyncService broadcasts the message to all data sync services.
188131
func (impl *flusherComponents) broadcastToAllDataSyncService(ctx context.Context, msg message.ImmutableMessage) error {
189132
for _, ds := range impl.dataServices {
190133
if err := ds.HandleMessage(ctx, msg); err != nil {
@@ -194,17 +137,14 @@ func (impl *flusherComponents) broadcastToAllDataSyncService(ctx context.Context
194137
return nil
195138
}
196139

140+
// addNewDataSyncService adds a new data sync service to the components when new collection is created.
197141
func (impl *flusherComponents) addNewDataSyncService(
198142
createCollectionMsg message.ImmutableCreateCollectionMessageV1,
199143
input chan<- *msgstream.MsgPack,
200144
ds *pipeline.DataSyncService,
201145
) {
202-
newDS := &dataSyncServiceWrapper{
203-
input: input,
204-
handler: adaptor.NewBaseMsgPackAdaptorHandler(),
205-
ds: ds,
206-
startMessageID: createCollectionMsg.LastConfirmedMessageID(),
207-
}
146+
impl.checkpointManager.AddVChannel(createCollectionMsg.VChannel(), createCollectionMsg.LastConfirmedMessageID())
147+
newDS := newDataSyncServiceWrapper(input, ds)
208148
newDS.Start()
209149
impl.dataServices[createCollectionMsg.VChannel()] = newDS
210150
impl.logger.Info("create data sync service done", zap.String("vchannel", createCollectionMsg.VChannel()))
@@ -219,6 +159,7 @@ func (impl *flusherComponents) Close() {
219159
impl.wbMgr.Stop()
220160
impl.cpUpdater.Close()
221161
impl.syncMgr.Close()
162+
impl.checkpointManager.Close()
222163
}
223164

224165
// recover recover the components of the flusher.
@@ -254,6 +195,7 @@ func (impl *flusherComponents) recover(ctx context.Context, recoverInfos map[str
254195
return nil
255196
}
256197

198+
// buildDataSyncServiceWithRetry builds the data sync service with retry.
257199
func (impl *flusherComponents) buildDataSyncServiceWithRetry(ctx context.Context, recoverInfo *datapb.GetChannelRecoveryInfoResponse) (*dataSyncServiceWrapper, error) {
258200
var ds *dataSyncServiceWrapper
259201
err := retry.Do(ctx, func() error {
@@ -267,6 +209,7 @@ func (impl *flusherComponents) buildDataSyncServiceWithRetry(ctx context.Context
267209
return ds, nil
268210
}
269211

212+
// buildDataSyncService builds the data sync service with given recovery info.
270213
func (impl *flusherComponents) buildDataSyncService(ctx context.Context, recoverInfo *datapb.GetChannelRecoveryInfoResponse) (*dataSyncServiceWrapper, error) {
271214
// Build and add pipeline.
272215
input := make(chan *msgstream.MsgPack, 10)
@@ -300,10 +243,5 @@ func (impl *flusherComponents) buildDataSyncService(ctx context.Context, recover
300243
if err != nil {
301244
return nil, err
302245
}
303-
return &dataSyncServiceWrapper{
304-
input: input,
305-
handler: adaptor.NewBaseMsgPackAdaptorHandler(),
306-
ds: ds,
307-
startMessageID: adaptor.MustGetMessageIDFromMQWrapperIDBytes(impl.wal.WALName(), recoverInfo.GetInfo().GetSeekPosition().GetMsgID()),
308-
}, nil
246+
return newDataSyncServiceWrapper(input, ds), nil
309247
}

0 commit comments

Comments
 (0)