From 90d8a716977a93ec666df32f17b9ddc5c71b651e Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 21 Jan 2021 14:31:29 -0800 Subject: [PATCH 1/2] WIP --- graphsync.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/graphsync.go b/graphsync.go index 96a9ecd1..237e48ca 100644 --- a/graphsync.go +++ b/graphsync.go @@ -46,6 +46,10 @@ const ( // for requests that have the same key. The data for the extension is a string key ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key") + // ExtensionMetadataOnly tells graphsync to send only metadata in a response, + // no blocks + ExtensionMetadataOnly = ExtensionName("graphsync/metadata-only") + // GraphSync Response Status Codes // Informational Response Codes (partial) From 0f752bfcb6f218accacb0b3028b2df1490a3a15b Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 21 Jan 2021 19:18:44 -0800 Subject: [PATCH 2/2] feat(responsemanager): support NoBlocks extension --- graphsync.go | 4 +- responsemanager/queryexecutor.go | 12 ++++ .../responseassembler/peerlinktracker.go | 28 +++++--- .../responseassembler/responseassembler.go | 5 ++ .../responseassembler_test.go | 66 +++++++++++++++++++ responsemanager/responsemanager.go | 1 + responsemanager/responsemanager_test.go | 31 +++++++++ 7 files changed, 136 insertions(+), 11 deletions(-) diff --git a/graphsync.go b/graphsync.go index 237e48ca..40cc6c4d 100644 --- a/graphsync.go +++ b/graphsync.go @@ -46,9 +46,9 @@ const ( // for requests that have the same key. The data for the extension is a string key ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key") - // ExtensionMetadataOnly tells graphsync to send only metadata in a response, + // ExtensionNoBlocks tells graphsync to send only metadata in a response, // no blocks - ExtensionMetadataOnly = ExtensionName("graphsync/metadata-only") + ExtensionNoBlocks = ExtensionName("graphsync/no-blocks") // GraphSync Response Status Codes diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index 9e7d6054..308e94e1 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -140,6 +140,9 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context, if err := qe.processDedupByKey(request, p, failNotifee); err != nil { return nil, nil, false, err } + if err := qe.processNoBlocks(request, p, failNotifee); err != nil { + return nil, nil, false, err + } if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil { return nil, nil, false, err } @@ -174,6 +177,15 @@ func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, p pee return nil } +func (qe *queryExecutor) processNoBlocks(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { + _, has := request.Extension(graphsync.ExtensionNoBlocks) + if !has { + return nil + } + qe.responseAssembler.IgnoreAllBlocks(p, request.ID()) + return nil +} + func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) if !has { diff --git a/responsemanager/responseassembler/peerlinktracker.go b/responsemanager/responseassembler/peerlinktracker.go index 3d3c5bbe..f7d60ce3 100644 --- a/responsemanager/responseassembler/peerlinktracker.go +++ b/responsemanager/responseassembler/peerlinktracker.go @@ -10,17 +10,19 @@ import ( ) type peerLinkTracker struct { - linkTrackerLk sync.RWMutex - linkTracker *linktracker.LinkTracker - altTrackers map[string]*linktracker.LinkTracker - dedupKeys map[graphsync.RequestID]string + linkTrackerLk sync.RWMutex + linkTracker *linktracker.LinkTracker + altTrackers map[string]*linktracker.LinkTracker + dedupKeys map[graphsync.RequestID]string + noBlockRequests map[graphsync.RequestID]struct{} } func newTracker() *peerLinkTracker { return &peerLinkTracker{ - linkTracker: linktracker.New(), - dedupKeys: make(map[graphsync.RequestID]string), - altTrackers: make(map[string]*linktracker.LinkTracker), + linkTracker: linktracker.New(), + dedupKeys: make(map[graphsync.RequestID]string), + altTrackers: make(map[string]*linktracker.LinkTracker), + noBlockRequests: make(map[graphsync.RequestID]struct{}), } } @@ -44,6 +46,12 @@ func (prs *peerLinkTracker) DedupKey(requestID graphsync.RequestID, key string) } } +func (prs *peerLinkTracker) IgnoreAllBlocks(requestID graphsync.RequestID) { + prs.linkTrackerLk.Lock() + defer prs.linkTrackerLk.Unlock() + prs.noBlockRequests[requestID] = struct{}{} +} + // IgnoreBlocks indicates that a list of keys should be ignored when sending blocks func (prs *peerLinkTracker) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) { prs.linkTrackerLk.Lock() @@ -74,6 +82,7 @@ func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool { delete(prs.altTrackers, key) } } + delete(prs.noBlockRequests, requestID) return allBlocks } @@ -81,9 +90,10 @@ func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool { func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID, link ipld.Link, hasBlock bool) (isUnique bool) { prs.linkTrackerLk.Lock() + defer prs.linkTrackerLk.Unlock() linkTracker := prs.getLinkTracker(requestID) - isUnique = linkTracker.BlockRefCount(link) == 0 + _, noBlockRequest := prs.noBlockRequests[requestID] + isUnique = linkTracker.BlockRefCount(link) == 0 && !noBlockRequest linkTracker.RecordLinkTraversal(requestID, link, hasBlock) - prs.linkTrackerLk.Unlock() return } diff --git a/responsemanager/responseassembler/responseassembler.go b/responsemanager/responseassembler/responseassembler.go index 761f744d..85df1066 100644 --- a/responsemanager/responseassembler/responseassembler.go +++ b/responsemanager/responseassembler/responseassembler.go @@ -86,6 +86,11 @@ func (ra *ResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.Request ra.GetProcess(p).(*peerLinkTracker).IgnoreBlocks(requestID, links) } +// IgnoreAllBlocks indicates no blocks should be sent for a given request +func (ra *ResponseAssembler) IgnoreAllBlocks(p peer.ID, requestID graphsync.RequestID) { + ra.GetProcess(p).(*peerLinkTracker).IgnoreAllBlocks(requestID) +} + // Transaction builds a response, and queues it for sending in the next outgoing message func (ra *ResponseAssembler) Transaction(p peer.ID, requestID graphsync.RequestID, transaction Transaction) error { rb := &responseBuilder{ diff --git a/responsemanager/responseassembler/responseassembler_test.go b/responsemanager/responseassembler/responseassembler_test.go index 0c5b0c40..5ad9efab 100644 --- a/responsemanager/responseassembler/responseassembler_test.go +++ b/responsemanager/responseassembler/responseassembler_test.go @@ -259,6 +259,72 @@ func TestResponseAssemblerIgnoreBlocks(t *testing.T) { } +func TestResponseAssemblerIgnoreAllBlocks(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + p := testutil.GeneratePeers(1)[0] + requestID1 := graphsync.RequestID(rand.Int31()) + requestID2 := graphsync.RequestID(rand.Int31()) + blks := testutil.GenerateBlocksOfSize(5, 100) + links := make([]ipld.Link, 0, len(blks)) + for _, block := range blks { + links = append(links, cidlink.Link{Cid: block.Cid()}) + } + fph := newFakePeerHandler(ctx, t) + responseAssembler := New(ctx, fph) + + responseAssembler.IgnoreAllBlocks(p, requestID1) + + var bd1, bd2, bd3 graphsync.BlockData + err := responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error { + bd1 = b.SendResponse(links[0], blks[0].RawData()) + return nil + }) + require.NoError(t, err) + + assertSentNotOnWire(t, bd1, blks[0]) + fph.RefuteBlocks() + fph.AssertResponses(expectedResponses{requestID1: graphsync.PartialResponse}) + + err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error { + bd1 = b.SendResponse(links[0], blks[0].RawData()) + return nil + }) + require.NoError(t, err) + fph.AssertResponses(expectedResponses{ + requestID2: graphsync.PartialResponse, + }) + + err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error { + bd2 = b.SendResponse(links[1], blks[1].RawData()) + bd3 = b.SendResponse(links[2], blks[2].RawData()) + b.FinishRequest() + return nil + }) + require.NoError(t, err) + + assertSentNotOnWire(t, bd1, blks[0]) + assertSentNotOnWire(t, bd2, blks[1]) + assertSentNotOnWire(t, bd3, blks[2]) + + fph.RefuteBlocks() + fph.AssertResponses(expectedResponses{ + requestID1: graphsync.RequestCompletedFull, + }) + + err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error { + b.SendResponse(links[3], blks[3].RawData()) + b.FinishRequest() + return nil + }) + require.NoError(t, err) + + fph.AssertBlocks(blks[3]) + fph.AssertResponses(expectedResponses{requestID2: graphsync.RequestCompletedFull}) + +} + func TestResponseAssemblerDupKeys(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 608ae232..9cdb9905 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -106,6 +106,7 @@ type NetworkErrorListeners interface { // ResponseAssembler is an interface that returns sender interfaces for peer responses. type ResponseAssembler interface { DedupKey(p peer.ID, requestID graphsync.RequestID, key string) + IgnoreAllBlocks(p peer.ID, requestID graphsync.RequestID) IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link) Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error } diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index dcfc216b..98c31c34 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -304,6 +304,25 @@ func TestValidationAndExtensions(t *testing.T) { td.assertCompleteRequestWithSuccess() td.assertDedupKey("applesauce") }) + t.Run("no-blocks extension", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := td.newResponseManager() + responseManager.Startup() + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + requests := []gsmsg.GraphSyncRequest{ + gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), + graphsync.ExtensionData{ + Name: graphsync.ExtensionNoBlocks, + Data: nil, + }), + } + responseManager.ProcessRequests(td.ctx, td.p, requests) + td.assertCompleteRequestWithSuccess() + td.assertNoBlocks() + }) t.Run("test pause/resume", func(t *testing.T) { td := newTestData(t) defer td.cancel() @@ -712,6 +731,7 @@ type fakeResponseAssembler struct { pausedRequests chan pausedRequest clearedRequests chan clearedRequest ignoredLinks chan []ipld.Link + noBlocks chan struct{} notifeePublisher *testutil.MockPublisher dedupKeys chan string } @@ -730,6 +750,10 @@ func (fra *fakeResponseAssembler) DedupKey(p peer.ID, requestID graphsync.Reques fra.dedupKeys <- key } +func (fra *fakeResponseAssembler) IgnoreAllBlocks(p peer.ID, requestID graphsync.RequestID) { + fra.noBlocks <- struct{}{} +} + type sentResponse struct { requestID graphsync.RequestID link ipld.Link @@ -854,6 +878,7 @@ type testData struct { clearedRequests chan clearedRequest ignoredLinks chan []ipld.Link dedupKeys chan string + noBlocks chan struct{} responseAssembler *fakeResponseAssembler queryQueue *fakeQueryQueue extensionData []byte @@ -900,6 +925,7 @@ func newTestData(t *testing.T) testData { td.clearedRequests = make(chan clearedRequest, 1) td.ignoredLinks = make(chan []ipld.Link, 1) td.dedupKeys = make(chan string, 1) + td.noBlocks = make(chan struct{}, 1) td.blockSends = make(chan graphsync.BlockData, td.blockChainLength*2) td.completedResponseStatuses = make(chan graphsync.ResponseStatusCode, 1) td.networkErrorChan = make(chan error, td.blockChainLength*2) @@ -913,6 +939,7 @@ func newTestData(t *testing.T) testData { ignoredLinks: td.ignoredLinks, dedupKeys: td.dedupKeys, notifeePublisher: td.notifeePublisher, + noBlocks: td.noBlocks, } td.queryQueue = &fakeQueryQueue{} @@ -1077,6 +1104,10 @@ func (td *testData) assertDedupKey(key string) { require.Equal(td.t, key, dedupKey) } +func (td *testData) assertNoBlocks() { + testutil.AssertDoesReceive(td.ctx, td.t, td.noBlocks, "should ignore sending all blocks") +} + func (td *testData) assertIgnoredCids(set *cid.Set) { var lastLinks []ipld.Link testutil.AssertReceive(td.ctx, td.t, td.ignoredLinks, &lastLinks, "should send ignored links")