From 3eb7a41a251809948ab2eb4f87932f0f07872dad Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 6 Mar 2023 09:18:03 -0800 Subject: [PATCH] test(race): fix races in tests --- impl/initiating_test.go | 8 ++++ impl/responding_test.go | 2 + itest/integration_test.go | 44 ++++++++++++++---- testutil/faketransport.go | 16 +++++++ testutil/testnet.go | 10 ++-- testutil/testutil.go | 7 ++- transport/graphsync/graphsync_test.go | 66 +++++++++++++++++++++------ 7 files changed, 126 insertions(+), 27 deletions(-) diff --git a/impl/initiating_test.go b/impl/initiating_test.go index cd608c0..ad7ca23 100644 --- a/impl/initiating_test.go +++ b/impl/initiating_test.go @@ -205,9 +205,13 @@ func TestDataTransferInitiating(t *testing.T) { require.Equal(t, h.transport.ClosedChannels[0], channelID) require.Eventually(t, func() bool { + h.network.SentMessagesLk.Lock() + defer h.network.SentMessagesLk.Unlock() return len(h.network.SentMessages) == 2 }, 5*time.Second, 200*time.Millisecond) + h.network.SentMessagesLk.Lock() cancelMessage := h.network.SentMessages[1].Message + h.network.SentMessagesLk.Unlock() require.False(t, cancelMessage.IsUpdate()) require.False(t, cancelMessage.IsPaused()) require.True(t, cancelMessage.IsRequest()) @@ -261,10 +265,14 @@ func TestDataTransferInitiating(t *testing.T) { require.Equal(t, h.transport.ClosedChannels[0], channelID) require.Eventually(t, func() bool { + h.network.SentMessagesLk.Lock() + defer h.network.SentMessagesLk.Unlock() return len(h.network.SentMessages) == 1 }, 5*time.Second, 200*time.Millisecond) + h.network.SentMessagesLk.Lock() cancelMessage := h.network.SentMessages[0].Message + h.network.SentMessagesLk.Unlock() require.False(t, cancelMessage.IsUpdate()) require.False(t, cancelMessage.IsPaused()) require.True(t, cancelMessage.IsRequest()) diff --git a/impl/responding_test.go b/impl/responding_test.go index 1ee5fd2..dc29c9f 100644 --- a/impl/responding_test.go +++ b/impl/responding_test.go @@ -352,6 +352,8 @@ func TestDataTransferResponding(t *testing.T) { h.network.Delegate.ReceiveRequest(h.ctx, h.peers[1], h.pushRequest) _, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.cancelUpdate) require.NoError(t, err) + h.transport.TransportLk.Lock() + defer h.transport.TransportLk.Unlock() require.Len(t, h.transport.CleanedUpChannels, 1) require.Equal(t, channelID(h.id, h.peers), h.transport.CleanedUpChannels[0]) }, diff --git a/itest/integration_test.go b/itest/integration_test.go index 99691c6..4e51bcf 100644 --- a/itest/integration_test.go +++ b/itest/integration_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "math/rand" + "sync" "testing" "time" @@ -1431,6 +1432,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) { require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt2) var chid datatransfer.ChannelID + var chidLk sync.Mutex errChan := make(chan struct{}, 2) clientPausePoint := 0 clientFinished := make(chan struct{}, 1) @@ -1471,7 +1473,9 @@ func TestSimulatedRetrievalFlow(t *testing.T) { timer := time.NewTimer(config.unpauseResponderDelay) go func() { <-timer.C + chidLk.Lock() _ = dt1.ResumeDataTransferChannel(ctx, chid) + chidLk.Unlock() }() } if event.Code == datatransfer.NewVoucher && channelState.Queued() > 0 { @@ -1496,7 +1500,9 @@ func TestSimulatedRetrievalFlow(t *testing.T) { require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) + chidLk.Lock() chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) + chidLk.Unlock() require.NoError(t, err) for providerFinished != nil || clientFinished != nil { @@ -2129,6 +2135,7 @@ func TestMultipleMessagesInExtension(t *testing.T) { testutil.StartAndWaitForReady(ctx, t, dt2) var chid datatransfer.ChannelID + var chidLk sync.Mutex errChan := make(chan struct{}, 2) clientPausePoint := 0 @@ -2155,13 +2162,15 @@ func TestMultipleMessagesInExtension(t *testing.T) { finalVoucherResult := testutil.NewTestTypedVoucher() dt2.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + chidLk.Lock() + currentChid := chid + chidLk.Unlock() if event.Code == datatransfer.Error { errChan <- struct{}{} } // Here we verify reception of voucherResults by the client if event.Code == datatransfer.NewVoucherResult { voucherResult := channelState.LastVoucherResult() - require.NoError(t, err) // If this voucher result is the response voucher no action is needed // we just know that the provider has accepted the transfer and is sending blocks @@ -2174,7 +2183,7 @@ func TestMultipleMessagesInExtension(t *testing.T) { // to revalidate and unpause the transfer if clientPausePoint < 5 { if voucherResult.Equals(voucherResults[clientPausePoint]) { - _ = dt2.SendVoucher(ctx, chid, testutil.NewTestTypedVoucher()) + _ = dt2.SendVoucher(ctx, currentChid, testutil.NewTestTypedVoucher()) clientPausePoint++ } } @@ -2182,7 +2191,7 @@ func TestMultipleMessagesInExtension(t *testing.T) { // If this voucher result is the final voucher result we need // to send a new voucher to unpause the provider and complete the transfer if voucherResult.Equals(finalVoucherResult) { - _ = dt2.SendVoucher(ctx, chid, testutil.NewTestTypedVoucher()) + _ = dt2.SendVoucher(ctx, currentChid, testutil.NewTestTypedVoucher()) } } @@ -2200,6 +2209,9 @@ func TestMultipleMessagesInExtension(t *testing.T) { initialVoucherResult: &respVoucher, } dt1.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + chidLk.Lock() + currentChid := chid + chidLk.Unlock() if event.Code == datatransfer.Error { errChan <- struct{}{} } @@ -2208,23 +2220,25 @@ func TestMultipleMessagesInExtension(t *testing.T) { } if event.Code == datatransfer.NewVoucher && channelState.Queued() > 0 { vs := sv.nextStatus() - dt1.UpdateValidationStatus(ctx, chid, vs) + dt1.UpdateValidationStatus(ctx, currentChid, vs) } if event.Code == datatransfer.DataLimitExceeded { if nextVoucherResult < len(pausePoints) { - dt1.SendVoucherResult(ctx, chid, voucherResults[nextVoucherResult]) + dt1.SendVoucherResult(ctx, currentChid, voucherResults[nextVoucherResult]) nextVoucherResult++ } } if event.Code == datatransfer.BeginFinalizing { sv.requiresFinalization = false - dt1.SendVoucherResult(ctx, chid, finalVoucherResult) + dt1.SendVoucherResult(ctx, currentChid, finalVoucherResult) } }) require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) voucher := testutil.NewTestTypedVoucherWith("applesauce") + chidLk.Lock() chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) + chidLk.Unlock() require.NoError(t, err) // Expect the client to receive a response voucher, the provider to complete the transfer and @@ -2299,10 +2313,14 @@ func TestMultipleParallelTransfers(t *testing.T) { clientFinished := make(chan struct{}, 1) var chid datatransfer.ChannelID + var chidLk sync.Mutex chidReceived := make(chan struct{}) dt2.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { <-chidReceived - if chid != channelState.ChannelID() { + chidLk.Lock() + currentChid := chid + chidLk.Unlock() + if currentChid != channelState.ChannelID() { return } if event.Code == datatransfer.Error { @@ -2324,7 +2342,7 @@ func TestMultipleParallelTransfers(t *testing.T) { // If this voucher result is the final voucher result we need // to send a new voucher to unpause the provider and complete the transfer if voucherResult.Equals(finalVoucherResult) { - _ = dt2.SendVoucher(ctx, chid, testutil.NewTestTypedVoucher()) + _ = dt2.SendVoucher(ctx, currentChid, testutil.NewTestTypedVoucher()) } } @@ -2336,7 +2354,10 @@ func TestMultipleParallelTransfers(t *testing.T) { providerFinished := make(chan struct{}, 1) dt1.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { <-chidReceived - if chid != channelState.ChannelID() { + chidLk.Lock() + currentChid := chid + chidLk.Unlock() + if currentChid != channelState.ChannelID() { return } if event.Code == datatransfer.Error { @@ -2346,7 +2367,7 @@ func TestMultipleParallelTransfers(t *testing.T) { providerFinished <- struct{}{} } if event.Code == datatransfer.BeginFinalizing { - dt1.SendVoucherResult(ctx, chid, finalVoucherResult) + dt1.SendVoucherResult(ctx, currentChid, finalVoucherResult) } }) @@ -2354,7 +2375,10 @@ func TestMultipleParallelTransfers(t *testing.T) { rootCid := root.(cidlink.Link).Cid voucher := testutil.NewTestTypedVoucher() + chidLk.Lock() + var err error chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) + chidLk.Unlock() require.NoError(t, err) close(chidReceived) // Expect the client to receive a response voucher, the provider to complete the transfer and diff --git a/testutil/faketransport.go b/testutil/faketransport.go index 08ce64b..90a17a3 100644 --- a/testutil/faketransport.go +++ b/testutil/faketransport.go @@ -3,6 +3,7 @@ package testutil import ( "context" "errors" + "sync" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" @@ -29,6 +30,7 @@ type ResumedChannel struct { // FakeTransport is a fake transport with mocked results type FakeTransport struct { + TransportLk sync.Mutex OpenedChannels []OpenedChannel OpenChannelErr error ClosedChannels []datatransfer.ChannelID @@ -54,18 +56,24 @@ func NewFakeTransport() *FakeTransport { // request is push or pull -- OpenChannel is called by the party that is // intending to receive data func (ft *FakeTransport) OpenChannel(ctx context.Context, dataSender peer.ID, channelID datatransfer.ChannelID, root ipld.Link, stor datamodel.Node, channel datatransfer.ChannelState, msg datatransfer.Message) error { + ft.TransportLk.Lock() + defer ft.TransportLk.Unlock() ft.OpenedChannels = append(ft.OpenedChannels, OpenedChannel{dataSender, channelID, root, stor, channel, msg}) return ft.OpenChannelErr } // CloseChannel closes the given channel func (ft *FakeTransport) CloseChannel(ctx context.Context, chid datatransfer.ChannelID) error { + ft.TransportLk.Lock() + defer ft.TransportLk.Unlock() ft.ClosedChannels = append(ft.ClosedChannels, chid) return ft.CloseChannelErr } // SetEventHandler sets the handler for events on channels func (ft *FakeTransport) SetEventHandler(events datatransfer.EventsHandler) error { + ft.TransportLk.Lock() + defer ft.TransportLk.Unlock() ft.EventHandler = events return ft.SetEventHandlerErr } @@ -76,18 +84,24 @@ func (ft *FakeTransport) Shutdown(ctx context.Context) error { // PauseChannel paused the given channel ID func (ft *FakeTransport) PauseChannel(ctx context.Context, chid datatransfer.ChannelID) error { + ft.TransportLk.Lock() + defer ft.TransportLk.Unlock() ft.PausedChannels = append(ft.PausedChannels, chid) return ft.PauseChannelErr } // ResumeChannel resumes the given channel func (ft *FakeTransport) ResumeChannel(ctx context.Context, msg datatransfer.Message, chid datatransfer.ChannelID) error { + ft.TransportLk.Lock() + defer ft.TransportLk.Unlock() ft.ResumedChannels = append(ft.ResumedChannels, ResumedChannel{chid, msg}) return ft.ResumeChannelErr } // CleanupChannel cleans up the given channel func (ft *FakeTransport) CleanupChannel(chid datatransfer.ChannelID) { + ft.TransportLk.Lock() + defer ft.TransportLk.Unlock() ft.CleanedUpChannels = append(ft.CleanedUpChannels, chid) } @@ -97,6 +111,8 @@ func RecordCustomizedTransfer() datatransfer.TransportOption { if !ok { return errors.New("incorrect transport") } + ft.TransportLk.Lock() + defer ft.TransportLk.Unlock() ft.CustomizedTransfers = append(ft.CustomizedTransfers, chid) return nil } diff --git a/testutil/testnet.go b/testutil/testnet.go index 400616e..a7e7ccd 100644 --- a/testutil/testnet.go +++ b/testutil/testnet.go @@ -2,6 +2,7 @@ package testutil import ( "context" + "sync" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -19,9 +20,10 @@ type FakeSentMessage struct { // FakeNetwork is a network that satisfies the DataTransferNetwork interface but // does not actually do anything type FakeNetwork struct { - PeerID peer.ID - SentMessages []FakeSentMessage - Delegate network.Receiver + PeerID peer.ID + SentMessagesLk sync.Mutex + SentMessages []FakeSentMessage + Delegate network.Receiver } // NewFakeNetwork returns a new fake data transfer network instance @@ -33,6 +35,8 @@ var _ network.DataTransferNetwork = (*FakeNetwork)(nil) // SendMessage sends a GraphSync message to a peer. func (fn *FakeNetwork) SendMessage(ctx context.Context, p peer.ID, m datatransfer.Message) error { + fn.SentMessagesLk.Lock() + defer fn.SentMessagesLk.Unlock() fn.SentMessages = append(fn.SentMessages, FakeSentMessage{p, m}) return nil } diff --git a/testutil/testutil.go b/testutil/testutil.go index 510be37..7c4879c 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sync" "testing" blocks "github.com/ipfs/go-block-format" @@ -24,12 +25,16 @@ var blockGenerator = blocksutil.NewBlockGenerator() // var prioritySeq int var seedSeq int64 +var seedLock sync.Mutex // RandomBytes returns a byte array of the given size with random values. func RandomBytes(n int64) []byte { data := new(bytes.Buffer) - random.WritePseudoRandomBytes(n, data, seedSeq) // nolint: gosec,errcheck + seedLock.Lock() + currentSeedSeq := seedSeq seedSeq++ + seedLock.Unlock() + random.WritePseudoRandomBytes(n, data, currentSeedSeq) // nolint: gosec,errcheck return data.Bytes() } diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index 2c0cb8b..4bb881b 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -5,6 +5,7 @@ import ( "errors" "io" "math/rand" + "sync" "testing" "time" @@ -31,7 +32,7 @@ func TestManager(t *testing.T) { requestConfig gsRequestConfig responseConfig gsResponseConfig updatedConfig gsRequestConfig - events fakeEvents + events *fakeEvents action func(gsData *harness) check func(t *testing.T, events *fakeEvents, gsData *harness) protocol protocol.ID @@ -76,7 +77,7 @@ func TestManager(t *testing.T) { }, }, "gs request unrecognized opened channel will not record incoming blocks": { - events: fakeEvents{ + events: &fakeEvents{ OnChannelOpenedError: errors.New("Not recognized"), }, action: func(gsData *harness) { @@ -90,7 +91,7 @@ func TestManager(t *testing.T) { }, }, "gs incoming block with data receive error will halt request": { - events: fakeEvents{ + events: &fakeEvents{ OnDataReceivedError: errors.New("something went wrong"), }, action: func(gsData *harness) { @@ -206,7 +207,7 @@ func TestManager(t *testing.T) { }, }, "outgoing gs request with recognized dt response can send message on update": { - events: fakeEvents{ + events: &fakeEvents{ RequestReceivedResponse: testutil.NewDTResponse(t, datatransfer.TransferID(rand.Uint32())), }, requestConfig: gsRequestConfig{ @@ -229,7 +230,7 @@ func TestManager(t *testing.T) { requestConfig: gsRequestConfig{ dtIsResponse: true, }, - events: fakeEvents{ + events: &fakeEvents{ OnRequestReceivedErrors: []error{errors.New("something went wrong")}, }, action: func(gsData *harness) { @@ -246,7 +247,7 @@ func TestManager(t *testing.T) { action: func(gsData *harness) { gsData.incomingRequestHook() }, - events: fakeEvents{ + events: &fakeEvents{ RequestReceivedResponse: testutil.NewDTResponse(t, datatransfer.TransferID(rand.Uint32())), }, check: func(t *testing.T, events *fakeEvents, gsData *harness) { @@ -301,7 +302,7 @@ func TestManager(t *testing.T) { }, }, "unrecognized incoming dt request will terminate but send response": { - events: fakeEvents{ + events: &fakeEvents{ RequestReceivedResponse: testutil.NewDTResponse(t, datatransfer.TransferID(rand.Uint32())), OnRequestReceivedErrors: []error{errors.New("something went wrong")}, }, @@ -359,7 +360,7 @@ func TestManager(t *testing.T) { }, }, "outgoing data queued error will terminate request": { - events: fakeEvents{ + events: &fakeEvents{ OnDataQueuedError: errors.New("something went wrong"), }, action: func(gsData *harness) { @@ -373,7 +374,7 @@ func TestManager(t *testing.T) { }, }, "outgoing data queued error == pause will pause request": { - events: fakeEvents{ + events: &fakeEvents{ OnDataQueuedError: datatransfer.ErrPause, }, action: func(gsData *harness) { @@ -392,7 +393,7 @@ func TestManager(t *testing.T) { gsData.incomingRequestHook() gsData.outgoingBlockHook() }, - events: fakeEvents{ + events: &fakeEvents{ OnDataQueuedMessage: testutil.NewDTResponse(t, datatransfer.TransferID(rand.Uint32())), }, check: func(t *testing.T, events *fakeEvents, gsData *harness) { @@ -484,7 +485,7 @@ func TestManager(t *testing.T) { }, }, "incoming gs request with recognized dt request can send message on update": { - events: fakeEvents{ + events: &fakeEvents{ RequestReceivedResponse: testutil.NewDTResponse(t, datatransfer.TransferID(rand.Uint32())), }, action: func(gsData *harness) { @@ -830,8 +831,12 @@ func TestManager(t *testing.T) { close(requestReceived.ResponseErrChan) require.Eventually(t, func() bool { + events.DataLk.Lock() + defer events.DataLk.Unlock() return events.OnChannelCompletedCalled == true }, 2*time.Second, 100*time.Millisecond) + events.DataLk.Lock() + defer events.DataLk.Unlock() require.True(t, events.ChannelCompletedSuccess) }, }, @@ -857,8 +862,12 @@ func TestManager(t *testing.T) { close(requestReceived.ResponseErrChan) require.Eventually(t, func() bool { + events.DataLk.Lock() + defer events.DataLk.Unlock() return events.OnChannelCompletedCalled == true }, 2*time.Second, 100*time.Millisecond) + events.DataLk.Lock() + defer events.DataLk.Unlock() require.False(t, events.ChannelCompletedSuccess) }, }, @@ -913,8 +922,12 @@ func TestManager(t *testing.T) { close(requestReceived.ResponseErrChan) require.Eventually(t, func() bool { + events.DataLk.Lock() + defer events.DataLk.Unlock() return events.OnRequestCancelledCalled == true }, 2*time.Second, 100*time.Millisecond) + events.DataLk.Lock() + defer events.DataLk.Unlock() require.Equal(t, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, events.OnRequestCancelledChannelId) }, }, @@ -1040,6 +1053,10 @@ func TestManager(t *testing.T) { t.Run(testCase, func(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() + events := data.events + if events == nil { + events = &fakeEvents{} + } peers := testutil.GeneratePeers(2) transferID := datatransfer.TransferID(rand.Uint32()) requestID := graphsync.NewRequestID() @@ -1073,16 +1090,17 @@ func TestManager(t *testing.T) { requestUpdatedHookActions: &testharness.FakeRequestUpdatedActions{}, incomingResponseHookActions: &testharness.FakeIncomingResponseHookActions{}, } - require.NoError(t, transport.SetEventHandler(&data.events)) + require.NoError(t, transport.SetEventHandler(events)) if data.action != nil { data.action(gsData) } - data.check(t, &data.events, gsData) + data.check(t, events, gsData) }) } } type fakeEvents struct { + DataLk sync.Mutex ChannelOpenedChannelID datatransfer.ChannelID RequestReceivedChannelID datatransfer.ChannelID ResponseReceivedChannelID datatransfer.ChannelID @@ -1117,12 +1135,16 @@ type fakeEvents struct { } func (fe *fakeEvents) OnDataQueued(chid datatransfer.ChannelID, link ipld.Link, size uint64, index int64, unique bool) (datatransfer.Message, error) { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.OnDataQueuedCalled = true return fe.OnDataQueuedMessage, fe.OnDataQueuedError } func (fe *fakeEvents) OnRequestCancelled(chid datatransfer.ChannelID, err error) error { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.OnRequestCancelledCalled = true fe.OnRequestCancelledChannelId = chid @@ -1130,6 +1152,8 @@ func (fe *fakeEvents) OnRequestCancelled(chid datatransfer.ChannelID, err error) } func (fe *fakeEvents) OnTransferInitiated(chid datatransfer.ChannelID) { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.TransferInitiatedCalled = true fe.TransferInitiatedChannelID = chid } @@ -1139,33 +1163,45 @@ func (fe *fakeEvents) OnRequestDisconnected(chid datatransfer.ChannelID, err err } func (fe *fakeEvents) OnSendDataError(chid datatransfer.ChannelID, err error) error { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.OnSendDataErrorCalled = true fe.OnSendDataErrorChannelID = chid return nil } func (fe *fakeEvents) OnReceiveDataError(chid datatransfer.ChannelID, err error) error { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.OnReceiveDataErrorCalled = true fe.OnReceiveDataErrorChannelID = chid return nil } func (fe *fakeEvents) OnChannelOpened(chid datatransfer.ChannelID) error { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.ChannelOpenedChannelID = chid return fe.OnChannelOpenedError } func (fe *fakeEvents) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, size uint64, index int64, unique bool) error { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.OnDataReceivedCalled = true return fe.OnDataReceivedError } func (fe *fakeEvents) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64, index int64, unique bool) error { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.OnDataSentCalled = true return nil } func (fe *fakeEvents) OnRequestReceived(chid datatransfer.ChannelID, request datatransfer.Request) (datatransfer.Response, error) { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.OnRequestReceivedCallCount++ fe.RequestReceivedChannelID = chid fe.RequestReceivedRequest = request @@ -1177,6 +1213,8 @@ func (fe *fakeEvents) OnRequestReceived(chid datatransfer.ChannelID, request dat } func (fe *fakeEvents) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.OnResponseReceivedCallCount++ fe.ResponseReceivedResponse = response fe.ResponseReceivedChannelID = chid @@ -1188,6 +1226,8 @@ func (fe *fakeEvents) OnResponseReceived(chid datatransfer.ChannelID, response d } func (fe *fakeEvents) OnChannelCompleted(chid datatransfer.ChannelID, completeErr error) error { + fe.DataLk.Lock() + defer fe.DataLk.Unlock() fe.OnChannelCompletedCalled = true fe.ChannelCompletedSuccess = completeErr == nil return fe.OnChannelCompletedErr