Skip to content

Commit

Permalink
block unmatched empty domains
Browse files Browse the repository at this point in the history
  • Loading branch information
leoryu committed Jan 12, 2025
1 parent 31a62ea commit 420766f
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 23 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func validateKarpenterManagedLabelCanExist(p *corev1.Pod) error {
func (p *Provisioner) convertToPodVolumeRequirements(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement {
var schedulablePods = make(map[*corev1.Pod][]corev1.NodeSelectorRequirement)
for _, pod := range pods {
if err, requirements := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil {
if requirements, err := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil {
log.FromContext(ctx).WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)).Error(err, "failed getting volume topology requirements")
} else {
schedulablePods[pod] = requirements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
clock := &clock.RealClock{}
cluster = state.NewCluster(clock, client, cloudProvider)
domains := map[string]sets.Set[string]{}
topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods)
podsVolumeRequirements := make(map[*corev1.Pod][]corev1.NodeSelectorRequirement)
topology, err := scheduling.NewTopology(ctx, client, cluster, domains, podsVolumeRequirements)
if err != nil {
b.Fatalf("creating topology, %s", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (t *Topology) AddRequirements(podRequirements, nodeRequirements scheduling.
if nodeRequirements.Has(topology.Key) {
nodeDomains = nodeRequirements.Get(topology.Key)
}
domains := topology.Get(p, podDomains, nodeDomains)
domains := topology.Get(p, podDomains, nodeDomains, len(t.podVolumeRequirements[p]) != 0)
if domains.Len() == 0 {
return nil, topologyError{
topology: topology,
Expand Down Expand Up @@ -249,7 +249,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po
return err
}

tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])

hash := tg.Hash()
if existing, ok := t.inverseTopologies[hash]; !ok {
Expand Down Expand Up @@ -327,7 +327,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup {
var topologyGroups []*TopologyGroup
for _, cs := range p.Spec.TopologySpreadConstraints {
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey]))
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, t.cluster, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey]))
}
return topologyGroups
}
Expand Down Expand Up @@ -364,7 +364,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo
if err != nil {
return nil, err
}
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]))
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]))
}
}
return topologyGroups, nil
Expand Down
81 changes: 68 additions & 13 deletions pkg/controllers/provisioning/scheduling/topologygroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/scheduling"
)

Expand Down Expand Up @@ -59,6 +60,7 @@ type TopologyGroup struct {
Type TopologyType
maxSkew int32
minDomains *int32
cluster *state.Cluster
namespaces sets.Set[string]
selector labels.Selector
rawSelector *metav1.LabelSelector
Expand All @@ -69,7 +71,7 @@ type TopologyGroup struct {
emptyDomains sets.Set[string] // domains for which we know that no pod exists
}

func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup {
func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, cluster *state.Cluster, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup {
domainCounts := map[string]int32{}
for domain := range domains {
domainCounts[domain] = 0
Expand All @@ -86,6 +88,7 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod
return &TopologyGroup{
Type: topologyType,
Key: topologyKey,
cluster: cluster,
namespaces: namespaces,
selector: selector,
rawSelector: labelSelector,
Expand All @@ -98,10 +101,10 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod
}
}

func (t *TopologyGroup) Get(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement {
func (t *TopologyGroup) Get(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement, hasVolumeRequirements bool) *scheduling.Requirement {
switch t.Type {
case TopologyTypeSpread:
return t.nextDomainTopologySpread(pod, podDomains, nodeDomains)
return t.nextDomainTopologySpread(pod, podDomains, nodeDomains, hasVolumeRequirements)
case TopologyTypePodAffinity:
return t.nextDomainAffinity(pod, podDomains, nodeDomains)
case TopologyTypePodAntiAffinity:
Expand Down Expand Up @@ -175,34 +178,78 @@ func (t *TopologyGroup) Hash() uint64 {
}

// nextDomainTopologySpread returns a scheduling.Requirement that includes a node domain that a pod should be scheduled to.
// If there are multiple eligible domains, we return all eligible domains that satisfies the `maxSkew` configuration.
// If there are multiple eligible domains, we return any random domain that satisfies the `maxSkew` configuration.
// If there are no eligible domains, we return a `DoesNotExist` requirement, implying that we could not satisfy the topologySpread requirement.
// nolint:gocyclo
func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement {
func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement, hasVolumeRequirement bool) *scheduling.Requirement {
var nodes = make(map[string][]*v1.Node)
var blockedDomains = sets.New[string]()
var candidateDomains = []string{}
var firstDomains = []string{}

if t.cluster != nil {
for _, node := range t.cluster.Nodes() {
if node == nil || node.Node == nil {
continue
}
if _, ok := node.Node.GetLabels()[t.Key]; !ok {
continue
}
nodes[node.Node.GetLabels()[t.Key]] = append(nodes[node.Node.GetLabels()[t.Key]], node.Node)
}
}
// some empty domains, which all existing nodes with them don't match the pod, should not be in the calculations.
for _, domain := range t.emptyDomains.UnsortedList() {
// no existing node has this domain, so this domain is in nodeclaim and may will be created first time.
if len(nodes[domain]) == 0 {
// if we have volume requirement, we should block the first time domain, since it's skew is always 0 which may break the skew caculations.
if hasVolumeRequirement {
firstDomains = append(firstDomains, domain)
} else {
continue
}
}
var needBlock = true
for _, node := range nodes[domain] {
if node.GetLabels()[t.Key] == domain && t.nodeFilter.Matches(node) {
needBlock = false
break
}
}
if needBlock {
blockedDomains.Insert(domain)
}
}
// min count is calculated across all domains
min := t.domainMinCount(podDomains)
min := t.domainMinCount(podDomains, blockedDomains)
selfSelecting := t.selects(pod)

candidateDomains := []string{}
minDomain := ""
minCount := int32(math.MaxInt32)

// If we are explicitly selecting on specific node domains ("In" requirement),
// this is going to be more efficient to iterate through
// This is particularly useful when considering the hostname topology key that can have a
// lot of t.domains but only a single nodeDomain
if nodeDomains.Operator() == v1.NodeSelectorOpIn {
for _, domain := range nodeDomains.Values() {
if count, ok := t.domains[domain]; ok {
if count, ok := t.domains[domain]; ok && !blockedDomains.Has(domain) {
if selfSelecting {
count++
}
if count-min <= t.maxSkew {
candidateDomains = append(candidateDomains, domain)
if count < minCount {
minDomain = domain
minCount = count
}
}
}
}
} else {
for domain := range t.domains {
// but we can only choose from the node domains
if nodeDomains.Has(domain) {
if nodeDomains.Has(domain) && !blockedDomains.Has(domain) {
// comment from kube-scheduler regarding the viable choices to schedule to based on skew is:
// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'
count := t.domains[domain]
Expand All @@ -211,18 +258,26 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo
}
if count-min <= t.maxSkew {
candidateDomains = append(candidateDomains, domain)
if count < minCount {
minDomain = domain
minCount = count
}
}
}
}
}
if len(candidateDomains) == 0 {
if minDomain == "" && len(firstDomains) == 0 {
// avoids an error message about 'zone in [""]', preferring 'zone in []'
return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpDoesNotExist)
}
return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, candidateDomains...)
// we should pop all candidate domains for volume requirments
if hasVolumeRequirement {
return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, append(firstDomains, candidateDomains...)...)
}
return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, minDomain)
}

func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 {
func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement, blockedDomains sets.Set[string]) int32 {
// hostname based topologies always have a min pod count of zero since we can create one
if t.Key == v1.LabelHostname {
return 0
Expand All @@ -232,7 +287,7 @@ func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 {
var numPodSupportedDomains int32
// determine our current min count
for domain, count := range t.domains {
if domains.Has(domain) {
if domains.Has(domain) && !blockedDomains.Has(domain) {
numPodSupportedDomains++
if count < min {
min = count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func MakeTopologyNodeFilter(p *v1.Pod) TopologyNodeFilter {
}

// Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology
// TODO: Node filter should respect nodeAffinityPolicy/nodeTaintsPolicy field in future.
func (t TopologyNodeFilter) Matches(node *v1.Node) bool {
return t.MatchesRequirements(scheduling.NewLabelRequirements(node.Labels))
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/provisioning/scheduling/volumetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ type VolumeTopology struct {
kubeClient client.Client
}

func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) (error, []v1.NodeSelectorRequirement) {
func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) ([]v1.NodeSelectorRequirement, error) {
var requirements []v1.NodeSelectorRequirement
for _, volume := range pod.Spec.Volumes {
req, err := v.getRequirements(ctx, pod, volume)
if err != nil {
return err, nil
return nil, err
}
requirements = append(requirements, req...)
}
if len(requirements) == 0 {
return nil, requirements
return requirements, nil
}

log.FromContext(ctx).
WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)).
V(1).Info(fmt.Sprintf("found requirements from pod volumes, %s", requirements))
return nil, requirements
return requirements, nil
}

func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) ([]v1.NodeSelectorRequirement, error) {
Expand Down

0 comments on commit 420766f

Please sign in to comment.