Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use informers for pod events instead of Listing #2177

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 74 additions & 36 deletions clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func createPodStartupLatencyMeasurement() measurement.Measurement {
podStartupEntries: measurementutil.NewObjectTransitionTimes(podStartupLatencyMeasurementName),
podMetadata: measurementutil.NewPodsMetadata(podStartupLatencyMeasurementName),
eventQueue: workqueue.New(),
schedEventQueue: workqueue.New(),
}
}

Expand All @@ -70,12 +71,14 @@ type eventData struct {
}

type podStartupLatencyMeasurement struct {
selector *util.ObjectSelector
isRunning bool
stopCh chan struct{}
selector *util.ObjectSelector
isRunning bool
stopCh chan struct{}
stopSchedCh chan struct{}
// This queue can potentially grow indefinitely, beacause we put all changes here.
// Usually it's not recommended pattern, but we need it for measuring PodStartupLatency.
eventQueue *workqueue.Type
schedEventQueue *workqueue.Type
podStartupEntries *measurementutil.ObjectTransitionTimes
podMetadata *measurementutil.PodsMetadata
threshold time.Duration
Expand All @@ -91,7 +94,7 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me
if err != nil {
return nil, err
}

schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName)
switch action {
case "start":
if err := p.selector.Parse(config.Params); err != nil {
Expand All @@ -101,9 +104,8 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me
if err != nil {
return nil, err
}
return nil, p.start(config.ClusterFramework.GetClientSets().GetClient())
return nil, p.start(config.ClusterFramework.GetClientSets().GetClient(), schedulerName)
case "gather":
schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName)
if err != nil {
return nil, err
}
Expand All @@ -124,7 +126,7 @@ func (p *podStartupLatencyMeasurement) String() string {
return podStartupLatencyMeasurementName + ": " + p.selector.String()
}

func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
func (p *podStartupLatencyMeasurement) start(c clientset.Interface, schedulerName string) error {
if p.isRunning {
klog.V(2).Infof("%s: pod startup latancy measurement already running", p)
return nil
Expand All @@ -146,6 +148,29 @@ func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
p.addEvent,
)
go p.processEvents()

selector := fields.Set{
"involvedObject.kind": "Pod",
"source": schedulerName,
}.AsSelector().String()

p.stopSchedCh = make(chan struct{})

e := informer.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = selector
return c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = selector
return c.CoreV1().Events(p.selector.Namespace).Watch(context.TODO(), options)
},
},
p.addSchedEvent,
)
go p.processSchedEvents()
go e.Run(p.stopSchedCh)
return informer.StartAndSync(i, p.stopCh, informerSyncTimeout)
}

Expand All @@ -154,11 +179,36 @@ func (p *podStartupLatencyMeasurement) addEvent(_, obj interface{}) {
p.eventQueue.Add(event)
}

func (p *podStartupLatencyMeasurement) addSchedEvent(_, obj interface{}) {
event := &eventData{obj: obj, recvTime: time.Now()}
p.schedEventQueue.Add(event)
}

func (p *podStartupLatencyMeasurement) processEvents() {
for p.processNextWorkItem() {
}
}

func (p *podStartupLatencyMeasurement) processSchedEvents() {
for p.processNextSchedWorkItem() {
}
}

func (p *podStartupLatencyMeasurement) processNextSchedWorkItem() bool {
item, quit := p.schedEventQueue.Get()
if quit {
return false
}
defer p.schedEventQueue.Done(item)
event, ok := item.(*eventData)
if !ok {
klog.Warningf("Couldn't convert work item to eventData: %v", item)
return true
}
p.processSchedEvent(event)
return true
}

func (p *podStartupLatencyMeasurement) processNextWorkItem() bool {
item, quit := p.eventQueue.Get()
if quit {
Expand All @@ -179,7 +229,9 @@ func (p *podStartupLatencyMeasurement) stop() {
if p.isRunning {
p.isRunning = false
close(p.stopCh)
close(p.stopSchedCh)
p.eventQueue.ShutDown()
p.schedEventQueue.ShutDown()
}
}

Expand Down Expand Up @@ -230,10 +282,6 @@ func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier

p.stop()

if err := p.gatherScheduleTimes(c, schedulerName); err != nil {
return nil, err
}

checks := []podStartupLatencyCheck{
{
namePrefix: "",
Expand Down Expand Up @@ -270,32 +318,23 @@ func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier
return summaries, err
}

// TODO(#2006): gatherScheduleTimes is currently listing events at the end of the test.
// Given that events by default have 1h TTL, for measurements across longer periods
// it just returns incomplete results.
// Given that we don't 100% accuracy, we should switch to a mechanism that is similar
// to the one that slo-monitor is using (added in #1477).
func (p *podStartupLatencyMeasurement) gatherScheduleTimes(c clientset.Interface, schedulerName string) error {
selector := fields.Set{
"involvedObject.kind": "Pod",
"source": schedulerName,
}.AsSelector().String()
options := metav1.ListOptions{FieldSelector: selector}
schedEvents, err := c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), options)
if err != nil {
return err
func (p *podStartupLatencyMeasurement) processSchedEvent(event *eventData) {

obj := event.obj
if obj == nil {
return
}
for _, event := range schedEvents.Items {
key := createMetaNamespaceKey(event.InvolvedObject.Namespace, event.InvolvedObject.Name)
if _, exists := p.podStartupEntries.Get(key, createPhase); exists {
if !event.EventTime.IsZero() {
p.podStartupEntries.Set(key, schedulePhase, event.EventTime.Time)
} else {
p.podStartupEntries.Set(key, schedulePhase, event.FirstTimestamp.Time)
}
}
e, ok := obj.(*corev1.Event)
if !ok {
return
}
key := createMetaNamespaceKey(e.InvolvedObject.Namespace, e.InvolvedObject.Name)

if !e.EventTime.IsZero() {
p.podStartupEntries.Set(key, schedulePhase, e.EventTime.Time)
} else {
p.podStartupEntries.Set(key, schedulePhase, e.FirstTimestamp.Time)
}
return nil
}

func (p *podStartupLatencyMeasurement) processEvent(event *eventData) {
Expand All @@ -310,7 +349,6 @@ func (p *podStartupLatencyMeasurement) processEvent(event *eventData) {

key := createMetaNamespaceKey(pod.Namespace, pod.Name)
p.podMetadata.SetStateless(key, isPodStateless(pod))

if pod.Status.Phase == corev1.PodRunning {
if _, found := p.podStartupEntries.Get(key, createPhase); !found {
p.podStartupEntries.Set(key, watchPhase, recvTime)
Expand Down