diff --git a/chainservice/builder.go b/chainservice/builder.go index 65a0d531a7..c4a358e48f 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -798,8 +798,9 @@ func (builder *Builder) buildBlockTimeCalculator() (err error) { func (builder *Builder) buildConsensusComponent() error { p2pAgent := builder.cs.p2pAgent copts := []consensus.Option{ - consensus.WithBroadcast(func(msg proto.Message) error { - return p2pAgent.BroadcastOutbound(context.Background(), msg) + consensus.WithDefaultTopic(p2pAgent.DefaultTopic()), + consensus.WithBroadcast(func(topic string, msg proto.Message) error { + return p2pAgent.BroadcastOutbound(context.Background(), topic, msg) }), } if rDPoSProtocol := rolldpos.FindProtocol(builder.cs.registry); rDPoSProtocol != nil { diff --git a/chainservice/chainservice.go b/chainservice/chainservice.go index 3cf53c9ead..799d39753e 100644 --- a/chainservice/chainservice.go +++ b/chainservice/chainservice.go @@ -239,7 +239,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, archive bool) (*api.ServerV p2pAgent := cs.p2pAgent apiServerOptions := []api.Option{ api.WithBroadcastOutbound(func(ctx context.Context, chainID uint32, msg proto.Message) error { - return p2pAgent.BroadcastOutbound(ctx, msg) + return p2pAgent.BroadcastOutbound(ctx, p2pAgent.DefaultTopic(), msg) }), api.WithNativeElection(cs.electionCommittee), api.WithAPIStats(cs.apiStats), diff --git a/consensus/consensus.go b/consensus/consensus.go index 76bcdd4fd2..cc635196db 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -46,6 +46,7 @@ type IotxConsensus struct { } type optionParams struct { + defaultTopic string broadcastHandler scheme.Broadcast pp poll.Protocol rp *rp.Protocol @@ -54,6 +55,14 @@ type optionParams struct { // Option sets Consensus construction parameter. type Option func(op *optionParams) error +// WithDefaultTopic is an option to set consensus broadcast topic +func WithDefaultTopic(topic string) Option { + return func(ops *optionParams) error { + ops.defaultTopic = topic + return nil + } +} + // WithBroadcast is an option to add broadcast callback to Consensus func WithBroadcast(broadcastHandler scheme.Broadcast) Option { return func(ops *optionParams) error { @@ -140,6 +149,7 @@ func NewConsensus( SetBlockDeserializer(block.NewDeserializer(bc.EvmNetworkID())). SetClock(clock). SetBroadcast(ops.broadcastHandler). + SetDefaultTopic(ops.defaultTopic). SetDelegatesByEpochFunc(delegatesByEpochFunc). SetProposersByEpochFunc(proposersByEpochFunc). RegisterProtocol(ops.rp) @@ -171,7 +181,7 @@ func NewConsensus( } broadcastBlockCB := func(blk *block.Block) error { if blkPb := blk.ConvertToBlockPb(); blkPb != nil { - return ops.broadcastHandler(blkPb) + return ops.broadcastHandler(ops.defaultTopic, blkPb) } return nil } diff --git a/consensus/scheme/rolldpos/rolldpos.go b/consensus/scheme/rolldpos/rolldpos.go index 5ca1cdf3fe..d8406bad67 100644 --- a/consensus/scheme/rolldpos/rolldpos.go +++ b/consensus/scheme/rolldpos/rolldpos.go @@ -336,6 +336,7 @@ type ( chain ChainManager blockDeserializer *block.Deserializer broadcastHandler scheme.Broadcast + topic string clock clock.Clock // TODO: explorer dependency deleted at #1085, need to add api params rp *rolldpos.Protocol @@ -385,6 +386,11 @@ func (b *Builder) SetBroadcast(broadcastHandler scheme.Broadcast) *Builder { return b } +func (b *Builder) SetDefaultTopic(topic string) *Builder { + b.topic = topic + return b +} + // SetClock sets the clock func (b *Builder) SetClock(clock clock.Clock) *Builder { b.clock = clock @@ -435,6 +441,7 @@ func (b *Builder) Build() (*RollDPoS, error) { b.blockDeserializer, b.rp, b.broadcastHandler, + b.topic, b.delegatesByEpochFunc, b.proposersByEpochFunc, b.encodedAddr, diff --git a/consensus/scheme/rolldpos/rolldposctx.go b/consensus/scheme/rolldpos/rolldposctx.go index 62c9a4e3ef..b1d104df74 100644 --- a/consensus/scheme/rolldpos/rolldposctx.go +++ b/consensus/scheme/rolldpos/rolldposctx.go @@ -90,6 +90,7 @@ type ( chain ChainManager blockDeserializer *block.Deserializer broadcastHandler scheme.Broadcast + topic string roundCalc *roundCalculator eManagerDB db.KVStore toleratedOvertime time.Duration @@ -114,6 +115,7 @@ func NewRollDPoSCtx( blockDeserializer *block.Deserializer, rp *rolldpos.Protocol, broadcastHandler scheme.Broadcast, + topic string, delegatesByEpochFunc NodesSelectionByEpochFunc, proposersByEpochFunc NodesSelectionByEpochFunc, encodedAddr string, @@ -166,6 +168,7 @@ func NewRollDPoSCtx( chain: chain, blockDeserializer: blockDeserializer, broadcastHandler: broadcastHandler, + topic: topic, clock: clock, roundCalc: roundCalc, eManagerDB: eManagerDB, @@ -526,7 +529,8 @@ func (ctx *rollDPoSCtx) Commit(msg interface{}) (bool, error) { } // Broadcast the committed block to the network if blkProto := pendingBlock.ConvertToBlockPb(); blkProto != nil { - if err := ctx.broadcastHandler(blkProto); err != nil { + // TODO: broadcast to different topic after HF height + if err := ctx.broadcastHandler(ctx.topic, blkProto); err != nil { ctx.logger().Error( "error when broadcasting blkProto", zap.Error(err), @@ -571,7 +575,8 @@ func (ctx *rollDPoSCtx) Broadcast(endorsedMsg interface{}) { ctx.loggerWithStats().Error("failed to generate protobuf message", zap.Error(err)) return } - if err := ctx.broadcastHandler(msg); err != nil { + // TODO: broadcast to different topic after HF height + if err := ctx.broadcastHandler(ctx.topic, msg); err != nil { ctx.loggerWithStats().Error("fail to broadcast", zap.Error(err)) } } diff --git a/consensus/scheme/scheme.go b/consensus/scheme/scheme.go index cd3bb0f711..b9b11776e7 100644 --- a/consensus/scheme/scheme.go +++ b/consensus/scheme/scheme.go @@ -13,6 +13,10 @@ import ( "github.com/iotexproject/iotex-proto/golang/iotextypes" ) +var ( + BroadcastTopic = "broadcast+consensus" +) + // CreateBlockCB defines the callback to create a new block type CreateBlockCB func() (*block.Block, error) @@ -26,7 +30,7 @@ type ConsensusDoneCB func(*block.Block) error type BroadcastCB func(*block.Block) error // Broadcast sends a broadcast message to the whole network -type Broadcast func(msg proto.Message) error +type Broadcast func(string, proto.Message) error // Scheme is the interface that consensus schemes should implement type Scheme interface { diff --git a/nodeinfo/manager.go b/nodeinfo/manager.go index 693e2cc446..01430715ec 100644 --- a/nodeinfo/manager.go +++ b/nodeinfo/manager.go @@ -31,8 +31,9 @@ import ( type ( transmitter interface { - BroadcastOutbound(context.Context, proto.Message) error + BroadcastOutbound(context.Context, string, proto.Message) error UnicastOutbound(context.Context, peer.AddrInfo, proto.Message) error + DefaultTopic() string Info() (peer.AddrInfo, error) } @@ -54,6 +55,7 @@ type ( lifecycle.Lifecycle version string address string + topic string broadcastList atomic.Value // []string, whitelist to force enable broadcast nodeMap *lru.Cache transmitter transmitter @@ -86,6 +88,7 @@ func NewInfoManager(cfg *Config, t transmitter, ch chain, privKey crypto.Private privKey: privKey, version: version.PackageVersion, address: privKey.PublicKey().Address().String(), + topic: t.DefaultTopic(), getBroadcastListFunc: broadcastListFunc, } dm.broadcastList.Store([]string{}) @@ -169,7 +172,8 @@ func (dm *InfoManager) BroadcastNodeInfo(ctx context.Context) error { return err } // broadcast request meesage - if err := dm.transmitter.BroadcastOutbound(ctx, req); err != nil { + // TODO: use different topic after HF height + if err := dm.transmitter.BroadcastOutbound(ctx, dm.topic, req); err != nil { return err } // manually update self node info for broadcast message to myself will be ignored diff --git a/p2p/agent.go b/p2p/agent.go index c5038e97ca..616f921123 100644 --- a/p2p/agent.go +++ b/p2p/agent.go @@ -85,6 +85,7 @@ type ( ExternalHost string `yaml:"externalHost"` ExternalPort int `yaml:"externalPort"` BootstrapNodes []string `yaml:"bootstrapNodes"` + ExtraTopics []string `yaml:"extraTopics"` MasterKey string `yaml:"masterKey"` // master key will be PrivateKey if not set. // RelayType is the type of P2P network relay. By default, the value is empty, meaning disabled. Two relay types // are supported: active, nat. @@ -103,10 +104,12 @@ type ( Agent interface { lifecycle.StartStopper nodestats.StatsReporter - // BroadcastOutbound sends a broadcast message to the whole network - BroadcastOutbound(ctx context.Context, msg proto.Message) (err error) + // BroadcastOutbound sends a broadcast message to the topic + BroadcastOutbound(context.Context, string, proto.Message) (err error) // UnicastOutbound sends a unicast message to the given address UnicastOutbound(_ context.Context, peer peer.AddrInfo, msg proto.Message) (err error) + // DefaultTopic returns the default broadcast topic + DefaultTopic() string // Info returns agents' peer info. Info() (peer.AddrInfo, error) // Self returns the self network address @@ -163,7 +166,7 @@ func (*dummyAgent) Stop(context.Context) error { return nil } -func (*dummyAgent) BroadcastOutbound(ctx context.Context, msg proto.Message) error { +func (*dummyAgent) BroadcastOutbound(ctx context.Context, topic string, msg proto.Message) error { return nil } @@ -171,6 +174,8 @@ func (*dummyAgent) UnicastOutbound(_ context.Context, peer peer.AddrInfo, msg pr return nil } +func (*dummyAgent) DefaultTopic() string { return "" } + func (*dummyAgent) Info() (peer.AddrInfo, error) { return peer.AddrInfo{}, nil } @@ -235,61 +240,64 @@ func (p *agent) Start(ctx context.Context) error { if err != nil { return errors.Wrap(err, "error when instantiating Agent host") } - - if err := host.AddBroadcastPubSub(ctx, _broadcastTopic+p.topicSuffix, func(ctx context.Context, data []byte) (err error) { - // Blocking handling the broadcast message until the agent is started - <-ready - var ( - peerID string - broadcast iotexrpc.BroadcastMsg - latency int64 - ) - skip := false - defer func() { - // Skip accounting if the broadcast message is not handled - if skip { + // register to all topics + p.cfg.ExtraTopics = append(p.cfg.ExtraTopics, p.DefaultTopic()) + for _, topic := range p.cfg.ExtraTopics { + if err := host.AddBroadcastPubSub(ctx, topic, func(ctx context.Context, data []byte) (err error) { + // Blocking handling the broadcast message until the agent is started + <-ready + var ( + peerID string + broadcast iotexrpc.BroadcastMsg + latency int64 + ) + skip := false + defer func() { + // Skip accounting if the broadcast message is not handled + if skip { + return + } + status := _successStr + if err != nil { + status = _failureStr + } + _p2pMsgCounter.WithLabelValues("broadcast", strconv.Itoa(int(broadcast.MsgType)), "in", peerID, status).Inc() + _p2pMsgLatency.WithLabelValues("broadcast", strconv.Itoa(int(broadcast.MsgType)), status).Observe(float64(latency)) + }() + if err = proto.Unmarshal(data, &broadcast); err != nil { + err = errors.Wrap(err, "error when marshaling broadcast message") return } - status := _successStr - if err != nil { - status = _failureStr + // Skip the broadcast message if it's from the node itself + rawmsg, ok := p2p.GetBroadcastMsg(ctx) + if !ok { + err = errors.New("error when asserting broadcast msg context") + return + } + peerID = rawmsg.GetFrom().String() + if p.host.HostIdentity() == peerID { + skip = true + return + } + if broadcast.ChainId != p.chainID { + err = errors.Errorf("chain ID mismatch, received %d, expecting %d", broadcast.ChainId, p.chainID) + return } - _p2pMsgCounter.WithLabelValues("broadcast", strconv.Itoa(int(broadcast.MsgType)), "in", peerID, status).Inc() - _p2pMsgLatency.WithLabelValues("broadcast", strconv.Itoa(int(broadcast.MsgType)), status).Observe(float64(latency)) - }() - if err = proto.Unmarshal(data, &broadcast); err != nil { - err = errors.Wrap(err, "error when marshaling broadcast message") - return - } - // Skip the broadcast message if it's from the node itself - rawmsg, ok := p2p.GetBroadcastMsg(ctx) - if !ok { - err = errors.New("error when asserting broadcast msg context") - return - } - peerID = rawmsg.GetFrom().String() - if p.host.HostIdentity() == peerID { - skip = true - return - } - if broadcast.ChainId != p.chainID { - err = errors.Errorf("chain ID mismatch, received %d, expecting %d", broadcast.ChainId, p.chainID) - return - } - t := broadcast.GetTimestamp().AsTime() - latency = time.Since(t).Nanoseconds() / time.Millisecond.Nanoseconds() + t := broadcast.GetTimestamp().AsTime() + latency = time.Since(t).Nanoseconds() / time.Millisecond.Nanoseconds() - msg, err := goproto.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody) - if err != nil { - err = errors.Wrap(err, "error when typifying broadcast message") + msg, err := goproto.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody) + if err != nil { + err = errors.Wrap(err, "error when typifying broadcast message") + return + } + p.broadcastInboundHandler(ctx, broadcast.ChainId, peerID, msg) + p.qosMetrics.updateRecvBroadcast(time.Now()) return + }); err != nil { + return errors.Wrap(err, "error when adding broadcast pubsub") } - p.broadcastInboundHandler(ctx, broadcast.ChainId, peerID, msg) - p.qosMetrics.updateRecvBroadcast(time.Now()) - return - }); err != nil { - return errors.Wrap(err, "error when adding broadcast pubsub") } if err := host.AddUnicastPubSub(_unicastTopic+p.topicSuffix, func(ctx context.Context, peerInfo peer.AddrInfo, data []byte) (err error) { @@ -379,7 +387,7 @@ func (p *agent) Stop(ctx context.Context) error { return nil } -func (p *agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err error) { +func (p *agent) BroadcastOutbound(ctx context.Context, topic string, msg proto.Message) (err error) { _, span := tracer.NewSpan(ctx, "Agent.BroadcastOutbound") defer span.End() @@ -419,7 +427,7 @@ func (p *agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err e return } t := time.Now() - if err = host.Broadcast(ctx, _broadcastTopic+p.topicSuffix, data); err != nil { + if err = host.Broadcast(ctx, topic, data); err != nil { err = errors.Wrap(err, "error when sending broadcast message") p.qosMetrics.updateSendBroadcast(t, false) return @@ -473,6 +481,10 @@ func (p *agent) UnicastOutbound(ctx context.Context, peer peer.AddrInfo, msg pro return } +func (p *agent) DefaultTopic() string { + return _broadcastTopic + p.topicSuffix +} + func (p *agent) Info() (peer.AddrInfo, error) { if p.host == nil { return peer.AddrInfo{}, ErrAgentNotStarted diff --git a/server/itx/server.go b/server/itx/server.go index c3c346f751..ee6cb18640 100644 --- a/server/itx/server.go +++ b/server/itx/server.go @@ -20,6 +20,7 @@ import ( "github.com/iotexproject/iotex-core/v2/api" "github.com/iotexproject/iotex-core/v2/chainservice" "github.com/iotexproject/iotex-core/v2/config" + "github.com/iotexproject/iotex-core/v2/consensus/scheme" "github.com/iotexproject/iotex-core/v2/dispatcher" "github.com/iotexproject/iotex-core/v2/p2p" "github.com/iotexproject/iotex-core/v2/pkg/ha" @@ -66,6 +67,8 @@ func newServer(cfg config.Config, testing bool) (*Server, error) { case config.StandaloneScheme: p2pAgent = p2p.NewDummyAgent() default: + // TODO: add different topic depends on node type + cfg.Network.ExtraTopics = append(cfg.Network.ExtraTopics, scheme.BroadcastTopic) p2pAgent = p2p.NewAgent(cfg.Network, cfg.Chain.ID, cfg.Genesis.Hash(), dispatcher.HandleBroadcast, dispatcher.HandleTell) } chains := make(map[uint32]*chainservice.ChainService)