From 19a47a1ac1f58de30cc3d987850f39633fc51711 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 26 Jun 2024 06:18:44 +0530 Subject: [PATCH] feat: modify peer-manager to consider relay target peers (#1135) --- logging/logging.go | 6 + waku/v2/node/wakunode2.go | 6 +- waku/v2/node/wakuoptions.go | 5 +- waku/v2/peermanager/peer_connector.go | 6 +- waku/v2/peermanager/peer_discovery.go | 2 +- waku/v2/peermanager/peer_manager.go | 133 ++++++++++-------- waku/v2/peermanager/peer_manager_test.go | 6 +- waku/v2/peermanager/topic_event_handler.go | 6 +- .../peermanager/topic_event_handler_test.go | 9 +- waku/v2/protocol/filter/test_utils.go | 2 +- .../legacy_store/waku_store_client_test.go | 2 +- .../protocol/lightpush/waku_lightpush_test.go | 2 +- waku/v2/protocol/metadata/waku_metadata.go | 19 ++- .../peer_exchange/waku_peer_exchange_test.go | 4 +- waku/v2/protocol/store/client_test.go | 2 +- 15 files changed, 122 insertions(+), 88 deletions(-) diff --git a/logging/logging.go b/logging/logging.go index 7e872eefc..19732d55f 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -8,6 +8,7 @@ package logging import ( "encoding/hex" + "fmt" "net" "time" @@ -147,3 +148,8 @@ func TCPAddr(key string, ip net.IP, port int) zap.Field { func UDPAddr(key string, ip net.IP, port int) zap.Field { return zap.Stringer(key, &net.UDPAddr{IP: ip, Port: port}) } + +func Uint64(key string, value uint64) zap.Field { + valueStr := fmt.Sprintf("%v", value) + return zap.String(key, valueStr) +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index e39c2f064..d8c280c17 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -256,7 +256,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.metadata = metadata //Initialize peer manager. - w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, w.log) + w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, params.enableRelay, w.log) w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log) if err != nil { @@ -554,9 +554,9 @@ func (w *WakuNode) watchENRChanges(ctx context.Context) { currNodeVal := w.localNode.Node().String() if prevNodeVal != currNodeVal { if prevNodeVal == "" { - w.log.Info("enr record", logging.ENode("enr", w.localNode.Node())) + w.log.Info("local node enr record", logging.ENode("enr", w.localNode.Node())) } else { - w.log.Info("new enr record", logging.ENode("enr", w.localNode.Node())) + w.log.Info("local node new enr record", logging.ENode("enr", w.localNode.Node())) } prevNodeVal = currNodeVal } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 7c1c2e45d..26a82d0d0 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -45,6 +45,8 @@ const UserAgent string = "go-waku" const defaultMinRelayPeersToPublish = 0 const DefaultMaxConnectionsPerIP = 5 +const DefaultMaxConnections = 300 +const DefaultMaxPeerStoreCapacity = 300 type WakuNodeParameters struct { hostAddr *net.TCPAddr @@ -127,9 +129,10 @@ type WakuNodeOption func(*WakuNodeParameters) error // Default options used in the libp2p node var DefaultWakuNodeOptions = []WakuNodeOption{ WithPrometheusRegisterer(prometheus.NewRegistry()), - WithMaxPeerConnections(50), + WithMaxPeerConnections(DefaultMaxConnections), WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP), WithCircuitRelayParams(2*time.Second, 3*time.Minute), + WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity), WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)), } diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index b5eb45aab..ebe808e85 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -18,7 +18,6 @@ import ( "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/onlinechecker" wps "github.com/waku-org/go-waku/waku/v2/peerstore" - waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" @@ -134,10 +133,9 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) { triggerImmediateConnection := false //Not connecting to peer as soon as it is discovered, // rather expecting this to be pushed from PeerManager based on the need. - if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin { + if len(c.host.Network().Peers()) < c.pm.OutPeersTarget { triggerImmediateConnection = true } - c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID)) c.pm.AddDiscoveredPeer(p, triggerImmediateConnection) case <-time.After(1 * time.Second): @@ -238,7 +236,7 @@ func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) { func (c *PeerConnectionStrategy) dialPeers() { defer c.WaitGroup().Done() - maxGoRoutines := c.pm.OutRelayPeersTarget + maxGoRoutines := c.pm.OutPeersTarget if maxGoRoutines > maxActiveDials { maxGoRoutines = maxActiveDials } diff --git a/waku/v2/peermanager/peer_discovery.go b/waku/v2/peermanager/peer_discovery.go index d1cedac29..ae18907c0 100644 --- a/waku/v2/peermanager/peer_discovery.go +++ b/waku/v2/peermanager/peer_discovery.go @@ -55,7 +55,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol] if !ok { - pm.logger.Error("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) + pm.logger.Warn("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) return nil, errors.New("cannot do on demand discovery for non-waku protocol") } iterator, err := pm.discoveryService.PeerIterator( diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 554688da8..173663c07 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -73,8 +73,8 @@ type PeerManager struct { maxPeers int maxRelayPeers int logger *zap.Logger - InRelayPeersTarget int - OutRelayPeersTarget int + InPeersTarget int + OutPeersTarget int host host.Host serviceSlots *ServiceSlots ctx context.Context @@ -85,6 +85,7 @@ type PeerManager struct { wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo TopicHealthNotifCh chan<- TopicHealthStatus rttCache *FastestPeerSelector + RelayEnabled bool } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -127,7 +128,7 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p) if err == nil { if pThreshold < relay.PeerPublishThreshold { - pm.logger.Debug("peer score below publish threshold", logging.HostID("peer", p), zap.Float64("score", pThreshold)) + pm.logger.Debug("peer score below publish threshold", zap.Stringer("peer", p), zap.Float64("score", pThreshold)) } else { healthyPeerCount++ } @@ -135,14 +136,15 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { if errors.Is(err, peerstore.ErrNotFound) { // For now considering peer as healthy if we can't fetch score. healthyPeerCount++ - pm.logger.Debug("peer score is not available yet", logging.HostID("peer", p)) + pm.logger.Debug("peer score is not available yet", zap.Stringer("peer", p)) } else { - pm.logger.Warn("failed to fetch peer score ", zap.Error(err), logging.HostID("peer", p)) + pm.logger.Warn("failed to fetch peer score ", zap.Error(err), zap.Stringer("peer", p)) } } } } //Update topic's health + //TODO: This should be done based on number of full-mesh peers. oldHealth := topic.healthStatus if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now. topic.healthStatus = UnHealthy @@ -174,31 +176,38 @@ func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) { } // NewPeerManager creates a new peerManager instance. -func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, logger *zap.Logger) *PeerManager { - - maxRelayPeers, _ := relayAndServicePeers(maxConnections) - inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers) - - if maxPeers == 0 || maxConnections > maxPeers { - maxPeers = maxConnsToPeerRatio * maxConnections +func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, relayEnabled bool, logger *zap.Logger) *PeerManager { + var inPeersTarget, outPeersTarget, maxRelayPeers int + if relayEnabled { + maxRelayPeers, _ := relayAndServicePeers(maxConnections) + inPeersTarget, outPeersTarget = inAndOutRelayPeers(maxRelayPeers) + + if maxPeers == 0 || maxConnections > maxPeers { + maxPeers = maxConnsToPeerRatio * maxConnections + } + } else { + maxRelayPeers = 0 + inPeersTarget = 0 + //TODO: ideally this should be 2 filter peers per topic, 2 lightpush peers per topic and 2-4 store nodes per topic + outPeersTarget = 10 } - pm := &PeerManager{ logger: logger.Named("peer-manager"), metadata: metadata, maxRelayPeers: maxRelayPeers, - InRelayPeersTarget: inRelayPeersTarget, - OutRelayPeersTarget: outRelayPeersTarget, + InPeersTarget: inPeersTarget, + OutPeersTarget: outPeersTarget, serviceSlots: NewServiceSlot(), subRelayTopics: make(map[string]*NodeTopicDetails), maxPeers: maxPeers, wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{}, rttCache: NewFastestPeerSelector(logger), + RelayEnabled: relayEnabled, } logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), zap.Int("maxRelayPeers", maxRelayPeers), - zap.Int("outRelayPeersTarget", outRelayPeersTarget), - zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget), + zap.Int("outPeersTarget", outPeersTarget), + zap.Int("inPeersTarget", pm.InPeersTarget), zap.Int("maxPeers", maxPeers)) return pm @@ -225,7 +234,7 @@ func (pm *PeerManager) Start(ctx context.Context) { pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) pm.ctx = ctx - if pm.sub != nil { + if pm.sub != nil && pm.RelayEnabled { go pm.peerEventLoop(ctx) } go pm.connectivityLoop(ctx) @@ -233,7 +242,7 @@ func (pm *PeerManager) Start(ctx context.Context) { // This is a connectivity loop, which currently checks and prunes inbound connections. func (pm *PeerManager) connectivityLoop(ctx context.Context) { - pm.connectToRelayPeers() + pm.connectToPeers() t := time.NewTicker(peerConnectivityLoopSecs * time.Second) defer t.Stop() for { @@ -241,7 +250,7 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) { case <-ctx.Done(): return case <-t.C: - pm.connectToRelayPeers() + pm.connectToPeers() } } } @@ -262,7 +271,7 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers } } else { pm.logger.Error("failed to retrieve peer direction", - logging.HostID("peerID", p), zap.Error(err)) + zap.Stringer("peerID", p), zap.Error(err)) } } return inPeers, outPeers, nil @@ -302,10 +311,10 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { // match those peers that are currently connected curPeerLen := pm.checkAndUpdateTopicHealth(topicInst) - if curPeerLen < waku_proto.GossipSubDMin { - pm.logger.Debug("subscribed topic is not sufficiently healthy, initiating more connections to maintain health", + if curPeerLen < pm.OutPeersTarget { + pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh", zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), - zap.Int("optimumPeers", waku_proto.GossipSubDMin)) + zap.Int("targetPeers", pm.OutPeersTarget)) //Find not connected peers. notConnectedPeers := pm.getNotConnectedPers(topicStr) if notConnectedPeers.Len() == 0 { @@ -315,35 +324,42 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { } pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr)) //Connect to eligible peers. - numPeersToConnect := waku_proto.GossipSubDMin - curPeerLen + numPeersToConnect := pm.OutPeersTarget - curPeerLen if numPeersToConnect > notConnectedPeers.Len() { numPeersToConnect = notConnectedPeers.Len() } - pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) + pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect]) } } } -// connectToRelayPeers ensures minimum D connections are there for each pubSubTopic. +// connectToPeers ensures minimum D connections are there for each pubSubTopic. // If not, initiates connections to additional peers. // It also checks for incoming relay connections and prunes once they cross inRelayTarget -func (pm *PeerManager) connectToRelayPeers() { - //Check for out peer connections and connect to more peers. - pm.ensureMinRelayConnsPerTopic() - - inRelayPeers, outRelayPeers := pm.getRelayPeers() - pm.logger.Debug("number of relay peers connected", - zap.Int("in", inRelayPeers.Len()), - zap.Int("out", outRelayPeers.Len())) - if inRelayPeers.Len() > 0 && - inRelayPeers.Len() > pm.InRelayPeersTarget { - pm.pruneInRelayConns(inRelayPeers) +func (pm *PeerManager) connectToPeers() { + if pm.RelayEnabled { + //Check for out peer connections and connect to more peers. + pm.ensureMinRelayConnsPerTopic() + + inRelayPeers, outRelayPeers := pm.getRelayPeers() + pm.logger.Debug("number of relay peers connected", + zap.Int("in", inRelayPeers.Len()), + zap.Int("out", outRelayPeers.Len())) + if inRelayPeers.Len() > 0 && + inRelayPeers.Len() > pm.InPeersTarget { + pm.pruneInRelayConns(inRelayPeers) + } + } else { + //TODO: Connect to filter peers per topic as of now. + //Fetch filter peers from peerStore, TODO: topics for lightNode not available here? + //Filter subscribe to notify peerManager whenever a new topic/shard is subscribed to. + pm.logger.Debug("light mode..not doing anything") } } -// connectToPeers connects to peers provided in the list if the addresses have not expired. -func (pm *PeerManager) connectToPeers(peers peer.IDSlice) { +// connectToSpecifiedPeers connects to peers provided in the list if the addresses have not expired. +func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) { for _, peerID := range peers { peerData := AddrInfoToPeerData(wps.PeerManager, peerID, pm.host) if peerData == nil { @@ -377,16 +393,16 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { //TODO: Need to have more intelligent way of doing this, maybe peer scores. //TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers. pm.logger.Info("peer connections exceed target relay peers, hence pruning", - zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InRelayPeersTarget)) - for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ { + zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InPeersTarget)) + for pruningStartIndex := pm.InPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ { p := inRelayPeers[pruningStartIndex] err := pm.host.Network().ClosePeer(p) if err != nil { pm.logger.Warn("failed to disconnect connection towards peer", - logging.HostID("peerID", p)) + zap.Stringer("peerID", p)) } pm.logger.Debug("successfully disconnected connection towards peer", - logging.HostID("peerID", p)) + zap.Stringer("peerID", p)) } } @@ -394,7 +410,7 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { shards, err := wenr.RelaySharding(p.ENR.Record()) if err != nil { pm.logger.Error("could not derive relayShards from ENR", zap.Error(err), - logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } else { if shards != nil { p.PubsubTopics = make([]string, 0) @@ -404,7 +420,7 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { p.PubsubTopics = append(p.PubsubTopics, topicStr) } } else { - pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) + pm.logger.Debug("ENR doesn't have relay shards", zap.Stringer("peer", p.AddrInfo.ID)) } } supportedProtos := []protocol.ID{} @@ -430,6 +446,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { return } + //Check if the peer is already present, if so skip adding _, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID) if err == nil { @@ -447,16 +464,17 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { } //Peer is already in peer-store but stored ENR is older than discovered one. pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored", - logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq())) + zap.Stringer("peer", p.AddrInfo.ID), logging.Uint64("newENRSeq", p.ENR.Record().Seq()), logging.Uint64("storedENRSeq", enr.Record().Seq())) } else { - pm.logger.Info("peer already found in peerstore, but no new ENR", logging.HostID("peer", p.AddrInfo.ID)) + pm.logger.Info("peer already found in peerstore, but no new ENR", zap.Stringer("peer", p.AddrInfo.ID)) } } else { //Peer is in peer-store but it doesn't have an enr pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding", - logging.HostID("peer", p.AddrInfo.ID)) + zap.Stringer("peer", p.AddrInfo.ID)) } } + pm.logger.Debug("adding discovered peer", zap.Stringer("peerID", p.AddrInfo.ID)) supportedProtos := []protocol.ID{} if len(p.PubsubTopics) == 0 && p.ENR != nil { @@ -467,14 +485,15 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...) if p.ENR != nil { + pm.logger.Debug("setting ENR for peer", zap.Stringer("peerID", p.AddrInfo.ID), zap.Stringer("enr", p.ENR)) err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) if err != nil { pm.logger.Error("could not store enr", zap.Error(err), - logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } } if connectNow { - pm.logger.Debug("connecting now to discovered peer", logging.HostID("peer", p.AddrInfo.ID)) + pm.logger.Debug("connecting now to discovered peer", zap.Stringer("peer", p.AddrInfo.ID)) go pm.peerConnector.PushToChan(p) } } @@ -483,10 +502,10 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { // It also sets additional metadata such as origin and supported protocols func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { - pm.logger.Error("could not add peer as peer store capacity is reached", logging.HostID("peer", ID), zap.Int("capacity", pm.maxPeers)) + pm.logger.Error("could not add peer as peer store capacity is reached", zap.Stringer("peer", ID), zap.Int("capacity", pm.maxPeers)) return errors.New("peer store capacity reached") } - pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID)) + pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID)) if origin == wps.Static { pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL) } else { @@ -496,14 +515,14 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig } err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin) if err != nil { - pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID)) + pm.logger.Error("could not set origin", zap.Error(err), zap.Stringer("peer", ID)) return err } if len(protocols) > 0 { err = pm.host.Peerstore().AddProtocols(ID, protocols...) if err != nil { - pm.logger.Error("could not set protocols", zap.Error(err), logging.HostID("peer", ID)) + pm.logger.Error("could not set protocols", zap.Error(err), zap.Stringer("peer", ID)) return err } } @@ -515,7 +534,7 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig err = pm.host.Peerstore().(wps.WakuPeerstore).SetPubSubTopics(ID, pubSubTopics) if err != nil { pm.logger.Error("could not store pubSubTopic", zap.Error(err), - logging.HostID("peer", ID), zap.Strings("topics", pubSubTopics)) + zap.Stringer("peer", ID), zap.Strings("topics", pubSubTopics)) } return nil } @@ -593,7 +612,7 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { //For now adding the peer to serviceSlot which means the latest added peer would be given priority. //TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc. - pm.logger.Info("adding peer to service slots", logging.HostID("peer", peerID), + pm.logger.Info("adding peer to service slots", zap.Stringer("peer", peerID), zap.String("service", string(proto))) // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol pm.serviceSlots.getPeers(proto).add(peerID) diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 5e73ef497..3c3596eb9 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -31,7 +31,7 @@ func initTest(t *testing.T) (context.Context, *PeerManager, func()) { require.NoError(t, err) // host 1 is used by peer manager - pm := NewPeerManager(10, 20, nil, utils.Logger()) + pm := NewPeerManager(10, 20, nil, true, utils.Logger()) pm.SetHost(h1) return ctx, pm, func() { @@ -229,7 +229,7 @@ func TestConnectToRelayPeers(t *testing.T) { defer deferFn() - pm.connectToRelayPeers() + pm.connectToPeers() } @@ -253,7 +253,7 @@ func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrF err = wenr.Update(utils.Logger(), localNode, wenr.WithWakuRelaySharding(rs[0])) require.NoError(t, err) - pm := NewPeerManager(10, 20, nil, logger) + pm := NewPeerManager(10, 20, nil, true, logger) pm.SetHost(host) peerconn, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, logger) require.NoError(t, err) diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go index 3060bf7b9..1b965ef03 100644 --- a/waku/v2/peermanager/topic_event_handler.go +++ b/waku/v2/peermanager/topic_event_handler.go @@ -48,7 +48,9 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubsubTopic]) - if connectedPeers >= waku_proto.GossipSubDMin { //TODO: Use a config rather than hard-coding. + //Leaving this logic based on gossipSubDMin as this is a good start for a subscribed topic. + // subsequent connectivity loop iteration would initiate more connections which should take it towards a healthy mesh. + if connectedPeers >= waku_proto.GossipSubDMin { // Should we use optimal number or define some sort of a config for the node to choose from? // A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has // or bandwidth it can support. @@ -70,7 +72,7 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic } //For now all peers are being given same priority, // Later we may want to choose peers that have more shards in common over others. - pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) + pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect]) } else { triggerDiscovery = true } diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 8fc8da946..f072019bc 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -3,6 +3,9 @@ package peermanager import ( "context" "crypto/rand" + "testing" + "time" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -17,8 +20,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "testing" - "time" ) func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) { @@ -44,7 +45,7 @@ func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) { // Host 1 used by peer manager - pm := NewPeerManager(10, 20, nil, utils.Logger()) + pm := NewPeerManager(10, 20, nil, true, utils.Logger()) pm.SetHost(*h) // Create a new relay event bus @@ -77,7 +78,7 @@ func TestSubscribeToRelayEvtBus(t *testing.T) { r, h1, _ := makeWakuRelay(t, log) // Host 1 used by peer manager - pm := NewPeerManager(10, 20, nil, utils.Logger()) + pm := NewPeerManager(10, 20, nil, true, utils.Logger()) pm.SetHost(h1) // Create a new relay event bus diff --git a/waku/v2/protocol/filter/test_utils.go b/waku/v2/protocol/filter/test_utils.go index 898eacf7a..361ab561b 100644 --- a/waku/v2/protocol/filter/test_utils.go +++ b/waku/v2/protocol/filter/test_utils.go @@ -165,7 +165,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { s.Require().NoError(err) b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) - pm := peermanager.NewPeerManager(5, 5, nil, s.Log) + pm := peermanager.NewPeerManager(5, 5, nil, true, s.Log) filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log) filterPush.SetHost(host) pm.SetHost(host) diff --git a/waku/v2/protocol/legacy_store/waku_store_client_test.go b/waku/v2/protocol/legacy_store/waku_store_client_test.go index c947a64dc..7d9ebcbad 100644 --- a/waku/v2/protocol/legacy_store/waku_store_client_test.go +++ b/waku/v2/protocol/legacy_store/waku_store_client_test.go @@ -36,7 +36,7 @@ func TestQueryOptions(t *testing.T) { require.NoError(t, err) // Let peer manager reside at host - pm := peermanager.NewPeerManager(5, 5, nil, utils.Logger()) + pm := peermanager.NewPeerManager(5, 5, nil, true, utils.Logger()) pm.SetHost(host) // Add host2 to peerstore diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 8b3d93bb1..99525b1ec 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -237,7 +237,7 @@ func TestWakuLightPushCornerCases(t *testing.T) { testContentTopic := "/test/10/my-lp-app/proto" // Prepare peer manager instance to include in test - pm := peermanager.NewPeerManager(10, 10, nil, utils.Logger()) + pm := peermanager.NewPeerManager(10, 10, nil, true, utils.Logger()) node1, sub1, host1 := makeWakuRelay(t, testTopic) defer node1.Stop() diff --git a/waku/v2/protocol/metadata/waku_metadata.go b/waku/v2/protocol/metadata/waku_metadata.go index 228f4487f..875902034 100644 --- a/waku/v2/protocol/metadata/waku_metadata.go +++ b/waku/v2/protocol/metadata/waku_metadata.go @@ -125,27 +125,30 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc writer := pbio.NewDelimitedWriter(stream) reader := pbio.NewDelimitedReader(stream, math.MaxInt32) + logger.Debug("sending metadata request") err = writer.WriteMsg(request) if err != nil { logger.Error("writing request", zap.Error(err)) if err := stream.Reset(); err != nil { - wakuM.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return nil, err } + logger.Debug("sent metadata request") response := &pb.WakuMetadataResponse{} err = reader.ReadMsg(response) if err != nil { logger.Error("reading response", zap.Error(err)) if err := stream.Reset(); err != nil { - wakuM.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return nil, err } stream.Close() + logger.Debug("received metadata response") if response.ClusterId == nil { return nil, errors.New("node did not provide a waku clusterid") @@ -163,6 +166,7 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc rShardIDs = append(rShardIDs, uint16(i)) } } + logger.Debug("getting remote cluster and shards") rs, err := protocol.NewRelayShards(rClusterID, rShardIDs...) if err != nil { @@ -176,7 +180,7 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { logger := wakuM.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) request := &pb.WakuMetadataRequest{} - + logger.Debug("received metadata request from peer") writer := pbio.NewDelimitedWriter(stream) reader := pbio.NewDelimitedReader(stream, math.MaxInt32) @@ -184,11 +188,10 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) { if err != nil { logger.Error("reading request", zap.Error(err)) if err := stream.Reset(); err != nil { - wakuM.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return } - response := new(pb.WakuMetadataResponse) clusterID, shards, err := wakuM.ClusterAndShards() @@ -205,10 +208,11 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) { if err != nil { logger.Error("writing response", zap.Error(err)) if err := stream.Reset(); err != nil { - wakuM.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return } + logger.Debug("sent metadata response to peer") stream.Close() } @@ -248,14 +252,15 @@ func (wakuM *WakuMetadata) disconnectPeer(peerID peer.ID, reason error) { // Connected is called when a connection is opened func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) { go func() { + wakuM.log.Debug("peer connected", zap.Stringer("peer", cc.RemotePeer())) // Metadata verification is done only if a clusterID is specified if wakuM.clusterID == 0 { return } peerID := cc.RemotePeer() - shard, err := wakuM.Request(wakuM.ctx, peerID) + if err != nil { wakuM.disconnectPeer(peerID, err) return diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 6b53bb8dd..92280810f 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -292,7 +292,7 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { require.NoError(t, err) // Prepare peer manager for host3 - pm3 := peermanager.NewPeerManager(10, 20, nil, log) + pm3 := peermanager.NewPeerManager(10, 20, nil, true, log) pm3.SetHost(host3) pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, utils.Logger()) require.NoError(t, err) @@ -367,7 +367,7 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) { require.NoError(t, err) // Prepare peer manager for host3 - pm3 := peermanager.NewPeerManager(10, 20, nil, log) + pm3 := peermanager.NewPeerManager(10, 20, nil, true, log) pm3.SetHost(host3) pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, utils.Logger()) require.NoError(t, err) diff --git a/waku/v2/protocol/store/client_test.go b/waku/v2/protocol/store/client_test.go index 035b7a6fe..d851cfc4a 100644 --- a/waku/v2/protocol/store/client_test.go +++ b/waku/v2/protocol/store/client_test.go @@ -43,7 +43,7 @@ func TestStoreClient(t *testing.T) { err = wakuRelay.Start(context.Background()) require.NoError(t, err) - pm := peermanager.NewPeerManager(5, 5, nil, utils.Logger()) + pm := peermanager.NewPeerManager(5, 5, nil, true, utils.Logger()) pm.SetHost(host) err = pm.SubscribeToRelayEvtBus(wakuRelay.Events()) require.NoError(t, err)