Skip to content

Commit

Permalink
Upgrade tests account for last event being interrupted (knative#7447) (
Browse files Browse the repository at this point in the history
…#458)

* Fix reporting index of the failed step

The warning had a wrong index:
{"level":"info","ts":"2023-11-13T14:00:03.816770975Z","caller":"sender/services.go:207","msg":"Sending
step event #16691 to
\"http://sut-kn-channel.eventing-e2e0.svc.cluster.local\""}
{"level":"warn","ts":"2023-11-13T14:00:03.821102525Z","caller":"sender/services.go:102","msg":"Could
not send step event 16690, retrying (1): Post
\"http://sut-kn-channel.eventing-e2e0.svc.cluster.local\": dial tcp
172.30.99.91:80: connect: connection refused"}

* Upgrade tests account for last event being interrupted

Co-authored-by: Martin Gencur <[email protected]>
  • Loading branch information
openshift-cherrypick-robot and mgencur authored Jan 2, 2024
1 parent 06c1c47 commit 33dfdce
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
6 changes: 5 additions & 1 deletion test/upgrade/prober/wathola/event/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ func (f *finishedStore) RegisterFinished(finished *Finished) {
log.Infof("waiting additional %v to be sure all events came", d)
time.Sleep(d)
receivedEvents := f.steps.Count()
if receivedEvents != finished.EventsSent {

if receivedEvents != finished.EventsSent &&
// If sending was interrupted, tolerate one more received
// event as there's no way to check if the last event is delivered or not.
!(finished.SendingInterrupted && receivedEvents == finished.EventsSent+1) {
f.errors.throwUnexpected("expecting to have %v unique events received, "+
"but received %v unique events", finished.EventsSent, receivedEvents)
f.reportViolations(finished)
Expand Down
1 change: 1 addition & 0 deletions test/upgrade/prober/wathola/event/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Finished struct {
EventsSent int
TotalRequests int
UnavailablePeriods []UnavailablePeriod
SendingInterrupted bool
}

// Type returns a type of a event
Expand Down
12 changes: 10 additions & 2 deletions test/upgrade/prober/wathola/sender/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type sender struct {
totalRequests int
// unavailablePeriods is an array for non-zero retries for each event
unavailablePeriods []event.UnavailablePeriod
// sendingInterrupted indicates whether sending last event was interrupted by shutdown
sendingInterrupted bool
}

func (s *sender) SendContinually() {
Expand Down Expand Up @@ -90,6 +92,7 @@ func (s *sender) SendContinually() {
Period: time.Since(start),
LastErr: err.Error(),
})
s.sendingInterrupted = true
}
return
default:
Expand All @@ -100,7 +103,7 @@ func (s *sender) SendContinually() {
start = time.Now()
}
log.Warnf("Could not send step event %v, retrying (%d): %v",
s.eventsSent, retry, err)
currentStep.Number, retry, err)
retry++
lastErr = err
} else {
Expand Down Expand Up @@ -219,7 +222,12 @@ func (s *sender) sendFinished() {
if s.eventsSent == 0 {
return
}
finished := event.Finished{EventsSent: s.eventsSent, TotalRequests: s.totalRequests, UnavailablePeriods: s.unavailablePeriods}
finished := event.Finished{
EventsSent: s.eventsSent,
TotalRequests: s.totalRequests,
UnavailablePeriods: s.unavailablePeriods,
SendingInterrupted: s.sendingInterrupted,
}
endpoint := senderConfig.Address
ce := NewCloudEvent(finished, event.FinishedType)
ctx, span := PopulateSpanWithEvent(context.Background(), ce, Name)
Expand Down

0 comments on commit 33dfdce

Please sign in to comment.