Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to replicate import msg #39171

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ require (
github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/magiconair/properties v1.8.5
github.com/milvus-io/milvus/pkg v0.0.2-0.20241126032235-cb6542339e84
github.com/pkg/errors v0.9.1
github.com/remeh/sizedwaitgroup v1.0.0
Expand Down Expand Up @@ -166,7 +167,6 @@ require (
github.com/leodido/go-urn v1.2.4 // indirect
github.com/linkedin/goavro/v2 v2.11.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-colorable v0.1.11 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand Down Expand Up @@ -271,6 +271,7 @@ replace (
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93
github.com/ianlancetaylor/cgosymbolizer => github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119
github.com/milvus-io/milvus-proto/go-api/v2 => github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc
github.com/milvus-io/milvus/pkg => ./pkg
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8 h1:boN3QhAWQU9O8EYQWxN7AEYav39PuD29QzZwTiI8Ca0=
github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc h1:DhqkZ5yhNm9YMOJjxhaptgH2sgXmOYYIGv7Bj9JBZlo=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=
Expand Down Expand Up @@ -630,8 +632,6 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f h1:So6RKU5wqP/8EaKogicJP8gZ2SrzzS/JprusBaE3RKc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
9 changes: 8 additions & 1 deletion internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@ packages:
interfaces:
WALAccesser:
Utility:
Broadcast:
github.com/milvus-io/milvus/internal/streamingcoord/server/balancer:
interfaces:
Balancer:
github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster:
interfaces:
AppendOperator:
Watcher:
github.com/milvus-io/milvus/internal/streamingcoord/client:
interfaces:
Client:
BroadcastService:
github.com/milvus-io/milvus/internal/streamingcoord/client/broadcast:
interfaces:
Watcher:
github.com/milvus-io/milvus/internal/streamingnode/client/manager:
interfaces:
ManagerClient:
Expand All @@ -37,7 +42,6 @@ packages:
github.com/milvus-io/milvus/internal/streamingnode/server/flusher:
interfaces:
Flusher:
FlushMsgHandler:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
OpenerBuilder:
Expand Down Expand Up @@ -82,6 +86,9 @@ packages:
github.com/milvus-io/milvus/internal/util/searchutil/optimizers:
interfaces:
QueryHook:
# github.com/milvus-io/milvus/internal/flushcommon/util:
# interfaces:
# MsgHandler:
google.golang.org/grpc/resolver:
interfaces:
ClientConn:
Expand Down
9 changes: 0 additions & 9 deletions internal/coordinator/coordclient/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,20 @@ func EnableLocalClientRole(cfg *LocalClientRoleConfig) {

// RegisterQueryCoordServer register query coord server
func RegisterQueryCoordServer(server querypb.QueryCoordServer) {
if !enableLocal.EnableQueryCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient)
glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient})
log.Ctx(context.TODO()).Info("register query coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegsterDataCoordServer register data coord server
func RegisterDataCoordServer(server datapb.DataCoordServer) {
if !enableLocal.EnableDataCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient)
glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient})
log.Ctx(context.TODO()).Info("register data coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegisterRootCoordServer register root coord server
func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) {
if !enableLocal.EnableRootCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient)
glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient})
log.Ctx(context.TODO()).Info("register root coord server", zap.Any("enableLocalClient", enableLocal))
Expand Down
7 changes: 4 additions & 3 deletions internal/coordinator/coordclient/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ func TestRegistry(t *testing.T) {
RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{})
RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{})
RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{})
assert.False(t, glocalClient.dataCoordClient.Ready())
assert.False(t, glocalClient.queryCoordClient.Ready())
assert.False(t, glocalClient.rootCoordClient.Ready())
assert.True(t, glocalClient.dataCoordClient.Ready())
assert.True(t, glocalClient.queryCoordClient.Ready())
assert.True(t, glocalClient.rootCoordClient.Ready())
ResetRegistration()

enableLocal = &LocalClientRoleConfig{}

Expand Down
32 changes: 32 additions & 0 deletions internal/coordinator/coordclient/test_utility.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//go:build test
// +build test

package coordclient

import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

// ResetRegistration resets the global local client to initial state.
// This function is only used in test.
func ResetRegistration() {
glocalClient = &localClient{
queryCoordClient: syncutil.NewFuture[types.QueryCoordClient](),
dataCoordClient: syncutil.NewFuture[types.DataCoordClient](),
rootCoordClient: syncutil.NewFuture[types.RootCoordClient](),
}
}

// ResetQueryCoordRegistration resets the query coord client to initial state.
func ResetQueryCoordRegistration() {
glocalClient.queryCoordClient = syncutil.NewFuture[types.QueryCoordClient]()
}

func ResetRootCoordRegistration() {
glocalClient.rootCoordClient = syncutil.NewFuture[types.RootCoordClient]()
}

func ResetDataCoordRegistration() {
glocalClient.dataCoordClient = syncutil.NewFuture[types.DataCoordClient]()
}
44 changes: 42 additions & 2 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type compactionPlanContext interface {
getCompactionTasksNumBySignalID(signalID int64) int
getCompactionInfo(ctx context.Context, signalID int64) *compactionInfo
removeTasksByChannel(channel string)
getCompactionTasksNum(filters ...compactionTaskFilter) int
}

var (
Expand Down Expand Up @@ -144,7 +145,7 @@ func (sna *SlotBasedNodeAssigner) pickAnyNode(task CompactionTask) (nodeID int64
}

type compactionPlanHandler struct {
queueTasks CompactionQueue
queueTasks *CompactionQueue

executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task
Expand Down Expand Up @@ -254,7 +255,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
// TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of.
capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt()
return &compactionPlanHandler{
queueTasks: *NewCompactionQueue(capacity, getPrioritizer()),
queueTasks: NewCompactionQueue(capacity, getPrioritizer()),
meta: meta,
sessions: sessions,
allocator: allocator,
Expand Down Expand Up @@ -778,6 +779,45 @@ func (c *compactionPlanHandler) checkDelay(t CompactionTask) {
}
}

func (c *compactionPlanHandler) getCompactionTasksNum(filters ...compactionTaskFilter) int {
cnt := 0
isMatch := func(task CompactionTask) bool {
for _, f := range filters {
if !f(task) {
return false
}
}
return true
}
c.queueTasks.ForEach(func(task CompactionTask) {
if isMatch(task) {
cnt += 1
}
})
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if isMatch(t) {
cnt += 1
}
}
c.executingGuard.RUnlock()
return cnt
}

type compactionTaskFilter func(task CompactionTask) bool

func CollectionIDCompactionTaskFilter(collectionID int64) compactionTaskFilter {
return func(task CompactionTask) bool {
return task.GetTaskProto().GetCollectionID() == collectionID
}
}

func L0CompactionCompactionTaskFilter() compactionTaskFilter {
return func(task CompactionTask) bool {
return task.GetTaskProto().GetType() == datapb.CompactionType_Level0DeleteCompaction
}
}

var (
ioPool *conc.Pool[any]
ioPoolInitOnce sync.Once
Expand Down
42 changes: 39 additions & 3 deletions internal/datacoord/compaction_policy_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,52 @@ type l0CompactionPolicy struct {
meta *meta

activeCollections *activeCollections

// key: collectionID, value: reference count
skipCompactionCollections map[int64]int
skipLocker sync.RWMutex
}

func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
return &l0CompactionPolicy{
meta: meta,
activeCollections: newActiveCollections(),
meta: meta,
activeCollections: newActiveCollections(),
skipCompactionCollections: make(map[int64]int),
}
}

func (policy *l0CompactionPolicy) Enable() bool {
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}

func (policy *l0CompactionPolicy) AddSkipCollection(collectionID UniqueID) {
policy.skipLocker.Lock()
defer policy.skipLocker.Unlock()

if _, ok := policy.skipCompactionCollections[collectionID]; !ok {
policy.skipCompactionCollections[collectionID] = 1
} else {
policy.skipCompactionCollections[collectionID]++
}
}

func (policy *l0CompactionPolicy) RemoveSkipCollection(collectionID UniqueID) {
policy.skipLocker.Lock()
defer policy.skipLocker.Unlock()
refCount := policy.skipCompactionCollections[collectionID]
if refCount > 1 {
policy.skipCompactionCollections[collectionID]--
} else {
delete(policy.skipCompactionCollections, collectionID)
}
}

func (policy *l0CompactionPolicy) isSkipCollection(collectionID UniqueID) bool {
policy.skipLocker.RLock()
defer policy.skipLocker.RUnlock()
return policy.skipCompactionCollections[collectionID] > 0
}

// Notify policy to record the active updated(when adding a new L0 segment) collections.
func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) {
policy.activeCollections.Record(collectionID)
Expand All @@ -50,8 +83,11 @@ func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]
idleCollsSet := typeutil.NewUniqueSet(idleColls...)
activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{}
for collID, segments := range latestCollSegs {
policy.activeCollections.Read(collID)
if policy.isSkipCollection(collID) {
continue
}

policy.activeCollections.Read(collID)
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})
Expand Down
24 changes: 24 additions & 0 deletions internal/datacoord/compaction_policy_l0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,30 @@ func (s *L0CompactionPolicySuite) TestTriggerIdle() {
for _, view := range cView.GetSegmentsView() {
s.Equal(datapb.SegmentLevel_L0, view.Level)
}

// test for skip collection
s.l0_policy.AddSkipCollection(1)
s.l0_policy.AddSkipCollection(1)
// Test for skip collection
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Empty(events)

// Test for skip collection with ref count
s.l0_policy.RemoveSkipCollection(1)
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Empty(events)

s.l0_policy.RemoveSkipCollection(1)
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Equal(1, len(events))
gotViews, ok = events[TriggerTypeLevelZeroViewIDLE]
s.True(ok)
s.NotNil(gotViews)
s.Equal(1, len(gotViews))

log.Info("cView", zap.String("string", cView.String()))
}

Expand Down
58 changes: 58 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/magiconair/properties/assert"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -1122,3 +1123,60 @@ func TestCheckDelay(t *testing.T) {
}, nil, nil, nil, nil, nil)
handler.checkDelay(t3)
}

func TestGetCompactionTasksNum(t *testing.T) {
queueTasks := NewCompactionQueue(10, DefaultPrioritizer)
queueTasks.Enqueue(
newMixCompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 1,
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil),
)
queueTasks.Enqueue(
newL0CompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 1,
Type: datapb.CompactionType_Level0DeleteCompaction,
}, nil, nil, nil),
)
queueTasks.Enqueue(
newClusteringCompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 10,
Type: datapb.CompactionType_ClusteringCompaction,
}, nil, nil, nil, nil, nil),
)
executingTasks := make(map[int64]CompactionTask, 0)
executingTasks[1] = newMixCompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 1,
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil)
executingTasks[2] = newL0CompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
}, nil, nil, nil)

handler := &compactionPlanHandler{
queueTasks: queueTasks,
executingTasks: executingTasks,
}
t.Run("no filter", func(t *testing.T) {
i := handler.getCompactionTasksNum()
assert.Equal(t, 5, i)
})
t.Run("collection id filter", func(t *testing.T) {
i := handler.getCompactionTasksNum(CollectionIDCompactionTaskFilter(1))
assert.Equal(t, 3, i)
})
t.Run("l0 compaction filter", func(t *testing.T) {
i := handler.getCompactionTasksNum(L0CompactionCompactionTaskFilter())
assert.Equal(t, 2, i)
})
t.Run("collection id and l0 compaction filter", func(t *testing.T) {
i := handler.getCompactionTasksNum(CollectionIDCompactionTaskFilter(1), L0CompactionCompactionTaskFilter())
assert.Equal(t, 1, i)
})
}
Loading
Loading