From 09f48bc9374c921058f5680605432b720703a0da Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 2 Jul 2021 14:55:09 -0700 Subject: [PATCH 1/2] fix(deps): update graphsync --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index cf61f158..74269fe5 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 github.com/ipfs/go-ds-badger v0.2.3 - github.com/ipfs/go-graphsync v0.6.4 + github.com/ipfs/go-graphsync v0.6.5-0.20210702215108-234bf651a8a7 github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index d76c44e5..a7400a34 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.6.4 h1:g6wFRK2BkLPnx8nfoSdnokp5gtpuGyWZjbqI6q3NGb8= -github.com/ipfs/go-graphsync v0.6.4/go.mod h1:5WyaeigpNdpiYQuW2vwpuecOoEfB4h747ZGEOKmAGTg= +github.com/ipfs/go-graphsync v0.6.5-0.20210702215108-234bf651a8a7 h1:IIv1uDZYxWqQ1BIynIn+5MTYdxi2c2+O3GO+uq2t3Hw= +github.com/ipfs/go-graphsync v0.6.5-0.20210702215108-234bf651a8a7/go.mod h1:GdHT8JeuIZ0R4lSjFR16Oe4zPi5dXwKi9zR9ADVlcdk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ= From 2b03c2747d908d0e48769dee88ee5e9d9ef33986 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 2 Jul 2021 14:55:57 -0700 Subject: [PATCH 2/2] Revert "Fire a transfer queued event when a transfer is queued in Graphsync (#221)" This reverts commit 7794046e5c17b37a7792a3cf643893faad5431ee. --- channels/channels.go | 4 --- channels/channels_fsm.go | 7 ----- channels/channels_test.go | 11 ------- events.go | 4 --- impl/events.go | 4 --- testutil/fakegraphsync.go | 9 ------ transport.go | 3 -- transport/graphsync/graphsync.go | 35 --------------------- transport/graphsync/graphsync_test.go | 45 --------------------------- 9 files changed, 122 deletions(-) diff --git a/channels/channels.go b/channels/channels.go index ab4dc866..1e80f3d2 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -215,10 +215,6 @@ func (c *Channels) Accept(chid datatransfer.ChannelID) error { return c.send(chid, datatransfer.Accept) } -func (c *Channels) TransferRequestQueued(chid datatransfer.ChannelID) error { - return c.send(chid, datatransfer.TransferRequestQueued) -} - // Restart marks a data transfer as restarted func (c *Channels) Restart(chid datatransfer.ChannelID) error { return c.send(chid, datatransfer.Restart) diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 02768eee..79711ebc 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -34,13 +34,6 @@ var ChannelEvents = fsm.Events{ chst.AddLog("") return nil }), - - fsm.Event(datatransfer.TransferRequestQueued).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { - chst.Message = "" - chst.AddLog("") - return nil - }), - fsm.Event(datatransfer.Restart).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { chst.Message = "" chst.AddLog("") diff --git a/channels/channels_test.go b/channels/channels_test.go index 82c8bc6b..eaf9e7ab 100644 --- a/channels/channels_test.go +++ b/channels/channels_test.go @@ -127,17 +127,6 @@ func TestChannels(t *testing.T) { require.True(t, xerrors.As(err, new(*channels.ErrNotFound))) }) - t.Run("transfer queued", func(t *testing.T) { - state, err := channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}) - require.NoError(t, err) - require.Equal(t, state.Status(), datatransfer.Ongoing) - - err = channelList.TransferRequestQueued(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}) - require.NoError(t, err) - state = checkEvent(ctx, t, received, datatransfer.TransferRequestQueued) - require.Equal(t, state.Status(), datatransfer.Ongoing) - }) - t.Run("updating send/receive values", func(t *testing.T) { ds := dss.MutexWrap(datastore.NewMapDatastore()) dir := os.TempDir() diff --git a/events.go b/events.go index 12158565..5ec62fd4 100644 --- a/events.go +++ b/events.go @@ -102,9 +102,6 @@ const ( // ReceiveDataError indicates that the transport layer had an error // receiving data from the remote peer ReceiveDataError - - // TransferRequestQueued indicates that a new data transfer request has been queued in the transport layer - TransferRequestQueued ) // Events are human readable names for data transfer events @@ -137,7 +134,6 @@ var Events = map[EventCode]string{ RequestTimedOut: "RequestTimedOut", SendDataError: "SendDataError", ReceiveDataError: "ReceiveDataError", - TransferRequestQueued: "TransferRequestQueued", } // Event is a struct containing information about a data transfer event diff --git a/impl/events.go b/impl/events.go index 13b972ae..377d6203 100644 --- a/impl/events.go +++ b/impl/events.go @@ -160,10 +160,6 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra return nil, nil } -func (m *manager) OnTransferQueued(chid datatransfer.ChannelID) { - m.channels.TransferRequestQueued(chid) -} - func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error { if response.IsCancel() { log.Infof("channel %s: received cancel response, cancelling channel", chid) diff --git a/testutil/fakegraphsync.go b/testutil/fakegraphsync.go index 0348b44e..32f4ba18 100644 --- a/testutil/fakegraphsync.go +++ b/testutil/fakegraphsync.go @@ -106,7 +106,6 @@ type FakeGraphSync struct { OutgoingRequestHook graphsync.OnOutgoingRequestHook IncomingBlockHook graphsync.OnIncomingBlockHook OutgoingBlockHook graphsync.OnOutgoingBlockHook - IncomingRequestQueuedHook graphsync.OnIncomingRequestQueuedHook IncomingRequestHook graphsync.OnIncomingRequestHook CompletedResponseListener graphsync.OnResponseCompletedListener RequestUpdatedHook graphsync.OnRequestUpdatedHook @@ -287,14 +286,6 @@ func (fgs *FakeGraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingR } } -// RegisterIncomingRequestQueuedHook adds a hook that runs when an incoming GS request is queued. -func (fgs *FakeGraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc { - fgs.IncomingRequestQueuedHook = hook - return func() { - fgs.IncomingRequestQueuedHook = nil - } -} - // RegisterIncomingResponseHook adds a hook that runs when a response is received func (fgs *FakeGraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc { fgs.IncomingResponseHook = hook diff --git a/transport.go b/transport.go index 7fbee093..275b7ab5 100644 --- a/transport.go +++ b/transport.go @@ -39,9 +39,6 @@ type EventsHandler interface { // OnDataSent is called when we send data for the given channel ID OnDataSent(chid ChannelID, link ipld.Link, size uint64) error - // OnTransferQueued is called when a new data transfer request is queued in the transport layer. - OnTransferQueued(chid ChannelID) - // OnRequestReceived is called when we receive a new request to send data // for the given channel ID // return values are: diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 34cd8d1f..ca65da50 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -285,7 +285,6 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error { } t.events = events - t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingRequestQueuedHook(t.gsReqQueuedHook)) t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingRequestHook(t.gsReqRecdHook)) t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterCompletedResponseListener(t.gsCompletedResponseListener)) t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingBlockHook(t.gsIncomingBlockHook)) @@ -449,40 +448,6 @@ func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData } } -// gsReqQueuedHook is called when graphsync enqueues an incoming request for data -func (t *Transport) gsReqQueuedHook(p peer.ID, request graphsync.RequestData) { - msg, err := extension.GetTransferData(request, t.supportedExtensions) - if err != nil { - log.Errorf("failed GetTransferData, req=%+v, err=%s", request, err) - } - // extension not found; probably not our request. - if msg == nil { - return - } - - var chid datatransfer.ChannelID - if msg.IsRequest() { - // when a data transfer request comes in on graphsync, the remote peer - // initiated a pull - chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID} - request := msg.(datatransfer.Request) - if request.IsNew() { - log.Infof("%s, pull request queued, req=%+v", chid, request) - t.events.OnTransferQueued(chid) - } - } else { - // when a data transfer response comes in on graphsync, this node - // initiated a push, and the remote peer responded with a request - // for data - chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p} - response := msg.(datatransfer.Response) - if response.IsNew() { - log.Infof("%s, GS pull request queued in response to our push, req=%+v", chid, request) - t.events.OnTransferQueued(chid) - } - } -} - // gsReqRecdHook is called when graphsync receives an incoming request for data func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { // if this is a push request the sender is us. diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index 3916c1d7..c6a832ce 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -304,7 +304,6 @@ func TestManager(t *testing.T) { require.NoError(t, gsData.outgoingBlockHookActions.TerminationError) }, }, - "incoming gs request with recognized dt response will record outgoing blocks": { requestConfig: gsRequestConfig{ dtIsResponse: true, @@ -486,7 +485,6 @@ func TestManager(t *testing.T) { require.True(t, events.ChannelCompletedSuccess) }, }, - "recognized incoming request will record unsuccessful request completion": { responseConfig: gsResponseConfig{ status: graphsync.RequestCompletedPartial, @@ -588,35 +586,6 @@ func TestManager(t *testing.T) { gsData.fgs.AssertNoPauseResponseReceived(t) }, }, - - "incoming request can be queued": { - action: func(gsData *harness) { - gsData.incomingRequestQueuedHook() - }, - check: func(t *testing.T, events *fakeEvents, gsData *harness) { - require.True(t, events.TransferQueuedCalled) - require.Equal(t, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.self, Initiator: gsData.other}, - events.TransferQueuedChannelID) - }, - }, - - "incoming request with dtResponse can be queued": { - requestConfig: gsRequestConfig{ - dtIsResponse: true, - }, - responseConfig: gsResponseConfig{ - dtIsResponse: true, - }, - action: func(gsData *harness) { - gsData.incomingRequestQueuedHook() - }, - check: func(t *testing.T, events *fakeEvents, gsData *harness) { - require.True(t, events.TransferQueuedCalled) - require.Equal(t, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, - events.TransferQueuedChannelID) - }, - }, - "recognized incoming request can be resumed": { action: func(gsData *harness) { gsData.incomingRequestHook() @@ -631,7 +600,6 @@ func TestManager(t *testing.T) { gsData.fgs.AssertResumeResponseReceived(gsData.ctx, t) }, }, - "unrecognized request cannot be resumed": { check: func(t *testing.T, events *fakeEvents, gsData *harness) { err := gsData.transport.ResumeChannel(gsData.ctx, @@ -1050,9 +1018,6 @@ type fakeEvents struct { OnReceiveDataErrorCalled bool OnReceiveDataErrorChannelID datatransfer.ChannelID - TransferQueuedCalled bool - TransferQueuedChannelID datatransfer.ChannelID - ChannelCompletedSuccess bool RequestReceivedRequest datatransfer.Request RequestReceivedResponse datatransfer.Response @@ -1072,11 +1037,6 @@ func (fe *fakeEvents) OnRequestTimedOut(chid datatransfer.ChannelID, err error) return nil } -func (fe *fakeEvents) OnTransferQueued(chid datatransfer.ChannelID) { - fe.TransferQueuedCalled = true - fe.TransferQueuedChannelID = chid -} - func (fe *fakeEvents) OnRequestDisconnected(chid datatransfer.ChannelID, err error) error { return nil } @@ -1169,11 +1129,6 @@ func (ha *harness) outgoingBlockHook() { func (ha *harness) incomingRequestHook() { ha.fgs.IncomingRequestHook(ha.other, ha.request, ha.incomingRequestHookActions) } - -func (ha *harness) incomingRequestQueuedHook() { - ha.fgs.IncomingRequestQueuedHook(ha.other, ha.request) -} - func (ha *harness) requestUpdatedHook() { ha.fgs.RequestUpdatedHook(ha.other, ha.request, ha.updatedRequest, ha.requestUpdatedHookActions) }