Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
Signed-off-by: 童剑 <[email protected]>
  • Loading branch information
bufferflies committed Feb 13, 2025
1 parent fdd2279 commit d0f2cae
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 68 deletions.
94 changes: 75 additions & 19 deletions pkg/schedule/schedulers/balance_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type balanceRangeSchedulerConfig struct {
type balanceRangeSchedulerJob struct {
JobID uint64 `json:"job-id"`
Role Role `json:"role"`
Engine string `json:"engine"`
Engine Engine `json:"engine"`
Timeout time.Duration `json:"timeout"`
Ranges []core.KeyRange `json:"ranges"`
Alias string `json:"alias"`
Expand Down Expand Up @@ -235,7 +235,7 @@ func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun b

downFilter := filter.NewRegionDownFilter()
replicaFilter := filter.NewRegionReplicatedFilter(cluster)
snapshotFilter := filter.NewSnapshotSendFilter(plan.stores, constant.Medium)
snapshotFilter := filter.NewSnapshotSendFilter(cluster.GetStores(), constant.Medium)
pendingFilter := filter.NewRegionPendingFilter()
baseRegionFilters := []filter.RegionFilter{downFilter, replicaFilter, snapshotFilter, pendingFilter}

Expand Down Expand Up @@ -276,19 +276,13 @@ func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun b
return []*operator.Operator{op}, nil
}
}

if err != nil {
log.Error("failed to prepare balance key range scheduler", errs.ZapError(err))
return nil, nil

}
return nil, nil
}

// transferPeer selects the best store to create a new peer to replace the old peer.
func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, dstStores []*core.StoreInfo) *operator.Operator {
excludeTargets := plan.region.GetStoreIDs()
if plan.job.Role != leader {
if plan.job.Role == leader {
excludeTargets = make(map[uint64]struct{})
}
conf := plan.GetSchedulerConfig()
Expand All @@ -312,8 +306,25 @@ func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, ds
log.Debug("candidate store", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID))

oldPeer := plan.region.GetStorePeer(sourceID)
newPeer := &metapb.Peer{StoreId: plan.target.GetID(), Role: oldPeer.Role}
op, err := operator.CreateMovePeerOperator(s.GetName(), plan, plan.region, operator.OpRange, oldPeer.GetStoreId(), newPeer)
exist := false
if plan.job.Role == leader {
peers := plan.region.GetPeers()
for _, peer := range peers {
if peer.GetStoreId() == targetID {
exist = true
break
}
}
}
var op *operator.Operator
var err error
if exist {
op, err = operator.CreateTransferLeaderOperator(s.GetName(), plan, plan.region, plan.targetStoreID(), []uint64{}, operator.OpRange)
} else {
newPeer := &metapb.Peer{StoreId: plan.target.GetID(), Role: oldPeer.Role}
op, err = operator.CreateMovePeerOperator(s.GetName(), plan, plan.region, operator.OpRange, oldPeer.GetStoreId(), newPeer)
}

if err != nil {
balanceRangeCreateOpFailCounter.Inc()
return nil
Expand All @@ -336,8 +347,8 @@ type balanceRangeSchedulerPlan struct {
sche.SchedulerCluster
// stores is sorted by score desc
stores []*core.StoreInfo
// sourceMap records the storeID -> score
sourceMap map[uint64]int64
// scoreMap records the storeID -> score
scoreMap map[uint64]int64
source *core.StoreInfo
sourceScore int64
target *core.StoreInfo
Expand All @@ -359,7 +370,14 @@ func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluen
if err != nil {
return nil, err
}
sources := filter.SelectSourceStores(cluster.GetStores(), s.filters, cluster.GetSchedulerConfig(), nil, nil)
filters := s.filters
if job.Engine == TiFlash {
filters = append(filters, filter.NewEngineFilter(balanceRangeName, filter.TiFlashEngineConstraint))
}
sources := filter.SelectSourceStores(cluster.GetStores(), filters, cluster.GetSchedulerConfig(), nil, nil)
if sources == nil {
return nil, errs.ErrStoresNotEnough.FastGenByArgs("no store to select")
}
storeInfos := make(map[uint64]*storeInfo, len(sources))
for _, source := range sources {
storeInfos[source.GetID()] = &storeInfo{store: source}
Expand All @@ -380,7 +398,10 @@ func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluen
storeList = append(storeList, store)
}
sort.Slice(storeList, func(i, j int) bool {
return storeList[i].score > storeList[j].score
role := job.Role
iop := role.getStoreInfluence(opInfluence.GetStoreInfluence(storeList[i].store.GetID()))
jop := role.getStoreInfluence(opInfluence.GetStoreInfluence(storeList[j].store.GetID()))
return storeList[i].score+iop > storeList[j].score+jop
})
sourceMap := make(map[uint64]int64)
for _, store := range storeList {
Expand All @@ -398,7 +419,7 @@ func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluen
return &balanceRangeSchedulerPlan{
SchedulerCluster: cluster,
stores: stores,
sourceMap: sourceMap,
scoreMap: sourceMap,
source: nil,
target: nil,
region: nil,
Expand All @@ -416,7 +437,7 @@ func (p *balanceRangeSchedulerPlan) targetStoreID() uint64 {
}

func (p *balanceRangeSchedulerPlan) score(storeID uint64) int64 {
return p.sourceMap[storeID]
return p.scoreMap[storeID]
}

func (p *balanceRangeSchedulerPlan) shouldBalance(scheduler string) bool {
Expand Down Expand Up @@ -459,6 +480,41 @@ func (r Role) String() string {
}
}

type Engine int

const (
TiKV Engine = iota
TiFlash
Unknown
)

func (e Engine) String() string {
switch e {
case TiKV:
return "tikv"
case TiFlash:
return "tiflash"
default:
return "unknown"
}
}

func (e Engine) MarshalJSON() ([]byte, error) {
return []byte(`"` + e.String() + `"`), nil
}

// NewEngine creates a new engine.
func NewEngine(role string) Engine {
switch role {
case "tikv":
return TiKV
case "tiflash":
return TiFlash
default:
return Unknown
}
}

// JobStatus is the status of the job.
type JobStatus int

Expand Down Expand Up @@ -489,11 +545,11 @@ func (s JobStatus) MarshalJSON() ([]byte, error) {
func NewRole(role string) Role {
switch role {
case "leader":
return learner
return leader
case "follower":
return follower
case "learner":
return leader
return learner
default:
return unknown
}
Expand Down
174 changes: 130 additions & 44 deletions pkg/schedule/schedulers/balance_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,20 @@
package schedulers

import (
"context"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/storage"
"testing"
"fmt"
"github.com/tikv/pd/pkg/schedule/operator"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/require"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/storage"
)

type balanceRangeSchedulerTestSuite struct {
suite.Suite
cancel context.CancelFunc
tc *mockcluster.Cluster
oc *operator.Controller
}

func TestBalanceRangeSchedulerTestSuite(t *testing.T) {
suite.Run(t, new(balanceRangeSchedulerTestSuite))
}

func (suite *balanceRangeSchedulerTestSuite) SetupTest() {
suite.cancel, _, suite.tc, suite.oc = prepareSchedulersTest()
}

func (suite *balanceRangeSchedulerTestSuite) TearDownTest() {
suite.cancel()
}

func TestGetPeers(t *testing.T) {
re := require.New(t)
learner := &metapb.Peer{StoreId: 1, Id: 1, Role: metapb.PeerRole_Learner}
Expand Down Expand Up @@ -80,6 +60,39 @@ func TestGetPeers(t *testing.T) {
}
}

func TestJobStatus(t *testing.T) {
re := require.New(t)
conf := balanceRangeSchedulerConfig{}
for _, v := range []struct {
jobStatus JobStatus
begin bool
finish bool
}{
{
pending,
true,
false,
},
{
running,
false,
true,
},
{
finished,
false,
false,
},
} {
job := &balanceRangeSchedulerJob{
Status: v.jobStatus,
}
re.Equal(v.begin, conf.begin(job))
job.Status = v.jobStatus
re.Equal(v.finish, conf.finish(job))
}
}

func TestBalanceRangeShouldBalance(t *testing.T) {
re := require.New(t)
for _, v := range []struct {
Expand All @@ -106,27 +119,100 @@ func TestBalanceRangeShouldBalance(t *testing.T) {
}
}

//func TestBalanceRangePrepare(t *testing.T) {
// re := require.New(t)
// cancel, _, tc, oc := prepareSchedulersTest()
// defer cancel()
// // args: [role, engine, timeout, range1, range2, ...]
//}
func TestBalanceRangePlan(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
sc := newBalanceRangeScheduler(oc, &balanceRangeSchedulerConfig{}).(*balanceRangeScheduler)
for i := 1; i <= 3; i++ {
tc.AddLeaderStore(uint64(i), 0)
}
tc.AddLeaderRegionWithRange(1, "100", "110", 1, 2, 3)
job := &balanceRangeSchedulerJob{
Engine: TiKV,
Role: leader,
Ranges: []core.KeyRange{core.NewKeyRange("100", "110")},
}
plan, err := sc.prepare(tc, *operator.NewOpInfluence(), job)
re.NoError(err)
re.NotNil(plan)
re.Len(plan.stores, 3)
re.Len(plan.scoreMap, 3)
re.Equal(plan.scoreMap[1], int64(1))
}

func TestBalanceRangeSchedule(t *testing.T) {
func TestTIKVEngine(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
// args: [role, engine, timeout, range1, range2, ...]
scheduler, err := CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"leader", "tikv", "1h", "100", "200"}))
scheduler, err := CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"leader", "tikv", "1h", "test", "100", "200"}))
re.Nil(err)
op, _ := scheduler.Schedule(tc, true)
re.Empty(op)
for i := 0; i <= 4; i++ {
tc.AddLeaderStore(uint64(int64(i)), i*10)
ops, _ := scheduler.Schedule(tc, true)
re.Empty(ops)
for i := 1; i <= 3; i++ {
tc.AddLeaderStore(uint64(i), 0)
}
tc.AddLeaderRegionWithRange(1, "100", "100", 1, 2, 3, 4)
tc.AddLeaderRegionWithRange(2, "110", "120", 1, 2, 3, 4)
op, _ = scheduler.Schedule(tc, true)
re.NotEmpty(op)
// add regions:
// store-1: 3 leader regions
// store-2: 2 leader regions
// store-3: 1 leader regions
tc.AddLeaderRegionWithRange(1, "100", "110", 1, 2, 3)
tc.AddLeaderRegionWithRange(2, "110", "120", 1, 2, 3)
tc.AddLeaderRegionWithRange(3, "120", "140", 1, 2, 3)
tc.AddLeaderRegionWithRange(4, "140", "160", 2, 1, 3)
tc.AddLeaderRegionWithRange(5, "160", "180", 2, 1, 3)
tc.AddLeaderRegionWithRange(5, "180", "200", 3, 1, 2)
// case1: transfer leader from store 1 to store 3
ops, _ = scheduler.Schedule(tc, true)
re.NotEmpty(ops)
op := ops[0]
re.Equal(op.GetAdditionalInfo("sourceScore"), "3")
re.Equal(op.GetAdditionalInfo("targetScore"), "1")
re.Contains(op.Brief(), "transfer leader: store 1 to 3")
tc.AddLeaderStore(4, 0)

// case2: move peer from store 1 to store 4
ops, _ = scheduler.Schedule(tc, true)
re.NotEmpty(ops)
op = ops[0]
re.Equal(op.GetAdditionalInfo("sourceScore"), "3")
re.Equal(op.GetAdditionalInfo("targetScore"), "0")
re.Contains(op.Brief(), "mv peer: store [1] to [4]")
}

func TestTIFLASHEngine(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tikvCount := 3
for i := 1; i <= tikvCount; i++ {
tc.AddLeaderStore(uint64(i), 0)
}
for i := tikvCount + 1; i <= tikvCount+3; i++ {
tc.AddLabelsStore(uint64(i), 0, map[string]string{"engine": "tiflash"})
}
for i := 1; i <= 3; i++ {
tc.AddRegionWithLearner(uint64(i), 1, []uint64{2, 3}, []uint64{4})
}
startKey := fmt.Sprintf("%20d0", 1)
endKey := fmt.Sprintf("%20d0", 10)
tc.RuleManager.SetRule(&placement.Rule{
GroupID: "tiflash",
ID: "1",
Role: placement.Learner,
Count: 1,
StartKey: []byte(startKey),
EndKey: []byte(endKey),
LabelConstraints: []placement.LabelConstraint{
{Key: "engine", Op: "in", Values: []string{"tiflash"}},
},
})

scheduler, err := CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"learner", "tiflash", "1h", "test", startKey, endKey}))
re.NoError(err)
ops, _ := scheduler.Schedule(tc, false)
re.NotEmpty(ops)
op := ops[0]
re.Equal(op.GetAdditionalInfo("sourceScore"), "3")
re.Contains(op.Brief(), "mv peer: store [4] to")
}
Loading

0 comments on commit d0f2cae

Please sign in to comment.