Skip to content

Feat/remove request queued hooks #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,6 @@ func (c *Channels) Accept(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Accept)
}

func (c *Channels) TransferRequestQueued(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.TransferRequestQueued)
}

// Restart marks a data transfer as restarted
func (c *Channels) Restart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Restart)
Expand Down
7 changes: 0 additions & 7 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ var ChannelEvents = fsm.Events{
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.TransferRequestQueued).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
chst.Message = ""
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.Restart).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
chst.Message = ""
chst.AddLog("")
Expand Down
11 changes: 0 additions & 11 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,6 @@ func TestChannels(t *testing.T) {
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
})

t.Run("transfer queued", func(t *testing.T) {
state, err := channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
require.NoError(t, err)
require.Equal(t, state.Status(), datatransfer.Ongoing)

err = channelList.TransferRequestQueued(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.TransferRequestQueued)
require.Equal(t, state.Status(), datatransfer.Ongoing)
})

t.Run("updating send/receive values", func(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
dir := os.TempDir()
Expand Down
4 changes: 0 additions & 4 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ const (
// ReceiveDataError indicates that the transport layer had an error
// receiving data from the remote peer
ReceiveDataError

// TransferRequestQueued indicates that a new data transfer request has been queued in the transport layer
TransferRequestQueued
)

// Events are human readable names for data transfer events
Expand Down Expand Up @@ -137,7 +134,6 @@ var Events = map[EventCode]string{
RequestTimedOut: "RequestTimedOut",
SendDataError: "SendDataError",
ReceiveDataError: "ReceiveDataError",
TransferRequestQueued: "TransferRequestQueued",
}

// Event is a struct containing information about a data transfer event
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.6.4
github.com/ipfs/go-graphsync v0.6.5-0.20210702215108-234bf651a8a7
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.6.4 h1:g6wFRK2BkLPnx8nfoSdnokp5gtpuGyWZjbqI6q3NGb8=
github.com/ipfs/go-graphsync v0.6.4/go.mod h1:5WyaeigpNdpiYQuW2vwpuecOoEfB4h747ZGEOKmAGTg=
github.com/ipfs/go-graphsync v0.6.5-0.20210702215108-234bf651a8a7 h1:IIv1uDZYxWqQ1BIynIn+5MTYdxi2c2+O3GO+uq2t3Hw=
github.com/ipfs/go-graphsync v0.6.5-0.20210702215108-234bf651a8a7/go.mod h1:GdHT8JeuIZ0R4lSjFR16Oe4zPi5dXwKi9zR9ADVlcdk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
Expand Down
4 changes: 0 additions & 4 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra
return nil, nil
}

func (m *manager) OnTransferQueued(chid datatransfer.ChannelID) {
m.channels.TransferRequestQueued(chid)
}

func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error {
if response.IsCancel() {
log.Infof("channel %s: received cancel response, cancelling channel", chid)
Expand Down
9 changes: 0 additions & 9 deletions testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ type FakeGraphSync struct {
OutgoingRequestHook graphsync.OnOutgoingRequestHook
IncomingBlockHook graphsync.OnIncomingBlockHook
OutgoingBlockHook graphsync.OnOutgoingBlockHook
IncomingRequestQueuedHook graphsync.OnIncomingRequestQueuedHook
IncomingRequestHook graphsync.OnIncomingRequestHook
CompletedResponseListener graphsync.OnResponseCompletedListener
RequestUpdatedHook graphsync.OnRequestUpdatedHook
Expand Down Expand Up @@ -287,14 +286,6 @@ func (fgs *FakeGraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingR
}
}

// RegisterIncomingRequestQueuedHook adds a hook that runs when an incoming GS request is queued.
func (fgs *FakeGraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc {
fgs.IncomingRequestQueuedHook = hook
return func() {
fgs.IncomingRequestQueuedHook = nil
}
}

// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (fgs *FakeGraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
fgs.IncomingResponseHook = hook
Expand Down
3 changes: 0 additions & 3 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ type EventsHandler interface {
// OnDataSent is called when we send data for the given channel ID
OnDataSent(chid ChannelID, link ipld.Link, size uint64) error

// OnTransferQueued is called when a new data transfer request is queued in the transport layer.
OnTransferQueued(chid ChannelID)

// OnRequestReceived is called when we receive a new request to send data
// for the given channel ID
// return values are:
Expand Down
35 changes: 0 additions & 35 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
}
t.events = events

t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingRequestQueuedHook(t.gsReqQueuedHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingRequestHook(t.gsReqRecdHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterCompletedResponseListener(t.gsCompletedResponseListener))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingBlockHook(t.gsIncomingBlockHook))
Expand Down Expand Up @@ -449,40 +448,6 @@ func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData
}
}

// gsReqQueuedHook is called when graphsync enqueues an incoming request for data
func (t *Transport) gsReqQueuedHook(p peer.ID, request graphsync.RequestData) {
msg, err := extension.GetTransferData(request, t.supportedExtensions)
if err != nil {
log.Errorf("failed GetTransferData, req=%+v, err=%s", request, err)
}
// extension not found; probably not our request.
if msg == nil {
return
}

var chid datatransfer.ChannelID
if msg.IsRequest() {
// when a data transfer request comes in on graphsync, the remote peer
// initiated a pull
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID}
request := msg.(datatransfer.Request)
if request.IsNew() {
log.Infof("%s, pull request queued, req=%+v", chid, request)
t.events.OnTransferQueued(chid)
}
} else {
// when a data transfer response comes in on graphsync, this node
// initiated a push, and the remote peer responded with a request
// for data
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p}
response := msg.(datatransfer.Response)
if response.IsNew() {
log.Infof("%s, GS pull request queued in response to our push, req=%+v", chid, request)
t.events.OnTransferQueued(chid)
}
}
}

// gsReqRecdHook is called when graphsync receives an incoming request for data
func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
// if this is a push request the sender is us.
Expand Down
45 changes: 0 additions & 45 deletions transport/graphsync/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ func TestManager(t *testing.T) {
require.NoError(t, gsData.outgoingBlockHookActions.TerminationError)
},
},

"incoming gs request with recognized dt response will record outgoing blocks": {
requestConfig: gsRequestConfig{
dtIsResponse: true,
Expand Down Expand Up @@ -486,7 +485,6 @@ func TestManager(t *testing.T) {
require.True(t, events.ChannelCompletedSuccess)
},
},

"recognized incoming request will record unsuccessful request completion": {
responseConfig: gsResponseConfig{
status: graphsync.RequestCompletedPartial,
Expand Down Expand Up @@ -588,35 +586,6 @@ func TestManager(t *testing.T) {
gsData.fgs.AssertNoPauseResponseReceived(t)
},
},

"incoming request can be queued": {
action: func(gsData *harness) {
gsData.incomingRequestQueuedHook()
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.True(t, events.TransferQueuedCalled)
require.Equal(t, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.self, Initiator: gsData.other},
events.TransferQueuedChannelID)
},
},

"incoming request with dtResponse can be queued": {
requestConfig: gsRequestConfig{
dtIsResponse: true,
},
responseConfig: gsResponseConfig{
dtIsResponse: true,
},
action: func(gsData *harness) {
gsData.incomingRequestQueuedHook()
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.True(t, events.TransferQueuedCalled)
require.Equal(t, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self},
events.TransferQueuedChannelID)
},
},

"recognized incoming request can be resumed": {
action: func(gsData *harness) {
gsData.incomingRequestHook()
Expand All @@ -631,7 +600,6 @@ func TestManager(t *testing.T) {
gsData.fgs.AssertResumeResponseReceived(gsData.ctx, t)
},
},

"unrecognized request cannot be resumed": {
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
err := gsData.transport.ResumeChannel(gsData.ctx,
Expand Down Expand Up @@ -1050,9 +1018,6 @@ type fakeEvents struct {
OnReceiveDataErrorCalled bool
OnReceiveDataErrorChannelID datatransfer.ChannelID

TransferQueuedCalled bool
TransferQueuedChannelID datatransfer.ChannelID

ChannelCompletedSuccess bool
RequestReceivedRequest datatransfer.Request
RequestReceivedResponse datatransfer.Response
Expand All @@ -1072,11 +1037,6 @@ func (fe *fakeEvents) OnRequestTimedOut(chid datatransfer.ChannelID, err error)
return nil
}

func (fe *fakeEvents) OnTransferQueued(chid datatransfer.ChannelID) {
fe.TransferQueuedCalled = true
fe.TransferQueuedChannelID = chid
}

func (fe *fakeEvents) OnRequestDisconnected(chid datatransfer.ChannelID, err error) error {
return nil
}
Expand Down Expand Up @@ -1169,11 +1129,6 @@ func (ha *harness) outgoingBlockHook() {
func (ha *harness) incomingRequestHook() {
ha.fgs.IncomingRequestHook(ha.other, ha.request, ha.incomingRequestHookActions)
}

func (ha *harness) incomingRequestQueuedHook() {
ha.fgs.IncomingRequestQueuedHook(ha.other, ha.request)
}

func (ha *harness) requestUpdatedHook() {
ha.fgs.RequestUpdatedHook(ha.other, ha.request, ha.updatedRequest, ha.requestUpdatedHookActions)
}
Expand Down