Skip to content

Commit

Permalink
Gossipsub v2.0: Handle INEED and send the message
Browse files Browse the repository at this point in the history
  • Loading branch information
ppopth committed Jan 22, 2025
1 parent 39b2e0d commit 3a7aa65
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 1 deletion.
32 changes: 32 additions & 0 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {

iwant := gs.handleIHave(rpc.from, ctl)
ihave := gs.handleIWant(rpc.from, ctl)
ihave = append(ihave, gs.handleINeed(rpc.from, ctl)...)
prune := gs.handleGraft(rpc.from, ctl)
gs.handleIAnnounce(rpc.from, ctl)
gs.handlePrune(rpc.from, ctl)
Expand Down Expand Up @@ -922,6 +923,37 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
return msgs
}

func (gs *GossipSubRouter) handleINeed(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
ihave := make(map[string]*pb.Message)
for _, ineed := range ctl.GetIneed() {
mid := ineed.GetMessageID()
// Check if that peer has sent IDONTWANT before, if so don't send them the message
if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok {
continue
}

msg, _, ok := gs.mcache.GetForPeer(mid, p)
if !ok {
continue
}
if !gs.p.peerFilter(p, msg.GetTopic()) {
continue
}
ihave[mid] = msg.Message
}

if len(ihave) == 0 {
return nil
}

msgs := make([]*pb.Message, 0, len(ihave))
for _, msg := range ihave {
msgs = append(msgs, msg)
}

return msgs
}

func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
var prune []string

Expand Down
171 changes: 170 additions & 1 deletion gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3338,7 +3338,7 @@ func TestGossipsubPruneMeshCorrectly(t *testing.T) {
}
}

// Test that IANNOUNCE is sent to mesh peers
// Test that IANNOUNCE is sent to mesh peers and no message is sent if it doesn't send INEED
func TestGossipsubIannounceMeshPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -3421,6 +3421,89 @@ func TestGossipsubIannounceMeshPeer(t *testing.T) {
<-ctx.Done()
}

// Test that IANNOUNCE is sent to mesh peers and the message is sent after sending INEED
func TestGossipsubIannounceIneedMeshPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 2)

msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}

params := DefaultGossipSubParams()
params.Dannounce = params.D
psub := getGossipsub(ctx, hosts[0], WithGossipSubParams(params), WithMessageIdFn(msgID))
_, err := psub.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

// Wait a bit after the last message before checking we got the right messages
msgTimer := time.NewTimer(1 * time.Second)

// Checks we received the right messages
msgCount := 0
checkMsgs := func() {
if msgCount != 1 {
t.Fatalf("Expected one message received, got %d", msgCount)
}
}

// Wait for the timer to expire
go func() {
select {
case <-msgTimer.C:
checkMsgs()
cancel()
return
case <-ctx.Done():
checkMsgs()
}
}()

newMockGS(ctx, t, hosts[1], func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// When the first peer connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the first peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
})

go func() {
// Wait for a short interval to make sure the first peer
// received and processed the subscribe + graft
time.Sleep(100 * time.Millisecond)
// Publish messages from the first peer
data := []byte("mymessage")
psub.Publish("foobar", data)
}()
}
}
if len(irpc.GetControl().GetIannounce()) > 0 {
var ineeds []*pb.ControlINeed
for _, iannounce := range irpc.GetControl().GetIannounce() {
mid := iannounce.GetMessageID()
ineed := &pb.ControlINeed{
MessageID: &mid,
}
ineeds = append(ineeds, ineed)
}
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Ineed: ineeds},
})
}
msgCount += len(irpc.GetPublish())
})

connect(t, hosts[0], hosts[1])

<-ctx.Done()
}

// Test that IANNOUNCE is sent to direct peers
func TestGossipsubIannounceDirectPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -4018,6 +4101,92 @@ func TestGossipsubIneedIndirectNonmeshPeers(t *testing.T) {
<-ctx.Done()
}

func TestSparseGossipsubV2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 20)

params := DefaultGossipSubParams()
params.Dannounce = params.D
psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params))

var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs = append(msgs, subch)
}

sparseConnect(t, hosts)

// wait for heartbeats to build mesh
time.Sleep(time.Second * 2)

for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))

owner := mrand.Intn(len(psubs))

psubs[owner].Publish("foobar", msg)

for _, sub := range msgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
}
}

func TestDenseGossipsubV2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 20)

params := DefaultGossipSubParams()
params.Dannounce = params.D
psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params))

var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs = append(msgs, subch)
}

denseConnect(t, hosts)

// wait for heartbeats to build mesh
time.Sleep(time.Second * 2)

for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))

owner := mrand.Intn(len(psubs))

psubs[owner].Publish("foobar", msg)

for _, sub := range msgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
}
}

func BenchmarkAllocDoDropRPC(b *testing.B) {
gs := GossipSubRouter{tracer: &pubsubTracer{}}

Expand Down

0 comments on commit 3a7aa65

Please sign in to comment.