Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queries With No Blocks #146

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
// for requests that have the same key. The data for the extension is a string key
ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key")

// ExtensionNoBlocks tells graphsync to send only metadata in a response,
// no blocks
ExtensionNoBlocks = ExtensionName("graphsync/no-blocks")

// GraphSync Response Status Codes

// Informational Response Codes (partial)
Expand Down
12 changes: 12 additions & 0 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context,
if err := qe.processDedupByKey(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processNoBlocks(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
Expand Down Expand Up @@ -174,6 +177,15 @@ func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, p pee
return nil
}

func (qe *queryExecutor) processNoBlocks(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
_, has := request.Extension(graphsync.ExtensionNoBlocks)
if !has {
return nil
}
qe.responseAssembler.IgnoreAllBlocks(p, request.ID())
return nil
}

func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
Expand Down
28 changes: 19 additions & 9 deletions responsemanager/responseassembler/peerlinktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ import (
)

type peerLinkTracker struct {
linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
altTrackers map[string]*linktracker.LinkTracker
dedupKeys map[graphsync.RequestID]string
linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
altTrackers map[string]*linktracker.LinkTracker
dedupKeys map[graphsync.RequestID]string
noBlockRequests map[graphsync.RequestID]struct{}
}

func newTracker() *peerLinkTracker {
return &peerLinkTracker{
linkTracker: linktracker.New(),
dedupKeys: make(map[graphsync.RequestID]string),
altTrackers: make(map[string]*linktracker.LinkTracker),
linkTracker: linktracker.New(),
dedupKeys: make(map[graphsync.RequestID]string),
altTrackers: make(map[string]*linktracker.LinkTracker),
noBlockRequests: make(map[graphsync.RequestID]struct{}),
}
}

Expand All @@ -44,6 +46,12 @@ func (prs *peerLinkTracker) DedupKey(requestID graphsync.RequestID, key string)
}
}

func (prs *peerLinkTracker) IgnoreAllBlocks(requestID graphsync.RequestID) {
prs.linkTrackerLk.Lock()
defer prs.linkTrackerLk.Unlock()
prs.noBlockRequests[requestID] = struct{}{}
}

// IgnoreBlocks indicates that a list of keys should be ignored when sending blocks
func (prs *peerLinkTracker) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
prs.linkTrackerLk.Lock()
Expand Down Expand Up @@ -74,16 +82,18 @@ func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool {
delete(prs.altTrackers, key)
}
}
delete(prs.noBlockRequests, requestID)
return allBlocks
}

// RecordLinkTraversal records whether a link is found for a request.
func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID,
link ipld.Link, hasBlock bool) (isUnique bool) {
prs.linkTrackerLk.Lock()
defer prs.linkTrackerLk.Unlock()
linkTracker := prs.getLinkTracker(requestID)
isUnique = linkTracker.BlockRefCount(link) == 0
_, noBlockRequest := prs.noBlockRequests[requestID]
isUnique = linkTracker.BlockRefCount(link) == 0 && !noBlockRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isUnique is now misleading. shouldSend?

linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prs.linkTrackerLk.Unlock()
return
}
5 changes: 5 additions & 0 deletions responsemanager/responseassembler/responseassembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (ra *ResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.Request
ra.GetProcess(p).(*peerLinkTracker).IgnoreBlocks(requestID, links)
}

// IgnoreAllBlocks indicates no blocks should be sent for a given request
func (ra *ResponseAssembler) IgnoreAllBlocks(p peer.ID, requestID graphsync.RequestID) {
ra.GetProcess(p).(*peerLinkTracker).IgnoreAllBlocks(requestID)
}

// Transaction builds a response, and queues it for sending in the next outgoing message
func (ra *ResponseAssembler) Transaction(p peer.ID, requestID graphsync.RequestID, transaction Transaction) error {
rb := &responseBuilder{
Expand Down
66 changes: 66 additions & 0 deletions responsemanager/responseassembler/responseassembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,72 @@ func TestResponseAssemblerIgnoreBlocks(t *testing.T) {

}

func TestResponseAssemblerIgnoreAllBlocks(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31())
requestID2 := graphsync.RequestID(rand.Int31())
blks := testutil.GenerateBlocksOfSize(5, 100)
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
fph := newFakePeerHandler(ctx, t)
responseAssembler := New(ctx, fph)

responseAssembler.IgnoreAllBlocks(p, requestID1)

var bd1, bd2, bd3 graphsync.BlockData
err := responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
bd1 = b.SendResponse(links[0], blks[0].RawData())
return nil
})
require.NoError(t, err)

assertSentNotOnWire(t, bd1, blks[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change this to assertNotSentOnWire? I assume this tests that the block is not sent at all, instead of it being sent but not on the wire like the name suggests.

fph.RefuteBlocks()
fph.AssertResponses(expectedResponses{requestID1: graphsync.PartialResponse})

err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error {
bd1 = b.SendResponse(links[0], blks[0].RawData())
return nil
})
require.NoError(t, err)
fph.AssertResponses(expectedResponses{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be an assertion that the blks[0] was sent? I would expect that it was since IgnoreAllBlocks has not been called for requestID2, and I assume that's the point of this second transaction. Some comments on what the different sections of this test are trying to achieve would be helpful.

requestID2: graphsync.PartialResponse,
})

err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
bd2 = b.SendResponse(links[1], blks[1].RawData())
bd3 = b.SendResponse(links[2], blks[2].RawData())
b.FinishRequest()
return nil
})
require.NoError(t, err)

assertSentNotOnWire(t, bd1, blks[0])
assertSentNotOnWire(t, bd2, blks[1])
assertSentNotOnWire(t, bd3, blks[2])

fph.RefuteBlocks()
fph.AssertResponses(expectedResponses{
requestID1: graphsync.RequestCompletedFull,
})

err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error {
b.SendResponse(links[3], blks[3].RawData())
b.FinishRequest()
return nil
})
require.NoError(t, err)

fph.AssertBlocks(blks[3])
fph.AssertResponses(expectedResponses{requestID2: graphsync.RequestCompletedFull})

}

func TestResponseAssemblerDupKeys(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down
1 change: 1 addition & 0 deletions responsemanager/responsemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type NetworkErrorListeners interface {
// ResponseAssembler is an interface that returns sender interfaces for peer responses.
type ResponseAssembler interface {
DedupKey(p peer.ID, requestID graphsync.RequestID, key string)
IgnoreAllBlocks(p peer.ID, requestID graphsync.RequestID)
IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link)
Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error
}
Expand Down
31 changes: 31 additions & 0 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,25 @@ func TestValidationAndExtensions(t *testing.T) {
td.assertCompleteRequestWithSuccess()
td.assertDedupKey("applesauce")
})
t.Run("no-blocks extension", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := td.newResponseManager()
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0),
graphsync.ExtensionData{
Name: graphsync.ExtensionNoBlocks,
Data: nil,
}),
}
responseManager.ProcessRequests(td.ctx, td.p, requests)
td.assertCompleteRequestWithSuccess()
td.assertNoBlocks()
})
t.Run("test pause/resume", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
Expand Down Expand Up @@ -712,6 +731,7 @@ type fakeResponseAssembler struct {
pausedRequests chan pausedRequest
clearedRequests chan clearedRequest
ignoredLinks chan []ipld.Link
noBlocks chan struct{}
notifeePublisher *testutil.MockPublisher
dedupKeys chan string
}
Expand All @@ -730,6 +750,10 @@ func (fra *fakeResponseAssembler) DedupKey(p peer.ID, requestID graphsync.Reques
fra.dedupKeys <- key
}

func (fra *fakeResponseAssembler) IgnoreAllBlocks(p peer.ID, requestID graphsync.RequestID) {
fra.noBlocks <- struct{}{}
}

type sentResponse struct {
requestID graphsync.RequestID
link ipld.Link
Expand Down Expand Up @@ -854,6 +878,7 @@ type testData struct {
clearedRequests chan clearedRequest
ignoredLinks chan []ipld.Link
dedupKeys chan string
noBlocks chan struct{}
responseAssembler *fakeResponseAssembler
queryQueue *fakeQueryQueue
extensionData []byte
Expand Down Expand Up @@ -900,6 +925,7 @@ func newTestData(t *testing.T) testData {
td.clearedRequests = make(chan clearedRequest, 1)
td.ignoredLinks = make(chan []ipld.Link, 1)
td.dedupKeys = make(chan string, 1)
td.noBlocks = make(chan struct{}, 1)
td.blockSends = make(chan graphsync.BlockData, td.blockChainLength*2)
td.completedResponseStatuses = make(chan graphsync.ResponseStatusCode, 1)
td.networkErrorChan = make(chan error, td.blockChainLength*2)
Expand All @@ -913,6 +939,7 @@ func newTestData(t *testing.T) testData {
ignoredLinks: td.ignoredLinks,
dedupKeys: td.dedupKeys,
notifeePublisher: td.notifeePublisher,
noBlocks: td.noBlocks,
}
td.queryQueue = &fakeQueryQueue{}

Expand Down Expand Up @@ -1077,6 +1104,10 @@ func (td *testData) assertDedupKey(key string) {
require.Equal(td.t, key, dedupKey)
}

func (td *testData) assertNoBlocks() {
testutil.AssertDoesReceive(td.ctx, td.t, td.noBlocks, "should ignore sending all blocks")
}

func (td *testData) assertIgnoredCids(set *cid.Set) {
var lastLinks []ipld.Link
testutil.AssertReceive(td.ctx, td.t, td.ignoredLinks, &lastLinks, "should send ignored links")
Expand Down