Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: 童剑 <[email protected]>
  • Loading branch information
bufferflies committed Feb 18, 2025
1 parent 2854acd commit 40f0f56
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 96 deletions.
2 changes: 1 addition & 1 deletion pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func (oc *Controller) GetHistory(start time.Time) []OpHistory {
// returns false if the region does not meet the condition, it will calculate the influence of this region.
type OpInfluenceOption func(region *core.RegionInfo) bool

// WithRangeOption returns an OpInfluenceOption that filters the region by the label.
// WithRangeOption returns an OpInfluenceOption that filters the region by the key ranges.
func WithRangeOption(ranges []core.KeyRange) OpInfluenceOption {
return func(region *core.RegionInfo) bool {
for _, r := range ranges {
Expand Down
127 changes: 42 additions & 85 deletions pkg/schedule/schedulers/balance_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

const balanceRangeName = "balance-range-scheduler"

type balanceRangeSchedulerHandler struct {
rd *render.Render
config *balanceRangeSchedulerConfig
Expand Down Expand Up @@ -77,7 +75,7 @@ type balanceRangeSchedulerConfig struct {
type balanceRangeSchedulerJob struct {
JobID uint64 `json:"job-id"`
Role Role `json:"role"`
Engine engine `json:"engine"`
Engine string `json:"engine"`
Timeout time.Duration `json:"timeout"`
Ranges []core.KeyRange `json:"ranges"`
Alias string `json:"alias"`
Expand All @@ -98,6 +96,8 @@ func (conf *balanceRangeSchedulerConfig) begin(job *balanceRangeSchedulerJob) bo
job.Status = running
if err := conf.save(); err != nil {
log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID))
job.Status = pending
job.Start = nil
return false
}
return true
Expand All @@ -111,8 +111,11 @@ func (conf *balanceRangeSchedulerConfig) finish(job *balanceRangeSchedulerJob) b
}
now := time.Now()
job.Finish = &now
job.Status = finished
if err := conf.save(); err != nil {
log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID))
job.Status = running
job.Finish = nil
return false
}
return true
Expand Down Expand Up @@ -276,7 +279,7 @@ func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool)
}
// Check region leader
if plan.region.GetLeader() == nil {
log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID()))
log.Warn("region has no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID()))
balanceRangeNoLeaderCounter.Inc()
continue
}
Expand All @@ -294,6 +297,7 @@ func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, ds
excludeTargets := plan.region.GetStoreIDs()
if plan.job.Role == leader {
excludeTargets = make(map[uint64]struct{})
excludeTargets[plan.region.GetLeader().GetStoreId()] = struct{}{}
}
conf := plan.GetSchedulerConfig()
filters := []filter.Filter{
Expand Down Expand Up @@ -372,71 +376,62 @@ type balanceRangeSchedulerPlan struct {
tolerate int64
}

type storeInfo struct {
store *core.StoreInfo
score int64
}

func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluence operator.OpInfluence, job *balanceRangeSchedulerJob) (*balanceRangeSchedulerPlan, error) {
krs := core.NewKeyRanges(job.Ranges)
scanRegions, err := cluster.BatchScanRegions(krs)
if err != nil {
return nil, err
}
filters := s.filters
if job.Engine == tiflash {
filters = append(filters, filter.NewEngineFilter(balanceRangeName, filter.SpecialEngines))
switch job.Engine {
case core.EngineTiKV:
filters = append(filters, filter.NewEngineFilter(string(types.BalanceRangeScheduler), filter.NotSpecialEngines))
case core.EngineTiFlash:
filters = append(filters, filter.NewEngineFilter(string(types.BalanceRangeScheduler), filter.SpecialEngines))
default:
return nil, errs.ErrGetSourceStore.FastGenByArgs(job.Engine)
}
sources := filter.SelectSourceStores(cluster.GetStores(), filters, cluster.GetSchedulerConfig(), nil, nil)
if sources == nil {
return nil, errs.ErrStoresNotEnough.FastGenByArgs("no store to select")
}
storeInfos := make(map[uint64]*storeInfo, len(sources))

krs := core.NewKeyRanges(job.Ranges)
scanRegions, err := cluster.BatchScanRegions(krs)
if err != nil {
return nil, err
}

// storeID <--> score mapping
scoreMap := make(map[uint64]int64, len(sources))
for _, source := range sources {
storeInfos[source.GetID()] = &storeInfo{store: source}
scoreMap[source.GetID()] = 0
}
totalScore := int64(0)
for _, region := range scanRegions {
for _, peer := range job.Role.getPeers(region) {
storeInfos[peer.GetStoreId()].score += 1
scoreMap[peer.GetStoreId()] += 1
totalScore += 1
}
}
tolerate := int64(float64(len(scanRegions)) * adjustRatio)
if tolerate < 1 {
tolerate = 1
}

storeList := make([]*storeInfo, 0, len(storeInfos))
for storeID, store := range storeInfos {
if influence := opInfluence.GetStoreInfluence(storeID); influence != nil {
store.score += job.Role.getStoreInfluence(influence)
}
storeList = append(storeList, store)
}
sort.Slice(storeList, func(i, j int) bool {
sort.Slice(sources, func(i, j int) bool {
role := job.Role
iop := role.getStoreInfluence(opInfluence.GetStoreInfluence(storeList[i].store.GetID()))
jop := role.getStoreInfluence(opInfluence.GetStoreInfluence(storeList[j].store.GetID()))
return storeList[i].score+iop > storeList[j].score+jop
iop := role.getStoreInfluence(opInfluence.GetStoreInfluence(sources[i].GetID()))
jop := role.getStoreInfluence(opInfluence.GetStoreInfluence(sources[j].GetID()))
iScore := scoreMap[sources[i].GetID()]
jScore := scoreMap[sources[j].GetID()]
return iScore+iop > jScore+jop
})
sourceMap := make(map[uint64]int64)
for _, store := range storeList {
sourceMap[store.store.GetID()] = store.score
}

stores := make([]*core.StoreInfo, 0, len(storeList))
for _, store := range storeList {
stores = append(stores, store.store)
}
averageScore := int64(0)
if len(storeList) != 0 {
averageScore = totalScore / int64(len(storeList))
if len(sources) != 0 {
averageScore = totalScore / int64(len(sources))
}

tolerate := int64(float64(len(scanRegions)) * adjustRatio)
if tolerate < 1 {
tolerate = 1
}
return &balanceRangeSchedulerPlan{
SchedulerCluster: cluster,
stores: stores,
scoreMap: sourceMap,
stores: sources,
scoreMap: scoreMap,
source: nil,
target: nil,
region: nil,
Expand Down Expand Up @@ -476,7 +471,7 @@ func (p *balanceRangeSchedulerPlan) shouldBalance(scheduler string) bool {

shouldBalance := sourceScore >= targetScore
if !shouldBalance && log.GetLevel() <= zap.DebugLevel {
log.Debug("skip balance ",
log.Debug("skip balance",
zap.String("scheduler", scheduler),
zap.Uint64("region-id", p.region.GetID()),
zap.Uint64("source-store", p.sourceStoreID()),
Expand All @@ -497,7 +492,6 @@ type Role int

const (
leader Role = iota
// include leader + voter
follower
learner
unknown
Expand All @@ -516,43 +510,6 @@ func (r Role) String() string {
}
}

// engine is the engine of the store.
type engine int

const (
tiKV engine = iota
tiflash
notSupported
)

func (e engine) String() string {
switch e {
case tiKV:
return "tikv"
case tiflash:
return "tiflash"
default:
return "not-supported"
}
}

// MarshalJSON marshals to json.
func (e engine) MarshalJSON() ([]byte, error) {
return []byte(`"` + e.String() + `"`), nil
}

// NewEngine creates a new engine.
func NewEngine(role string) engine {
switch role {
case "tikv":
return tiKV
case "tiflash":
return tiflash
default:
return notSupported
}
}

// JobStatus is the status of the job.
type JobStatus int

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestJobStatus(t *testing.T) {
conf := &balanceRangeSchedulerConfig{
schedulerConfig: &baseSchedulerConfig{},
}
conf.init(balanceRangeName, s, conf)
conf.init(string(types.BalanceRangeScheduler), s, conf)
for _, v := range []struct {
jobStatus JobStatus
begin bool
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestBalanceRangePlan(t *testing.T) {
}
tc.AddLeaderRegionWithRange(1, "100", "110", 1, 2, 3)
job := &balanceRangeSchedulerJob{
Engine: tiKV,
Engine: core.EngineTiKV,
Role: leader,
Ranges: []core.KeyRange{core.NewKeyRange("100", "110")},
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/schedule/schedulers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,13 +567,12 @@ func schedulersRegister() {
if role == unknown {
return errs.ErrQueryUnescape.FastGenByArgs("role")
}
engineString, err := url.QueryUnescape(args[1])
engine, err := url.QueryUnescape(args[1])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
engine := NewEngine(engineString)
if engine == notSupported {
return errs.ErrQueryUnescape.FastGenByArgs("engine")
if engine != core.EngineTiFlash && engine != core.EngineTiKV {
return errs.ErrQueryUnescape.FastGenByArgs("engine must be tikv or tiflash ")
}
timeout, err := url.QueryUnescape(args[2])
if err != nil {
Expand All @@ -596,10 +595,6 @@ func schedulersRegister() {
id = conf.jobs[len(conf.jobs)-1].JobID + 1
}

if engine == tiflash && role != learner {
return errs.ErrURLParse.FastGenByArgs("TiFlash only support learner role")
}

job := &balanceRangeSchedulerJob{
Role: role,
Engine: engine,
Expand Down

0 comments on commit 40f0f56

Please sign in to comment.