diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go index 967a55da..c70994c2 100644 --- a/benchmarks/benchmark_test.go +++ b/benchmarks/benchmark_test.go @@ -24,9 +24,7 @@ import ( "github.com/ipfs/go-merkledag" "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - ipldselector "github.com/ipld/go-ipld-prime/traversal/selector" - "github.com/ipld/go-ipld-prime/traversal/selector/builder" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" @@ -77,10 +75,6 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, thisCids := df(ctx, b, instances[:1]) allCids = append(allCids, thisCids...) } - ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) - - allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(), - ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() runtime.GC() b.ResetTimer() @@ -105,7 +99,7 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, timer := time.NewTimer(30 * time.Second) start := time.Now() for j := 0; j < numfiles; j++ { - _, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewTestTypedVoucher(), allCids[j], allSelector) + _, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewTestTypedVoucher(), allCids[j], selectorparse.CommonSelector_ExploreAllRecursively) if err != nil { b.Fatalf("received error on request: %s", err.Error()) } diff --git a/benchmarks/testinstance/testinstance.go b/benchmarks/testinstance/testinstance.go index 24ce4691..bfc72747 100644 --- a/benchmarks/testinstance/testinstance.go +++ b/benchmarks/testinstance/testinstance.go @@ -21,9 +21,9 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer/v2" tn "github.com/filecoin-project/go-data-transfer/v2/benchmarks/testnet" dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl" - dtnet "github.com/filecoin-project/go-data-transfer/v2/network" "github.com/filecoin-project/go-data-transfer/v2/testutil" gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync" + dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" ) // TempDirGenerator is any interface that can generate temporary directories diff --git a/benchmarks/testnet/interface.go b/benchmarks/testnet/interface.go index dc5cf55c..72858184 100644 --- a/benchmarks/testnet/interface.go +++ b/benchmarks/testnet/interface.go @@ -4,7 +4,7 @@ import ( gsnet "github.com/ipfs/go-graphsync/network" "github.com/libp2p/go-libp2p-core/peer" - dtnet "github.com/filecoin-project/go-data-transfer/v2/network" + dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" ) // Network is an interface for generating graphsync network interfaces diff --git a/benchmarks/testnet/peernet.go b/benchmarks/testnet/peernet.go index c6ca1778..ba83ff09 100644 --- a/benchmarks/testnet/peernet.go +++ b/benchmarks/testnet/peernet.go @@ -7,7 +7,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock" - dtnet "github.com/filecoin-project/go-data-transfer/v2/network" + dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" ) type peernet struct { diff --git a/impl/initiating_test.go b/impl/initiating_test.go index 04113267..f6c4f0ec 100644 --- a/impl/initiating_test.go +++ b/impl/initiating_test.go @@ -11,6 +11,7 @@ import ( dss "github.com/ipfs/go-datastore/sync" "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" @@ -333,7 +334,7 @@ func TestDataTransferInitiating(t *testing.T) { events: make(chan datatransfer.EventCode, len(verify.expectedEvents)), } ev.setup(t, dt) - h.stor = testutil.AllSelector() + h.stor = selectorparse.CommonSelector_ExploreAllRecursively h.voucher = testutil.NewTestTypedVoucher() h.voucherResult = testutil.NewTestTypedVoucher() require.NoError(t, err) @@ -580,7 +581,7 @@ func TestDataTransferRestartInitiating(t *testing.T) { ev.setup(t, dt) // setup voucher processing - h.stor = testutil.AllSelector() + h.stor = selectorparse.CommonSelector_ExploreAllRecursively h.voucher = testutil.NewTestTypedVoucher() require.NoError(t, h.dt.RegisterVoucherType(h.voucher.Type, h.voucherValidator)) h.voucherResult = testutil.NewTestTypedVoucher() diff --git a/impl/responding_test.go b/impl/responding_test.go index a7ca4e3e..1537a485 100644 --- a/impl/responding_test.go +++ b/impl/responding_test.go @@ -13,6 +13,7 @@ import ( "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -617,7 +618,7 @@ func TestDataTransferResponding(t *testing.T) { events: make(chan datatransfer.EventCode, len(verify.expectedEvents)), } ev.setup(t, dt) - h.stor = testutil.AllSelector() + h.stor = selectorparse.CommonSelector_ExploreAllRecursively h.voucher = testutil.NewTestTypedVoucher() h.baseCid = testutil.GenerateCids(1)[0] h.id = datatransfer.TransferID(rand.Int31()) @@ -999,7 +1000,7 @@ func TestDataTransferRestartResponding(t *testing.T) { events: make(chan datatransfer.EventCode, len(verify.expectedEvents)), } ev.setup(t, dt) - h.stor = testutil.AllSelector() + h.stor = selectorparse.CommonSelector_ExploreAllRecursively h.voucher = testutil.NewTestTypedVoucher() h.baseCid = testutil.GenerateCids(1)[0] h.id = datatransfer.TransferID(rand.Int31()) diff --git a/impl/utils.go b/impl/utils.go index 1a2b1351..518b9e0c 100644 --- a/impl/utils.go +++ b/impl/utils.go @@ -83,4 +83,3 @@ func (m *manager) cancelMessage(chid datatransfer.ChannelID) datatransfer.Messag } return message.CancelResponse(chid.ID) } - diff --git a/testutil/fixtures/lorem.txt b/itest/fixtures/lorem.txt similarity index 100% rename from testutil/fixtures/lorem.txt rename to itest/fixtures/lorem.txt diff --git a/testutil/fixtures/lorem_large.txt b/itest/fixtures/lorem_large.txt similarity index 100% rename from testutil/fixtures/lorem_large.txt rename to itest/fixtures/lorem_large.txt diff --git a/testutil/gstestdata.go b/itest/gstestdata.go similarity index 93% rename from testutil/gstestdata.go rename to itest/gstestdata.go index e7cf1fe1..e60391d8 100644 --- a/testutil/gstestdata.go +++ b/itest/gstestdata.go @@ -1,4 +1,4 @@ -package testutil +package itest import ( "bytes" @@ -29,37 +29,30 @@ import ( "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - "github.com/ipld/go-ipld-prime/traversal/selector" - "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/protocol" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" datatransfer "github.com/filecoin-project/go-data-transfer/v2" - "github.com/filecoin-project/go-data-transfer/v2/network" gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync" "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync/extension" + "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" ) -var allSelector datamodel.Node - const loremFile = "lorem.txt" +const loremFileTransferBytes = 20439 -func init() { - ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) - allSelector = ssb.ExploreRecursive(selector.RecursionLimitNone(), - ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() -} +const loremLargeFile = "lorem_large.txt" +const loremLargeFileTransferBytes = 217452 const unixfsChunkSize uint64 = 1 << 10 const unixfsLinksPerLevel = 1024 var extsForProtocol = map[protocol.ID]graphsync.ExtensionName{ - datatransfer.ProtocolDataTransfer1_2: extension.ExtensionDataTransfer1_1, + network.ProtocolDataTransfer1_2: extension.ExtensionDataTransfer1_1, + network.ProtocolFilDataTransfer1_2: extension.ExtensionDataTransfer1_1, } // GraphsyncTestingData is a test harness for testing data transfer on top of @@ -83,7 +76,6 @@ type GraphsyncTestingData struct { GsNet2 gsnet.GraphSyncNetwork DtNet1 network.DataTransferNetwork DtNet2 network.DataTransferNetwork - AllSelector datamodel.Node OrigBytes []byte TempDir1 string TempDir2 string @@ -152,7 +144,6 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T, host1Protocols [ require.NoError(t, err) gsData.TempDir2 = tempdir // create a selector for the whole UnixFS dag - gsData.AllSelector = allSelector gsData.host1Protocols = host1Protocols gsData.host2Protocols = host2Protocols return gsData diff --git a/itest/integration_test.go b/itest/integration_test.go index 6850d2db..8cb3bb89 100644 --- a/itest/integration_test.go +++ b/itest/integration_test.go @@ -29,6 +29,7 @@ import ( "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -39,19 +40,12 @@ import ( "github.com/filecoin-project/go-data-transfer/v2/channelmonitor" . "github.com/filecoin-project/go-data-transfer/v2/impl" "github.com/filecoin-project/go-data-transfer/v2/message" - "github.com/filecoin-project/go-data-transfer/v2/network" "github.com/filecoin-project/go-data-transfer/v2/testutil" tp "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync" "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync/extension" + "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" ) -const loremFile = "lorem.txt" -const loremFileTransferBytes = 20439 - -const loremLargeFile = "lorem_large.txt" - -// const loremLargeFileTransferBytes = 217452 - // nil means use the default protocols // tests data transfer for the following protocol combinations: // default protocol -> default protocols @@ -61,7 +55,10 @@ var protocolsForTest = map[string]struct { host1Protocols []protocol.ID host2Protocols []protocol.ID }{ - "(v1.2 -> v1.2)": {nil, nil}, + "(wrapped v1.2 -> wrapped v1.2)": {nil, nil}, + "(v1.2 -> wrapped v1.2)": {[]protocol.ID{network.ProtocolFilDataTransfer1_2}, nil}, + "(wrapped v1.2 -> v1.2)": {nil, []protocol.ID{network.ProtocolFilDataTransfer1_2}}, + "(v1.2 -> v1.2)": {[]protocol.ID{network.ProtocolFilDataTransfer1_2}, []protocol.ID{network.ProtocolFilDataTransfer1_2}}, } // tests data transfer for the protocol combinations that support restart messages @@ -69,7 +66,10 @@ var protocolsForRestartTest = map[string]struct { host1Protocols []protocol.ID host2Protocols []protocol.ID }{ - "(v1.2 -> v1.2)": {nil, nil}, + "(wrapped v1.2 -> wrapped v1.2)": {nil, nil}, + "(v1.2 -> wrapped v1.2)": {[]protocol.ID{network.ProtocolFilDataTransfer1_2}, nil}, + "(wrapped v1.2 -> v1.2)": {nil, []protocol.ID{network.ProtocolFilDataTransfer1_2}}, + "(v1.2 -> v1.2)": {[]protocol.ID{network.ProtocolFilDataTransfer1_2}, []protocol.ID{network.ProtocolFilDataTransfer1_2}}, } func TestRoundTrip(t *testing.T) { @@ -138,7 +138,7 @@ func TestRoundTrip(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, ps.host1Protocols, ps.host2Protocols) + gsData := NewGraphsyncTestingData(ctx, t, ps.host1Protocols, ps.host2Protocols) host1 := gsData.Host1 // initiator, data sender host2 := gsData.Host2 // data recipient @@ -205,7 +205,7 @@ func TestRoundTrip(t *testing.T) { } else { sourceDagService = gsData.DagService1 } - root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, loremFile) + root, origBytes := LoadUnixFSFile(ctx, t, sourceDagService, loremFile) rootCid := root.(cidlink.Link).Cid var destDagService ipldformat.DAGService @@ -232,11 +232,11 @@ func TestRoundTrip(t *testing.T) { if data.isPull { sv.ExpectSuccessPull() require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) - chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } else { sv.ExpectSuccessPush() require.NoError(t, dt2.RegisterVoucherType(testutil.TestVoucherType, sv)) - chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } require.NoError(t, err) opens := 0 @@ -260,7 +260,7 @@ func TestRoundTrip(t *testing.T) { } } require.Equal(t, sentIncrements, receivedIncrements) - testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes) + VerifyHasFile(ctx, t, destDagService, root, origBytes) if data.isPull { assert.Equal(t, chid.Initiator, host2.ID()) } else { @@ -294,7 +294,7 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender host2 := gsData.Host2 // data recipient @@ -331,7 +331,7 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) { sv := testutil.NewStubbedValidator() sv.StubResult(datatransfer.ValidationResult{Accepted: true}) - root, origBytes := testutil.LoadUnixFSFile(ctx, t, gsData.DagService1, loremFile) + root, origBytes := LoadUnixFSFile(ctx, t, gsData.DagService1, loremFile) rootCid := root.(cidlink.Link).Cid destDagServices := make([]ipldformat.DAGService, 0, data.requestCount) @@ -363,14 +363,14 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) { sv.ExpectSuccessPull() require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) for i := 0; i < data.requestCount; i++ { - _, err = dt2.OpenPullDataChannel(ctx, host1.ID(), vouchers[i], rootCid, gsData.AllSelector) + _, err = dt2.OpenPullDataChannel(ctx, host1.ID(), vouchers[i], rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) } } else { sv.ExpectSuccessPush() require.NoError(t, dt2.RegisterVoucherType(testutil.TestVoucherType, sv)) for i := 0; i < data.requestCount; i++ { - _, err = dt1.OpenPushDataChannel(ctx, host2.ID(), vouchers[i], rootCid, gsData.AllSelector) + _, err = dt1.OpenPushDataChannel(ctx, host2.ID(), vouchers[i], rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) } } @@ -389,7 +389,7 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) { } } for _, destDagService := range destDagServices { - testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes) + VerifyHasFile(ctx, t, destDagService, root, origBytes) } }) } @@ -414,7 +414,7 @@ func TestManyReceiversAtOnce(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender tp1 := gsData.SetupGSTransportHost1() @@ -490,21 +490,21 @@ func TestManyReceiversAtOnce(t *testing.T) { sv := testutil.NewStubbedValidator() sv.StubResult(datatransfer.ValidationResult{Accepted: true}) - root, origBytes := testutil.LoadUnixFSFile(ctx, t, gsData.DagService1, loremFile) + root, origBytes := LoadUnixFSFile(ctx, t, gsData.DagService1, loremFile) rootCid := root.(cidlink.Link).Cid if data.isPull { sv.ExpectSuccessPull() require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) for i, receiver := range receivers { - _, err = receiver.OpenPullDataChannel(ctx, host1.ID(), vouchers[i], rootCid, gsData.AllSelector) + _, err = receiver.OpenPullDataChannel(ctx, host1.ID(), vouchers[i], rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) } } else { sv.ExpectSuccessPush() for i, receiver := range receivers { require.NoError(t, receiver.RegisterVoucherType(testutil.TestVoucherType, sv)) - _, err = dt1.OpenPushDataChannel(ctx, hosts[i].ID(), vouchers[i], rootCid, gsData.AllSelector) + _, err = dt1.OpenPushDataChannel(ctx, hosts[i].ID(), vouchers[i], rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) } } @@ -523,7 +523,7 @@ func TestManyReceiversAtOnce(t *testing.T) { } } for _, destDagService := range destDagServices { - testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes) + VerifyHasFile(ctx, t, destDagService, root, origBytes) } }) } @@ -692,7 +692,7 @@ func TestAutoRestart(t *testing.T) { // The retry config for the network layer: make 5 attempts, backing off by 1s each time netRetry := network.RetryParameters(time.Second, time.Second, 5, 1) - gsData := testutil.NewGraphsyncTestingData(ctx, t, ps.host1Protocols, ps.host2Protocols) + gsData := NewGraphsyncTestingData(ctx, t, ps.host1Protocols, ps.host2Protocols) gsData.DtNet1 = network.NewFromLibp2pHost(gsData.Host1, netRetry) initiatorHost := gsData.Host1 // initiator, data sender responderHost := gsData.Host2 // data recipient @@ -745,7 +745,7 @@ func TestAutoRestart(t *testing.T) { destDagService = gsData.DagService1 } - root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, loremFile) + root, origBytes := LoadUnixFSFile(ctx, t, sourceDagService, loremFile) rootCid := root.(cidlink.Link).Cid require.NoError(t, initiator.RegisterVoucherType(testutil.TestVoucherType, sv)) @@ -770,10 +770,10 @@ func TestAutoRestart(t *testing.T) { var chid datatransfer.ChannelID if tc.isPush { // Open a push channel - chid, err = initiator.OpenPushDataChannel(ctx, responderHost.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = initiator.OpenPushDataChannel(ctx, responderHost.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } else { // Open a pull channel - chid, err = initiator.OpenPullDataChannel(ctx, responderHost.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = initiator.OpenPullDataChannel(ctx, responderHost.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } require.NoError(t, err) @@ -842,7 +842,7 @@ func TestAutoRestart(t *testing.T) { } // Verify that the file was transferred to the destination node - testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes) + VerifyHasFile(ctx, t, destDagService, root, origBytes) }) } } @@ -866,7 +866,7 @@ func TestAutoRestartAfterBouncingInitiator(t *testing.T) { // The retry config for the network layer: make 5 attempts, backing off by 1s each time netRetry := network.RetryParameters(time.Second, time.Second, 5, 1) - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) gsData.DtNet1 = network.NewFromLibp2pHost(gsData.Host1, netRetry) initiatorHost := gsData.Host1 // initiator, data sender responderHost := gsData.Host2 // data recipient @@ -938,7 +938,7 @@ func TestAutoRestartAfterBouncingInitiator(t *testing.T) { destDagService = gsData.DagService1 } - root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, loremLargeFile) + root, origBytes := LoadUnixFSFile(ctx, t, sourceDagService, loremLargeFile) rootCid := root.(cidlink.Link).Cid require.NoError(t, initiator.RegisterVoucherType(testutil.TestVoucherType, sv)) @@ -947,10 +947,10 @@ func TestAutoRestartAfterBouncingInitiator(t *testing.T) { var chid datatransfer.ChannelID if isPush { // Open a push channel - chid, err = initiator.OpenPushDataChannel(ctx, responderHost.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = initiator.OpenPushDataChannel(ctx, responderHost.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } else { // Open a pull channel - chid, err = initiator.OpenPullDataChannel(ctx, responderHost.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = initiator.OpenPullDataChannel(ctx, responderHost.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } require.NoError(t, err) @@ -1058,7 +1058,7 @@ func TestAutoRestartAfterBouncingInitiator(t *testing.T) { } // Verify that the file was transferred to the destination node - testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes) + VerifyHasFile(ctx, t, destDagService, root, origBytes) } t.Run("push", func(t *testing.T) { @@ -1084,7 +1084,7 @@ func TestRoundTripCancelledRequest(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender host2 := gsData.Host2 @@ -1124,7 +1124,7 @@ func TestRoundTripCancelledRequest(t *testing.T) { dt2.SubscribeToEvents(subscriber) voucher := testutil.NewTestTypedVoucherWith("applesauce") sv := testutil.NewStubbedValidator() - root, _ := testutil.LoadUnixFSFile(ctx, t, gsData.DagService1, loremFile) + root, _ := LoadUnixFSFile(ctx, t, gsData.DagService1, loremFile) rootCid := root.(cidlink.Link).Cid var chid datatransfer.ChannelID @@ -1132,12 +1132,12 @@ func TestRoundTripCancelledRequest(t *testing.T) { sv.ExpectSuccessPull() sv.StubResult(datatransfer.ValidationResult{Accepted: true, ForcePause: true}) require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) - chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } else { sv.ExpectSuccessPush() sv.StubResult(datatransfer.ValidationResult{Accepted: true, ForcePause: true}) require.NoError(t, dt2.RegisterVoucherType(testutil.TestVoucherType, sv)) - chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } require.NoError(t, err) opens := 0 @@ -1310,7 +1310,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender root := gsData.LoadUnixFSFile(t, false) @@ -1391,7 +1391,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) { require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) - chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) for providerFinished != nil || clientFinished != nil { @@ -1429,7 +1429,7 @@ func TestPauseAndResume(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender host2 := gsData.Host2 // data recipient @@ -1500,11 +1500,11 @@ func TestPauseAndResume(t *testing.T) { if isPull { sv.ExpectSuccessPull() require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) - chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } else { sv.ExpectSuccessPush() require.NoError(t, dt2.RegisterVoucherType(testutil.TestVoucherType, sv)) - chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } require.NoError(t, err) opens := 0 @@ -1572,7 +1572,7 @@ func TestUnrecognizedVoucherRoundTrip(t *testing.T) { // ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender host2 := gsData.Host2 // data recipient @@ -1604,13 +1604,13 @@ func TestUnrecognizedVoucherRoundTrip(t *testing.T) { dt2.SubscribeToEvents(subscriber) voucher := testutil.NewTestTypedVoucherWith("applesauce") - root, _ := testutil.LoadUnixFSFile(ctx, t, gsData.DagService1, loremFile) + root, _ := LoadUnixFSFile(ctx, t, gsData.DagService1, loremFile) rootCid := root.(cidlink.Link).Cid if isPull { - _, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector) + _, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } else { - _, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, rootCid, gsData.AllSelector) + _, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) } require.NoError(t, err) opens := 0 @@ -1641,7 +1641,7 @@ func TestDataTransferSubscribing(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host2 := gsData.Host2 tp1 := gsData.SetupGSTransportHost1() @@ -1673,7 +1673,7 @@ func TestDataTransferSubscribing(t *testing.T) { } unsub1 := dt1.SubscribeToEvents(subscribe1) unsub2 := dt1.SubscribeToEvents(subscribe2) - _, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, baseCid, gsData.AllSelector) + _, err = dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, baseCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) select { case <-ctx.Done(): @@ -1702,7 +1702,7 @@ func TestDataTransferSubscribing(t *testing.T) { } unsub3 := dt1.SubscribeToEvents(subscribe3) unsub4 := dt1.SubscribeToEvents(subscribe4) - _, err = dt1.OpenPullDataChannel(ctx, host2.ID(), voucher, baseCid, gsData.AllSelector) + _, err = dt1.OpenPullDataChannel(ctx, host2.ID(), voucher, baseCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) select { case <-ctx.Done(): @@ -1769,7 +1769,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator and data sender host2 := gsData.Host2 // data recipient, makes graphsync request for data voucher := testutil.NewTestTypedVoucher() @@ -1780,7 +1780,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { r := &receiver{ messageReceived: make(chan receivedMessage), } - dtnet2.SetDelegate(r) + dtnet2.SetDelegate(datatransfer.LegacyTransportID, []datatransfer.Version{datatransfer.LegacyTransportVersion}, r) gsr := &fakeGraphSyncReceiver{ receivedMessages: make(chan receivedGraphSyncMessage), @@ -1795,7 +1795,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { require.NoError(t, err) t.Run("when request is initiated", func(t *testing.T) { - _, err := dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, link.(cidlink.Link).Cid, gsData.AllSelector) + _, err := dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, link.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) var messageReceived receivedMessage @@ -1810,7 +1810,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { require.NoError(t, err) nd, err := response.ToIPLD() require.NoError(t, err) - request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ + request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ Name: extension.ExtensionDataTransfer1_1, Data: nd, }) @@ -1829,7 +1829,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { require.NoError(t, err) nd, err := response.ToIPLD() require.NoError(t, err) - request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ + request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ Name: extension.ExtensionDataTransfer1_1, Data: nd, }) @@ -1849,7 +1849,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator and data sender host2 := gsData.Host2 // data recipient, makes graphsync request for data voucher := testutil.NewTestTypedVoucherWith("applesauce") @@ -1860,7 +1860,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) { r := &receiver{ messageReceived: make(chan receivedMessage), } - dtnet2.SetDelegate(r) + dtnet2.SetDelegate(datatransfer.LegacyTransportID, []datatransfer.Version{datatransfer.LegacyTransportVersion}, r) gsr := &fakeGraphSyncReceiver{ receivedMessages: make(chan receivedGraphSyncMessage), @@ -1880,7 +1880,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) { } gs1.RegisterIncomingRequestHook(validateHook) - _, err := dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, link.(cidlink.Link).Cid, gsData.AllSelector) + _, err := dt1.OpenPushDataChannel(ctx, host2.ID(), voucher, link.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) select { @@ -1889,7 +1889,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) { case <-r.messageReceived: } - request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31())) + request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively, graphsync.Priority(rand.Int31())) builder := gsmsg.NewBuilder() builder.AddRequest(request) gsmessage, err := builder.Build() @@ -1905,10 +1905,10 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { //create network ctx := context.Background() testCases := map[string]struct { - test func(*testing.T, *testutil.GraphsyncTestingData, datatransfer.Transport, ipld.Link, datatransfer.TransferID, *fakeGraphSyncReceiver) + test func(*testing.T, *GraphsyncTestingData, datatransfer.Transport, ipld.Link, datatransfer.TransferID, *fakeGraphSyncReceiver) }{ "When a pull request is initiated and validated": { - test: func(t *testing.T, gsData *testutil.GraphsyncTestingData, tp2 datatransfer.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) { + test: func(t *testing.T, gsData *GraphsyncTestingData, tp2 datatransfer.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) { sv := testutil.NewStubbedValidator() sv.ExpectSuccessPull() sv.StubResult(datatransfer.ValidationResult{Accepted: true}) @@ -1919,11 +1919,11 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) voucher := testutil.NewTestTypedVoucher() - request, err := message.NewRequest(id, false, true, &voucher, testutil.GenerateCids(1)[0], gsData.AllSelector) + request, err := message.NewRequest(id, false, true, &voucher, testutil.GenerateCids(1)[0], selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) nd, err := request.ToIPLD() require.NoError(t, err) - gsRequest := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ + gsRequest := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ Name: extension.ExtensionDataTransfer1_1, Data: nd, }) @@ -1939,7 +1939,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { }, }, "When request is initiated, but fails validation": { - test: func(t *testing.T, gsData *testutil.GraphsyncTestingData, tp2 datatransfer.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) { + test: func(t *testing.T, gsData *GraphsyncTestingData, tp2 datatransfer.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) { sv := testutil.NewStubbedValidator() sv.ExpectErrorPull() dt1, err := NewDataTransfer(gsData.DtDs2, gsData.Host2.ID(), tp2) @@ -1947,12 +1947,12 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { testutil.StartAndWaitForReady(ctx, t, dt1) require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) voucher := testutil.NewTestTypedVoucher() - dtRequest, err := message.NewRequest(id, false, true, &voucher, testutil.GenerateCids(1)[0], gsData.AllSelector) + dtRequest, err := message.NewRequest(id, false, true, &voucher, testutil.GenerateCids(1)[0], selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) nd, err := dtRequest.ToIPLD() require.NoError(t, err) - request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ + request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ Name: extension.ExtensionDataTransfer1_1, Data: nd, }) @@ -1975,7 +1975,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) // setup receiving peer to just record message coming in gsr := &fakeGraphSyncReceiver{ @@ -2002,7 +2002,7 @@ func TestMultipleMessagesInExtension(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) defer cancel() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender root := gsData.LoadUnixFSFile(t, false) @@ -2114,7 +2114,7 @@ func TestMultipleMessagesInExtension(t *testing.T) { require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv)) voucher := testutil.NewTestTypedVoucherWith("applesauce") - chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) // Expect the client to receive a response voucher, the provider to complete the transfer and @@ -2144,7 +2144,7 @@ func TestMultipleParallelTransfers(t *testing.T) { ctx := context.Background() - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender tp1 := gsData.SetupGSTransportHost1() @@ -2244,7 +2244,7 @@ func TestMultipleParallelTransfers(t *testing.T) { rootCid := root.(cidlink.Link).Cid voucher := testutil.NewTestTypedVoucher() - chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector) + chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(t, err) close(chidReceived) // Expect the client to receive a response voucher, the provider to complete the transfer and @@ -2273,7 +2273,7 @@ func TestMultipleParallelTransfers(t *testing.T) { } } sv.VerifyExpectations(t) - testutil.VerifyHasFile(gsData.Ctx, t, gsData.DagService2, root, origBytes) + VerifyHasFile(gsData.Ctx, t, gsData.DagService2, root, origBytes) }) } } diff --git a/itest/restart_integration_test.go b/itest/restart_integration_test.go index d63fd728..68e02263 100644 --- a/itest/restart_integration_test.go +++ b/itest/restart_integration_test.go @@ -10,6 +10,7 @@ import ( ipldformat "github.com/ipfs/go-ipld-format" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -42,7 +43,7 @@ func TestRestartPush(t *testing.T) { stopAt: 20, openPushF: func(rh *restartHarness) datatransfer.ChannelID { voucher := testutil.NewTestTypedVoucherWith("applesauce") - chid, err := rh.dt1.OpenPushDataChannel(rh.testCtx, rh.peer2, voucher, rh.rootCid, rh.gsData.AllSelector) + chid, err := rh.dt1.OpenPushDataChannel(rh.testCtx, rh.peer2, voucher, rh.rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(rh.t, err) return chid }, @@ -84,7 +85,7 @@ func TestRestartPush(t *testing.T) { stopAt: 20, openPushF: func(rh *restartHarness) datatransfer.ChannelID { voucher := testutil.NewTestTypedVoucherWith("applesauce") - chid, err := rh.dt1.OpenPushDataChannel(rh.testCtx, rh.peer2, voucher, rh.rootCid, rh.gsData.AllSelector) + chid, err := rh.dt1.OpenPushDataChannel(rh.testCtx, rh.peer2, voucher, rh.rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(rh.t, err) return chid }, @@ -259,7 +260,7 @@ func TestRestartPush(t *testing.T) { // verify all cids are present on the receiver - testutil.VerifyHasFile(rh.testCtx, t, rh.destDagService, rh.root, rh.origBytes) + VerifyHasFile(rh.testCtx, t, rh.destDagService, rh.root, rh.origBytes) rh.sv.VerifyExpectations(t) // we should ONLY see two opens and two completes @@ -296,7 +297,7 @@ func TestRestartPull(t *testing.T) { stopAt: 40, openPullF: func(rh *restartHarness) datatransfer.ChannelID { voucher := testutil.NewTestTypedVoucherWith("applesauce") - chid, err := rh.dt2.OpenPullDataChannel(rh.testCtx, rh.peer1, voucher, rh.rootCid, rh.gsData.AllSelector) + chid, err := rh.dt2.OpenPullDataChannel(rh.testCtx, rh.peer1, voucher, rh.rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(rh.t, err) return chid }, @@ -335,7 +336,7 @@ func TestRestartPull(t *testing.T) { stopAt: 40, openPullF: func(rh *restartHarness) datatransfer.ChannelID { voucher := testutil.NewTestTypedVoucherWith("applesauce") - chid, err := rh.dt2.OpenPullDataChannel(rh.testCtx, rh.peer1, voucher, rh.rootCid, rh.gsData.AllSelector) + chid, err := rh.dt2.OpenPullDataChannel(rh.testCtx, rh.peer1, voucher, rh.rootCid, selectorparse.CommonSelector_ExploreAllRecursively) require.NoError(rh.t, err) return chid }, @@ -502,7 +503,7 @@ func TestRestartPull(t *testing.T) { _, _, err = waitF(10*time.Second, 2) require.NoError(t, err) - testutil.VerifyHasFile(rh.testCtx, t, rh.destDagService, rh.root, rh.origBytes) + VerifyHasFile(rh.testCtx, t, rh.destDagService, rh.root, rh.origBytes) rh.sv.VerifyExpectations(t) // we should ONLY see two opens and two completes @@ -536,7 +537,7 @@ type restartHarness struct { peer1 peer.ID peer2 peer.ID - gsData *testutil.GraphsyncTestingData + gsData *GraphsyncTestingData dt1 datatransfer.Manager dt2 datatransfer.Manager sv *testutil.StubbedValidator @@ -553,7 +554,7 @@ func newRestartHarness(t *testing.T) *restartHarness { ctx, cancel := context.WithTimeout(ctx, 120*time.Second) // Setup host - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + gsData := NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender host2 := gsData.Host2 // data recipient peer1 := host1.ID() @@ -576,7 +577,7 @@ func newRestartHarness(t *testing.T) *restartHarness { require.NoError(t, dt2.RegisterVoucherType(testutil.TestVoucherType, sv)) sourceDagService := gsData.DagService1 - root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, largeFile) + root, origBytes := LoadUnixFSFile(ctx, t, sourceDagService, largeFile) rootCid := root.(cidlink.Link).Cid destDagService := gsData.DagService2 diff --git a/message.go b/message.go index d54eabb1..cabb82ea 100644 --- a/message.go +++ b/message.go @@ -1,17 +1,51 @@ package datatransfer import ( + "errors" + "fmt" "io" + "strconv" + "strings" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime/datamodel" - "github.com/libp2p/go-libp2p-core/protocol" ) +type Version struct { + Major uint64 + Minor uint64 + Patch uint64 +} + +func (mv Version) String() string { + return fmt.Sprintf("%d.%d.%d", mv.Major, mv.Minor, mv.Patch) +} + +// MessageVersionFromString parses a string into a message version +func MessageVersionFromString(versionString string) (Version, error) { + versions := strings.Split(versionString, ".") + if len(versions) != 3 { + return Version{}, errors.New("not a version string") + } + major, err := strconv.ParseUint(versions[0], 10, 0) + if err != nil { + return Version{}, errors.New("unable to parse major version") + } + minor, err := strconv.ParseUint(versions[1], 10, 0) + if err != nil { + return Version{}, errors.New("unable to parse major version") + } + patch, err := strconv.ParseUint(versions[2], 10, 0) + if err != nil { + return Version{}, errors.New("unable to parse major version") + } + return Version{Major: major, Minor: minor, Patch: patch}, nil +} + var ( - // ProtocolDataTransfer1_2 is the protocol identifier for the latest - // version of data-transfer (supports do-not-send-first-blocks extension) - ProtocolDataTransfer1_2 protocol.ID = "/fil/datatransfer/1.2.0" + // DataTransfer1_2 is the identifier for the current + // supported version of data-transfer + DataTransfer1_2 Version = Version{1, 2, 0} ) // Message is a message for the data transfer protocol @@ -26,7 +60,16 @@ type Message interface { TransferID() TransferID ToNet(w io.Writer) error ToIPLD() (datamodel.Node, error) - MessageForProtocol(targetProtocol protocol.ID) (newMsg Message, err error) + MessageForVersion(targetProtocol Version) (newMsg Message, err error) + Version() Version + WrappedForTransport(transportID TransportID, transportVersion Version) TransportedMessage +} + +// TransportedMessage is a message that can also report how it was transported +type TransportedMessage interface { + Message + TransportID() TransportID + TransportVersion() Version } // Request is a response message for the data transfer protocol diff --git a/message/message.go b/message/message.go index 438e6913..639046a4 100644 --- a/message/message.go +++ b/message/message.go @@ -23,5 +23,6 @@ var CancelResponse = message1_1.CancelResponse var UpdateResponse = message1_1.UpdateResponse var FromNet = message1_1.FromNet var FromIPLD = message1_1.FromIPLD +var FromNetWrapped = message1_1.FromNetWrapped var CompleteResponse = message1_1.CompleteResponse var CancelRequest = message1_1.CancelRequest diff --git a/message/message1_1prime/message.go b/message/message1_1prime/message.go index b740bc01..36438107 100644 --- a/message/message1_1prime/message.go +++ b/message/message1_1prime/message.go @@ -198,6 +198,10 @@ func FromNet(r io.Reader) (datatransfer.Message, error) { } tresp := tm.(*TransferMessage1_1) + return fromMessage(tresp) +} + +func fromMessage(tresp *TransferMessage1_1) (datatransfer.Message, error) { if (tresp.IsRequest && tresp.Request == nil) || (!tresp.IsRequest && tresp.Response == nil) { return nil, xerrors.Errorf("invalid/malformed message") } @@ -208,6 +212,36 @@ func FromNet(r io.Reader) (datatransfer.Message, error) { return tresp.Response, nil } +func fromWrappedMessage(wtresp *WrappedTransferMessage1_1) (datatransfer.TransportedMessage, error) { + tresp := wtresp.Message + if (tresp.IsRequest && tresp.Request == nil) || (!tresp.IsRequest && tresp.Response == nil) { + return nil, xerrors.Errorf("invalid/malformed message") + } + + if tresp.IsRequest { + return &WrappedTransferRequest1_1{ + tresp.Request, + wtresp.TransportVersion, + wtresp.TransportID, + }, nil + } + return &WrappedTransferResponse1_1{ + tresp.Response, + wtresp.TransportID, + wtresp.TransportVersion, + }, nil +} + +// FromNetWrraped can read a network stream to deserialize a message + transport ID +func FromNetWrapped(r io.Reader) (datatransfer.TransportedMessage, error) { + tm, err := ipldutils.FromReader(r, &WrappedTransferMessage1_1{}) + if err != nil { + return nil, err + } + wtresp := tm.(*WrappedTransferMessage1_1) + return fromWrappedMessage(wtresp) +} + // FromNet can read a network stream to deserialize a GraphSyncMessage func FromIPLD(node datamodel.Node) (datatransfer.Message, error) { if tn, ok := node.(schema.TypedNode); ok { // shouldn't need this if from Graphsync @@ -218,13 +252,5 @@ func FromIPLD(node datamodel.Node) (datatransfer.Message, error) { return nil, err } tresp := tm.(*TransferMessage1_1) - - if (tresp.IsRequest && tresp.Request == nil) || (!tresp.IsRequest && tresp.Response == nil) { - return nil, xerrors.Errorf("invalid/malformed message") - } - - if tresp.IsRequest { - return tresp.Request, nil - } - return tresp.Response, nil + return fromMessage(tresp) } diff --git a/message/message1_1prime/message_test.go b/message/message1_1prime/message_test.go index 7ef75e53..33050deb 100644 --- a/message/message1_1prime/message_test.go +++ b/message/message1_1prime/message_test.go @@ -504,6 +504,75 @@ func TestToNetFromNetEquivalency(t *testing.T) { deserializedRequest, ok = deserialized.(datatransfer.Request) require.True(t, ok) + require.Equal(t, deserializedRequest.TransferID(), request.TransferID()) + require.Equal(t, deserializedRequest.IsCancel(), request.IsCancel()) + require.Equal(t, deserializedRequest.IsRequest(), request.IsRequest()) + }) + t.Run("round-trip with wrapping", func(t *testing.T) { + transportID := datatransfer.TransportID("applesauce") + transportVersion := datatransfer.Version{Major: 1, Minor: 5, Patch: 0} + baseCid := testutil.GenerateCids(1)[0] + selector := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any).Matcher().Node() + isPull := false + id := datatransfer.TransferID(rand.Int31()) + accepted := false + voucher := testutil.NewTestTypedVoucher() + voucherResult := testutil.NewTestTypedVoucher() + request, err := message1_1.NewRequest(id, false, isPull, &voucher, baseCid, selector) + require.NoError(t, err) + wrequest := request.WrappedForTransport(transportID, transportVersion) + buf := new(bytes.Buffer) + err = wrequest.ToNet(buf) + require.NoError(t, err) + require.Greater(t, buf.Len(), 0) + deserialized, err := message1_1.FromNetWrapped(buf) + require.NoError(t, err) + + require.Equal(t, transportID, deserialized.TransportID()) + require.Equal(t, transportVersion, deserialized.TransportVersion()) + deserializedRequest, ok := deserialized.(datatransfer.Request) + require.True(t, ok) + + require.Equal(t, deserializedRequest.TransferID(), request.TransferID()) + require.Equal(t, deserializedRequest.IsCancel(), request.IsCancel()) + require.Equal(t, deserializedRequest.IsPull(), request.IsPull()) + require.Equal(t, deserializedRequest.IsRequest(), request.IsRequest()) + require.Equal(t, deserializedRequest.BaseCid(), request.BaseCid()) + testutil.AssertEqualTestVoucher(t, request, deserializedRequest) + testutil.AssertEqualSelector(t, request, deserializedRequest) + + response, err := message1_1.NewResponse(id, accepted, false, &voucherResult) + require.NoError(t, err) + wresponse := response.WrappedForTransport(transportID, transportVersion) + err = wresponse.ToNet(buf) + require.NoError(t, err) + deserialized, err = message1_1.FromNetWrapped(buf) + require.NoError(t, err) + require.Equal(t, transportID, deserialized.TransportID()) + require.Equal(t, transportVersion, deserialized.TransportVersion()) + + deserializedResponse, ok := deserialized.(datatransfer.Response) + require.True(t, ok) + + require.Equal(t, deserializedResponse.TransferID(), response.TransferID()) + require.Equal(t, deserializedResponse.Accepted(), response.Accepted()) + require.Equal(t, deserializedResponse.IsRequest(), response.IsRequest()) + require.Equal(t, deserializedResponse.IsUpdate(), response.IsUpdate()) + require.Equal(t, deserializedResponse.IsPaused(), response.IsPaused()) + testutil.AssertEqualTestVoucherResult(t, response, deserializedResponse) + + request = message1_1.CancelRequest(id) + wrequest = request.WrappedForTransport(transportID, transportVersion) + err = wrequest.ToNet(buf) + require.NoError(t, err) + deserialized, err = message1_1.FromNetWrapped(buf) + require.NoError(t, err) + require.Equal(t, transportID, deserialized.TransportID()) + require.Equal(t, transportVersion, deserialized.TransportVersion()) + + deserializedRequest, ok = deserialized.(datatransfer.Request) + require.True(t, ok) + require.Equal(t, deserializedRequest.TransferID(), request.TransferID()) require.Equal(t, deserializedRequest.IsCancel(), request.IsCancel()) require.Equal(t, deserializedRequest.IsRequest(), request.IsRequest()) diff --git a/message/message1_1prime/schema.ipldsch b/message/message1_1prime/schema.ipldsch index d5f9f87a..32683663 100644 --- a/message/message1_1prime/schema.ipldsch +++ b/message/message1_1prime/schema.ipldsch @@ -1,6 +1,7 @@ type PeerID string # peer.ID, really should be bytes (this is non-utf8) but is string for backward compat type TransferID int type TypeIdentifier string +type TransportID string type ChannelID struct { Initiator PeerID @@ -35,3 +36,15 @@ type TransferMessage1_1 struct { Request nullable TransferRequest Response nullable TransferResponse } + +type Version struct { + Major Int + Minor Int + Patch Int +} representation tuple + +type WrappedTransferMessage1_1 struct { + TransportID TransportID (rename "ID") + TransportVersion Version (rename "TV") + Message TransferMessage1_1 (rename "Msg") +} \ No newline at end of file diff --git a/message/message1_1prime/transfer_message.go b/message/message1_1prime/transfer_message.go index 0944212b..46304f31 100644 --- a/message/message1_1prime/transfer_message.go +++ b/message/message1_1prime/transfer_message.go @@ -57,3 +57,17 @@ func (tm *TransferMessage1_1) ToNet(w io.Writer) error { } return ipldutils.NodeToWriter(i, w) } + +type WrappedTransferMessage1_1 struct { + TransportID string + TransportVersion datatransfer.Version + Message TransferMessage1_1 +} + +func (wtm *WrappedTransferMessage1_1) BindnodeSchema() string { + return string(embedSchema) +} + +func (wtm *WrappedTransferMessage1_1) toIPLD() (schema.TypedNode, error) { + return ipldutils.ToNode(wtm) +} diff --git a/message/message1_1prime/transfer_request.go b/message/message1_1prime/transfer_request.go index a01e2452..f2f1560a 100644 --- a/message/message1_1prime/transfer_request.go +++ b/message/message1_1prime/transfer_request.go @@ -6,7 +6,6 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/schema" - "github.com/libp2p/go-libp2p-core/protocol" xerrors "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer/v2" @@ -29,15 +28,27 @@ type TransferRequest1_1 struct { RestartChannel datatransfer.ChannelID } -func (trq *TransferRequest1_1) MessageForProtocol(targetProtocol protocol.ID) (datatransfer.Message, error) { - switch targetProtocol { - case datatransfer.ProtocolDataTransfer1_2: +func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.Version) (datatransfer.Message, error) { + switch version { + case datatransfer.DataTransfer1_2: return trq, nil default: return nil, xerrors.Errorf("protocol not supported") } } +func (trq *TransferRequest1_1) Version() datatransfer.Version { + return datatransfer.DataTransfer1_2 +} + +func (trq *TransferRequest1_1) WrappedForTransport(transportID datatransfer.TransportID, transportVersion datatransfer.Version) datatransfer.TransportedMessage { + return &WrappedTransferRequest1_1{ + TransferRequest1_1: trq, + transportID: string(transportID), + transportVersion: transportVersion, + } +} + // IsRequest always returns true in this case because this is a transfer request func (trq *TransferRequest1_1) IsRequest() bool { return true @@ -163,3 +174,48 @@ func (trq *TransferRequest1_1) ToNet(w io.Writer) error { } return ipldutils.NodeToWriter(i, w) } + +// WrappedTransferRequest1_1 is used to serialize a request along with a +// transport id +type WrappedTransferRequest1_1 struct { + *TransferRequest1_1 + transportVersion datatransfer.Version + transportID string +} + +func (trq *WrappedTransferRequest1_1) TransportID() datatransfer.TransportID { + return datatransfer.TransportID(trq.transportID) +} + +func (trq *WrappedTransferRequest1_1) TransportVersion() datatransfer.Version { + return trq.transportVersion +} + +func (trq *WrappedTransferRequest1_1) toIPLD() (schema.TypedNode, error) { + msg := WrappedTransferMessage1_1{ + TransportID: trq.transportID, + TransportVersion: trq.transportVersion, + Message: TransferMessage1_1{ + IsRequest: true, + Request: trq.TransferRequest1_1, + Response: nil, + }, + } + return msg.toIPLD() +} + +func (trq *WrappedTransferRequest1_1) ToIPLD() (datamodel.Node, error) { + msg, err := trq.toIPLD() + if err != nil { + return nil, err + } + return msg.Representation(), nil +} + +func (trq *WrappedTransferRequest1_1) ToNet(w io.Writer) error { + i, err := trq.toIPLD() + if err != nil { + return err + } + return ipldutils.NodeToWriter(i, w) +} diff --git a/message/message1_1prime/transfer_request_test.go b/message/message1_1prime/transfer_request_test.go index 2cff27e3..410917b8 100644 --- a/message/message1_1prime/transfer_request_test.go +++ b/message/message1_1prime/transfer_request_test.go @@ -13,7 +13,7 @@ import ( "github.com/filecoin-project/go-data-transfer/v2/testutil" ) -func TestRequestMessageForProtocol(t *testing.T) { +func TestRequestMessageForVersion(t *testing.T) { baseCid := testutil.GenerateCids(1)[0] selector := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any).Matcher().Node() isPull := true @@ -24,7 +24,8 @@ func TestRequestMessageForProtocol(t *testing.T) { request, err := message1_1.NewRequest(id, false, isPull, &voucher, baseCid, selector) require.NoError(t, err) - out12, err := request.MessageForProtocol(datatransfer.ProtocolDataTransfer1_2) + // v1.2 new protocol + out12, err := request.MessageForVersion(datatransfer.DataTransfer1_2) require.NoError(t, err) require.Equal(t, request, out12) @@ -38,4 +39,17 @@ func TestRequestMessageForProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, selector, n) require.Equal(t, testutil.TestVoucherType, req.VoucherType()) + + wrappedOut12 := out12.WrappedForTransport(datatransfer.LegacyTransportID, datatransfer.LegacyTransportVersion) + require.Equal(t, datatransfer.LegacyTransportID, wrappedOut12.TransportID()) + require.Equal(t, datatransfer.LegacyTransportVersion, wrappedOut12.TransportVersion()) + + // random protocol should fail + _, err = request.MessageForVersion(datatransfer.Version{ + Major: rand.Uint64(), + Minor: rand.Uint64(), + Patch: rand.Uint64(), + }) + require.Error(t, err) + } diff --git a/message/message1_1prime/transfer_response.go b/message/message1_1prime/transfer_response.go index 1431ff72..590a68ff 100644 --- a/message/message1_1prime/transfer_response.go +++ b/message/message1_1prime/transfer_response.go @@ -5,7 +5,6 @@ import ( "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/schema" - "github.com/libp2p/go-libp2p-core/protocol" xerrors "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer/v2" @@ -87,15 +86,26 @@ func (trsp *TransferResponse1_1) EmptyVoucherResult() bool { return trsp.VoucherTypeIdentifier == datatransfer.EmptyTypeIdentifier } -func (trsp *TransferResponse1_1) MessageForProtocol(targetProtocol protocol.ID) (datatransfer.Message, error) { - switch targetProtocol { - case datatransfer.ProtocolDataTransfer1_2: +func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.Version) (datatransfer.Message, error) { + switch version { + case datatransfer.DataTransfer1_2: return trsp, nil default: - return nil, xerrors.Errorf("protocol %s not supported", targetProtocol) + return nil, xerrors.Errorf("protocol %s not supported", version) } } +func (trsp *TransferResponse1_1) Version() datatransfer.Version { + return datatransfer.DataTransfer1_2 +} + +func (trsp *TransferResponse1_1) WrappedForTransport(transportID datatransfer.TransportID, transportVersion datatransfer.Version) datatransfer.TransportedMessage { + return &WrappedTransferResponse1_1{ + TransferResponse1_1: trsp, + transportID: string(transportID), + transportVersion: transportVersion, + } +} func (trsp *TransferResponse1_1) toIPLD() (schema.TypedNode, error) { msg := TransferMessage1_1{ IsRequest: false, @@ -121,3 +131,47 @@ func (trsp *TransferResponse1_1) ToNet(w io.Writer) error { } return ipldutils.NodeToWriter(i, w) } + +// WrappedTransferResponse1_1 is used to serialize a response along with a +// transport id +type WrappedTransferResponse1_1 struct { + *TransferResponse1_1 + transportID string + transportVersion datatransfer.Version +} + +func (trsp *WrappedTransferResponse1_1) TransportID() datatransfer.TransportID { + return datatransfer.TransportID(trsp.transportID) +} +func (trsp *WrappedTransferResponse1_1) TransportVersion() datatransfer.Version { + return trsp.transportVersion +} + +func (trsp *WrappedTransferResponse1_1) toIPLD() (schema.TypedNode, error) { + msg := WrappedTransferMessage1_1{ + TransportID: trsp.transportID, + TransportVersion: trsp.transportVersion, + Message: TransferMessage1_1{ + IsRequest: false, + Request: nil, + Response: trsp.TransferResponse1_1, + }, + } + return msg.toIPLD() +} + +func (trsp *WrappedTransferResponse1_1) ToIPLD() (datamodel.Node, error) { + msg, err := trsp.toIPLD() + if err != nil { + return nil, err + } + return msg.Representation(), nil +} + +func (trsp *WrappedTransferResponse1_1) ToNet(w io.Writer) error { + msg, err := trsp.toIPLD() + if err != nil { + return err + } + return ipldutils.NodeToWriter(msg, w) +} diff --git a/message/message1_1prime/transfer_response_test.go b/message/message1_1prime/transfer_response_test.go index 9b979371..71fcf703 100644 --- a/message/message1_1prime/transfer_response_test.go +++ b/message/message1_1prime/transfer_response_test.go @@ -11,14 +11,14 @@ import ( "github.com/filecoin-project/go-data-transfer/v2/testutil" ) -func TestResponseMessageForProtocol(t *testing.T) { +func TestResponseMessageForVersion(t *testing.T) { id := datatransfer.TransferID(rand.Int31()) voucherResult := testutil.NewTestTypedVoucher() response, err := message1_1.NewResponse(id, false, true, &voucherResult) // not accepted require.NoError(t, err) - // v1.2 protocol - out, err := response.MessageForProtocol(datatransfer.ProtocolDataTransfer1_2) + // v1.2 new protocol + out, err := response.MessageForVersion(datatransfer.DataTransfer1_2) require.NoError(t, err) require.Equal(t, response, out) @@ -28,8 +28,15 @@ func TestResponseMessageForProtocol(t *testing.T) { require.Equal(t, testutil.TestVoucherType, resp.VoucherResultType()) require.True(t, resp.IsValidationResult()) - // random protocol - out, err = response.MessageForProtocol("RAND") + wrappedOut := out.WrappedForTransport(datatransfer.LegacyTransportID, datatransfer.LegacyTransportVersion) + require.Equal(t, datatransfer.LegacyTransportID, wrappedOut.TransportID()) + require.Equal(t, datatransfer.LegacyTransportVersion, wrappedOut.TransportVersion()) + + // random protocol should fail + _, err = response.MessageForVersion(datatransfer.Version{ + Major: rand.Uint64(), + Minor: rand.Uint64(), + Patch: rand.Uint64(), + }) require.Error(t, err) - require.Nil(t, out) } diff --git a/testutil/fakegraphsync.go b/testutil/fakegraphsync.go index f0ac8ab0..758a3f40 100644 --- a/testutil/fakegraphsync.go +++ b/testutil/fakegraphsync.go @@ -13,6 +13,7 @@ import ( "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" @@ -430,7 +431,7 @@ func NewFakeRequest(id graphsync.RequestID, extensions map[graphsync.ExtensionNa return &fakeRequest{ id: id, root: GenerateCids(1)[0], - selector: allSelector, + selector: selectorparse.CommonSelector_ExploreAllRecursively, priority: graphsync.Priority(rand.Int()), extensions: extensions, requestType: graphsync.RequestTypeNew, diff --git a/testutil/faketransport.go b/testutil/faketransport.go index 6bd822ac..7e0d20c8 100644 --- a/testutil/faketransport.go +++ b/testutil/faketransport.go @@ -54,6 +54,16 @@ func NewFakeTransport() *FakeTransport { return &FakeTransport{} } +// ID is a unique identifier for this transport +func (ft *FakeTransport) ID() datatransfer.TransportID { + return "fake" +} + +// Versions indicates what versions of this transport are supported +func (ft *FakeTransport) Versions() []datatransfer.Version { + return []datatransfer.Version{{Major: 1, Minor: 1, Patch: 0}} +} + // Capabilities tells datatransfer what kinds of capabilities this transport supports func (ft *FakeTransport) Capabilities() datatransfer.TransportCapabilities { return datatransfer.TransportCapabilities{ diff --git a/testutil/testnet.go b/testutil/testnet.go deleted file mode 100644 index 88d30a07..00000000 --- a/testutil/testnet.go +++ /dev/null @@ -1,71 +0,0 @@ -package testutil - -import ( - "context" - - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" - - datatransfer "github.com/filecoin-project/go-data-transfer/v2" - "github.com/filecoin-project/go-data-transfer/v2/network" -) - -// FakeSentMessage is a recording of a message sent on the FakeNetwork -type FakeSentMessage struct { - PeerID peer.ID - Message datatransfer.Message -} - -// FakeNetwork is a network that satisfies the DataTransferNetwork interface but -// does not actually do anything -type FakeNetwork struct { - PeerID peer.ID - SentMessages []FakeSentMessage - Delegate network.Receiver -} - -// NewFakeNetwork returns a new fake data transfer network instance -func NewFakeNetwork(id peer.ID) *FakeNetwork { - return &FakeNetwork{PeerID: id} -} - -var _ network.DataTransferNetwork = (*FakeNetwork)(nil) - -// SendMessage sends a GraphSync message to a peer. -func (fn *FakeNetwork) SendMessage(ctx context.Context, p peer.ID, m datatransfer.Message) error { - fn.SentMessages = append(fn.SentMessages, FakeSentMessage{p, m}) - return nil -} - -// SetDelegate registers the Reciver to handle messages received from the -// network. -func (fn *FakeNetwork) SetDelegate(receiver network.Receiver) { - fn.Delegate = receiver -} - -// ConnectTo establishes a connection to the given peer -func (fn *FakeNetwork) ConnectTo(_ context.Context, _ peer.ID) error { - panic("not implemented") -} - -func (fn *FakeNetwork) ConnectWithRetry(ctx context.Context, p peer.ID) error { - panic("implement me") -} - -// ID returns a stubbed id for host of this network -func (fn *FakeNetwork) ID() peer.ID { - return fn.PeerID -} - -// Protect does nothing on the fake network -func (fn *FakeNetwork) Protect(id peer.ID, tag string) { -} - -// Unprotect does nothing on the fake network -func (fn *FakeNetwork) Unprotect(id peer.ID, tag string) bool { - return false -} - -func (fn *FakeNetwork) Protocol(ctx context.Context, id peer.ID) (protocol.ID, error) { - return datatransfer.ProtocolDataTransfer1_2, nil -} diff --git a/testutil/testutil.go b/testutil/testutil.go index dec67a72..0c143969 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -10,10 +10,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" - "github.com/ipld/go-ipld-prime/datamodel" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - "github.com/ipld/go-ipld-prime/traversal/selector" - "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/jbenet/go-random" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" @@ -102,13 +98,6 @@ func AssertEqualSelector(t *testing.T, expectedRequest datatransfer.Request, req require.Equal(t, expectedSelector, selector) } -// AllSelector just returns a new instance of a "whole dag selector" -func AllSelector() datamodel.Node { - ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) - return ssb.ExploreRecursive(selector.RecursionLimitNone(), - ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() -} - // StartAndWaitForReady is a utility function to start a module and verify it reaches the ready state func StartAndWaitForReady(ctx context.Context, t *testing.T, manager datatransfer.Manager) { ready := make(chan error, 1) diff --git a/transport.go b/transport.go index dfadad56..546c2574 100644 --- a/transport.go +++ b/transport.go @@ -6,6 +6,17 @@ import ( ipld "github.com/ipld/go-ipld-prime" ) +// TransportID identifies a unique transport +type TransportID string + +// LegacyTransportID is the only transport for the fil/data-transfer protocol -- +// i.e. graphsync +const LegacyTransportID TransportID = "graphsync" + +// LegacyTransportVersion is the only transport version for the fil/data-transfer protocol -- +// i.e. graphsync 1.0.0 +var LegacyTransportVersion Version = Version{1, 0, 0} + // EventsHandler are semantic data transfer events that happen as a result of transport events type EventsHandler interface { // ChannelState queries for the current channel state @@ -94,6 +105,12 @@ and send messages. Beyond that, additional commands may or may not be supported. Whether a command is supported can be determined ahead by calling Capabilities(). */ type Transport interface { + // ID is a unique identifier for this transport + ID() TransportID + + // Versions indicates what versions of this transport are supported + Versions() []Version + // Capabilities tells datatransfer what kinds of capabilities this transport supports Capabilities() TransportCapabilities // OpenChannel opens a channel on a given transport to move data back and forth. diff --git a/transport/graphsync/extension/gsextension.go b/transport/graphsync/extension/gsextension.go index a8c34c35..dcf8153e 100644 --- a/transport/graphsync/extension/gsextension.go +++ b/transport/graphsync/extension/gsextension.go @@ -5,7 +5,6 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipld/go-ipld-prime/datamodel" - "github.com/libp2p/go-libp2p-core/protocol" datatransfer "github.com/filecoin-project/go-data-transfer/v2" "github.com/filecoin-project/go-data-transfer/v2/message" @@ -21,10 +20,10 @@ const ( ) // ProtocolMap maps graphsync extensions to their libp2p protocols -var ProtocolMap = map[graphsync.ExtensionName]protocol.ID{ - ExtensionIncomingRequest1_1: datatransfer.ProtocolDataTransfer1_2, - ExtensionOutgoingBlock1_1: datatransfer.ProtocolDataTransfer1_2, - ExtensionDataTransfer1_1: datatransfer.ProtocolDataTransfer1_2, +var ProtocolMap = map[graphsync.ExtensionName]datatransfer.Version{ + ExtensionIncomingRequest1_1: datatransfer.DataTransfer1_2, + ExtensionOutgoingBlock1_1: datatransfer.DataTransfer1_2, + ExtensionDataTransfer1_1: datatransfer.DataTransfer1_2, } // ToExtensionData converts a message to a graphsync extension @@ -35,7 +34,7 @@ func ToExtensionData(msg datatransfer.Message, supportedExtensions []graphsync.E if !ok { return nil, errors.New("unsupported protocol") } - versionedMsg, err := msg.MessageForProtocol(protoID) + versionedMsg, err := msg.MessageForVersion(protoID) if err != nil { continue } diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 058a2aec..07fe307e 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -15,13 +15,16 @@ import ( "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer/v2" - "github.com/filecoin-project/go-data-transfer/v2/network" dtchannel "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync/dtchannel" "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync/extension" + "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" ) var log = logging.Logger("dt_graphsync") +var transportID datatransfer.TransportID = "graphsync" +var supportedVersions = []datatransfer.Version{{Major: 1, Minor: 0, Patch: 0}} + // When restarting a data transfer, we cancel the existing graphsync request // before opening a new one. // This constant defines the maximum time to wait for the request to be @@ -103,6 +106,14 @@ func NewTransport(peerID peer.ID, gs graphsync.GraphExchange, dtNet network.Data return t } +func (t *Transport) ID() datatransfer.TransportID { + return transportID +} + +func (t *Transport) Versions() []datatransfer.Version { + return supportedVersions +} + func (t *Transport) Capabilities() datatransfer.TransportCapabilities { return datatransfer.TransportCapabilities{ Pausable: true, @@ -125,7 +136,7 @@ func (t *Transport) OpenChannel( channel.Selector(), req) } - return t.dtNet.SendMessage(ctx, channel.OtherPeer(), req) + return t.dtNet.SendMessage(ctx, channel.OtherPeer(), transportID, req) } // RestartChannel restarts a channel on the initiator side @@ -136,7 +147,7 @@ func (t *Transport) RestartChannel( req datatransfer.Request) error { log.Debugf("%s: re-establishing connection to %s", channelState.ChannelID(), channelState.OtherPeer()) start := time.Now() - err := t.dtNet.ConnectWithRetry(ctx, channelState.OtherPeer()) + err := t.dtNet.ConnectWithRetry(ctx, channelState.OtherPeer(), transportID) if err != nil { return xerrors.Errorf("%s: failed to reconnect to peer %s after %s: %w", channelState.ChannelID(), channelState.OtherPeer(), time.Since(start), err) @@ -155,7 +166,7 @@ func (t *Transport) RestartChannel( channelState.Selector(), req) } - return t.dtNet.SendMessage(ctx, channelState.OtherPeer(), req) + return t.dtNet.SendMessage(ctx, channelState.OtherPeer(), transportID, req) } func (t *Transport) openRequest( @@ -201,7 +212,7 @@ func (t *Transport) UpdateChannel(ctx context.Context, chid datatransfer.Channel ch, err := t.getDTChannel(chid) if err != nil { if update.SendMessage != nil && !update.Closed { - return t.dtNet.SendMessage(ctx, t.otherPeer(chid), update.SendMessage) + return t.dtNet.SendMessage(ctx, t.otherPeer(chid), transportID, update.SendMessage) } return err } @@ -221,7 +232,7 @@ func (t *Transport) UpdateChannel(ctx context.Context, chid datatransfer.Channel } if update.SendMessage != nil { - if err := t.dtNet.SendMessage(ctx, t.otherPeer(chid), update.SendMessage); err != nil { + if err := t.dtNet.SendMessage(ctx, t.otherPeer(chid), transportID, update.SendMessage); err != nil { return err } } @@ -239,7 +250,7 @@ func (t *Transport) UpdateChannel(ctx context.Context, chid datatransfer.Channel // SendMessage sends a data transfer message over the channel to the other peer func (t *Transport) SendMessage(ctx context.Context, chid datatransfer.ChannelID, msg datatransfer.Message) error { - return t.dtNet.SendMessage(ctx, t.otherPeer(chid), msg) + return t.dtNet.SendMessage(ctx, t.otherPeer(chid), transportID, msg) } // CleanupChannel is called on the otherside of a cancel - removes any associated @@ -286,7 +297,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error { t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterNetworkErrorListener(t.gsNetworkSendErrorListener)) t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterReceiverNetworkErrorListener(t.gsNetworkReceiveErrorListener)) - t.dtNet.SetDelegate(&receiver{t}) + t.dtNet.SetDelegate(transportID, supportedVersions, &receiver{t}) return nil } diff --git a/transport/graphsync/receiver.go b/transport/graphsync/receiver.go index 585a8859..ba9b1639 100644 --- a/transport/graphsync/receiver.go +++ b/transport/graphsync/receiver.go @@ -49,7 +49,7 @@ func (r *receiver) receiveRequest(ctx context.Context, initiator peer.ID, incomi if err != nil { if !initiateGraphsyncRequest { if response != nil { - return r.transport.dtNet.SendMessage(ctx, initiator, response) + return r.transport.dtNet.SendMessage(ctx, initiator, transportID, response) } return receiveErr } @@ -84,7 +84,7 @@ func (r *receiver) receiveRequest(ctx context.Context, initiator peer.ID, incomi return err } } else { - if err := r.transport.dtNet.SendMessage(ctx, initiator, response); err != nil { + if err := r.transport.dtNet.SendMessage(ctx, initiator, transportID, response); err != nil { return err } } diff --git a/network/interface.go b/transport/helpers/network/interface.go similarity index 52% rename from network/interface.go rename to transport/helpers/network/interface.go index 965a337d..022a1a37 100644 --- a/network/interface.go +++ b/transport/helpers/network/interface.go @@ -2,6 +2,8 @@ package network import ( "context" + "errors" + "strings" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -9,6 +11,32 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer/v2" ) +const ( + // ProtocolFilDataTransfer1_2 is the legacy filecoin data transfer protocol + // that assumes a graphsync transport + ProtocolFilDataTransfer1_2 protocol.ID = "/fil/datatransfer/1.2.0" + // ProtocolDataTransfer1_2 is the protocol identifier for current data transfer + // protocol which wraps transport information in the protocol + ProtocolDataTransfer1_2 protocol.ID = "/datatransfer/1.2.0" +) + +// ProtocolDescription describes how you are connected to a given +// peer on a given transport, if at all +type ProtocolDescription struct { + IsLegacy bool + MessageVersion datatransfer.Version + TransportVersion datatransfer.Version +} + +// MessageVersion extracts the message version from the full protocol +func MessageVersion(protocol protocol.ID) (datatransfer.Version, error) { + protocolParts := strings.Split(string(protocol), "/") + if len(protocolParts) == 0 { + return datatransfer.Version{}, errors.New("no protocol to parse") + } + return datatransfer.MessageVersionFromString(protocolParts[len(protocolParts)-1]) +} + // DataTransferNetwork provides network connectivity for GraphSync. type DataTransferNetwork interface { Protect(id peer.ID, tag string) @@ -18,11 +46,12 @@ type DataTransferNetwork interface { SendMessage( context.Context, peer.ID, + datatransfer.TransportID, datatransfer.Message) error // SetDelegate registers the Reciver to handle messages received from the // network. - SetDelegate(Receiver) + SetDelegate(datatransfer.TransportID, []datatransfer.Version, Receiver) // ConnectTo establishes a connection to the given peer ConnectTo(context.Context, peer.ID) error @@ -30,14 +59,14 @@ type DataTransferNetwork interface { // ConnectWithRetry establishes a connection to the given peer, retrying if // necessary, and opens a stream on the data-transfer protocol to verify // the peer will accept messages on the protocol - ConnectWithRetry(ctx context.Context, p peer.ID) error + ConnectWithRetry(ctx context.Context, p peer.ID, transportID datatransfer.TransportID) error // ID returns the peer id of this libp2p host ID() peer.ID // Protocol returns the protocol version of the peer, connecting to // the peer if necessary - Protocol(context.Context, peer.ID) (protocol.ID, error) + Protocol(context.Context, peer.ID, datatransfer.TransportID) (ProtocolDescription, error) } // Receiver is an interface for receiving messages from the GraphSyncNetwork. @@ -53,6 +82,4 @@ type Receiver interface { incoming datatransfer.Response) ReceiveRestartExistingChannelRequest(ctx context.Context, sender peer.ID, incoming datatransfer.Request) - - ReceiveError(error) } diff --git a/network/libp2p_impl.go b/transport/helpers/network/libp2p_impl.go similarity index 55% rename from network/libp2p_impl.go rename to transport/helpers/network/libp2p_impl.go index 1408ab2e..d4fe0edb 100644 --- a/network/libp2p_impl.go +++ b/transport/helpers/network/libp2p_impl.go @@ -2,8 +2,10 @@ package network import ( "context" + "errors" "fmt" "io" + "strings" "time" logging "github.com/ipfs/go-log/v2" @@ -16,7 +18,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" - "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer/v2" "github.com/filecoin-project/go-data-transfer/v2/message" @@ -43,7 +44,12 @@ const defaultMaxAttemptDuration = 5 * time.Minute const defaultBackoffFactor = 5 var defaultDataTransferProtocols = []protocol.ID{ - datatransfer.ProtocolDataTransfer1_2, + ProtocolDataTransfer1_2, + ProtocolFilDataTransfer1_2, +} + +func isLegacyProtocol(protocol protocol.ID) bool { + return protocol == ProtocolFilDataTransfer1_2 } // Option is an option for configuring the libp2p storage market network @@ -51,26 +57,26 @@ type Option func(*libp2pDataTransferNetwork) // DataTransferProtocols OVERWRITES the default libp2p protocols we use for data transfer with the given protocols. func DataTransferProtocols(protocols []protocol.ID) Option { - return func(impl *libp2pDataTransferNetwork) { - impl.setDataTransferProtocols(protocols) + return func(dtnet *libp2pDataTransferNetwork) { + dtnet.setDataTransferProtocols(protocols) } } // SendMessageParameters changes the default parameters around sending messages func SendMessageParameters(openStreamTimeout time.Duration, sendMessageTimeout time.Duration) Option { - return func(impl *libp2pDataTransferNetwork) { - impl.sendMessageTimeout = sendMessageTimeout - impl.openStreamTimeout = openStreamTimeout + return func(dtnet *libp2pDataTransferNetwork) { + dtnet.sendMessageTimeout = sendMessageTimeout + dtnet.openStreamTimeout = openStreamTimeout } } // RetryParameters changes the default parameters around connection reopening func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) Option { - return func(impl *libp2pDataTransferNetwork) { - impl.maxStreamOpenAttempts = attempts - impl.minAttemptDuration = minDuration - impl.maxAttemptDuration = maxDuration - impl.backoffFactor = backoffFactor + return func(dtnet *libp2pDataTransferNetwork) { + dtnet.maxStreamOpenAttempts = attempts + dtnet.minAttemptDuration = minDuration + dtnet.maxAttemptDuration = maxDuration + dtnet.backoffFactor = backoffFactor } } @@ -85,6 +91,8 @@ func NewFromLibp2pHost(host host.Host, options ...Option) DataTransferNetwork { minAttemptDuration: defaultMinAttemptDuration, maxAttemptDuration: defaultMaxAttemptDuration, backoffFactor: defaultBackoffFactor, + receivers: make(map[protocol.ID]receiverData), + transportProtocols: make(map[datatransfer.TransportID]transportProtocols), } dataTransferNetwork.setDataTransferProtocols(defaultDataTransferProtocols) @@ -95,44 +103,54 @@ func NewFromLibp2pHost(host host.Host, options ...Option) DataTransferNetwork { return &dataTransferNetwork } +type transportProtocols struct { + protocols []protocol.ID + protocolStrings []string +} + +type receiverData struct { + ProtocolDescription + transportID datatransfer.TransportID + receiver Receiver +} + // libp2pDataTransferNetwork transforms the libp2p host interface, which sends and receives // NetMessage objects, into the data transfer network interface. type libp2pDataTransferNetwork struct { host host.Host // inbound messages from the network are forwarded to the receiver - receiver Receiver - + receivers map[protocol.ID]receiverData + transportProtocols map[datatransfer.TransportID]transportProtocols openStreamTimeout time.Duration sendMessageTimeout time.Duration maxStreamOpenAttempts float64 minAttemptDuration time.Duration maxAttemptDuration time.Duration dtProtocols []protocol.ID - dtProtocolStrings []string backoffFactor float64 } -func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.ID, protocols ...protocol.ID) (network.Stream, error) { +func (dtnet *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.ID, protocols ...protocol.ID) (network.Stream, error) { b := &backoff.Backoff{ - Min: impl.minAttemptDuration, - Max: impl.maxAttemptDuration, - Factor: impl.backoffFactor, + Min: dtnet.minAttemptDuration, + Max: dtnet.maxAttemptDuration, + Factor: dtnet.backoffFactor, Jitter: true, } start := time.Now() for { - tctx, cancel := context.WithTimeout(ctx, impl.openStreamTimeout) + tctx, cancel := context.WithTimeout(ctx, dtnet.openStreamTimeout) defer cancel() // will use the first among the given protocols that the remote peer supports at := time.Now() - s, err := impl.host.NewStream(tctx, id, protocols...) + s, err := dtnet.host.NewStream(tctx, id, protocols...) if err == nil { nAttempts := b.Attempt() + 1 if b.Attempt() > 0 { log.Debugf("opened stream to %s on attempt %g of %g after %s", - id, nAttempts, impl.maxStreamOpenAttempts, time.Since(start)) + id, nAttempts, dtnet.maxStreamOpenAttempts, time.Since(start)) } return s, err @@ -140,13 +158,13 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I // b.Attempt() starts from zero nAttempts := b.Attempt() + 1 - if nAttempts >= impl.maxStreamOpenAttempts { - return nil, xerrors.Errorf("exhausted %g attempts but failed to open stream to %s, err: %w", impl.maxStreamOpenAttempts, id, err) + if nAttempts >= dtnet.maxStreamOpenAttempts { + return nil, fmt.Errorf("exhausted %g attempts but failed to open stream to %s, err: %w", dtnet.maxStreamOpenAttempts, id, err) } d := b.Duration() log.Warnf("failed to open stream to %s on attempt %g of %g after %s, waiting %s to try again, err: %s", - id, nAttempts, impl.maxStreamOpenAttempts, time.Since(at), d, err) + id, nAttempts, dtnet.maxStreamOpenAttempts, time.Since(at), d, err) select { case <-ctx.Done(): @@ -159,6 +177,7 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I func (dtnet *libp2pDataTransferNetwork) SendMessage( ctx context.Context, p peer.ID, + transportID datatransfer.TransportID, outgoing datatransfer.Message) error { ctx, span := otel.Tracer("data-transfer").Start(ctx, "sendMessage", trace.WithAttributes( @@ -173,22 +192,36 @@ func (dtnet *libp2pDataTransferNetwork) SendMessage( )) defer span.End() - s, err := dtnet.openStream(ctx, p, dtnet.dtProtocols...) + + transportProtocols, ok := dtnet.transportProtocols[transportID] + if !ok { + return datatransfer.ErrUnsupported + } + s, err := dtnet.openStream(ctx, p, transportProtocols.protocols...) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } - outgoing, err = outgoing.MessageForProtocol(s.Protocol()) + receiverData, ok := dtnet.receivers[s.Protocol()] + if !ok { + // this shouldn't happen, but let's be careful just in case to avoid a panic + err := errors.New("no receiver set") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + outgoing, err = outgoing.MessageForVersion(receiverData.MessageVersion) if err != nil { - err = xerrors.Errorf("failed to convert message for protocol: %w", err) + err = fmt.Errorf("failed to convert message for protocol: %w", err) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } - if err = dtnet.msgToStream(ctx, s, outgoing); err != nil { + if err = dtnet.msgToStream(ctx, s, outgoing, receiverData); err != nil { if err2 := s.Reset(); err2 != nil { log.Error(err) span.RecordError(err2) @@ -203,9 +236,55 @@ func (dtnet *libp2pDataTransferNetwork) SendMessage( return s.Close() } -func (dtnet *libp2pDataTransferNetwork) SetDelegate(r Receiver) { - dtnet.receiver = r - for _, p := range dtnet.dtProtocols { +func (dtnet *libp2pDataTransferNetwork) SetDelegate(transportID datatransfer.TransportID, versions []datatransfer.Version, r Receiver) { + transportProtocols := transportProtocols{} + for _, dtProtocol := range dtnet.dtProtocols { + messageVersion, _ := MessageVersion(dtProtocol) + if isLegacyProtocol(dtProtocol) { + if transportID == datatransfer.LegacyTransportID { + supportsLegacyVersion := false + for _, version := range versions { + if version == datatransfer.LegacyTransportVersion { + supportsLegacyVersion = true + break + } + } + if !supportsLegacyVersion { + continue + } + dtnet.receivers[dtProtocol] = receiverData{ + ProtocolDescription: ProtocolDescription{ + IsLegacy: true, + TransportVersion: datatransfer.LegacyTransportVersion, + MessageVersion: messageVersion, + }, + transportID: transportID, + receiver: r, + } + transportProtocols.protocols = append(transportProtocols.protocols, dtProtocol) + transportProtocols.protocolStrings = append(transportProtocols.protocolStrings, string(dtProtocol)) + } + } else { + for _, version := range versions { + joinedProtocol := strings.Join([]string{string(dtProtocol), string(transportID), version.String()}, "/") + dtnet.receivers[protocol.ID(joinedProtocol)] = receiverData{ + ProtocolDescription: ProtocolDescription{ + IsLegacy: false, + TransportVersion: version, + MessageVersion: messageVersion, + }, + transportID: transportID, + receiver: r, + } + transportProtocols.protocols = append(transportProtocols.protocols, protocol.ID(joinedProtocol)) + transportProtocols.protocolStrings = append(transportProtocols.protocolStrings, joinedProtocol) + } + } + } + + dtnet.transportProtocols[transportID] = transportProtocols + + for _, p := range transportProtocols.protocols { dtnet.host.SetStreamHandler(p, dtnet.handleNewStream) } } @@ -217,10 +296,14 @@ func (dtnet *libp2pDataTransferNetwork) ConnectTo(ctx context.Context, p peer.ID // ConnectWithRetry establishes a connection to the given peer, retrying if // necessary, and opens a stream on the data-transfer protocol to verify // the peer will accept messages on the protocol -func (dtnet *libp2pDataTransferNetwork) ConnectWithRetry(ctx context.Context, p peer.ID) error { +func (dtnet *libp2pDataTransferNetwork) ConnectWithRetry(ctx context.Context, p peer.ID, transportID datatransfer.TransportID) error { + transportProtocols, ok := dtnet.transportProtocols[transportID] + if !ok { + return datatransfer.ErrUnsupported + } // Open a stream over the data-transfer protocol, to make sure that the // peer is listening on the protocol - s, err := dtnet.openStream(ctx, p, dtnet.dtProtocols...) + s, err := dtnet.openStream(ctx, p, transportProtocols.protocols...) if err != nil { return err } @@ -234,25 +317,31 @@ func (dtnet *libp2pDataTransferNetwork) ConnectWithRetry(ctx context.Context, p func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) { defer s.Close() // nolint: errcheck,gosec - if dtnet.receiver == nil { + if len(dtnet.receivers) == 0 { s.Reset() // nolint: errcheck,gosec return } + receiverData, ok := dtnet.receivers[s.Protocol()] + if !ok { + s.Reset() // nolint: errcheck,gosec + return + } p := s.Conn().RemotePeer() + // if we have no transport handler, reset the stream for { var received datatransfer.Message var err error - switch s.Protocol() { - case datatransfer.ProtocolDataTransfer1_2: + if receiverData.IsLegacy { received, err = message.FromNet(s) + } else { + received, err = message.FromNetWrapped(s) } if err != nil { if err != io.EOF && err != io.ErrUnexpectedEOF { s.Reset() // nolint: errcheck,gosec - go dtnet.receiver.ReceiveError(err) - log.Debugf("net handleNewStream from %s error: %s", p, err) + log.Errorf("net handleNewStream from %s error: %s", p, err) } return } @@ -264,15 +353,15 @@ func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) { receivedRequest, ok := received.(datatransfer.Request) if ok { if receivedRequest.IsRestartExistingChannelRequest() { - dtnet.receiver.ReceiveRestartExistingChannelRequest(ctx, p, receivedRequest) + receiverData.receiver.ReceiveRestartExistingChannelRequest(ctx, p, receivedRequest) } else { - dtnet.receiver.ReceiveRequest(ctx, p, receivedRequest) + receiverData.receiver.ReceiveRequest(ctx, p, receivedRequest) } } } else { receivedResponse, ok := received.(datatransfer.Response) if ok { - dtnet.receiver.ReceiveResponse(ctx, p, receivedResponse) + receiverData.receiver.ReceiveResponse(ctx, p, receivedResponse) } } } @@ -290,7 +379,7 @@ func (dtnet *libp2pDataTransferNetwork) Unprotect(id peer.ID, tag string) bool { return dtnet.host.ConnManager().Unprotect(id, tag) } -func (dtnet *libp2pDataTransferNetwork) msgToStream(ctx context.Context, s network.Stream, msg datatransfer.Message) error { +func (dtnet *libp2pDataTransferNetwork) msgToStream(ctx context.Context, s network.Stream, msg datatransfer.Message, receiverData receiverData) error { if msg.IsRequest() { log.Debugf("Outgoing request message for transfer ID: %d", msg.TransferID()) } @@ -308,49 +397,52 @@ func (dtnet *libp2pDataTransferNetwork) msgToStream(ctx context.Context, s netwo } }() - switch s.Protocol() { - case datatransfer.ProtocolDataTransfer1_2: - default: - return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol()) + if !receiverData.IsLegacy { + msg = msg.WrappedForTransport(receiverData.transportID, receiverData.TransportVersion) } if err := msg.ToNet(s); err != nil { log.Debugf("error: %s", err) return err } - return nil } -func (impl *libp2pDataTransferNetwork) Protocol(ctx context.Context, id peer.ID) (protocol.ID, error) { +func (dtnet *libp2pDataTransferNetwork) Protocol(ctx context.Context, id peer.ID, transportID datatransfer.TransportID) (ProtocolDescription, error) { + transportProtocols, ok := dtnet.transportProtocols[transportID] + if !ok { + return ProtocolDescription{}, datatransfer.ErrUnsupported + } + // Check the cache for the peer's protocol version - firstProto, err := impl.host.Peerstore().FirstSupportedProtocol(id, impl.dtProtocolStrings...) + firstProto, err := dtnet.host.Peerstore().FirstSupportedProtocol(id, transportProtocols.protocolStrings...) if err != nil { - return "", err + return ProtocolDescription{}, err } if firstProto != "" { - return protocol.ID(firstProto), nil + receiverData, ok := dtnet.receivers[protocol.ID(firstProto)] + if !ok { + return ProtocolDescription{}, err + } + return receiverData.ProtocolDescription, nil } // The peer's protocol version is not in the cache, so connect to the peer. // Note that when the stream is opened, the peer's protocol will be added // to the cache. - s, err := impl.openStream(ctx, id, impl.dtProtocols...) + s, err := dtnet.openStream(ctx, id, dtnet.dtProtocols...) if err != nil { - return "", err + return ProtocolDescription{}, err } _ = s.Close() - - return s.Protocol(), nil + receiverData, ok := dtnet.receivers[s.Protocol()] + if !ok { + return ProtocolDescription{}, err + } + return receiverData.ProtocolDescription, nil } -func (impl *libp2pDataTransferNetwork) setDataTransferProtocols(protocols []protocol.ID) { - impl.dtProtocols = append([]protocol.ID{}, protocols...) - - // Keep a string version of the protocols for performance reasons - impl.dtProtocolStrings = make([]string, 0, len(impl.dtProtocols)) - for _, proto := range impl.dtProtocols { - impl.dtProtocolStrings = append(impl.dtProtocolStrings, string(proto)) - } +func (dtnet *libp2pDataTransferNetwork) setDataTransferProtocols(protocols []protocol.ID) { + dtnet.dtProtocols = append([]protocol.ID{}, protocols...) } diff --git a/network/libp2p_impl_test.go b/transport/helpers/network/libp2p_impl_test.go similarity index 91% rename from network/libp2p_impl_test.go rename to transport/helpers/network/libp2p_impl_test.go index 646ba915..13deb960 100644 --- a/network/libp2p_impl_test.go +++ b/transport/helpers/network/libp2p_impl_test.go @@ -21,8 +21,8 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer/v2" "github.com/filecoin-project/go-data-transfer/v2/message" "github.com/filecoin-project/go-data-transfer/v2/message/types" - "github.com/filecoin-project/go-data-transfer/v2/network" "github.com/filecoin-project/go-data-transfer/v2/testutil" + "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" ) // Receiver is an interface for receiving messages from the DataTransferNetwork. @@ -91,8 +91,8 @@ func TestMessageSendAndReceive(t *testing.T) { messageReceived: make(chan struct{}), connectedPeers: make(chan peer.ID, 2), } - dtnet1.SetDelegate(r) - dtnet2.SetDelegate(r) + dtnet1.SetDelegate("graphsync", []datatransfer.Version{datatransfer.LegacyTransportVersion}, r) + dtnet2.SetDelegate("graphsync", []datatransfer.Version{datatransfer.LegacyTransportVersion}, r) err = dtnet1.ConnectTo(ctx, host2.ID()) require.NoError(t, err) @@ -105,7 +105,7 @@ func TestMessageSendAndReceive(t *testing.T) { voucher := testutil.NewTestTypedVoucher() request, err := message.NewRequest(id, false, isPull, &voucher, baseCid, selector) require.NoError(t, err) - require.NoError(t, dtnet1.SendMessage(ctx, host2.ID(), request)) + require.NoError(t, dtnet1.SendMessage(ctx, host2.ID(), "graphsync", request)) select { case <-ctx.Done(): @@ -134,7 +134,7 @@ func TestMessageSendAndReceive(t *testing.T) { voucherResult := testutil.NewTestTypedVoucher() response, err := message.ValidationResultResponse(types.NewMessage, id, datatransfer.ValidationResult{Accepted: accepted, VoucherResult: &voucherResult}, nil, false) require.NoError(t, err) - require.NoError(t, dtnet2.SendMessage(ctx, host1.ID(), response)) + require.NoError(t, dtnet2.SendMessage(ctx, host1.ID(), "graphsync", response)) select { case <-ctx.Done(): @@ -161,7 +161,7 @@ func TestMessageSendAndReceive(t *testing.T) { Responder: peers[1], ID: id} request := message.RestartExistingChannelRequest(chId) - require.NoError(t, dtnet1.SendMessage(ctx, host2.ID(), request)) + require.NoError(t, dtnet1.SendMessage(ctx, host2.ID(), "graphsync", request)) select { case <-ctx.Done(): @@ -263,8 +263,8 @@ func TestSendMessageRetry(t *testing.T) { messageReceived: make(chan struct{}), connectedPeers: make(chan peer.ID, 2), } - dtnet1.SetDelegate(r) - dtnet2.SetDelegate(r) + dtnet1.SetDelegate("graphsync", []datatransfer.Version{datatransfer.LegacyTransportVersion}, r) + dtnet2.SetDelegate("graphsync", []datatransfer.Version{datatransfer.LegacyTransportVersion}, r) err = dtnet1.ConnectTo(ctx, host2.ID()) require.NoError(t, err) @@ -277,7 +277,7 @@ func TestSendMessageRetry(t *testing.T) { request, err := message.NewRequest(id, false, isPull, &voucher, baseCid, selector) require.NoError(t, err) - err = dtnet1.SendMessage(ctx, host2.ID(), request) + err = dtnet1.SendMessage(ctx, host2.ID(), "graphsync", request) if !tcase.expSuccess { require.Error(t, err) return