Skip to content

Commit 2cb04b8

Browse files
LukeAVanDrieBenjaminBraunDev
authored andcommitted
Fix(test): resolve data race in StreamedRequest (kubernetes-sigs#1727)
Replaces the boolean flag and timer goroutine with a channel and select statement to prevent a data race on the timeout in integration tests. This ensures safe concurrent access when handling test timeouts.
1 parent 82775c9 commit 2cb04b8

File tree

1 file changed

+32
-18
lines changed

1 file changed

+32
-18
lines changed

test/integration/util.go

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,35 +55,49 @@ func SendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient,
5555
return res, err
5656
}
5757

58-
func StreamedRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, requests []*extProcPb.ProcessingRequest, expectedResponses int) ([]*extProcPb.ProcessingResponse, error) {
58+
// StreamedRequest sends a series of requests and collects the specified number of responses.
59+
func StreamedRequest(
60+
t *testing.T,
61+
client extProcPb.ExternalProcessor_ProcessClient,
62+
requests []*extProcPb.ProcessingRequest,
63+
expectedResponses int,
64+
) ([]*extProcPb.ProcessingResponse, error) {
5965
for _, req := range requests {
6066
t.Logf("Sending request: %v", req)
6167
if err := client.Send(req); err != nil {
6268
t.Logf("Failed to send request %+v: %v", req, err)
6369
return nil, err
6470
}
6571
}
72+
6673
responses := []*extProcPb.ProcessingResponse{}
74+
for i := range expectedResponses {
75+
type recvResult struct {
76+
res *extProcPb.ProcessingResponse
77+
err error
78+
}
79+
recvChan := make(chan recvResult, 1)
6780

68-
// Make an incredible simple timeout func in the case where
69-
// there is less than the expected amount of responses; bail and fail.
70-
var simpleTimeout bool
71-
go func() {
72-
time.Sleep(10 * time.Second)
73-
simpleTimeout = true
74-
}()
81+
go func() {
82+
res, err := client.Recv()
83+
recvChan <- recvResult{res, err}
84+
}()
7585

76-
for range expectedResponses {
77-
if simpleTimeout {
78-
break
79-
}
80-
res, err := client.Recv()
81-
if err != nil && err != io.EOF {
82-
t.Logf("Failed to receive: %v", err)
83-
return nil, err
86+
select {
87+
case <-time.After(10 * time.Second):
88+
t.Logf("Timeout waiting for response %d of %d", i+1, expectedResponses)
89+
return responses, nil
90+
case result := <-recvChan:
91+
if result.err != nil {
92+
if result.err == io.EOF {
93+
return responses, nil
94+
}
95+
t.Logf("Failed to receive: %v", result.err)
96+
return nil, result.err
97+
}
98+
t.Logf("Received response %+v", result.res)
99+
responses = append(responses, result.res)
84100
}
85-
t.Logf("Received response %+v", res)
86-
responses = append(responses, res)
87101
}
88102
return responses, nil
89103
}

0 commit comments

Comments
 (0)