Skip to content

Commit

Permalink
Update deps again
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Jan 8, 2025
1 parent b1bdeaf commit ca3965f
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 107 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
k8s.io/apiserver v0.29.2
k8s.io/client-go v0.29.2
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
knative.dev/eventing v0.42.4-0.20241030064549-19e4c4bddaca
knative.dev/eventing v0.42.6-0.20241219120210-e387363f837c
knative.dev/hack v0.0.0-20240704013904-b9799599afcf
knative.dev/pkg v0.0.0-20240716082220-4355f0c73608
knative.dev/reconciler-test v0.0.0-20240716134925-00d94f40c470
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1213,8 +1213,8 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/eventing v0.42.4-0.20241030064549-19e4c4bddaca h1:i5PgRTbxNX1u7CnrWnQFFV0eGky09pU14NHzeacWwPY=
knative.dev/eventing v0.42.4-0.20241030064549-19e4c4bddaca/go.mod h1:hW5BMYcihtCelT9pqaMtK8gmNOo1ybxcigjBY+/fU+k=
knative.dev/eventing v0.42.6-0.20241219120210-e387363f837c h1:KJkmRLCa5oGAvwCvVxEX3o2s2Vy1cmxsOUMrCFmQoSw=
knative.dev/eventing v0.42.6-0.20241219120210-e387363f837c/go.mod h1:hW5BMYcihtCelT9pqaMtK8gmNOo1ybxcigjBY+/fU+k=
knative.dev/hack v0.0.0-20240704013904-b9799599afcf h1:n92FmZRywgtHso7pFAku7CW0qvRAs1hXtMQqO0R6eiE=
knative.dev/hack v0.0.0-20240704013904-b9799599afcf/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/pkg v0.0.0-20240716082220-4355f0c73608 h1:BOiRzcnRS9Z5ruxlCiS/K1/Hb5bUN0X4W3xCegdcYQE=
Expand Down
2 changes: 1 addition & 1 deletion vendor/knative.dev/eventing/hack/e2e-debug.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ wait_until_pods_running knative-eventing || fail_test "Pods in knative-eventing

header "Running tests"

go test -tags=e2e -v -timeout=30m -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed"
go test -tags=e2e -v -timeout=30m -parallel=12 -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed"
6 changes: 3 additions & 3 deletions vendor/knative.dev/eventing/pkg/auth/token_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"time"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
"knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"

"github.com/coreos/go-oidc/v3/oidc"
"go.uber.org/zap"
"k8s.io/client-go/rest"
"knative.dev/eventing/pkg/apis/feature"
listerseventingv1alpha1 "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
"knative.dev/pkg/injection"
"knative.dev/pkg/logging"
)
Expand All @@ -57,11 +57,11 @@ type IDToken struct {
AccessTokenHash string
}

func NewOIDCTokenVerifier(ctx context.Context) *OIDCTokenVerifier {
func NewOIDCTokenVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister) *OIDCTokenVerifier {
tokenHandler := &OIDCTokenVerifier{
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
eventPolicyLister: eventpolicyinformer.Get(ctx).Lister(),
eventPolicyLister: eventPolicyLister,
}

if err := tokenHandler.initOIDCProvider(ctx); err != nil {
Expand Down

This file was deleted.

2 changes: 2 additions & 0 deletions vendor/knative.dev/eventing/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type VPodLister func() ([]VPod, error)
// Evictor allows for vreplicas to be evicted.
// For instance, the evictor is used by the statefulset scheduler to
// move vreplicas to pod with a lower ordinal.
//
// pod might be `nil`.
type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) error

// Scheduler is responsible for placing VPods into real Kubernetes pods
Expand Down
56 changes: 23 additions & 33 deletions vendor/knative.dev/eventing/pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ type StateAccessor interface {
// It is used by for the scheduler and the autoscaler
type State struct {
// free tracks the free capacity of each pod.
//
// Including pods that might not exist anymore, it reflects the free capacity determined by
// placements in the vpod status.
FreeCap []int32

// schedulable pods tracks the pods that aren't being evicted.
SchedulablePods []int32

// LastOrdinal is the ordinal index corresponding to the last statefulset replica
// with placed vpods.
LastOrdinal int32

// Pod capacity.
Capacity int32

Expand Down Expand Up @@ -143,14 +142,10 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
return nil, err
}

free := make([]int32, 0)
freeCap := make([]int32, 0)
pending := make(map[types.NamespacedName]int32, 4)
expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods))
schedulablePods := sets.NewInt32()
last := int32(-1)

// keep track of (vpod key, podname) pairs with existing placements
withPlacement := make(map[types.NamespacedName]map[string]bool)

podSpread := make(map[types.NamespacedName]map[string]int32)

Expand All @@ -172,7 +167,7 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
freeCap = s.updateFreeCapacity(logger, freeCap, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
Expand All @@ -182,16 +177,13 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
pending[vpod.GetKey()] = pendingFromVPod(vpod)
expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas()

withPlacement[vpod.GetKey()] = make(map[string]bool)
podSpread[vpod.GetKey()] = make(map[string]int32)

for i := 0; i < len(ps); i++ {
podName := ps[i].PodName
vreplicas := ps[i].VReplicas

free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas)

withPlacement[vpod.GetKey()][podName] = true
freeCap = s.updateFreeCapacity(logger, freeCap, podName, vreplicas)

pod, err := s.podLister.Get(podName)
if err != nil {
Expand All @@ -204,8 +196,17 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
}
}

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}
state := &State{
FreeCap: freeCap,
SchedulablePods: schedulablePods.List(),
Capacity: s.capacity,
Replicas: scale.Spec.Replicas,
StatefulSetName: s.statefulSetName,
PodLister: s.podLister,
PodSpread: podSpread,
Pending: pending,
ExpectedVReplicaByVPod: expectedVReplicasByVPod,
}

logger.Infow("cluster state info", zap.Any("state", state))

Expand All @@ -219,23 +220,19 @@ func pendingFromVPod(vpod scheduler.VPod) int32 {
return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, freeCap []int32, podName string, vreplicas int32) []int32 {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)
freeCap = grow(freeCap, ordinal, s.capacity)

free[ordinal] -= vreplicas
freeCap[ordinal] -= vreplicas

// Assert the pod is not overcommitted
if free[ordinal] < 0 {
if overcommit := freeCap[ordinal]; overcommit < 0 {
// This should not happen anymore. Log as an error but do not interrupt the current scheduling.
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last {
last = ordinal
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("overcommit", overcommit))
}

return free, last
return freeCap
}

func (s *State) TotalPending() int32 {
Expand Down Expand Up @@ -283,23 +280,16 @@ func (s *State) MarshalJSON() ([]byte, error) {
type S struct {
FreeCap []int32 `json:"freeCap"`
SchedulablePods []int32 `json:"schedulablePods"`
LastOrdinal int32 `json:"lastOrdinal"`
Capacity int32 `json:"capacity"`
Replicas int32 `json:"replicas"`
NumZones int32 `json:"numZones"`
NumNodes int32 `json:"numNodes"`
NodeToZoneMap map[string]string `json:"nodeToZoneMap"`
StatefulSetName string `json:"statefulSetName"`
PodSpread map[string]map[string]int32 `json:"podSpread"`
NodeSpread map[string]map[string]int32 `json:"nodeSpread"`
ZoneSpread map[string]map[string]int32 `json:"zoneSpread"`
Pending map[string]int32 `json:"pending"`
}

sj := S{
FreeCap: s.FreeCap,
SchedulablePods: s.SchedulablePods,
LastOrdinal: s.LastOrdinal,
Capacity: s.Capacity,
Replicas: s.Replicas,
StatefulSetName: s.StatefulSetName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/integer"
Expand Down Expand Up @@ -250,17 +251,8 @@ func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State) error {
zap.Any("state", s),
)

// when there is only one pod there is nothing to move or number of pods is just enough!
if s.LastOrdinal < 1 || len(s.SchedulablePods) <= 1 {
return nil
}

// Determine if there is enough free capacity to
// move all vreplicas placed in the last pod to pods with a lower ordinal
freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal)
usedInLastPod := s.Capacity - s.Free(s.LastOrdinal)

if freeCapacity >= usedInLastPod {
// Determine if there are vpods that need compaction
if s.Replicas != int32(len(s.FreeCap)) {
a.lastCompactAttempt = time.Now()
err := a.compact(s)
if err != nil {
Expand All @@ -285,9 +277,9 @@ func (a *autoscaler) compact(s *st.State) error {
for i := len(placements) - 1; i >= 0; i-- { //start from the last placement
ordinal := st.OrdinalFromPodName(placements[i].PodName)

if ordinal == s.LastOrdinal {
if ordinal >= s.Replicas {
pod, err = s.PodLister.Get(placements[i].PodName)
if err != nil {
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err)
}

Expand Down
3 changes: 1 addition & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ k8s.io/utils/pointer
k8s.io/utils/ptr
k8s.io/utils/strings/slices
k8s.io/utils/trace
# knative.dev/eventing v0.42.4-0.20241030064549-19e4c4bddaca
# knative.dev/eventing v0.42.6-0.20241219120210-e387363f837c
## explicit; go 1.22
knative.dev/eventing/cmd/event_display
knative.dev/eventing/cmd/heartbeats
Expand Down Expand Up @@ -1219,7 +1219,6 @@ knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker
knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake
knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger
knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake
knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy
knative.dev/eventing/pkg/client/injection/informers/factory
knative.dev/eventing/pkg/client/injection/informers/factory/fake
knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription
Expand Down

0 comments on commit ca3965f

Please sign in to comment.