Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

az-aware-tightly-pack will attempt to schedule dynamic executors in the same zone #245

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions internal/extender/binpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ type Binpacker struct {
Name string
BinpackFunc binpack.SparkBinPackFunction
IsSingleAz bool
// RequiredSingledAz denotes that an application should only ever be scheduled in a single AZ (if IsSingleAz is true)
// otherwise this is merely a preference.
RequiredSingleAz bool
}

var binpackFunctions = map[string]*Binpacker{
tightlyPack: {tightlyPack, binpack.TightlyPack, false},
distributeEvenly: {distributeEvenly, binpack.DistributeEvenly, false},
azAwareTightlyPack: {azAwareTightlyPack, binpack.AzAwareTightlyPack, false},
singleAZTightlyPack: {singleAZTightlyPack, binpack.SingleAZTightlyPack, true},
singleAzMinimalFragmentation: {singleAzMinimalFragmentation, binpack.SingleAZMinimalFragmentation, true},
tightlyPack: {Name: tightlyPack, BinpackFunc: binpack.TightlyPack},
distributeEvenly: {Name: distributeEvenly, BinpackFunc: binpack.DistributeEvenly},
azAwareTightlyPack: {Name: azAwareTightlyPack, BinpackFunc: binpack.AzAwareTightlyPack, IsSingleAz: true},
singleAZTightlyPack: {Name: singleAZTightlyPack, BinpackFunc: binpack.SingleAZTightlyPack, IsSingleAz: true, RequiredSingleAz: true},
singleAzMinimalFragmentation: {Name: singleAzMinimalFragmentation, BinpackFunc: binpack.SingleAZMinimalFragmentation, IsSingleAz: true, RequiredSingleAz: true},
}

// SelectBinpacker selects the binpack function from the given name
Expand Down
62 changes: 50 additions & 12 deletions internal/extender/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ func logApplicationPods(ctx context.Context, applicationPods []*v1.Pod) {
}

func (s *SparkSchedulerExtender) rescheduleExecutor(ctx context.Context, executor *v1.Pod, nodeNames []string, isExtraExecutor bool) (string, string, error) {
initialNodeNames := nodeNames
driver, err := s.podLister.getDriverPodForExecutor(ctx, executor)
if err != nil {
return "", failureInternal, err
Expand Down Expand Up @@ -613,27 +614,64 @@ func (s *SparkSchedulerExtender) rescheduleExecutor(ctx context.Context, executo
usage.Add(overhead)
availableResources := resources.AvailableForNodes(availableNodes, usage)

_, executorNodeNames := s.nodeSorter.PotentialNodes(availableNodesSchedulingMetadata, nodeNames)
name, reason, ok := s.findExecutorNode(
availableNodesSchedulingMetadata,
nodeNames,
executorResources,
availableResources,
isExtraExecutor)
if ok {
return name, reason, nil
}

for _, name := range executorNodeNames {
if !executorResources.GreaterThan(availableResources[name]) {
if isExtraExecutor {
return name, successScheduledExtraExecutor, nil
if shouldScheduleIntoSingleAZ {
if s.binpacker.RequiredSingleAz {
svc1log.FromContext(ctx).Info("Failed to find space in zone for additional executor, creating a demand", svc1log.SafeParam("zone", singleAzZone))
metrics.IncrementSingleAzDynamicAllocationPackFailure(ctx, singleAzZone)
demandZone := demandapi.Zone(singleAzZone)
s.createDemandForExecutorInSpecificZone(ctx, executor, executorResources, &demandZone)
} else {
name, reason, ok = s.findExecutorNode(
availableNodesSchedulingMetadata,
initialNodeNames,
executorResources,
availableResources,
isExtraExecutor,
)
if ok {
svc1log.FromContext(ctx).Info("Preferred single az scheduling resulted in an executor being scheduled in a different AZ")
return name, reason, nil
}
return name, successRescheduled, nil

// we still create the demand in any AZ, as we want to maximize the chance of being able to provision a new
// node, even if that means paying for cross-az traffic
s.createDemandForExecutorInAnyZone(ctx, executor, executorResources)
}
}
if shouldScheduleIntoSingleAZ {
svc1log.FromContext(ctx).Info("Failed to find space in zone for additional executor, creating a demand", svc1log.SafeParam("zone", singleAzZone))
metrics.IncrementSingleAzDynamicAllocationPackFailure(ctx, singleAzZone)
demandZone := demandapi.Zone(singleAzZone)
s.createDemandForExecutorInSpecificZone(ctx, executor, executorResources, &demandZone)
} else {
s.createDemandForExecutorInAnyZone(ctx, executor, executorResources)
}
return "", failureFit, werror.ErrorWithContextParams(ctx, "not enough capacity to reschedule the executor")
}

func (s *SparkSchedulerExtender) findExecutorNode(
availableNodesSchedulingMetadata resources.NodeGroupSchedulingMetadata,
nodeNames []string,
executorResources *resources.Resources,
availableResources resources.NodeGroupResources,
isExtraExecutor bool,
) (string, string, bool) {
_, executorNodeNames := s.nodeSorter.PotentialNodes(availableNodesSchedulingMetadata, nodeNames)
for _, name := range executorNodeNames {
if !executorResources.GreaterThan(availableResources[name]) {
if isExtraExecutor {
return name, successScheduledExtraExecutor, true
}
return name, successRescheduled, true
}
}
return "", failureFit, false
}

func (s *SparkSchedulerExtender) isSuccessOutcome(outcome string) bool {
return outcome == success || outcome == successAlreadyBound || outcome == successRescheduled || outcome == successScheduledExtraExecutor
}