diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index 61a800ebb9b..492d5e9bd1e 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -834,7 +834,7 @@ func (oc *Controller) GetHistory(start time.Time) []OpHistory { // returns false if the region does not meet the condition, it will calculate the influence of this region. type OpInfluenceOption func(region *core.RegionInfo) bool -// WithRangeOption returns an OpInfluenceOption that filters the region by the label. +// WithRangeOption returns an OpInfluenceOption that filters the region by the key ranges. func WithRangeOption(ranges []core.KeyRange) OpInfluenceOption { return func(region *core.RegionInfo) bool { for _, r := range ranges { diff --git a/pkg/schedule/schedulers/balance_range.go b/pkg/schedule/schedulers/balance_range.go index 3c170fa17f5..8fd5df1d361 100644 --- a/pkg/schedule/schedulers/balance_range.go +++ b/pkg/schedule/schedulers/balance_range.go @@ -39,8 +39,6 @@ import ( "github.com/tikv/pd/pkg/utils/syncutil" ) -const balanceRangeName = "balance-range-scheduler" - type balanceRangeSchedulerHandler struct { rd *render.Render config *balanceRangeSchedulerConfig @@ -77,7 +75,7 @@ type balanceRangeSchedulerConfig struct { type balanceRangeSchedulerJob struct { JobID uint64 `json:"job-id"` Role Role `json:"role"` - Engine engine `json:"engine"` + Engine string `json:"engine"` Timeout time.Duration `json:"timeout"` Ranges []core.KeyRange `json:"ranges"` Alias string `json:"alias"` @@ -98,6 +96,8 @@ func (conf *balanceRangeSchedulerConfig) begin(job *balanceRangeSchedulerJob) bo job.Status = running if err := conf.save(); err != nil { log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID)) + job.Status = pending + job.Start = nil return false } return true @@ -111,8 +111,11 @@ func (conf *balanceRangeSchedulerConfig) finish(job *balanceRangeSchedulerJob) b } now := time.Now() job.Finish = &now + job.Status = finished if err := conf.save(); err != nil { log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID)) + job.Status = running + job.Finish = nil return false } return true @@ -276,7 +279,7 @@ func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) } // Check region leader if plan.region.GetLeader() == nil { - log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID())) + log.Warn("region has no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID())) balanceRangeNoLeaderCounter.Inc() continue } @@ -294,6 +297,7 @@ func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, ds excludeTargets := plan.region.GetStoreIDs() if plan.job.Role == leader { excludeTargets = make(map[uint64]struct{}) + excludeTargets[plan.region.GetLeader().GetStoreId()] = struct{}{} } conf := plan.GetSchedulerConfig() filters := []filter.Filter{ @@ -372,71 +376,62 @@ type balanceRangeSchedulerPlan struct { tolerate int64 } -type storeInfo struct { - store *core.StoreInfo - score int64 -} - func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluence operator.OpInfluence, job *balanceRangeSchedulerJob) (*balanceRangeSchedulerPlan, error) { - krs := core.NewKeyRanges(job.Ranges) - scanRegions, err := cluster.BatchScanRegions(krs) - if err != nil { - return nil, err - } filters := s.filters - if job.Engine == tiflash { - filters = append(filters, filter.NewEngineFilter(balanceRangeName, filter.SpecialEngines)) + switch job.Engine { + case core.EngineTiKV: + filters = append(filters, filter.NewEngineFilter(string(types.BalanceRangeScheduler), filter.NotSpecialEngines)) + case core.EngineTiFlash: + filters = append(filters, filter.NewEngineFilter(string(types.BalanceRangeScheduler), filter.SpecialEngines)) + default: + return nil, errs.ErrGetSourceStore.FastGenByArgs(job.Engine) } sources := filter.SelectSourceStores(cluster.GetStores(), filters, cluster.GetSchedulerConfig(), nil, nil) if sources == nil { return nil, errs.ErrStoresNotEnough.FastGenByArgs("no store to select") } - storeInfos := make(map[uint64]*storeInfo, len(sources)) + + krs := core.NewKeyRanges(job.Ranges) + scanRegions, err := cluster.BatchScanRegions(krs) + if err != nil { + return nil, err + } + + // storeID <--> score mapping + scoreMap := make(map[uint64]int64, len(sources)) for _, source := range sources { - storeInfos[source.GetID()] = &storeInfo{store: source} + scoreMap[source.GetID()] = 0 } totalScore := int64(0) for _, region := range scanRegions { for _, peer := range job.Role.getPeers(region) { - storeInfos[peer.GetStoreId()].score += 1 + scoreMap[peer.GetStoreId()] += 1 totalScore += 1 } } - tolerate := int64(float64(len(scanRegions)) * adjustRatio) - if tolerate < 1 { - tolerate = 1 - } - storeList := make([]*storeInfo, 0, len(storeInfos)) - for storeID, store := range storeInfos { - if influence := opInfluence.GetStoreInfluence(storeID); influence != nil { - store.score += job.Role.getStoreInfluence(influence) - } - storeList = append(storeList, store) - } - sort.Slice(storeList, func(i, j int) bool { + sort.Slice(sources, func(i, j int) bool { role := job.Role - iop := role.getStoreInfluence(opInfluence.GetStoreInfluence(storeList[i].store.GetID())) - jop := role.getStoreInfluence(opInfluence.GetStoreInfluence(storeList[j].store.GetID())) - return storeList[i].score+iop > storeList[j].score+jop + iop := role.getStoreInfluence(opInfluence.GetStoreInfluence(sources[i].GetID())) + jop := role.getStoreInfluence(opInfluence.GetStoreInfluence(sources[j].GetID())) + iScore := scoreMap[sources[i].GetID()] + jScore := scoreMap[sources[j].GetID()] + return iScore+iop > jScore+jop }) - sourceMap := make(map[uint64]int64) - for _, store := range storeList { - sourceMap[store.store.GetID()] = store.score - } - stores := make([]*core.StoreInfo, 0, len(storeList)) - for _, store := range storeList { - stores = append(stores, store.store) - } averageScore := int64(0) - if len(storeList) != 0 { - averageScore = totalScore / int64(len(storeList)) + if len(sources) != 0 { + averageScore = totalScore / int64(len(sources)) + } + + tolerate := int64(float64(len(scanRegions)) * adjustRatio) + if tolerate < 1 { + tolerate = 1 } return &balanceRangeSchedulerPlan{ SchedulerCluster: cluster, - stores: stores, - scoreMap: sourceMap, + stores: sources, + scoreMap: scoreMap, source: nil, target: nil, region: nil, @@ -476,7 +471,7 @@ func (p *balanceRangeSchedulerPlan) shouldBalance(scheduler string) bool { shouldBalance := sourceScore >= targetScore if !shouldBalance && log.GetLevel() <= zap.DebugLevel { - log.Debug("skip balance ", + log.Debug("skip balance", zap.String("scheduler", scheduler), zap.Uint64("region-id", p.region.GetID()), zap.Uint64("source-store", p.sourceStoreID()), @@ -497,7 +492,6 @@ type Role int const ( leader Role = iota - // include leader + voter follower learner unknown @@ -516,43 +510,6 @@ func (r Role) String() string { } } -// engine is the engine of the store. -type engine int - -const ( - tiKV engine = iota - tiflash - notSupported -) - -func (e engine) String() string { - switch e { - case tiKV: - return "tikv" - case tiflash: - return "tiflash" - default: - return "not-supported" - } -} - -// MarshalJSON marshals to json. -func (e engine) MarshalJSON() ([]byte, error) { - return []byte(`"` + e.String() + `"`), nil -} - -// NewEngine creates a new engine. -func NewEngine(role string) engine { - switch role { - case "tikv": - return tiKV - case "tiflash": - return tiflash - default: - return notSupported - } -} - // JobStatus is the status of the job. type JobStatus int diff --git a/pkg/schedule/schedulers/balance_range_test.go b/pkg/schedule/schedulers/balance_range_test.go index 3193fe935ae..da558c0f84a 100644 --- a/pkg/schedule/schedulers/balance_range_test.go +++ b/pkg/schedule/schedulers/balance_range_test.go @@ -66,7 +66,7 @@ func TestJobStatus(t *testing.T) { conf := &balanceRangeSchedulerConfig{ schedulerConfig: &baseSchedulerConfig{}, } - conf.init(balanceRangeName, s, conf) + conf.init(string(types.BalanceRangeScheduler), s, conf) for _, v := range []struct { jobStatus JobStatus begin bool @@ -107,7 +107,7 @@ func TestBalanceRangePlan(t *testing.T) { } tc.AddLeaderRegionWithRange(1, "100", "110", 1, 2, 3) job := &balanceRangeSchedulerJob{ - Engine: tiKV, + Engine: core.EngineTiKV, Role: leader, Ranges: []core.KeyRange{core.NewKeyRange("100", "110")}, } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 463bf87a363..00313985edc 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -567,13 +567,12 @@ func schedulersRegister() { if role == unknown { return errs.ErrQueryUnescape.FastGenByArgs("role") } - engineString, err := url.QueryUnescape(args[1]) + engine, err := url.QueryUnescape(args[1]) if err != nil { return errs.ErrQueryUnescape.Wrap(err) } - engine := NewEngine(engineString) - if engine == notSupported { - return errs.ErrQueryUnescape.FastGenByArgs("engine") + if engine != core.EngineTiFlash && engine != core.EngineTiKV { + return errs.ErrQueryUnescape.FastGenByArgs("engine must be tikv or tiflash ") } timeout, err := url.QueryUnescape(args[2]) if err != nil { @@ -596,10 +595,6 @@ func schedulersRegister() { id = conf.jobs[len(conf.jobs)-1].JobID + 1 } - if engine == tiflash && role != learner { - return errs.ErrURLParse.FastGenByArgs("TiFlash only support learner role") - } - job := &balanceRangeSchedulerJob{ Role: role, Engine: engine,