Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

improvement: Introduce the idea of GangScaling #259

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
install.ExecutorPrioritizedNodeLabel,
),
wasteMetricsReporter,
install.GangScaling,
)

resourceReporter := metrics.NewResourceReporter(
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type Install struct {
config.Runtime `yaml:",inline"`
Kubeconfig string `yaml:"kube-config,omitempty"`
FIFO bool `yaml:"fifo,omitempty"`
FifoConfig FifoConfig `yaml:"fifo-config,omitempty"`
FifoConfig FifoConfig `yaml:"t,omitempty"`
GangScaling bool `yaml:"gang-scaling,omitempty"`
QPS float32 `yaml:"qps,omitempty"`
Burst int `yaml:"burst,omitempty"`
BinpackAlgo string `yaml:"binpack,omitempty"`
Expand Down
22 changes: 22 additions & 0 deletions internal/extender/demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ func (s *SparkSchedulerExtender) createDemandForExecutorInAnyZone(ctx context.Co
s.createDemandForExecutorInSpecificZone(ctx, executorPod, executorResources, nil)
}

func (s *SparkSchedulerExtender) createPartialDemand(
ctx context.Context,
driverPod *v1.Pod,
executorResources *resources.Resources,
executorCount int,
zone *demandapi.Zone) {
if !s.demands.CRDExists() {
return
}
units := []demandapi.DemandUnit{
{
Count: executorCount,
Resources: demandapi.ResourceList{
demandapi.ResourceCPU: executorResources.CPU,
demandapi.ResourceMemory: executorResources.Memory,
demandapi.ResourceNvidiaGPU: executorResources.NvidiaGPU,
},
},
}
s.createDemand(ctx, driverPod, units, zone)
}

func (s *SparkSchedulerExtender) createDemandForExecutorInSpecificZone(ctx context.Context, executorPod *v1.Pod, executorResources *resources.Resources, zone *demandapi.Zone) {
if !s.demands.CRDExists() {
return
Expand Down
1 change: 1 addition & 0 deletions internal/extender/extendertest/extender_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func NewTestExtender(binpackAlgo string, objects ...runtime.Object) (*Harness, e
instanceGroupLabel,
sort.NewNodeSorter(nil, nil),
wasteMetricsReporter,
false,
)

unschedulablePodMarker := extender.NewUnschedulablePodMarker(
Expand Down
79 changes: 70 additions & 9 deletions internal/extender/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type SparkSchedulerExtender struct {
overheadComputer *OverheadComputer
lastRequest time.Time
instanceGroupLabel string
isGangScaling bool

wasteMetricsReporter *metrics.WasteMetricsReporter
}
Expand All @@ -100,7 +101,8 @@ func NewExtender(
overheadComputer *OverheadComputer,
instanceGroupLabel string,
nodeSorter *ns.NodeSorter,
wasteMetricsReporter *metrics.WasteMetricsReporter) *SparkSchedulerExtender {
wasteMetricsReporter *metrics.WasteMetricsReporter,
isGangScaling bool) *SparkSchedulerExtender {
return &SparkSchedulerExtender{
nodeLister: nodeLister,
podLister: podLister,
Expand All @@ -118,6 +120,7 @@ func NewExtender(
instanceGroupLabel: instanceGroupLabel,
nodeSorter: nodeSorter,
wasteMetricsReporter: wasteMetricsReporter,
isGangScaling: isGangScaling,
}
}

Expand Down Expand Up @@ -316,14 +319,7 @@ func (s *SparkSchedulerExtender) selectDriverNode(
}
}

packingResult := s.binpacker.BinpackFunc(
ctx,
applicationResources.driverResources,
applicationResources.executorResources,
applicationResources.minExecutorCount,
driverNodeNames,
executorNodeNames,
availableNodesSchedulingMetadata)
packingResult := s.getPackingResult(ctx, applicationResources, driverNodeNames, executorNodeNames, availableNodesSchedulingMetadata)
efficiency := computeAvgPackingEfficiencyForResult(availableNodesSchedulingMetadata, packingResult)

svc1log.FromContext(ctx).Debug("binpacking result",
Expand Down Expand Up @@ -364,6 +360,14 @@ func (s *SparkSchedulerExtender) selectDriverNode(
if err != nil {
return "", failureInternal, err
}
// Then we consider create partial demands
if s.isGangScaling {
differenceBetweenAllocationAndDesired := applicationResources.minExecutorCount - len(packingResult.ExecutorNodes)
if differenceBetweenAllocationAndDesired > 0 {
zone := s.getZone(packingResult.DriverNode, availableNodesSchedulingMetadata)
s.createPartialDemand(ctx, driver, applicationResources.executorResources, differenceBetweenAllocationAndDesired, zone)
}
}
return packingResult.DriverNode, success, nil
}

Expand Down Expand Up @@ -703,3 +707,60 @@ func (s *SparkSchedulerExtender) rescheduleExecutorWithMinimalFragmentation(
func (s *SparkSchedulerExtender) isSuccessOutcome(outcome string) bool {
return outcome == success || outcome == successAlreadyBound || outcome == successRescheduled || outcome == successScheduledExtraExecutor
}

func (s *SparkSchedulerExtender) getPackingResult(
ctx context.Context,
applicationResources *sparkApplicationResources,
driverNodeNames []string,
executorNodeNames []string,
availableNodesSchedulingMetadata resources.NodeGroupSchedulingMetadata) *binpack.PackingResult {
if !s.isGangScaling {
return s.binpacker.BinpackFunc(
ctx,
applicationResources.driverResources,
applicationResources.executorResources,
applicationResources.minExecutorCount,
driverNodeNames,
executorNodeNames,
availableNodesSchedulingMetadata)
}
i := applicationResources.minExecutorCount
if i < 0 {
i = 0
}
var packingResult *binpack.PackingResult
for {
if i == -1 {
break
}
packingResult = s.binpacker.BinpackFunc(
ctx,
applicationResources.driverResources,
applicationResources.executorResources,
i,
driverNodeNames,
executorNodeNames,
availableNodesSchedulingMetadata)

if packingResult.HasCapacity {
break
}
i = i - 1
}
return packingResult
}

func (s *SparkSchedulerExtender) getZone(driverNodeName string, metadata resources.NodeGroupSchedulingMetadata) *demandapi.Zone {
if !s.binpacker.IsSingleAz {
return nil
}
nodeSchedulingMetadata, ok := metadata[driverNodeName]
if !ok || nodeSchedulingMetadata == nil {
return nil
}
if nodeSchedulingMetadata.ZoneLabel == "" {
return nil
}
demandZone := demandapi.Zone(nodeSchedulingMetadata.ZoneLabel)
return &demandZone
}